You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2022/06/21 03:55:02 UTC

[rocketmq-streams] 03/16: merge 1.0

This is an automated email from the ASF dual-hosted git repository.

karp pushed a commit to branch snapshot-1.0.4
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git

commit 9d3ae58bf1f301a7f31d04256ddbe04ec94899b5
Author: 维章 <un...@gmail.com>
AuthorDate: Mon May 23 11:58:39 2022 +0800

    merge 1.0
---
 pom.xml                                            |   1 +
 rocketmq-streams-connectors/pom.xml                |  47 ++++
 .../streams/connectors/IBoundedSource.java         |  32 +++
 .../streams/connectors/IBoundedSourceReader.java   |  26 ++
 .../streams/connectors/IScheduleCallback.java      |  24 ++
 .../connectors/balance/AbstractBalance.java        | 207 ++++++++++++++
 .../streams/connectors/balance/IBalanceTask.java   |  24 ++
 .../streams/connectors/balance/ISourceBalance.java |  60 ++++
 .../streams/connectors/balance/SplitChanged.java   |  55 ++++
 .../connectors/balance/impl/LeaseBalanceImpl.java  | 144 ++++++++++
 .../streams/connectors/model/PullMessage.java      |  50 ++++
 .../streams/connectors/model/ReaderStatus.java     | 120 ++++++++
 .../streams/connectors/reader/DBScanReader.java    | 269 ++++++++++++++++++
 .../streams/connectors/reader/ISplitReader.java    |  96 +++++++
 .../connectors/reader/SplitCloseFuture.java        |  83 ++++++
 .../connectors/source/AbstractPullSource.java      | 312 +++++++++++++++++++++
 .../source/CycleDynamicMultipleDBScanSource.java   | 213 ++++++++++++++
 .../source/DynamicMultipleDBScanSource.java        | 190 +++++++++++++
 .../streams/connectors/source/IPullSource.java     |  60 ++++
 .../connectors/source/MutilBatchTaskSource.java    | 158 +++++++++++
 .../streams/connectors/source/SourceInstance.java  |  37 +++
 .../source/filter/AbstractPatternFilter.java       |  38 +++
 .../source/filter/BoundedPatternFilter.java        |  53 ++++
 .../source/filter/CyclePatternFilter.java          | 173 ++++++++++++
 .../connectors/source/filter/CyclePeriod.java      | 222 +++++++++++++++
 .../connectors/source/filter/CycleSchedule.java    | 236 ++++++++++++++++
 .../source/filter/CycleScheduleFilter.java         |  37 +++
 .../source/filter/DataFormatPatternFilter.java     | 106 +++++++
 .../connectors/source/filter/PatternFilter.java    |  41 +++
 .../window/offset/WindowMaxValueProcessor.java     |   2 +-
 30 files changed, 3115 insertions(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index 9da26dcd..7caa0cbc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -52,6 +52,7 @@
         <module>rocketmq-streams-channel-http</module>
         <module>rocketmq-streams-state</module>
         <module>rocketmq-streams-examples</module>
+        <module>rocketmq-streams-connectors</module>
         <module>rocketmq-streams-channel-syslog</module>
         <module>rocketmq-streams-channel-es</module>
         <module>rocketmq-streams-channel-kafka</module>
diff --git a/rocketmq-streams-connectors/pom.xml b/rocketmq-streams-connectors/pom.xml
new file mode 100644
index 00000000..d544e3d5
--- /dev/null
+++ b/rocketmq-streams-connectors/pom.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.rocketmq</groupId>
+        <artifactId>rocketmq-streams</artifactId>
+        <version>1.0.2-SNAPSHOT</version>
+    </parent>
+    <artifactId>rocketmq-streams-connectors</artifactId>
+    <packaging>jar</packaging>
+    <name>ROCKETMQ STREAMS :: connectors</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-streams-lease</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-streams-schedule</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-streams-commons</artifactId>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/IBoundedSource.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/IBoundedSource.java
new file mode 100644
index 00000000..1a383433
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/IBoundedSource.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors;
+
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+
+/**
+ * @description
+ */
+public interface IBoundedSource{
+
+    /**
+     * reader完成时调用
+     * @param iSplit
+     */
+    void boundedFinishedCallBack(ISplit iSplit);
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/IBoundedSourceReader.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/IBoundedSourceReader.java
new file mode 100644
index 00000000..03995c31
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/IBoundedSourceReader.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors;
+
+import org.apache.rocketmq.streams.common.interfaces.ILifeCycle;
+
+/**
+ * @description
+ */
+public interface IBoundedSourceReader extends ILifeCycle {
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/IScheduleCallback.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/IScheduleCallback.java
new file mode 100644
index 00000000..88fb2cfd
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/IScheduleCallback.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors;
+
+import java.util.Date;
+
+public interface IScheduleCallback {
+
+    boolean canFire(Date date);
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/AbstractBalance.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/AbstractBalance.java
new file mode 100644
index 00000000..5c2b266f
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/AbstractBalance.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.balance;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+import org.apache.rocketmq.streams.connectors.source.SourceInstance;
+
+public abstract class AbstractBalance implements ISourceBalance {
+
+    protected int balanceCount = 0;
+
+    @Override
+    public SplitChanged doBalance(List<ISplit> allSplits, List<ISplit> ownerSplits) {
+        balanceCount++;
+        heartBeat();
+        List<SourceInstance> sourceInstances = fetchSourceInstances();
+        List<ISplit> workingSplits = fetchWorkingSplits(allSplits);
+        SplitChanged splitChanged = getAdditionSplits(allSplits, sourceInstances, workingSplits, ownerSplits);
+        if (splitChanged != null) {
+            return splitChanged;
+        }
+        splitChanged = getRemoveSplits(allSplits, sourceInstances, workingSplits, ownerSplits);
+        return splitChanged;
+    }
+
+    protected void heartBeat() {
+        holdLockSourceInstance();
+    }
+
+    /**
+     * get all dispatch splits
+     *
+     * @return
+     */
+    protected abstract List<ISplit> fetchWorkingSplits(List<ISplit> allSplitS);
+
+    /**
+     * get all instacne for the source
+     *
+     * @return
+     */
+    protected abstract List<SourceInstance> fetchSourceInstances();
+
+    /**
+     * lock the source ,the lock is globel,only one source instance can get it in same time
+     *
+     * @return
+     */
+    protected abstract boolean holdLockSourceInstance();
+
+    /**
+     * unlock
+     */
+    protected abstract void unlockSourceInstance();
+
+    /**
+     * juge need add split,根据调度策略选择
+     * 每次最大增加的分片数,根据调度次数决定
+     *
+     * @param allSplits
+     * @param sourceInstances
+     * @param workingSplits
+     * @return
+     */
+    protected SplitChanged getAdditionSplits(List<ISplit> allSplits, List<SourceInstance> sourceInstances,
+        List<ISplit> workingSplits, List<ISplit> ownerSplits) {
+        SplitChanged splitChanged = getChangedSplitCount(allSplits, sourceInstances, workingSplits.size(), ownerSplits.size());
+        if (splitChanged == null) {
+            return null;
+        }
+        if (splitChanged.isNewSplit == false) {
+            return null;
+        }
+        if (splitChanged.splitCount <= 0) {
+            return null;
+        }
+        List<ISplit> noWorkingSplits = getNoWorkingSplits(allSplits, workingSplits);
+        List<ISplit> newSplits = new ArrayList<>();
+        for (int i = 0; i < noWorkingSplits.size(); i++) {
+            boolean success = holdLockSplit(noWorkingSplits.get(i));
+            if (success) {
+                newSplits.add(noWorkingSplits.get(i));
+                if (newSplits.size() >= splitChanged.splitCount) {
+                    break;
+                }
+            }
+        }
+        splitChanged.setChangedSplits(newSplits);
+        return splitChanged;
+
+    }
+
+    protected List<ISplit> getNoWorkingSplits(List<ISplit> allSplits, List<ISplit> workingSplits) {
+        Set<String> workingSplitIds = new HashSet<>();
+        for (ISplit split : workingSplits) {
+            workingSplitIds.add(split.getQueueId());
+        }
+        List<ISplit> splits = new ArrayList<>();
+        for (ISplit split : allSplits) {
+            if (!workingSplitIds.contains(split.getQueueId())) {
+                splits.add(split);
+            }
+        }
+        return splits;
+    }
+
+    /**
+     * 获取需要删除的分片
+     *
+     * @param allSplits
+     * @param sourceInstances
+     * @param workingSplits
+     * @return
+     */
+    protected SplitChanged getRemoveSplits(List<ISplit> allSplits, List<SourceInstance> sourceInstances,
+        List<ISplit> workingSplits, List<ISplit> ownerSplits) {
+        SplitChanged splitChanged = getChangedSplitCount(allSplits, sourceInstances, workingSplits.size(), ownerSplits.size());
+        if (splitChanged == null) {
+            return null;
+        }
+        if (splitChanged.isNewSplit == true) {
+            return null;
+        }
+
+        if (splitChanged.splitCount <= 0) {
+            return null;
+        }
+        //List<ISplit> ownerSplits=source.ownerSplits();
+        List<ISplit> removeSplits = new ArrayList();
+        for (int i = 0; i < splitChanged.splitCount; i++) {
+            removeSplits.add(ownerSplits.get(i));
+        }
+        splitChanged.setChangedSplits(removeSplits);
+        return splitChanged;
+    }
+
+    /**
+     * 获取需要变动的分片个数,新增或删除
+     * 分配策略,只有有未分配的分片时才会分配新分片,为了减少分片切换,前面几次尽可能少分,后面越来越多
+     *
+     * @return 需要本实例有变化的分配,新增或删除
+     */
+    protected SplitChanged getChangedSplitCount(List<ISplit> allSplits, List<SourceInstance> sourceInstances,
+        int splitCountInWorking, int ownerSplitCount) {
+        //int ownerSplitCount=source.ownerSplits().size();
+        int instanceCount = sourceInstances.size();
+        if (instanceCount == 0) {
+            instanceCount = 1;
+        }
+        int allSplitCount = allSplits.size();
+        int minSplitCount = allSplitCount / instanceCount;
+        int maxSplitCount = minSplitCount + (allSplitCount % instanceCount == 0 ? 0 : 1);
+        //已经是最大分片数了
+        if (ownerSplitCount == maxSplitCount) {
+            return null;
+        }
+        if (ownerSplitCount > maxSplitCount) {
+            int changeSplitCount = ownerSplitCount - maxSplitCount;
+            return new SplitChanged(changeSplitCount, false);
+        }
+        //分片已经全部在处理,当前分片也符合最小分片分配策略,不需要重新分配
+        if (splitCountInWorking == allSplitCount && ownerSplitCount >= minSplitCount) {
+            return null;
+        }
+        //如果还有未分配的分片,且当前实例还有分片的可行性,则分配分片
+        if (splitCountInWorking < allSplitCount && ownerSplitCount < maxSplitCount) {
+            int changeSplitCount = Math.min(maxSplitCount - ownerSplitCount, getMaxSplitCountInOneBalance());
+
+            return new SplitChanged(changeSplitCount, true);
+        }
+        return null;
+    }
+
+    @Override
+    public int getBalanceCount() {
+        return balanceCount;
+    }
+
+    /**
+     * 每次负载均衡最大的分片个数,目的是前几次,少分配分配,可能有实例在启动中,以免频繁切换分片,到后面实例都启动了,斤可能多分配分片
+     *
+     * @return
+     */
+    private int getMaxSplitCountInOneBalance() {
+        int balanceCount = getBalanceCount();
+        return (int) Math.pow(2, balanceCount - 1);
+    }
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/IBalanceTask.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/IBalanceTask.java
new file mode 100644
index 00000000..5275fe48
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/IBalanceTask.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.balance;
+
+/**
+ * @description
+ */
+public interface IBalanceTask extends Runnable {
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/ISourceBalance.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/ISourceBalance.java
new file mode 100644
index 00000000..b012b323
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/ISourceBalance.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.balance;
+
+import java.util.List;
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+
+public interface ISourceBalance {
+
+    /**
+     * 做负载均衡
+
+     * @return
+     */
+    SplitChanged doBalance(List<ISplit> allSplits, List<ISplit> ownerSplits);
+
+    /**
+     * 从启动开始,做了多少次均衡
+     * @return
+     */
+    int getBalanceCount();
+
+
+
+    boolean getRemoveSplitLock();
+
+    void unLockRemoveSplitLock();
+
+    /**
+     * lock the split and hold it util the instance is shutdown or remove split
+     * @param split
+     * @return
+     */
+    boolean holdLockSplit(ISplit split);
+
+    /**
+     * unlock split lock
+     * @param split
+     */
+    void unlockSplit(ISplit split);
+
+
+    void setSourceIdentification(String sourceIdentification);
+
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/SplitChanged.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/SplitChanged.java
new file mode 100644
index 00000000..c01c1519
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/SplitChanged.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.balance;
+
+import java.util.List;
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+
+public class SplitChanged {
+
+    protected int splitCount;//变动多分片个数
+    protected boolean isNewSplit;//是否新增,false是删除
+    protected List<ISplit> changedSplits;
+    public SplitChanged(int splitCount,boolean isNewSplit){
+        this.splitCount=splitCount;
+        this.isNewSplit=isNewSplit;
+    }
+
+    public int getSplitCount() {
+        return splitCount;
+    }
+
+    public void setSplitCount(int splitCount) {
+        this.splitCount = splitCount;
+    }
+
+    public boolean isNewSplit() {
+        return isNewSplit;
+    }
+
+    public void setNewSplit(boolean newSplit) {
+        isNewSplit = newSplit;
+    }
+
+    public List<ISplit> getChangedSplits() {
+        return changedSplits;
+    }
+
+    public void setChangedSplits(List<ISplit> changedSplits) {
+        this.changedSplits = changedSplits;
+    }
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/impl/LeaseBalanceImpl.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/impl/LeaseBalanceImpl.java
new file mode 100644
index 00000000..dc504e5d
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/impl/LeaseBalanceImpl.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.balance.impl;
+
+import com.google.auto.service.AutoService;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+import org.apache.rocketmq.streams.common.model.ServiceName;
+import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
+import org.apache.rocketmq.streams.common.utils.RuntimeUtil;
+import org.apache.rocketmq.streams.connectors.balance.AbstractBalance;
+import org.apache.rocketmq.streams.connectors.balance.ISourceBalance;
+import org.apache.rocketmq.streams.connectors.source.SourceInstance;
+import org.apache.rocketmq.streams.lease.LeaseComponent;
+import org.apache.rocketmq.streams.lease.model.LeaseInfo;
+import org.apache.rocketmq.streams.lease.service.ILeaseService;
+
+@AutoService(ISourceBalance.class)
+@ServiceName(LeaseBalanceImpl.DB_BALANCE_NAME)
+public class LeaseBalanceImpl extends AbstractBalance {
+
+    private static final Log logger = LogFactory.getLog(LeaseBalanceImpl.class);
+
+    public static final String DB_BALANCE_NAME = "db_balance";
+    private static final String REMOVE_SPLIT_LOCK_NAME = "lock_remove_split";
+    private static final String SOURCE_LOCK_PREFIX = "SOURCE_";
+    private static final String SPLIT_LOCK_PREFIX = "SPLIT_";
+    protected transient LeaseComponent leaseComponent = LeaseComponent.getInstance();
+    protected transient String sourceIdentification;
+
+    protected int lockTimeSecond = 5;
+
+    public LeaseBalanceImpl(String sourceIdentification) {
+
+        this.sourceIdentification = sourceIdentification;
+    }
+
+    public LeaseBalanceImpl() {
+
+    }
+
+    @Override
+    protected List<ISplit> fetchWorkingSplits(List<ISplit> allSplits) {
+        List<LeaseInfo> leaseInfos = leaseComponent.getService().queryLockedInstanceByNamePrefix(SPLIT_LOCK_PREFIX + this.sourceIdentification, null);
+        logger.info(String.format("lease SPLIT_LOCK_PREFIX is %s, sourceIdentification is %s. ", SPLIT_LOCK_PREFIX, sourceIdentification));
+        if (leaseInfos == null) {
+            return new ArrayList<>();
+        }
+
+        Map<String, ISplit> allSplitMap = new HashMap<>();
+        for (ISplit split : allSplits) {
+            allSplitMap.put(split.getQueueId(), split);
+        }
+        List<ISplit> splits = new ArrayList<>();
+        for (LeaseInfo leaseInfo : leaseInfos) {
+            String leaseName = leaseInfo.getLeaseName();
+            String splitId = MapKeyUtil.getLast(leaseName);
+            splits.add(allSplitMap.get(splitId));
+        }
+        logger.info(String.format("working split is %s", Arrays.toString(splits.toArray())));
+        return splits;
+    }
+
+    @Override
+    protected List<SourceInstance> fetchSourceInstances() {
+        List<LeaseInfo> leaseInfos = leaseComponent.getService().queryLockedInstanceByNamePrefix(SOURCE_LOCK_PREFIX + sourceIdentification, null);
+        if (leaseInfos == null) {
+            return new ArrayList<>();
+        }
+        List<SourceInstance> sourceInstances = new ArrayList<>();
+        for (LeaseInfo leaseInfo : leaseInfos) {
+            String leaseName = leaseInfo.getLeaseName();
+            sourceInstances.add(new SourceInstance(leaseName));
+        }
+        return sourceInstances;
+    }
+
+    @Override
+    protected boolean holdLockSourceInstance() {
+        return holdLock(SOURCE_LOCK_PREFIX + sourceIdentification, RuntimeUtil.getDipperInstanceId());
+    }
+
+    @Override
+    protected void unlockSourceInstance() {
+        leaseComponent.getService().unlock(SOURCE_LOCK_PREFIX + sourceIdentification, RuntimeUtil.getDipperInstanceId());
+    }
+
+    @Override
+    public boolean holdLockSplit(ISplit split) {
+        return holdLock(SPLIT_LOCK_PREFIX + this.sourceIdentification, split.getQueueId());
+    }
+
+    @Override
+    public void unlockSplit(ISplit split) {
+        leaseComponent.getService().unlock(SPLIT_LOCK_PREFIX + this.sourceIdentification, split.getQueueId());
+
+    }
+
+    @Override
+    public boolean getRemoveSplitLock() {
+        return holdLock(this.sourceIdentification, REMOVE_SPLIT_LOCK_NAME);
+    }
+
+    @Override
+    public void unLockRemoveSplitLock() {
+        leaseComponent.getService().unlock(this.sourceIdentification, REMOVE_SPLIT_LOCK_NAME);
+    }
+
+    public String getSourceIdentification() {
+        return sourceIdentification;
+    }
+
+    @Override
+    public void setSourceIdentification(String sourceIdentification) {
+        this.sourceIdentification = sourceIdentification;
+    }
+
+    protected boolean holdLock(String name, String lockName) {
+        ILeaseService leaseService = leaseComponent.getService();
+        boolean success = leaseService.holdLock(name, lockName, lockTimeSecond);
+        return success;
+    }
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/model/PullMessage.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/model/PullMessage.java
new file mode 100644
index 00000000..9bf34803
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/model/PullMessage.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.model;
+
+import org.apache.rocketmq.streams.common.context.MessageOffset;
+
+public class PullMessage<T> {
+    protected T message;
+    protected MessageOffset messageOffset;
+
+    public T getMessage() {
+        return message;
+    }
+
+    public void setMessage(T message) {
+        this.message = message;
+    }
+
+    public MessageOffset getMessageOffset() {
+        return messageOffset;
+    }
+
+    public void setMessageOffset(MessageOffset messageOffset) {
+        this.messageOffset = messageOffset;
+    }
+    /**
+     * 获取offset字符串,通过.把主offset和子offset串接在一起
+     * @return
+     */
+    public String getOffsetStr(){
+       return this.messageOffset.getOffsetStr();
+    }
+    public String getMainOffset() {
+        return messageOffset.getMainOffset();
+    }
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/model/ReaderStatus.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/model/ReaderStatus.java
new file mode 100644
index 00000000..a4889b5f
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/model/ReaderStatus.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.model;
+
+import java.util.Date;
+import java.util.List;
+import org.apache.rocketmq.streams.common.model.Entity;
+import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
+
+/**
+ * @description
+ */
+public class ReaderStatus extends Entity {
+
+    /**
+     * 查询单个readerStatus
+     */
+    static final String queryReaderStatusByUK = "select * from reader_status where source_name = '%s' and reader_name = '%s' and is_finished = 1";
+
+    static final String queryReaderStatusList = "select * from reader_status where source_name = '%s' and is_finished = 1";
+
+    static final String clearReaderStatus = "update reader_status set gmt_modified = now(), is_finished = -1 where source_name = '%s' and reader_name = '%s'";
+
+    String sourceName;
+
+    String readerName;
+
+    int isFinished;
+
+    int totalReader;
+
+    public String getReaderName() {
+        return readerName;
+    }
+
+    public void setReaderName(String readerName) {
+        this.readerName = readerName;
+    }
+
+    public int getIsFinished() {
+        return isFinished;
+    }
+
+    public void setIsFinished(int isFinished) {
+        this.isFinished = isFinished;
+    }
+
+    public int getTotalReader() {
+        return totalReader;
+    }
+
+    public void setTotalReader(int totalReader) {
+        this.totalReader = totalReader;
+    }
+
+    public String getSourceName() {
+        return sourceName;
+    }
+
+    public void setSourceName(String sourceName) {
+        this.sourceName = sourceName;
+    }
+
+    @Override
+    public String toString() {
+        return "ReaderStatus{" +
+            "id=" + id +
+            ", gmtCreate=" + gmtCreate +
+            ", gmtModified=" + gmtModified +
+            ", sourceName='" + sourceName + '\'' +
+            ", readerName='" + readerName + '\'' +
+            ", isFinished=" + isFinished +
+            ", totalReader=" + totalReader +
+            '}';
+    }
+
+    public static ReaderStatus queryReaderStatusByUK(String sourceName, String readerName) {
+        String sql = String.format(queryReaderStatusByUK, sourceName, readerName);
+        ReaderStatus readerStatus = ORMUtil.queryForObject(sql, null, ReaderStatus.class);
+        return readerStatus;
+    }
+
+    public static List<ReaderStatus> queryReaderStatusListBySourceName(String sourceName) {
+        String sql = String.format(queryReaderStatusList, sourceName);
+        List<ReaderStatus> readerStatusList = ORMUtil.queryForList(sql, null, ReaderStatus.class);
+        return readerStatusList;
+    }
+
+    public static void clearReaderStatus(String sourceName, String readerName) {
+        String sql = String.format(clearReaderStatus, sourceName, readerName);
+        ORMUtil.executeSQL(sql, null);
+    }
+
+    public static ReaderStatus create(String sourceName, String readerName, int isFinished, int totalReader) {
+
+        ReaderStatus readerStatus = new ReaderStatus();
+        readerStatus.setSourceName(sourceName);
+        readerStatus.setReaderName(readerName);
+        readerStatus.setIsFinished(isFinished);
+        readerStatus.setTotalReader(totalReader);
+        readerStatus.setGmtCreate(new Date());
+        readerStatus.setGmtModified(new Date());
+        return readerStatus;
+
+    }
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/DBScanReader.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/DBScanReader.java
new file mode 100644
index 00000000..268e891e
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/DBScanReader.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.reader;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.channel.source.ISource;
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+import org.apache.rocketmq.streams.common.component.AbstractComponent;
+import org.apache.rocketmq.streams.common.context.MessageOffset;
+import org.apache.rocketmq.streams.common.utils.ThreadUtil;
+import org.apache.rocketmq.streams.connectors.IBoundedSource;
+import org.apache.rocketmq.streams.connectors.IBoundedSourceReader;
+import org.apache.rocketmq.streams.connectors.model.PullMessage;
+import org.apache.rocketmq.streams.connectors.model.ReaderStatus;
+import org.apache.rocketmq.streams.connectors.source.CycleDynamicMultipleDBScanSource;
+import org.apache.rocketmq.streams.db.driver.DriverBuilder;
+import org.apache.rocketmq.streams.db.driver.JDBCDriver;
+import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
+
+/**
+ * @description
+ */
+public class DBScanReader implements ISplitReader, IBoundedSourceReader, Serializable {
+
+    private static final long serialVersionUID = 8172403250050893288L;
+    private static final Log logger = LogFactory.getLog(DBScanReader.class);
+    static final String sqlTemplate = "select * from %s where id >= %d and id < %d";
+
+    //是否完成了source的call back调用
+    transient volatile boolean isFinishedCall = false;
+    ISource iSource;
+    String url;
+    String userName;
+    String password;
+    String tableName;
+    int batchSize;
+    long offset;
+    long offsetStart;
+    long offsetEnd;
+    long maxOffset;
+    long minOffset;
+    ISplit iSplit;
+    transient List<PullMessage> pullMessages;
+    volatile boolean interrupt = false;
+    volatile boolean isClosed = false;
+
+    public String getUrl() {
+        return url;
+    }
+
+    public void setUrl(String url) {
+        this.url = url;
+    }
+
+    public String getUserName() {
+        return userName;
+    }
+
+    public void setUserName(String userName) {
+        this.userName = userName;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public void setTableName(String tableName) {
+        this.tableName = tableName;
+    }
+
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    public void setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    public ISplit getISplit() {
+        return iSplit;
+    }
+
+    public void setISplit(ISplit iSplit) {
+        this.iSplit = iSplit;
+    }
+
+    public DBScanReader() {
+
+    }
+
+    transient ThreadLocal<JDBCDriver> threadLocal = new ThreadLocal<JDBCDriver>() {
+
+        @Override
+        public JDBCDriver initialValue() {
+            logger.info(String.format("%s initial jdbcDriver. ", Thread.currentThread().getName()));
+            return DriverBuilder.createDriver(AbstractComponent.DEFAULT_JDBC_DRIVER, url, userName, password);
+        }
+
+    };
+
+    @Override
+    public void open(ISplit split) {
+        this.iSplit = split;
+        JDBCDriver jdbcDriver = threadLocal.get();
+        Map<String, Object> range = jdbcDriver.queryOneRow("select min(id) as min_id, max(id) as max_id from " + tableName);
+        minOffset = Long.parseLong(String.valueOf(range.get("min_id")));
+        maxOffset = Long.parseLong(String.valueOf(range.get("max_id")));
+        offsetStart = minOffset;
+        offset = minOffset;
+        logger.info(String.format("table %s min id [ %d ],  max id [ %d ]", tableName, minOffset, maxOffset));
+        pullMessages = new ArrayList<>();
+    }
+
+    @Override
+    public boolean next() {
+        if (interrupt) {
+            return false;
+        }
+        if (isFinished()) {
+            finish();
+            ThreadUtil.sleep(10 * 1000);
+            return false;
+        }
+        JDBCDriver jdbcDriver = threadLocal.get();
+        offsetEnd = offsetStart + batchSize;
+        String batchQuery = String.format(sqlTemplate, tableName, offsetStart, offsetEnd);
+        logger.debug(String.format("execute sql : %s", batchQuery));
+        List<Map<String, Object>> resultData = jdbcDriver.queryForList(batchQuery);
+        offsetStart = offsetEnd;
+        pullMessages.clear();
+        for (Map<String, Object> r : resultData) {
+            PullMessage msg = new PullMessage();
+            JSONObject data = JSONObject.parseObject(JSON.toJSONString(r));
+            msg.setMessage(data);
+            offset = offset > Long.parseLong(data.getString("id")) ? offset : Long.parseLong(data.getString("id"));
+            msg.setMessageOffset(new MessageOffset(String.valueOf(offset), true));
+            pullMessages.add(msg);
+        }
+        return offsetStart - batchSize <= maxOffset;
+    }
+
+    @Override
+    public List<PullMessage> getMessage() {
+//        logger.info(String.format("output messages %d", pullMessages.size()));
+        return pullMessages;
+    }
+
+    @Override
+    public SplitCloseFuture close() {
+//        interrupt = true;
+        isClosed = true;
+        threadLocal.remove();
+        pullMessages = null;
+        return new SplitCloseFuture(this, iSplit);
+    }
+
+    @Override
+    public void seek(String cursor) {
+        if (cursor == null || cursor.trim().equals("")) {
+            cursor = "0";
+        }
+        offset = Long.parseLong(cursor);
+        if (offset < minOffset) {
+            offset = minOffset;
+        }
+        offsetStart = offset;
+        logger.info(String.format("split %s seek %d.", iSplit.getQueueId(), offset));
+    }
+
+    @Override
+    public String getProgress() {
+        return String.valueOf(offset);
+    }
+
+    @Override
+    public long getDelay() {
+        return maxOffset - offset;
+    }
+
+    @Override
+    public long getFetchedDelay() {
+        return 0;
+    }
+
+    @Override
+    public boolean isClose() {
+        return isClosed;
+    }
+
+    @Override
+    public ISplit getSplit() {
+        return iSplit;
+    }
+
+    @Override
+    public boolean isInterrupt() {
+        return interrupt;
+    }
+
+    @Override
+    public boolean interrupt() {
+        interrupt = true;
+        return true;
+    }
+
+    @Override
+    public boolean isFinished() {
+        return offsetStart > maxOffset;
+    }
+
+    @Override
+    public void finish() {
+        if (isFinishedCall) {
+            return;
+        }
+        pullMessages = null;
+        updateReaderStatus();
+        IBoundedSource tmp = (IBoundedSource) iSource;
+        tmp.boundedFinishedCallBack(this.iSplit);
+        isFinishedCall = true;
+    }
+
+    public ISource getISource() {
+        return iSource;
+    }
+
+    public void setISource(ISource iSource) {
+        this.iSource = iSource;
+    }
+
+    private final void updateReaderStatus() {
+        String sourceName = CycleDynamicMultipleDBScanSource.createKey(this.getISource());
+        int finish = Integer.valueOf(1);
+        int total = ((CycleDynamicMultipleDBScanSource) iSource).getTotalReader();
+        ReaderStatus readerStatus = ReaderStatus.create(sourceName, iSplit.getQueueId(), finish, total);
+        logger.info(String.format("create reader status %s.", readerStatus));
+        ORMUtil.batchReplaceInto(readerStatus);
+    }
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/ISplitReader.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/ISplitReader.java
new file mode 100644
index 00000000..6b377cff
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/ISplitReader.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.reader;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+import org.apache.rocketmq.streams.connectors.model.PullMessage;
+
+public interface ISplitReader extends Serializable {
+
+    /**
+     * Open.
+     *
+     * @param split the split
+     * @throws IOException the io exception
+     */
+    void open(ISplit split);
+
+    /**
+     * Next boolean.
+     *
+     * @return the boolean
+     * @throws IOException          the io exception
+     * @throws InterruptedException the interrupted exception
+     */
+    boolean next();
+
+    /**
+     * Gets message.
+     *
+     * @return the message
+     */
+    List<PullMessage> getMessage();
+
+    /**
+     * Close.
+     *
+     * @throws IOException the io exception
+     */
+    SplitCloseFuture close();
+
+    /**
+     * Seek.
+     *
+     * @param cursor the cursor
+     * @throws IOException the io exception
+     */
+    void seek(String cursor);
+
+    /**
+     * Gets progress.
+     *
+     * @return the progress
+     * @throws IOException the io exception
+     */
+    String getProgress();
+
+    /**
+     * Get message delay (millseconds)
+     *
+     * @return delay
+     */
+    long getDelay();
+
+    /**
+     * Get message delay (millseconds) from being fetched
+     *
+     * @return delay
+     */
+    long getFetchedDelay();
+
+    boolean isClose();
+
+    ISplit getSplit();
+
+    boolean isInterrupt();
+
+    boolean interrupt();
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/SplitCloseFuture.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/SplitCloseFuture.java
new file mode 100644
index 00000000..b28748b8
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/SplitCloseFuture.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.reader;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+
+public class SplitCloseFuture implements Future<Boolean> {
+
+    protected ISplitReader reader;
+    protected ISplit split;
+
+    public SplitCloseFuture(ISplitReader reader, ISplit split) {
+        this.reader = reader;
+        this.split = split;
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+        return false;
+    }
+
+    @Override
+    public boolean isCancelled() {
+        return false;
+    }
+
+    @Override
+    public boolean isDone() {
+        return reader.isClose();
+    }
+
+    @Override
+    public Boolean get() throws InterruptedException, ExecutionException {
+        synchronized (reader) {
+            reader.wait();
+        }
+        return reader.isClose();
+    }
+
+    @Override
+    public Boolean get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+        synchronized (reader) {
+            long time = timeout;
+            if (unit == TimeUnit.SECONDS) {
+                time = time * 1000;
+            } else if (unit == TimeUnit.MINUTES) {
+                time = time * 1000 * 60;
+            } else if (unit == TimeUnit.HOURS) {
+                time = time * 1000 * 60 * 60;
+            } else {
+                throw new RuntimeException("can not support this timeout, expect less hour " + timeout + " the unit is " + unit);
+            }
+            reader.wait(time);
+        }
+        return reader.isClose();
+    }
+
+    public ISplitReader getReader() {
+        return reader;
+    }
+
+    public ISplit getSplit() {
+        return split;
+    }
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/AbstractPullSource.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/AbstractPullSource.java
new file mode 100644
index 00000000..9aedc95a
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/AbstractPullSource.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.source;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+import org.apache.rocketmq.streams.common.checkpoint.CheckPoint;
+import org.apache.rocketmq.streams.common.checkpoint.CheckPointManager;
+import org.apache.rocketmq.streams.common.context.Message;
+import org.apache.rocketmq.streams.common.threadpool.ThreadPoolFactory;
+import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
+import org.apache.rocketmq.streams.connectors.balance.ISourceBalance;
+import org.apache.rocketmq.streams.connectors.balance.SplitChanged;
+import org.apache.rocketmq.streams.connectors.balance.impl.LeaseBalanceImpl;
+import org.apache.rocketmq.streams.connectors.model.PullMessage;
+import org.apache.rocketmq.streams.connectors.reader.ISplitReader;
+import org.apache.rocketmq.streams.connectors.reader.SplitCloseFuture;
+import org.apache.rocketmq.streams.serviceloader.ServiceLoaderComponent;
+
+public abstract class AbstractPullSource extends AbstractSource implements IPullSource<AbstractSource> {
+
+    private static final Log logger = LogFactory.getLog(AbstractPullSource.class);
+
+    protected transient ISourceBalance balance;// balance interface
+    protected transient ScheduledExecutorService balanceExecutor;//schdeule balance
+    protected transient Map<String, ISplitReader> splitReaders = new HashMap<>();//owner split readers
+    protected transient Map<String, ISplit> ownerSplits = new HashMap<>();//working splits by the source instance
+
+    //可以有多种实现,通过名字选择不同的实现
+    protected String balanceName = LeaseBalanceImpl.DB_BALANCE_NAME;
+    //balance schedule time
+    protected int balanceTimeSecond = 10;
+    protected long pullIntervalMs;
+    protected transient CheckPointManager checkPointManager = new CheckPointManager();
+    protected transient boolean shutDown=false;
+    @Override
+    protected boolean startSource() {
+        ServiceLoaderComponent serviceLoaderComponent = ServiceLoaderComponent.getInstance(ISourceBalance.class);
+        balance = (ISourceBalance) serviceLoaderComponent.getService().loadService(balanceName);
+        balance.setSourceIdentification(MapKeyUtil.createKey(getNameSpace(), getConfigureName()));
+        balanceExecutor = new ScheduledThreadPoolExecutor(1, new BasicThreadFactory.Builder().namingPattern("balance-task-%d").daemon(true).build());
+        List<ISplit> allSplits = fetchAllSplits();
+        SplitChanged splitChanged = balance.doBalance(allSplits, new ArrayList(ownerSplits.values()));
+        doSplitChanged(splitChanged);
+        balanceExecutor.scheduleWithFixedDelay(new Runnable() {
+            @Override
+            public void run() {
+                logger.info("balance running..... current splits is " + ownerSplits);
+                List<ISplit> allSplits = fetchAllSplits();
+                SplitChanged splitChanged = balance.doBalance(allSplits, new ArrayList(ownerSplits.values()));
+                doSplitChanged(splitChanged);
+            }
+        }, balanceTimeSecond, balanceTimeSecond, TimeUnit.SECONDS);
+
+        startWorks();
+        return true;
+    }
+
+    private void startWorks() {
+        ExecutorService workThreads= ThreadPoolFactory.createThreadPool(maxThread);
+        long start=System.currentTimeMillis();
+        while (!shutDown) {
+            Iterator<Map.Entry<String, ISplitReader>> it = splitReaders.entrySet().iterator();
+            while (it.hasNext()) {
+                Map.Entry<String, ISplitReader> entry=it.next();
+                String splitId=entry.getKey();
+                ISplit split=ownerSplits.get(splitId);
+                ISplitReader reader=entry.getValue();
+                ReaderRunner runner=new ReaderRunner(split,reader);
+                workThreads.execute(runner);
+            }
+            try {
+                long sleepTime=this.pullIntervalMs-(System.currentTimeMillis()-start);
+                if(sleepTime>0){
+                    Thread.sleep(sleepTime);
+                }
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Override
+    public Map<String, ISplit> getAllSplitMap() {
+        List<ISplit> splits = fetchAllSplits();
+        if (splits == null) {
+            return new HashMap<>();
+        }
+        Map<String, ISplit> splitMap = new HashMap<>();
+        for (ISplit split : splits) {
+            splitMap.put(split.getQueueId(), split);
+        }
+        return splitMap;
+    }
+
+    protected void doSplitChanged(SplitChanged splitChanged) {
+        if (splitChanged == null) {
+            return;
+        }
+        if (splitChanged.getSplitCount() == 0) {
+            return;
+        }
+        if (splitChanged.isNewSplit()) {
+            doSplitAddition(splitChanged.getChangedSplits());
+        } else {
+            doSplitRelease(splitChanged.getChangedSplits());
+        }
+    }
+
+    protected void doSplitAddition(List<ISplit> changedSplits) {
+        if (changedSplits == null) {
+            return;
+        }
+        Set<String> splitIds = new HashSet<>();
+        for (ISplit split : changedSplits) {
+            splitIds.add(split.getQueueId());
+        }
+        addNewSplit(splitIds);
+        for (ISplit split : changedSplits) {
+            ISplitReader reader = createSplitReader(split);
+            reader.open(split);
+            reader.seek(loadSplitOffset(split));
+            splitReaders.put(split.getQueueId(), reader);
+            this.ownerSplits.put(split.getQueueId(), split);
+//            logger.info("start next");
+//            Thread thread = new Thread(new Runnable() {
+//
+//            thread.setName("reader-task-" + reader.getSplit().getQueueId());
+//            thread.start();
+        }
+
+    }
+
+    @Override
+    public String loadSplitOffset(ISplit split) {
+        String offset = null;
+        CheckPoint<String> checkPoint = checkPointManager.recover(this, split);
+        if (checkPoint != null) {
+            offset = JSON.parseObject(checkPoint.getData()).getString("offset");
+        }
+        return offset;
+    }
+
+    protected abstract ISplitReader createSplitReader(ISplit split);
+
+    protected void doSplitRelease(List<ISplit> changedSplits) {
+        boolean success = balance.getRemoveSplitLock();
+        if (!success) {
+            return;
+        }
+        try {
+            List<SplitCloseFuture> closeFutures = new ArrayList<>();
+            for (ISplit split : changedSplits) {
+                ISplitReader reader = this.splitReaders.get(split.getQueueId());
+                if (reader == null) {
+                    continue;
+                }
+                SplitCloseFuture future = reader.close();
+                closeFutures.add(future);
+            }
+            for (SplitCloseFuture future : closeFutures) {
+                try {
+                    if(!future.isDone()){
+                        future.get();
+                    }
+                    this.splitReaders.remove(future.getSplit().getQueueId());
+                    this.ownerSplits.remove(future.getSplit().getQueueId());
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                } catch (ExecutionException e) {
+                    e.printStackTrace();
+                }
+            }
+
+        } finally {
+            balance.unLockRemoveSplitLock();
+        }
+
+    }
+
+
+    protected class ReaderRunner implements Runnable{
+        long mLastCheckTime = System.currentTimeMillis();
+        protected ISplit split;
+        protected ISplitReader reader;
+
+        public ReaderRunner(ISplit split,ISplitReader reader){
+            this.split=split;
+            this.reader=reader;
+        }
+
+        @Override
+        public void run() {
+            logger.info("start running");
+            if (reader.isInterrupt() == false) {
+                if (reader.next()) {
+                    List<PullMessage> messages = reader.getMessage();
+                    if (messages != null) {
+                        for (PullMessage pullMessage : messages) {
+                            String queueId = split.getQueueId();
+                            String offset = pullMessage.getOffsetStr();
+                            JSONObject msg = createJson(pullMessage.getMessage());
+                            Message message = createMessage(msg, queueId, offset, false);
+                            message.getHeader().setOffsetIsLong(pullMessage.getMessageOffset().isLongOfMainOffset());
+                            executeMessage(message);
+                        }
+                    }
+                    reader.notifyAll();
+                }
+                long curTime = System.currentTimeMillis();
+                if (curTime - mLastCheckTime > getCheckpointTime()) {
+                    sendCheckpoint(reader.getSplit().getQueueId());
+                    mLastCheckTime = curTime;
+                }
+
+
+            }else {
+                Set<String> removeSplits = new HashSet<>();
+                removeSplits.add(reader.getSplit().getQueueId());
+                removeSplit(removeSplits);
+                balance.unlockSplit(split);
+                reader.close();
+                synchronized (reader) {
+                    reader.notifyAll();
+                }
+            }
+
+        }
+
+    }
+
+    @Override
+    public boolean supportNewSplitFind() {
+        return true;
+    }
+
+    @Override
+    public boolean supportRemoveSplitFind() {
+        return true;
+    }
+
+    @Override
+    public boolean supportOffsetRest() {
+        return true;
+    }
+
+    @Override
+    public Long getPullIntervalMs() {
+        return pullIntervalMs;
+    }
+
+    public String getBalanceName() {
+        return balanceName;
+    }
+
+    public void setBalanceName(String balanceName) {
+        this.balanceName = balanceName;
+    }
+
+    public int getBalanceTimeSecond() {
+        return balanceTimeSecond;
+    }
+
+    public void setBalanceTimeSecond(int balanceTimeSecond) {
+        this.balanceTimeSecond = balanceTimeSecond;
+    }
+
+    public void setPullIntervalMs(long pullIntervalMs) {
+        this.pullIntervalMs = pullIntervalMs;
+    }
+
+    @Override
+    public List<ISplit> ownerSplits() {
+        return new ArrayList(ownerSplits.values());
+    }
+
+}
\ No newline at end of file
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/CycleDynamicMultipleDBScanSource.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/CycleDynamicMultipleDBScanSource.java
new file mode 100644
index 00000000..561b48f2
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/CycleDynamicMultipleDBScanSource.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.source;
+
+import com.alibaba.fastjson.JSONObject;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
+import org.apache.rocketmq.streams.common.channel.source.ISource;
+import org.apache.rocketmq.streams.common.channel.source.systemmsg.ChangeTableNameMessage;
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+import org.apache.rocketmq.streams.common.context.Message;
+import org.apache.rocketmq.streams.common.metadata.MetaDataUtils;
+import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
+import org.apache.rocketmq.streams.common.utils.ThreadUtil;
+import org.apache.rocketmq.streams.connectors.IBoundedSource;
+import org.apache.rocketmq.streams.connectors.model.ReaderStatus;
+import org.apache.rocketmq.streams.connectors.reader.ISplitReader;
+import org.apache.rocketmq.streams.connectors.source.filter.CycleSchedule;
+import org.apache.rocketmq.streams.connectors.source.filter.CycleScheduleFilter;
+import org.apache.rocketmq.streams.db.CycleSplit;
+
+/**
+ * @description
+ */
+public class CycleDynamicMultipleDBScanSource extends DynamicMultipleDBScanSource implements IBoundedSource, Serializable {
+
+    private static final long serialVersionUID = 6840988298037061128L;
+    private static final Log logger = LogFactory.getLog(CycleDynamicMultipleDBScanSource.class);
+
+    Map<String, Boolean> initReaderMap = new ConcurrentHashMap<>();
+    CycleSchedule.Cycle cycle;
+    transient AtomicInteger size = new AtomicInteger(0);
+
+    public CycleDynamicMultipleDBScanSource() {
+        super();
+    }
+
+    public CycleDynamicMultipleDBScanSource(CycleSchedule.Cycle cycle) {
+        super();
+        this.cycle = cycle;
+    }
+
+    public AtomicInteger getSize() {
+        return size;
+    }
+
+    public void setSize(AtomicInteger size) {
+        this.size = size;
+    }
+
+    /**
+     * @return
+     */
+    //todo
+    @Override
+    public synchronized List<ISplit> fetchAllSplits() {
+
+        if (this.filter == null) {
+            filter = new CycleScheduleFilter(cycle.getAllPattern());
+        }
+
+        //如果还是当前周期, 已经完成全部分区的加载, 则不在加载
+        if (size.get() == cycle.getCycleCount()) {
+            return splits;
+        }
+        String sourceName = createKey(this);
+        List<String> tableNames = MetaDataUtils.listTableNameByPattern(url, userName, password, logicTableName + "%");
+
+        logger.info(String.format("load all logic table : %s", Arrays.toString(tableNames.toArray())));
+        Iterator<String> it = tableNames.iterator();
+        while (it.hasNext()) {
+            String s = it.next();
+            String suffix = s.replace(logicTableName + "_", "");
+            if (filter.filter(sourceName, logicTableName, suffix)) {
+                logger.info(String.format("filter add %s", s));
+                CycleSplit split = new CycleSplit();
+                split.setLogicTableName(logicTableName);
+                split.setSuffix(suffix);
+                split.setCyclePeriod(cycle.getCycleDateStr());
+                String splitId = split.getQueueId();
+                if (initReaderMap.get(splitId) == null) {
+                    initReaderMap.put(splitId, false);
+                    splits.add(split);
+                    size.incrementAndGet();
+                }
+            } else {
+                logger.info(String.format("filter remove %s", s));
+                it.remove();
+            }
+        }
+
+        this.tableNames = tableNames;
+        return splits;
+    }
+
+    public Map<String, Boolean> getInitReaderMap() {
+        return initReaderMap;
+    }
+
+    public void setInitReaderMap(Map<String, Boolean> initReaderMap) {
+        this.initReaderMap = initReaderMap;
+    }
+
+    @Override
+    public void finish() {
+        super.finish();
+        for (Map.Entry<String, Boolean> entry : initReaderMap.entrySet()) {
+            String key = entry.getKey();
+            Boolean value = entry.getValue();
+            if (value == false) {
+                logger.error(String.format("split[%s] reader is not finish, exit with error. ", key));
+            }
+        }
+        this.initReaderMap.clear();
+        this.initReaderMap = null;
+        splits.clear();
+        splits = null;
+    }
+
+    @Override
+    public boolean isFinished() {
+        List<ReaderStatus> readerStatuses = ReaderStatus.queryReaderStatusListBySourceName(createKey(this));
+        if (readerStatuses == null) {
+            return false;
+        }
+        return readerStatuses.size() == size.get();
+    }
+
+    @Override
+    protected ISplitReader createSplitReader(ISplit iSplit) {
+        return super.createSplitReader(iSplit);
+    }
+
+    private void sendChangeTableNameMessage() {
+        logger.info(String.format("start send change table name message."));
+        ChangeTableNameMessage changeTableNameMessage = new ChangeTableNameMessage();
+        changeTableNameMessage.setScheduleCycle(cycle.getCycleDateStr());
+        Message message = createMessage(new JSONObject(), null, null, false);
+        message.setSystemMessage(changeTableNameMessage);
+        message.getHeader().setSystemMessage(true);
+        executeMessage(message);
+        logger.info(String.format("finish send change table name message."));
+    }
+
+    @Override
+    public synchronized void boundedFinishedCallBack(ISplit iSplit) {
+        this.initReaderMap.put(iSplit.getQueueId(), true);
+        logger.info(String.format("current map is %s, key is %s. ", initReaderMap, iSplit.getQueueId()));
+        if (statusCheckerStart.compareAndSet(false, true)) {
+            Thread thread = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    while (!isFinished()) {
+                        ThreadUtil.sleep(3 * 1000);
+                    }
+                    logger.info(String.format("source will be closed."));
+                    sendChangeTableNameMessage(); //下发修改name的消息
+                    ThreadUtil.sleep(1 * 1000);
+                    finish();
+                }
+
+            });
+            thread.setName(createKey(this) + "_callback");
+            thread.start();
+        }
+    }
+
+    public CycleSchedule.Cycle getCycle() {
+        return cycle;
+    }
+
+    public void setCycle(CycleSchedule.Cycle cycle) {
+        this.cycle = cycle;
+    }
+
+    @Override
+    public String createCheckPointName() {
+        return super.createCheckPointName();
+    }
+
+    public synchronized int getTotalReader() {
+        return size.get();
+    }
+
+    public static String createKey(ISource iSource) {
+        AbstractSource source = (AbstractSource) iSource;
+        CycleSchedule.Cycle cycle = ((CycleDynamicMultipleDBScanSource) iSource).getCycle();
+        return MapKeyUtil.createKey(source.getNameSpace(), source.getGroupName(), source.getConfigureName(), source.getTopic(), cycle.getCycleDateStr());
+    }
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/DynamicMultipleDBScanSource.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/DynamicMultipleDBScanSource.java
new file mode 100644
index 00000000..ea2a118b
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/DynamicMultipleDBScanSource.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.source;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+import org.apache.rocketmq.streams.common.metadata.MetaDataUtils;
+import org.apache.rocketmq.streams.connectors.reader.DBScanReader;
+import org.apache.rocketmq.streams.connectors.reader.ISplitReader;
+import org.apache.rocketmq.streams.connectors.source.filter.DataFormatPatternFilter;
+import org.apache.rocketmq.streams.connectors.source.filter.PatternFilter;
+import org.apache.rocketmq.streams.db.DynamicMultipleDBSplit;
+
+/**
+ * @description DynamicMultipleDBScanSource
+ */
+public class DynamicMultipleDBScanSource extends AbstractPullSource implements Serializable {
+
+    private static final long serialVersionUID = 3987103552547019739L;
+    private static final Log logger = LogFactory.getLog(DynamicMultipleDBScanSource.class);
+    public static final int DEFAULT_BATCH_SIZE = 50;
+    public static final int MAX_BATCH_SIZE = 100;
+
+    String url;
+    String userName;
+    String password;
+    String logicTableName;
+    String suffix;
+    int batchSize;
+    List<String> tableNames;
+    List<ISplit> splits;
+    transient volatile AtomicBoolean statusCheckerStart = new AtomicBoolean(false);
+
+    //todo
+    transient PatternFilter filter;
+
+    public DynamicMultipleDBScanSource() {
+        splits = new ArrayList<>();
+    }
+
+    @Override
+    protected boolean initConfigurable() {
+        setTopic(logicTableName);
+        return super.initConfigurable();
+    }
+
+    @Override
+    protected boolean isNotDataSplit(String queueId) {
+        return tableNames.contains(queueId);
+    }
+
+    @Override
+    protected ISplitReader createSplitReader(ISplit split) {
+
+        DBScanReader reader = new DBScanReader();
+        reader.setISplit(split);
+        reader.setUrl(url);
+        reader.setUserName(userName);
+        reader.setPassword(password);
+        reader.setTableName(String.valueOf(split.getQueue()));
+        int local = batchSize <= 0 ? DEFAULT_BATCH_SIZE : batchSize;
+        local = local > MAX_BATCH_SIZE ? MAX_BATCH_SIZE : local;
+        reader.setBatchSize(local);
+        reader.setISource(this);
+        logger.info(String.format("create reader for split %s", split.getQueueId()));
+        return reader;
+    }
+
+    @Override
+    public List<ISplit> fetchAllSplits() {
+
+        if (filter == null) {
+            filter = new DataFormatPatternFilter();
+        }
+
+//        String sourceName = createKey(this);
+
+        tableNames = MetaDataUtils.listTableNameByPattern(url, userName, password, logicTableName + "%");
+
+        logger.info(String.format("load all logic table : %s", Arrays.toString(tableNames.toArray())));
+
+        for (String s : tableNames) {
+            String suffix = s.replace(logicTableName + "_", "");
+            if (filter.filter(null, logicTableName, suffix)) {
+                logger.info(String.format("filter add %s", s));
+                DynamicMultipleDBSplit split = new DynamicMultipleDBSplit();
+                split.setLogicTableName(logicTableName);
+                split.setSuffix(suffix);
+                splits.add(split);
+            } else {
+                logger.info(String.format("filter remove %s", s));
+            }
+
+        }
+        return splits;
+    }
+
+    public String getUrl() {
+        return url;
+    }
+
+    public void setUrl(String url) {
+        this.url = url;
+    }
+
+    public String getUserName() {
+        return userName;
+    }
+
+    public void setUserName(String userName) {
+        this.userName = userName;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public String getLogicTableName() {
+        return logicTableName;
+    }
+
+    public void setLogicTableName(String logicTableName) {
+        this.logicTableName = logicTableName;
+    }
+
+    public String getSuffix() {
+        return suffix;
+    }
+
+    public void setSuffix(String suffix) {
+        this.suffix = suffix;
+    }
+
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    public void setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    public List<String> getTableNames() {
+        return tableNames;
+    }
+
+    public void setTableNames(List<String> tableNames) {
+        this.tableNames = tableNames;
+    }
+
+    public List<ISplit> getSplits() {
+        return splits;
+    }
+
+    public void setSplits(List<ISplit> splits) {
+        this.splits = splits;
+    }
+
+    public PatternFilter getFilter() {
+        return filter;
+    }
+
+    public void setFilter(PatternFilter filter) {
+        this.filter = filter;
+    }
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/IPullSource.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/IPullSource.java
new file mode 100644
index 00000000..6733911d
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/IPullSource.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.source;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.streams.common.channel.source.ISource;
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+
+/**
+ * poll message,need balance
+ */
+public interface IPullSource<T extends ISource> extends ISource<T> {
+
+    /**
+     * 拥有的分片格式
+     *
+     * @return
+     */
+    Collection<ISplit> ownerSplits();
+
+    /**
+     * get all split for the source
+     *
+     * @return
+     */
+    List<ISplit> fetchAllSplits();
+
+    /**
+     * get all split for the source
+     *
+     * @return
+     */
+    Map<String, ISplit> getAllSplitMap();
+
+    Long getPullIntervalMs();
+
+    /**
+     * get cusor from store
+     *
+     * @return
+     */
+    String loadSplitOffset(ISplit split);
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/MutilBatchTaskSource.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/MutilBatchTaskSource.java
new file mode 100644
index 00000000..d82ba32a
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/MutilBatchTaskSource.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.source;
+
+import com.alibaba.fastjson.JSON;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+import org.apache.rocketmq.streams.common.checkpoint.CheckPoint;
+import org.apache.rocketmq.streams.common.topology.ChainPipeline;
+import org.apache.rocketmq.streams.common.topology.model.Pipeline;
+import org.apache.rocketmq.streams.common.topology.task.TaskAssigner;
+import org.apache.rocketmq.streams.common.utils.DipperThreadLocalUtil;
+import org.apache.rocketmq.streams.common.utils.RuntimeUtil;
+import org.apache.rocketmq.streams.connectors.model.PullMessage;
+import org.apache.rocketmq.streams.connectors.reader.ISplitReader;
+import org.apache.rocketmq.streams.connectors.reader.SplitCloseFuture;
+
+public class MutilBatchTaskSource extends AbstractPullSource {
+
+    @Override protected ISplitReader createSplitReader(ISplit split) {
+        return new ISplitReader() {
+            protected transient ISplit split;
+            protected boolean isInterrupt;
+            protected boolean isClose;
+            protected transient AtomicLong offsetGenerator=new AtomicLong(1000000000);
+            @Override public void open(ISplit split) {
+                this.split=split;
+            }
+
+            @Override public boolean next() {
+                return true;
+            }
+
+            @Override public List<PullMessage> getMessage() {
+                PipelineSplit pipelineSplit=(PipelineSplit)split;
+                pipelineSplit.getQueue().startChannel();
+                return null;
+            }
+
+            @Override public SplitCloseFuture close() {
+                isClose=true;
+                return new SplitCloseFuture(this,split);
+            }
+
+            @Override public void seek(String cursor) {
+
+            }
+
+            @Override public String getProgress() {
+                return RuntimeUtil.getDipperInstanceId()+"_"+offsetGenerator.incrementAndGet();
+            }
+
+            @Override public long getDelay() {
+                return 0;
+            }
+
+            @Override public long getFetchedDelay() {
+                return getPullIntervalMs();
+            }
+
+            @Override public boolean isClose() {
+                return isClose;
+            }
+
+            @Override public ISplit getSplit() {
+                return split;
+            }
+
+            @Override public boolean isInterrupt() {
+                return isInterrupt;
+            }
+
+            @Override public boolean interrupt() {
+                isInterrupt=true;
+                return isInterrupt;
+            }
+        };
+    }
+
+    @Override protected boolean isNotDataSplit(String queueId) {
+        return false;
+    }
+
+    @Override public List<ISplit> fetchAllSplits() {
+
+        List<TaskAssigner> taskAssigners = configurableService.queryConfigurableByType(TaskAssigner.TYPE);
+        if (taskAssigners == null) {
+            return null;
+        }
+        String taskName = getConfigureName();
+        List<ISplit> splits=new ArrayList<>();
+        for (TaskAssigner taskAssigner : taskAssigners) {
+            if (!taskName.equals(taskAssigner.getTaskName())) {
+                continue;
+            }
+            String pipelineName = taskAssigner.getPipelineName();
+            if(pipelineName!=null){
+                ChainPipeline<?> pipeline = configurableService.queryConfigurable(Pipeline.TYPE, pipelineName);
+                if (pipeline != null) {
+                    splits.add(new PipelineSplit(pipeline));
+                }
+            }
+        }
+        return splits;
+    }
+
+
+    protected class PipelineSplit implements ISplit<PipelineSplit, ChainPipeline>{
+        protected ChainPipeline chainPipeline;
+        public PipelineSplit(ChainPipeline chainPipeline){
+            this.chainPipeline=chainPipeline;
+        }
+
+        @Override public String getQueueId() {
+            return chainPipeline.getConfigureName();
+        }
+
+        @Override public ChainPipeline getQueue() {
+            return chainPipeline;
+        }
+
+        @Override public int compareTo(PipelineSplit o) {
+            return chainPipeline.getConfigureName().compareTo(o.getQueueId());
+        }
+
+        @Override public String toJson() {
+            return chainPipeline.toJson();
+        }
+
+        @Override public void toObject(String jsonString) {
+            ChainPipeline pipeline=new ChainPipeline();
+            pipeline.toObject(jsonString);
+            this.chainPipeline=pipeline;
+        }
+    }
+
+    @Override
+    public String loadSplitOffset(ISplit split) {
+        return null;
+    }
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/SourceInstance.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/SourceInstance.java
new file mode 100644
index 00000000..c0da5b6e
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/SourceInstance.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.source;
+
+/**
+ * i个消息队列的实例,一个实例i个
+ */
+public class SourceInstance {
+    protected String sourceInstanceId;
+
+
+    public SourceInstance(String sourceInstanceId){
+        this.sourceInstanceId=sourceInstanceId;
+    }
+
+    public String getSourceInstanceId() {
+        return sourceInstanceId;
+    }
+
+    public void setSourceInstanceId(String sourceInstanceId) {
+        this.sourceInstanceId = sourceInstanceId;
+    }
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/AbstractPatternFilter.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/AbstractPatternFilter.java
new file mode 100644
index 00000000..0d5368a7
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/AbstractPatternFilter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.source.filter;
+
+import java.io.Serializable;
+
+/**
+ * @description
+ */
+public abstract class AbstractPatternFilter implements PatternFilter, Serializable {
+
+    private static final long serialVersionUID = 6500945777421871431L;
+
+    PatternFilter next;
+
+    public abstract boolean filter(String sourceName, String logicTableName, String tableName);
+
+
+    @Override
+    public PatternFilter setNext(PatternFilter filter) {
+        this.next = filter;
+        return this;
+    }
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/BoundedPatternFilter.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/BoundedPatternFilter.java
new file mode 100644
index 00000000..c06de98d
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/BoundedPatternFilter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.source.filter;
+
+import java.io.Serializable;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.connectors.model.ReaderStatus;
+
+/**
+ * @description 过滤掉已经完成的reader
+ */
+@Deprecated
+public class BoundedPatternFilter extends AbstractPatternFilter implements Serializable {
+
+    static final Log logger = LogFactory.getLog(BoundedPatternFilter.class);
+
+    @Override
+    public boolean filter(String sourceName, String logicTableName, String tableName) {
+
+        ReaderStatus readerStatus = ReaderStatus.queryReaderStatusByUK(sourceName, logicTableName + "_" + tableName);
+        if (readerStatus != null) {
+            logger.info(String.format("filter sourceName %s, logicTableName %s, suffix %s. ", sourceName, logicTableName, tableName));
+            logger.info(String.format("query result %s", readerStatus.toString()));
+            return true;
+        }
+        if (next == null) {
+            return false;
+        }
+        return next.filter(sourceName, logicTableName, tableName);
+    }
+
+    @Override
+    public PatternFilter setNext(PatternFilter filter) {
+        super.setNext(filter);
+        return this;
+    }
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CyclePatternFilter.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CyclePatternFilter.java
new file mode 100644
index 00000000..3a0193f3
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CyclePatternFilter.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.source.filter;
+
+import java.io.Serializable;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * @description 用来做分区选取
+ */
+public class CyclePatternFilter extends AbstractPatternFilter implements Serializable {
+
+    private static final long serialVersionUID = -5151597286296228754L;
+
+    public static final int INIT_CYCLE_VERSION = 0;
+
+    CyclePeriod cyclePeriod;
+
+    Date curCycleDateTime; //当前调度周期时间
+
+    long cycleId;
+
+    String firstStartTime; //当前最小时间
+
+    List<String> allPatterns;
+
+    String expression;
+
+    boolean isInit;
+
+    //历史数据读取时使用,表示比起当前相差多少个调度周期
+    final long cycleDiff;
+
+    //todo expr解析
+    public CyclePatternFilter(String expr, Date date) throws ParseException {
+        expression = expr;
+        cycleId = INIT_CYCLE_VERSION;
+        cyclePeriod = CyclePeriod.getInstance(expression);
+        curCycleDateTime = calCycleDateTime(date);
+        allPatterns = new ArrayList<>();
+        isInit = true;
+        if(cyclePeriod.isHistory){
+            Date tmp = cyclePeriod.getHisDate();
+            cycleDiff = curCycleDateTime.getTime()/1000 * 1000 - tmp.getTime()/1000*1000;
+        }else{
+            cycleDiff = 0;
+        }
+    }
+
+
+    /**
+     *
+     * @return 返回date格式的调度周期时间
+     */
+    private Date calCycleDateTime(Date date){
+        return cyclePeriod.format(date);
+    }
+
+    private long calCycle(Date date){
+        Date tmp = calCycleDateTime(date);
+        if(tmp.getTime()/1000 == curCycleDateTime.getTime()/1000){
+            return cycleId;
+        }
+        return nextCycle(tmp);
+    }
+
+    private long nextCycle(Date date){
+        curCycleDateTime = date;
+        cycleId++;
+        calAllPattern();
+        return cycleId;
+    }
+
+    private void calAllPattern(){
+        allPatterns.clear();
+        for(int i = 1; i <= cyclePeriod.getCycle(); i++){
+            long d = (curCycleDateTime.getTime()/1000)*1000 - i * cyclePeriod.getInterval() - cycleDiff;
+            String s = cyclePeriod.getDateFormat().format(new Date(d));
+            allPatterns.add(s);
+        }
+        firstStartTime = allPatterns.get(allPatterns.size() - 1);
+    }
+
+    public boolean isNextCycle(Date date){
+        if(isInit){
+            isInit = false;
+            calAllPattern();
+            return true;
+        }
+        long tmp = cycleId;
+        return calCycle(date) > tmp;
+    }
+
+    public List<String> getAllPatterns() {
+        return allPatterns;
+    }
+
+    public long getCycleId() {
+        return cycleId;
+    }
+
+    public Date getCurCycleDateTime(){
+        return curCycleDateTime;
+    }
+
+    public String getCurCycleDateTimeStr(){
+        return cyclePeriod.getDateFormat().format(curCycleDateTime);
+    }
+
+    public long getCycleDiff() {
+        return cycleDiff;
+    }
+
+    public long getCyclePeriodDiff(){
+        return cycleDiff/cyclePeriod.getInterval();
+    }
+
+    public int getCycle(){
+        return cyclePeriod.getCycle();
+    }
+
+    public String getFirstStartTime() {
+        return firstStartTime;
+    }
+
+    @Override
+    public boolean filter(String sourceName, String logicTableName, String tableName) {
+        return allPatterns.contains(tableName);
+    }
+
+
+
+    public static void main(String[] args) throws ParseException {
+
+        CyclePatternFilter cycle = new CyclePatternFilter("yyyyMMddHHmm - 15m", new Date());
+        System.out.println(cycle);
+
+        System.out.println(cycle.filter(null, null, "202109131650"));
+        System.out.println(cycle.filter(null, null, "20210902000000"));
+        System.out.println(cycle.filter(null, null, "20210908000000"));
+        System.out.println(cycle.filter(null, null, "20210910000000"));
+        System.out.println(cycle.filter(null, null, "20210909230000"));
+
+        System.out.println(new SimpleDateFormat("yyyyMMddHH").parse("2021090923"));
+        System.out.println(new SimpleDateFormat("yyyyMMddhhmmss").parse("20210909230000"));
+        System.out.println(new SimpleDateFormat("yyyyMMddHHmmss").parse("20210909100000"));
+        System.out.println(new SimpleDateFormat("yyyyMMddhhmmss").parse("20210909100000"));
+
+
+
+
+    }
+
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CyclePeriod.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CyclePeriod.java
new file mode 100644
index 00000000..4e6cdd6a
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CyclePeriod.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.source.filter;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @Description
+ */
+public enum CyclePeriod {
+
+    CYCLE_PERIOD_DATE() {
+        @Override
+        void argsParser(String expr) throws ParseException {
+            super.argsParser(expr);
+            interval = 24 * 3600 * 1000;
+            int length = expr.length();
+            if (length == 8 && checkFormat(expr, PatternFilter.yyyyMMdd)) {
+                format = PatternFilter.yyyyMMdd;
+            } else if (length == 14 && checkFormat(expr, PatternFilter.yyyyMMddHHmmss)) {
+                format = PatternFilter.yyyyMMddHHmmss;
+            } else {
+                throw new RuntimeException(String.format("unsupported format : %s, only support yyyymmdd 、 yyyymmddhhmmss.", expr));
+            }
+        }
+
+        @Override
+        Date format(Date strDate) {
+            Date date = new Date(strDate.getTime());
+            date.setHours(0);
+            date.setMinutes(0);
+            date.setSeconds(0);
+            return date;
+        }
+
+    },
+    CYCLE_PERIOD_HOUR() {
+        @Override
+        void argsParser(String expr) throws ParseException {
+            super.argsParser(expr);
+            interval = 3600 * 1000;
+
+            int length = expr.length();
+            if (length == 10 && checkFormat(expr, PatternFilter.yyyyMMddHH)) {
+                format = PatternFilter.yyyyMMddHH;
+            } else if (length == 14 && checkFormat(expr, PatternFilter.yyyyMMddHHmmss)) {
+                format = PatternFilter.yyyyMMddHHmmss;
+            } else {
+                throw new RuntimeException(String.format("unsupported format : %s, only support yyyymmdd 、 yyyymmddhhmmss.", expr));
+            }
+        }
+
+        @Override
+        Date format(Date strDate) {
+            Date date = new Date(strDate.getTime());
+            date.setMinutes(0);
+            date.setSeconds(0);
+            return date;
+        }
+
+    },
+    CYCLE_PERIOD_MINUTE() {
+        @Override
+        void argsParser(String expr) throws ParseException {
+            super.argsParser(expr);
+            interval = 60 * 1000;
+            int length = expr.length();
+            if (length == 12 && checkFormat(expr, PatternFilter.yyyyMMddHHmm)) {
+                format = PatternFilter.yyyyMMddHHmm;
+            } else if (length == 14 && checkFormat(expr, PatternFilter.yyyyMMddHHmmss)) {
+                format = PatternFilter.yyyyMMddHHmmss;
+            } else {
+                throw new RuntimeException(String.format("unsupported format : %s, only support yyyymmdd 、 yyyymmddhhmmss.", expr));
+            }
+        }
+
+        @Override
+        Date format(Date strDate) {
+            Date date = new Date(strDate.getTime());
+            date.setSeconds(0);
+            return date;
+        }
+
+    };
+
+    boolean isHistory = false;
+
+    long interval;
+
+    int cycle;
+
+    String format;
+
+    String hisDateString;
+
+    static final Log logger = LogFactory.getLog(CyclePeriod.class);
+
+    void argsParser(String expr) throws ParseException {
+        if (expr.matches("^\\d+$")) {
+            isHistory = true;
+            hisDateString = expr;
+        }
+    }
+
+    Date format(Date strDate) {
+        throw new RuntimeException(String.format("unsupported type.", strDate));
+    }
+
+    /**
+     * expr可能是yyyymmdd 或者 20210917
+     *
+     * @param expr
+     * @param format
+     * @return
+     */
+    final boolean checkFormat(String expr, String format) {
+
+        if (!isHistory) {
+            return expr.equalsIgnoreCase(format);
+        }
+
+        try {
+            new SimpleDateFormat(format).parse(expr);
+            return true;
+        } catch (ParseException e) {
+            logger.error(String.format("error format, expr is %s, format is %s.", expr, format));
+            e.printStackTrace();
+            return false;
+        }
+    }
+
+    public Date getHisDate() throws ParseException {
+        return getDateFormat().parse(hisDateString);
+    }
+
+    public SimpleDateFormat getDateFormat() {
+        return new SimpleDateFormat(format);
+    }
+
+    public long getInterval() {
+        return interval;
+    }
+
+    public boolean isHistory() {
+        return isHistory;
+    }
+
+    public void setHistory(boolean history) {
+        isHistory = history;
+    }
+
+    public void setInterval(long interval) {
+        this.interval = interval;
+    }
+
+    public int getCycle() {
+        return cycle;
+    }
+
+    public void setCycle(int cycle) {
+        this.cycle = cycle;
+    }
+
+    public void setFormat(String format) {
+        this.format = format;
+    }
+
+    public String getFormat() {
+        return format;
+    }
+
+    public String getHisDateString() {
+        return hisDateString;
+    }
+
+    public void setHisDateString(String hisDateString) {
+        this.hisDateString = hisDateString;
+    }
+
+    public static CyclePeriod getInstance(String expression) throws ParseException {
+
+        String[] str = expression.split("\\-");
+        assert str.length == 2 : String.format("expression error : %s. ", expression);
+        String expr = str[0].trim();
+        String tmp = str[1].trim().toLowerCase();
+        String cycleStr = tmp.substring(0, tmp.length() - 1);
+        int cycle = Integer.parseInt(cycleStr);
+        CyclePeriod cyclePeriod = null;
+        if (tmp.endsWith("d")) {
+            cyclePeriod = CYCLE_PERIOD_DATE;
+        } else if (tmp.endsWith("h")) {
+            cyclePeriod = CYCLE_PERIOD_HOUR;
+        } else if (tmp.endsWith("m")) {
+            cyclePeriod = CYCLE_PERIOD_MINUTE;
+        } else {
+            new RuntimeException(String.format("unsupported format : %s", expression));
+        }
+        cyclePeriod.argsParser(expr);
+        cyclePeriod.cycle = cycle;
+
+        return cyclePeriod;
+    }
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CycleSchedule.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CycleSchedule.java
new file mode 100644
index 00000000..ba9a2797
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CycleSchedule.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.source.filter;
+
+import java.io.Serializable;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
+
+/**
+ * @description 用来做分区选取
+ */
+public class CycleSchedule implements Serializable {
+
+    private static final long serialVersionUID = -5151597286296228754L;
+    public static final int INIT_CYCLE_VERSION = 0;
+    private static CycleSchedule INSTANCE;
+    CyclePeriod cyclePeriod;
+    AtomicLong cycleId = new AtomicLong(0);
+    String expression;
+    boolean isInit;
+    //历史数据读取时使用,表示比起当前相差多少个调度周期
+    final long cycleDiff;
+
+    public CycleSchedule(String expr, Date date) throws ParseException {
+        Date local = subMs(date);
+        expression = expr;
+        cycleId.set(INIT_CYCLE_VERSION);
+        cyclePeriod = CyclePeriod.getInstance(expression);
+        isInit = true;
+        if (cyclePeriod.isHistory) {
+            Date curCycleDateTime = calCycleDateTime(local);
+            Date tmp = subMs(cyclePeriod.getHisDate());
+            cycleDiff = curCycleDateTime.getTime() - tmp.getTime();
+        } else {
+            cycleDiff = 0;
+        }
+    }
+
+    /**
+     * 去掉毫秒时间戳
+     *
+     * @param date
+     * @return
+     */
+    private Date subMs(Date date) {
+        long time = date.getTime() / 1000 * 1000;
+        return new Date(time);
+    }
+
+    /**
+     * @return 返回date格式的调度周期时间
+     */
+    private Date calCycleDateTime(Date date) {
+        return cyclePeriod.format(date);
+    }
+
+    public Cycle nextCycle(Date date) {
+        Date local = subMs(date);
+        local = cyclePeriod.format(local);
+        if (isInit) {
+            isInit = false;
+        } else {
+            cycleId.incrementAndGet();
+        }
+        List<String> ret = calAllPattern(local);
+        Cycle cycle = new Cycle();
+        cycle.setCycleId(cycleId.get());
+        cycle.setAllPattern(ret);
+        cycle.setCycleDateStr(calCycleDateStr(local));
+        cycle.setCycleCount(cyclePeriod.getCycle());
+        cycle.setCurDateStr(cyclePeriod.getDateFormat().format(local));
+        cycle.setCycleDiff(cycleDiff);
+        return cycle;
+    }
+
+    private String calCycleDateStr(Date date) {
+        long d = date.getTime() - cycleDiff;
+        Date d1 = new Date(d);
+        return cyclePeriod.getDateFormat().format(d1);
+    }
+
+    private List<String> calAllPattern(Date date) {
+        List<String> allPatterns = new ArrayList<>();
+        for (int i = 1; i <= cyclePeriod.getCycle(); i++) {
+            long d = date.getTime() - i * cyclePeriod.getInterval() - cycleDiff;
+            String s = cyclePeriod.getDateFormat().format(new Date(d));
+            allPatterns.add(s);
+        }
+        return allPatterns;
+    }
+
+    public CyclePeriod getCyclePeriod() {
+        return cyclePeriod;
+    }
+
+    public void setCyclePeriod(CyclePeriod cyclePeriod) {
+        this.cyclePeriod = cyclePeriod;
+    }
+
+    public AtomicLong getCycleId() {
+        return cycleId;
+    }
+
+    public void setCycleId(AtomicLong cycleId) {
+        this.cycleId = cycleId;
+    }
+
+    public String getExpression() {
+        return expression;
+    }
+
+    public void setExpression(String expression) {
+        this.expression = expression;
+    }
+
+    public boolean isInit() {
+        return isInit;
+    }
+
+    public void setInit(boolean init) {
+        isInit = init;
+    }
+
+    public long getCycleDiff() {
+        return cycleDiff;
+    }
+
+    public static CycleSchedule getInstance(String expr, Date date) {
+        if (INSTANCE == null) {
+            synchronized (CycleSchedule.class) {
+                if (INSTANCE == null) {
+                    try {
+                        INSTANCE = new CycleSchedule(expr, date);
+                    } catch (ParseException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+        }
+        return INSTANCE;
+    }
+
+    public static class Cycle extends BasedConfigurable implements Serializable {
+
+        private static final long serialVersionUID = 4842560538716388622L;
+        Long cycleId;
+        List<String> allPattern;
+        String cycleDateStr;
+        Integer cycleCount;
+        String curDateStr;
+        long cycleDiff;
+
+        public Integer getCycleCount() {
+            return cycleCount;
+        }
+
+        public void setCycleCount(Integer cycleCount) {
+            this.cycleCount = cycleCount;
+        }
+
+        public Cycle() {
+        }
+
+        public Long getCycleId() {
+            return cycleId;
+        }
+
+        public void setCycleId(Long cycleId) {
+            this.cycleId = cycleId;
+        }
+
+        public List<String> getAllPattern() {
+            return allPattern;
+        }
+
+        public void setAllPattern(List<String> allPattern) {
+            this.allPattern = allPattern;
+        }
+
+        public String getCycleDateStr() {
+            return cycleDateStr;
+        }
+
+        public void setCycleDateStr(String cycleDateStr) {
+            this.cycleDateStr = cycleDateStr;
+        }
+
+        public String getCurDateStr() {
+            return curDateStr;
+        }
+
+        public void setCurDateStr(String curDateStr) {
+            this.curDateStr = curDateStr;
+        }
+
+        public long getCycleDiff() {
+            return cycleDiff;
+        }
+
+        public void setCycleDiff(long cycleDiff) {
+            this.cycleDiff = cycleDiff;
+        }
+
+        @Override
+        public String toString() {
+            return "Cycle{" +
+                "cycleId=" + cycleId +
+                ", cycleDateStr='" + cycleDateStr + '\'' +
+                ", cycleCount=" + cycleCount +
+                ", curDateStr='" + curDateStr + '\'' +
+                ", cycleDiff=" + cycleDiff +
+                ", allPattern=" + Arrays.toString(allPattern.toArray()) +
+                '}';
+        }
+    }
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CycleScheduleFilter.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CycleScheduleFilter.java
new file mode 100644
index 00000000..507739d8
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CycleScheduleFilter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.source.filter;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * @description
+ */
+public class CycleScheduleFilter extends AbstractPatternFilter implements Serializable {
+
+    List<String> allPattern;
+
+    public CycleScheduleFilter(List<String> allPattern){
+        this.allPattern = allPattern;
+    }
+
+    @Override
+    public boolean filter(String sourceName, String logicTableName, String tableName) {
+        return allPattern.contains(tableName);
+    }
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/DataFormatPatternFilter.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/DataFormatPatternFilter.java
new file mode 100644
index 00000000..0cdc0762
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/DataFormatPatternFilter.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.source.filter;
+
+import java.io.Serializable;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @description
+ */
+public class DataFormatPatternFilter extends AbstractPatternFilter implements Serializable {
+
+    private static final long serialVersionUID = 3604787588465242642L;
+
+    static final Log logger = LogFactory.getLog(DataFormatPatternFilter.class);
+
+    static final String yyyyMMddHHmmss = "yyyyMMddHHmmss";
+    static final String yyyyMMdd = "yyyyMMdd";
+    static final String yyyyMMddHH = "yyyyMMddHH";
+
+    SimpleDateFormat format1 = new SimpleDateFormat(yyyyMMdd);
+    SimpleDateFormat format2 = new SimpleDateFormat(yyyyMMddHH);
+    SimpleDateFormat format3 = new SimpleDateFormat(yyyyMMddHHmmss);
+
+    @Override
+    public boolean filter(String sourceName, String logicTableName, String tableNameSuffix) {
+
+        int len = tableNameSuffix.length();
+        boolean isFilter = false;
+
+        switch (len) {
+            case 8:
+                try {
+                    format1.parse(tableNameSuffix);
+                    isFilter = true;
+                } catch (ParseException e) {
+                    e.printStackTrace();
+                    isFilter = false;
+                }
+                break;
+            case 10:
+                try {
+                    format2.parse(tableNameSuffix);
+                    isFilter = true;
+                } catch (ParseException e) {
+                    e.printStackTrace();
+                    isFilter = false;
+                }
+                break;
+            case 14:
+                try {
+                    format3.parse(tableNameSuffix);
+                    isFilter = true;
+                } catch (ParseException e) {
+                    e.printStackTrace();
+                    isFilter = false;
+                }
+                break;
+        }
+
+        if (isFilter) {
+            logger.info(String.format("filter sourceName %s, logicTableName %s, suffix %s", sourceName, logicTableName, tableNameSuffix));
+            return true;
+        }
+        if (next != null) {
+            return next.filter(sourceName, logicTableName, tableNameSuffix);
+        }
+        return false;
+    }
+
+    @Override
+    public PatternFilter setNext(PatternFilter filter) {
+        super.setNext(filter);
+        return this;
+    }
+
+    public PatternFilter getNext() {
+        return next;
+    }
+
+    public static void main(String[] args) {
+        DataFormatPatternFilter filter = new DataFormatPatternFilter();
+//        System.out.println(filter.filter("20200101"));
+//        System.out.println(filter.filter("2020010101"));
+//        System.out.println(filter.filter("20200101010101"));
+
+    }
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/PatternFilter.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/PatternFilter.java
new file mode 100644
index 00000000..42365007
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/PatternFilter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.source.filter;
+
+/**
+ * @description
+ */
+public interface PatternFilter {
+
+    String yyyyMMddHHmmss = "yyyyMMddHHmmss";
+    String yyyyMMdd = "yyyyMMdd";
+    String yyyyMMddHH = "yyyyMMddHH";
+    String yyyyMMddHHmm = "yyyyMMddHHmm";
+
+
+    /**
+     * 根据sourceName和tableName判断是否符合
+     * @param sourceName
+     * @param tableName
+     * @return
+     */
+    boolean filter(String sourceName, String logicTableName, String tableName);
+
+    PatternFilter setNext(PatternFilter filter);
+
+
+}
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java
index cf09bf48..4aa86ae4 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java
@@ -127,7 +127,7 @@ public class WindowMaxValueProcessor {
         }
 
         String keyPrefix = MapKeyUtil.createKey(name, splitId);
-        String sql="select * from "+ ORMUtil.getTableName(WindowMaxValue.class)+ " where msg_key like '"+keyPrefix+"%'";
+        String sql = "select * from " + ORMUtil.getTableName(WindowMaxValue.class) + " where configure_name like '%" + name + "%' and partition like '%" + splitId + "%'";
         List<WindowMaxValue> windowMaxValues = ORMUtil.queryForList(sql, null, WindowMaxValue.class);
         if (windowMaxValues == null || windowMaxValues.size() == 0) {
             return result;