You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ka...@apache.org on 2022/06/21 03:55:02 UTC
[rocketmq-streams] 03/16: merge 1.0
This is an automated email from the ASF dual-hosted git repository.
karp pushed a commit to branch snapshot-1.0.4
in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
commit 9d3ae58bf1f301a7f31d04256ddbe04ec94899b5
Author: 维章 <un...@gmail.com>
AuthorDate: Mon May 23 11:58:39 2022 +0800
merge 1.0
---
pom.xml | 1 +
rocketmq-streams-connectors/pom.xml | 47 ++++
.../streams/connectors/IBoundedSource.java | 32 +++
.../streams/connectors/IBoundedSourceReader.java | 26 ++
.../streams/connectors/IScheduleCallback.java | 24 ++
.../connectors/balance/AbstractBalance.java | 207 ++++++++++++++
.../streams/connectors/balance/IBalanceTask.java | 24 ++
.../streams/connectors/balance/ISourceBalance.java | 60 ++++
.../streams/connectors/balance/SplitChanged.java | 55 ++++
.../connectors/balance/impl/LeaseBalanceImpl.java | 144 ++++++++++
.../streams/connectors/model/PullMessage.java | 50 ++++
.../streams/connectors/model/ReaderStatus.java | 120 ++++++++
.../streams/connectors/reader/DBScanReader.java | 269 ++++++++++++++++++
.../streams/connectors/reader/ISplitReader.java | 96 +++++++
.../connectors/reader/SplitCloseFuture.java | 83 ++++++
.../connectors/source/AbstractPullSource.java | 312 +++++++++++++++++++++
.../source/CycleDynamicMultipleDBScanSource.java | 213 ++++++++++++++
.../source/DynamicMultipleDBScanSource.java | 190 +++++++++++++
.../streams/connectors/source/IPullSource.java | 60 ++++
.../connectors/source/MutilBatchTaskSource.java | 158 +++++++++++
.../streams/connectors/source/SourceInstance.java | 37 +++
.../source/filter/AbstractPatternFilter.java | 38 +++
.../source/filter/BoundedPatternFilter.java | 53 ++++
.../source/filter/CyclePatternFilter.java | 173 ++++++++++++
.../connectors/source/filter/CyclePeriod.java | 222 +++++++++++++++
.../connectors/source/filter/CycleSchedule.java | 236 ++++++++++++++++
.../source/filter/CycleScheduleFilter.java | 37 +++
.../source/filter/DataFormatPatternFilter.java | 106 +++++++
.../connectors/source/filter/PatternFilter.java | 41 +++
.../window/offset/WindowMaxValueProcessor.java | 2 +-
30 files changed, 3115 insertions(+), 1 deletion(-)
diff --git a/pom.xml b/pom.xml
index 9da26dcd..7caa0cbc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -52,6 +52,7 @@
<module>rocketmq-streams-channel-http</module>
<module>rocketmq-streams-state</module>
<module>rocketmq-streams-examples</module>
+ <module>rocketmq-streams-connectors</module>
<module>rocketmq-streams-channel-syslog</module>
<module>rocketmq-streams-channel-es</module>
<module>rocketmq-streams-channel-kafka</module>
diff --git a/rocketmq-streams-connectors/pom.xml b/rocketmq-streams-connectors/pom.xml
new file mode 100644
index 00000000..d544e3d5
--- /dev/null
+++ b/rocketmq-streams-connectors/pom.xml
@@ -0,0 +1,47 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-streams</artifactId>
+ <version>1.0.2-SNAPSHOT</version>
+ </parent>
+ <artifactId>rocketmq-streams-connectors</artifactId>
+ <packaging>jar</packaging>
+ <name>ROCKETMQ STREAMS :: connectors</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-streams-lease</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-streams-schedule</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-streams-commons</artifactId>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/IBoundedSource.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/IBoundedSource.java
new file mode 100644
index 00000000..1a383433
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/IBoundedSource.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors;
+
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+
+/**
+ * @description
+ */
+public interface IBoundedSource{
+
+ /**
+ * reader完成时调用
+ * @param iSplit
+ */
+ void boundedFinishedCallBack(ISplit iSplit);
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/IBoundedSourceReader.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/IBoundedSourceReader.java
new file mode 100644
index 00000000..03995c31
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/IBoundedSourceReader.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors;
+
+import org.apache.rocketmq.streams.common.interfaces.ILifeCycle;
+
+/**
+ * @description
+ */
+public interface IBoundedSourceReader extends ILifeCycle {
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/IScheduleCallback.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/IScheduleCallback.java
new file mode 100644
index 00000000..88fb2cfd
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/IScheduleCallback.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors;
+
+import java.util.Date;
+
+public interface IScheduleCallback {
+
+ boolean canFire(Date date);
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/AbstractBalance.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/AbstractBalance.java
new file mode 100644
index 00000000..5c2b266f
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/AbstractBalance.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.balance;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+import org.apache.rocketmq.streams.connectors.source.SourceInstance;
+
+public abstract class AbstractBalance implements ISourceBalance {
+
+ protected int balanceCount = 0;
+
+ @Override
+ public SplitChanged doBalance(List<ISplit> allSplits, List<ISplit> ownerSplits) {
+ balanceCount++;
+ heartBeat();
+ List<SourceInstance> sourceInstances = fetchSourceInstances();
+ List<ISplit> workingSplits = fetchWorkingSplits(allSplits);
+ SplitChanged splitChanged = getAdditionSplits(allSplits, sourceInstances, workingSplits, ownerSplits);
+ if (splitChanged != null) {
+ return splitChanged;
+ }
+ splitChanged = getRemoveSplits(allSplits, sourceInstances, workingSplits, ownerSplits);
+ return splitChanged;
+ }
+
+ protected void heartBeat() {
+ holdLockSourceInstance();
+ }
+
+ /**
+ * get all dispatch splits
+ *
+ * @return
+ */
+ protected abstract List<ISplit> fetchWorkingSplits(List<ISplit> allSplitS);
+
+ /**
+ * get all instacne for the source
+ *
+ * @return
+ */
+ protected abstract List<SourceInstance> fetchSourceInstances();
+
+ /**
+ * lock the source ,the lock is globel,only one source instance can get it in same time
+ *
+ * @return
+ */
+ protected abstract boolean holdLockSourceInstance();
+
+ /**
+ * unlock
+ */
+ protected abstract void unlockSourceInstance();
+
+ /**
+ * juge need add split,根据调度策略选择
+ * 每次最大增加的分片数,根据调度次数决定
+ *
+ * @param allSplits
+ * @param sourceInstances
+ * @param workingSplits
+ * @return
+ */
+ protected SplitChanged getAdditionSplits(List<ISplit> allSplits, List<SourceInstance> sourceInstances,
+ List<ISplit> workingSplits, List<ISplit> ownerSplits) {
+ SplitChanged splitChanged = getChangedSplitCount(allSplits, sourceInstances, workingSplits.size(), ownerSplits.size());
+ if (splitChanged == null) {
+ return null;
+ }
+ if (splitChanged.isNewSplit == false) {
+ return null;
+ }
+ if (splitChanged.splitCount <= 0) {
+ return null;
+ }
+ List<ISplit> noWorkingSplits = getNoWorkingSplits(allSplits, workingSplits);
+ List<ISplit> newSplits = new ArrayList<>();
+ for (int i = 0; i < noWorkingSplits.size(); i++) {
+ boolean success = holdLockSplit(noWorkingSplits.get(i));
+ if (success) {
+ newSplits.add(noWorkingSplits.get(i));
+ if (newSplits.size() >= splitChanged.splitCount) {
+ break;
+ }
+ }
+ }
+ splitChanged.setChangedSplits(newSplits);
+ return splitChanged;
+
+ }
+
+ protected List<ISplit> getNoWorkingSplits(List<ISplit> allSplits, List<ISplit> workingSplits) {
+ Set<String> workingSplitIds = new HashSet<>();
+ for (ISplit split : workingSplits) {
+ workingSplitIds.add(split.getQueueId());
+ }
+ List<ISplit> splits = new ArrayList<>();
+ for (ISplit split : allSplits) {
+ if (!workingSplitIds.contains(split.getQueueId())) {
+ splits.add(split);
+ }
+ }
+ return splits;
+ }
+
+ /**
+ * 获取需要删除的分片
+ *
+ * @param allSplits
+ * @param sourceInstances
+ * @param workingSplits
+ * @return
+ */
+ protected SplitChanged getRemoveSplits(List<ISplit> allSplits, List<SourceInstance> sourceInstances,
+ List<ISplit> workingSplits, List<ISplit> ownerSplits) {
+ SplitChanged splitChanged = getChangedSplitCount(allSplits, sourceInstances, workingSplits.size(), ownerSplits.size());
+ if (splitChanged == null) {
+ return null;
+ }
+ if (splitChanged.isNewSplit == true) {
+ return null;
+ }
+
+ if (splitChanged.splitCount <= 0) {
+ return null;
+ }
+ //List<ISplit> ownerSplits=source.ownerSplits();
+ List<ISplit> removeSplits = new ArrayList();
+ for (int i = 0; i < splitChanged.splitCount; i++) {
+ removeSplits.add(ownerSplits.get(i));
+ }
+ splitChanged.setChangedSplits(removeSplits);
+ return splitChanged;
+ }
+
+ /**
+ * 获取需要变动的分片个数,新增或删除
+ * 分配策略,只有有未分配的分片时才会分配新分片,为了减少分片切换,前面几次尽可能少分,后面越来越多
+ *
+ * @return 需要本实例有变化的分配,新增或删除
+ */
+ protected SplitChanged getChangedSplitCount(List<ISplit> allSplits, List<SourceInstance> sourceInstances,
+ int splitCountInWorking, int ownerSplitCount) {
+ //int ownerSplitCount=source.ownerSplits().size();
+ int instanceCount = sourceInstances.size();
+ if (instanceCount == 0) {
+ instanceCount = 1;
+ }
+ int allSplitCount = allSplits.size();
+ int minSplitCount = allSplitCount / instanceCount;
+ int maxSplitCount = minSplitCount + (allSplitCount % instanceCount == 0 ? 0 : 1);
+ //已经是最大分片数了
+ if (ownerSplitCount == maxSplitCount) {
+ return null;
+ }
+ if (ownerSplitCount > maxSplitCount) {
+ int changeSplitCount = ownerSplitCount - maxSplitCount;
+ return new SplitChanged(changeSplitCount, false);
+ }
+ //分片已经全部在处理,当前分片也符合最小分片分配策略,不需要重新分配
+ if (splitCountInWorking == allSplitCount && ownerSplitCount >= minSplitCount) {
+ return null;
+ }
+ //如果还有未分配的分片,且当前实例还有分片的可行性,则分配分片
+ if (splitCountInWorking < allSplitCount && ownerSplitCount < maxSplitCount) {
+ int changeSplitCount = Math.min(maxSplitCount - ownerSplitCount, getMaxSplitCountInOneBalance());
+
+ return new SplitChanged(changeSplitCount, true);
+ }
+ return null;
+ }
+
+ @Override
+ public int getBalanceCount() {
+ return balanceCount;
+ }
+
+ /**
+ * 每次负载均衡最大的分片个数,目的是前几次,少分配分配,可能有实例在启动中,以免频繁切换分片,到后面实例都启动了,斤可能多分配分片
+ *
+ * @return
+ */
+ private int getMaxSplitCountInOneBalance() {
+ int balanceCount = getBalanceCount();
+ return (int) Math.pow(2, balanceCount - 1);
+ }
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/IBalanceTask.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/IBalanceTask.java
new file mode 100644
index 00000000..5275fe48
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/IBalanceTask.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.balance;
+
+/**
+ * @description
+ */
+public interface IBalanceTask extends Runnable {
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/ISourceBalance.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/ISourceBalance.java
new file mode 100644
index 00000000..b012b323
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/ISourceBalance.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.balance;
+
+import java.util.List;
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+
+public interface ISourceBalance {
+
+ /**
+ * 做负载均衡
+
+ * @return
+ */
+ SplitChanged doBalance(List<ISplit> allSplits, List<ISplit> ownerSplits);
+
+ /**
+ * 从启动开始,做了多少次均衡
+ * @return
+ */
+ int getBalanceCount();
+
+
+
+ boolean getRemoveSplitLock();
+
+ void unLockRemoveSplitLock();
+
+ /**
+ * lock the split and hold it util the instance is shutdown or remove split
+ * @param split
+ * @return
+ */
+ boolean holdLockSplit(ISplit split);
+
+ /**
+ * unlock split lock
+ * @param split
+ */
+ void unlockSplit(ISplit split);
+
+
+ void setSourceIdentification(String sourceIdentification);
+
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/SplitChanged.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/SplitChanged.java
new file mode 100644
index 00000000..c01c1519
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/SplitChanged.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.balance;
+
+import java.util.List;
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+
+public class SplitChanged {
+
+ protected int splitCount;//变动多分片个数
+ protected boolean isNewSplit;//是否新增,false是删除
+ protected List<ISplit> changedSplits;
+ public SplitChanged(int splitCount,boolean isNewSplit){
+ this.splitCount=splitCount;
+ this.isNewSplit=isNewSplit;
+ }
+
+ public int getSplitCount() {
+ return splitCount;
+ }
+
+ public void setSplitCount(int splitCount) {
+ this.splitCount = splitCount;
+ }
+
+ public boolean isNewSplit() {
+ return isNewSplit;
+ }
+
+ public void setNewSplit(boolean newSplit) {
+ isNewSplit = newSplit;
+ }
+
+ public List<ISplit> getChangedSplits() {
+ return changedSplits;
+ }
+
+ public void setChangedSplits(List<ISplit> changedSplits) {
+ this.changedSplits = changedSplits;
+ }
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/impl/LeaseBalanceImpl.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/impl/LeaseBalanceImpl.java
new file mode 100644
index 00000000..dc504e5d
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/balance/impl/LeaseBalanceImpl.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.balance.impl;
+
+import com.google.auto.service.AutoService;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+import org.apache.rocketmq.streams.common.model.ServiceName;
+import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
+import org.apache.rocketmq.streams.common.utils.RuntimeUtil;
+import org.apache.rocketmq.streams.connectors.balance.AbstractBalance;
+import org.apache.rocketmq.streams.connectors.balance.ISourceBalance;
+import org.apache.rocketmq.streams.connectors.source.SourceInstance;
+import org.apache.rocketmq.streams.lease.LeaseComponent;
+import org.apache.rocketmq.streams.lease.model.LeaseInfo;
+import org.apache.rocketmq.streams.lease.service.ILeaseService;
+
+@AutoService(ISourceBalance.class)
+@ServiceName(LeaseBalanceImpl.DB_BALANCE_NAME)
+public class LeaseBalanceImpl extends AbstractBalance {
+
+ private static final Log logger = LogFactory.getLog(LeaseBalanceImpl.class);
+
+ public static final String DB_BALANCE_NAME = "db_balance";
+ private static final String REMOVE_SPLIT_LOCK_NAME = "lock_remove_split";
+ private static final String SOURCE_LOCK_PREFIX = "SOURCE_";
+ private static final String SPLIT_LOCK_PREFIX = "SPLIT_";
+ protected transient LeaseComponent leaseComponent = LeaseComponent.getInstance();
+ protected transient String sourceIdentification;
+
+ protected int lockTimeSecond = 5;
+
+ public LeaseBalanceImpl(String sourceIdentification) {
+
+ this.sourceIdentification = sourceIdentification;
+ }
+
+ public LeaseBalanceImpl() {
+
+ }
+
+ @Override
+ protected List<ISplit> fetchWorkingSplits(List<ISplit> allSplits) {
+ List<LeaseInfo> leaseInfos = leaseComponent.getService().queryLockedInstanceByNamePrefix(SPLIT_LOCK_PREFIX + this.sourceIdentification, null);
+ logger.info(String.format("lease SPLIT_LOCK_PREFIX is %s, sourceIdentification is %s. ", SPLIT_LOCK_PREFIX, sourceIdentification));
+ if (leaseInfos == null) {
+ return new ArrayList<>();
+ }
+
+ Map<String, ISplit> allSplitMap = new HashMap<>();
+ for (ISplit split : allSplits) {
+ allSplitMap.put(split.getQueueId(), split);
+ }
+ List<ISplit> splits = new ArrayList<>();
+ for (LeaseInfo leaseInfo : leaseInfos) {
+ String leaseName = leaseInfo.getLeaseName();
+ String splitId = MapKeyUtil.getLast(leaseName);
+ splits.add(allSplitMap.get(splitId));
+ }
+ logger.info(String.format("working split is %s", Arrays.toString(splits.toArray())));
+ return splits;
+ }
+
+ @Override
+ protected List<SourceInstance> fetchSourceInstances() {
+ List<LeaseInfo> leaseInfos = leaseComponent.getService().queryLockedInstanceByNamePrefix(SOURCE_LOCK_PREFIX + sourceIdentification, null);
+ if (leaseInfos == null) {
+ return new ArrayList<>();
+ }
+ List<SourceInstance> sourceInstances = new ArrayList<>();
+ for (LeaseInfo leaseInfo : leaseInfos) {
+ String leaseName = leaseInfo.getLeaseName();
+ sourceInstances.add(new SourceInstance(leaseName));
+ }
+ return sourceInstances;
+ }
+
+ @Override
+ protected boolean holdLockSourceInstance() {
+ return holdLock(SOURCE_LOCK_PREFIX + sourceIdentification, RuntimeUtil.getDipperInstanceId());
+ }
+
+ @Override
+ protected void unlockSourceInstance() {
+ leaseComponent.getService().unlock(SOURCE_LOCK_PREFIX + sourceIdentification, RuntimeUtil.getDipperInstanceId());
+ }
+
+ @Override
+ public boolean holdLockSplit(ISplit split) {
+ return holdLock(SPLIT_LOCK_PREFIX + this.sourceIdentification, split.getQueueId());
+ }
+
+ @Override
+ public void unlockSplit(ISplit split) {
+ leaseComponent.getService().unlock(SPLIT_LOCK_PREFIX + this.sourceIdentification, split.getQueueId());
+
+ }
+
+ @Override
+ public boolean getRemoveSplitLock() {
+ return holdLock(this.sourceIdentification, REMOVE_SPLIT_LOCK_NAME);
+ }
+
+ @Override
+ public void unLockRemoveSplitLock() {
+ leaseComponent.getService().unlock(this.sourceIdentification, REMOVE_SPLIT_LOCK_NAME);
+ }
+
+ public String getSourceIdentification() {
+ return sourceIdentification;
+ }
+
+ @Override
+ public void setSourceIdentification(String sourceIdentification) {
+ this.sourceIdentification = sourceIdentification;
+ }
+
+ protected boolean holdLock(String name, String lockName) {
+ ILeaseService leaseService = leaseComponent.getService();
+ boolean success = leaseService.holdLock(name, lockName, lockTimeSecond);
+ return success;
+ }
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/model/PullMessage.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/model/PullMessage.java
new file mode 100644
index 00000000..9bf34803
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/model/PullMessage.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.model;
+
+import org.apache.rocketmq.streams.common.context.MessageOffset;
+
+public class PullMessage<T> {
+ protected T message;
+ protected MessageOffset messageOffset;
+
+ public T getMessage() {
+ return message;
+ }
+
+ public void setMessage(T message) {
+ this.message = message;
+ }
+
+ public MessageOffset getMessageOffset() {
+ return messageOffset;
+ }
+
+ public void setMessageOffset(MessageOffset messageOffset) {
+ this.messageOffset = messageOffset;
+ }
+ /**
+ * 获取offset字符串,通过.把主offset和子offset串接在一起
+ * @return
+ */
+ public String getOffsetStr(){
+ return this.messageOffset.getOffsetStr();
+ }
+ public String getMainOffset() {
+ return messageOffset.getMainOffset();
+ }
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/model/ReaderStatus.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/model/ReaderStatus.java
new file mode 100644
index 00000000..a4889b5f
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/model/ReaderStatus.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.model;
+
+import java.util.Date;
+import java.util.List;
+import org.apache.rocketmq.streams.common.model.Entity;
+import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
+
+/**
+ * @description
+ */
+public class ReaderStatus extends Entity {
+
+ /**
+ * 查询单个readerStatus
+ */
+ static final String queryReaderStatusByUK = "select * from reader_status where source_name = '%s' and reader_name = '%s' and is_finished = 1";
+
+ static final String queryReaderStatusList = "select * from reader_status where source_name = '%s' and is_finished = 1";
+
+ static final String clearReaderStatus = "update reader_status set gmt_modified = now(), is_finished = -1 where source_name = '%s' and reader_name = '%s'";
+
+ String sourceName;
+
+ String readerName;
+
+ int isFinished;
+
+ int totalReader;
+
+ public String getReaderName() {
+ return readerName;
+ }
+
+ public void setReaderName(String readerName) {
+ this.readerName = readerName;
+ }
+
+ public int getIsFinished() {
+ return isFinished;
+ }
+
+ public void setIsFinished(int isFinished) {
+ this.isFinished = isFinished;
+ }
+
+ public int getTotalReader() {
+ return totalReader;
+ }
+
+ public void setTotalReader(int totalReader) {
+ this.totalReader = totalReader;
+ }
+
+ public String getSourceName() {
+ return sourceName;
+ }
+
+ public void setSourceName(String sourceName) {
+ this.sourceName = sourceName;
+ }
+
+ @Override
+ public String toString() {
+ return "ReaderStatus{" +
+ "id=" + id +
+ ", gmtCreate=" + gmtCreate +
+ ", gmtModified=" + gmtModified +
+ ", sourceName='" + sourceName + '\'' +
+ ", readerName='" + readerName + '\'' +
+ ", isFinished=" + isFinished +
+ ", totalReader=" + totalReader +
+ '}';
+ }
+
+ public static ReaderStatus queryReaderStatusByUK(String sourceName, String readerName) {
+ String sql = String.format(queryReaderStatusByUK, sourceName, readerName);
+ ReaderStatus readerStatus = ORMUtil.queryForObject(sql, null, ReaderStatus.class);
+ return readerStatus;
+ }
+
+ public static List<ReaderStatus> queryReaderStatusListBySourceName(String sourceName) {
+ String sql = String.format(queryReaderStatusList, sourceName);
+ List<ReaderStatus> readerStatusList = ORMUtil.queryForList(sql, null, ReaderStatus.class);
+ return readerStatusList;
+ }
+
+ public static void clearReaderStatus(String sourceName, String readerName) {
+ String sql = String.format(clearReaderStatus, sourceName, readerName);
+ ORMUtil.executeSQL(sql, null);
+ }
+
+ public static ReaderStatus create(String sourceName, String readerName, int isFinished, int totalReader) {
+
+ ReaderStatus readerStatus = new ReaderStatus();
+ readerStatus.setSourceName(sourceName);
+ readerStatus.setReaderName(readerName);
+ readerStatus.setIsFinished(isFinished);
+ readerStatus.setTotalReader(totalReader);
+ readerStatus.setGmtCreate(new Date());
+ readerStatus.setGmtModified(new Date());
+ return readerStatus;
+
+ }
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/DBScanReader.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/DBScanReader.java
new file mode 100644
index 00000000..268e891e
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/DBScanReader.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.reader;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.channel.source.ISource;
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+import org.apache.rocketmq.streams.common.component.AbstractComponent;
+import org.apache.rocketmq.streams.common.context.MessageOffset;
+import org.apache.rocketmq.streams.common.utils.ThreadUtil;
+import org.apache.rocketmq.streams.connectors.IBoundedSource;
+import org.apache.rocketmq.streams.connectors.IBoundedSourceReader;
+import org.apache.rocketmq.streams.connectors.model.PullMessage;
+import org.apache.rocketmq.streams.connectors.model.ReaderStatus;
+import org.apache.rocketmq.streams.connectors.source.CycleDynamicMultipleDBScanSource;
+import org.apache.rocketmq.streams.db.driver.DriverBuilder;
+import org.apache.rocketmq.streams.db.driver.JDBCDriver;
+import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
+
+/**
+ * @description
+ */
+public class DBScanReader implements ISplitReader, IBoundedSourceReader, Serializable {
+
+ private static final long serialVersionUID = 8172403250050893288L;
+ private static final Log logger = LogFactory.getLog(DBScanReader.class);
+ static final String sqlTemplate = "select * from %s where id >= %d and id < %d";
+
+ //是否完成了source的call back调用
+ transient volatile boolean isFinishedCall = false;
+ ISource iSource;
+ String url;
+ String userName;
+ String password;
+ String tableName;
+ int batchSize;
+ long offset;
+ long offsetStart;
+ long offsetEnd;
+ long maxOffset;
+ long minOffset;
+ ISplit iSplit;
+ transient List<PullMessage> pullMessages;
+ volatile boolean interrupt = false;
+ volatile boolean isClosed = false;
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ public void setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ public ISplit getISplit() {
+ return iSplit;
+ }
+
+ public void setISplit(ISplit iSplit) {
+ this.iSplit = iSplit;
+ }
+
+ public DBScanReader() {
+
+ }
+
+ transient ThreadLocal<JDBCDriver> threadLocal = new ThreadLocal<JDBCDriver>() {
+
+ @Override
+ public JDBCDriver initialValue() {
+ logger.info(String.format("%s initial jdbcDriver. ", Thread.currentThread().getName()));
+ return DriverBuilder.createDriver(AbstractComponent.DEFAULT_JDBC_DRIVER, url, userName, password);
+ }
+
+ };
+
+ @Override
+ public void open(ISplit split) {
+ this.iSplit = split;
+ JDBCDriver jdbcDriver = threadLocal.get();
+ Map<String, Object> range = jdbcDriver.queryOneRow("select min(id) as min_id, max(id) as max_id from " + tableName);
+ minOffset = Long.parseLong(String.valueOf(range.get("min_id")));
+ maxOffset = Long.parseLong(String.valueOf(range.get("max_id")));
+ offsetStart = minOffset;
+ offset = minOffset;
+ logger.info(String.format("table %s min id [ %d ], max id [ %d ]", tableName, minOffset, maxOffset));
+ pullMessages = new ArrayList<>();
+ }
+
+ @Override
+ public boolean next() {
+ if (interrupt) {
+ return false;
+ }
+ if (isFinished()) {
+ finish();
+ ThreadUtil.sleep(10 * 1000);
+ return false;
+ }
+ JDBCDriver jdbcDriver = threadLocal.get();
+ offsetEnd = offsetStart + batchSize;
+ String batchQuery = String.format(sqlTemplate, tableName, offsetStart, offsetEnd);
+ logger.debug(String.format("execute sql : %s", batchQuery));
+ List<Map<String, Object>> resultData = jdbcDriver.queryForList(batchQuery);
+ offsetStart = offsetEnd;
+ pullMessages.clear();
+ for (Map<String, Object> r : resultData) {
+ PullMessage msg = new PullMessage();
+ JSONObject data = JSONObject.parseObject(JSON.toJSONString(r));
+ msg.setMessage(data);
+ offset = offset > Long.parseLong(data.getString("id")) ? offset : Long.parseLong(data.getString("id"));
+ msg.setMessageOffset(new MessageOffset(String.valueOf(offset), true));
+ pullMessages.add(msg);
+ }
+ return offsetStart - batchSize <= maxOffset;
+ }
+
+ @Override
+ public List<PullMessage> getMessage() {
+// logger.info(String.format("output messages %d", pullMessages.size()));
+ return pullMessages;
+ }
+
+ @Override
+ public SplitCloseFuture close() {
+// interrupt = true;
+ isClosed = true;
+ threadLocal.remove();
+ pullMessages = null;
+ return new SplitCloseFuture(this, iSplit);
+ }
+
+ @Override
+ public void seek(String cursor) {
+ if (cursor == null || cursor.trim().equals("")) {
+ cursor = "0";
+ }
+ offset = Long.parseLong(cursor);
+ if (offset < minOffset) {
+ offset = minOffset;
+ }
+ offsetStart = offset;
+ logger.info(String.format("split %s seek %d.", iSplit.getQueueId(), offset));
+ }
+
+ @Override
+ public String getProgress() {
+ return String.valueOf(offset);
+ }
+
+ @Override
+ public long getDelay() {
+ return maxOffset - offset;
+ }
+
+ @Override
+ public long getFetchedDelay() {
+ return 0;
+ }
+
+ @Override
+ public boolean isClose() {
+ return isClosed;
+ }
+
+ @Override
+ public ISplit getSplit() {
+ return iSplit;
+ }
+
+ @Override
+ public boolean isInterrupt() {
+ return interrupt;
+ }
+
+ @Override
+ public boolean interrupt() {
+ interrupt = true;
+ return true;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return offsetStart > maxOffset;
+ }
+
+ @Override
+ public void finish() {
+ if (isFinishedCall) {
+ return;
+ }
+ pullMessages = null;
+ updateReaderStatus();
+ IBoundedSource tmp = (IBoundedSource) iSource;
+ tmp.boundedFinishedCallBack(this.iSplit);
+ isFinishedCall = true;
+ }
+
+ public ISource getISource() {
+ return iSource;
+ }
+
+ public void setISource(ISource iSource) {
+ this.iSource = iSource;
+ }
+
+ private final void updateReaderStatus() {
+ String sourceName = CycleDynamicMultipleDBScanSource.createKey(this.getISource());
+ int finish = Integer.valueOf(1);
+ int total = ((CycleDynamicMultipleDBScanSource) iSource).getTotalReader();
+ ReaderStatus readerStatus = ReaderStatus.create(sourceName, iSplit.getQueueId(), finish, total);
+ logger.info(String.format("create reader status %s.", readerStatus));
+ ORMUtil.batchReplaceInto(readerStatus);
+ }
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/ISplitReader.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/ISplitReader.java
new file mode 100644
index 00000000..6b377cff
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/ISplitReader.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.reader;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+import org.apache.rocketmq.streams.connectors.model.PullMessage;
+
+public interface ISplitReader extends Serializable {
+
+ /**
+ * Open.
+ *
+ * @param split the split
+ * @throws IOException the io exception
+ */
+ void open(ISplit split);
+
+ /**
+ * Next boolean.
+ *
+ * @return the boolean
+ * @throws IOException the io exception
+ * @throws InterruptedException the interrupted exception
+ */
+ boolean next();
+
+ /**
+ * Gets message.
+ *
+ * @return the message
+ */
+ List<PullMessage> getMessage();
+
+ /**
+ * Close.
+ *
+ * @throws IOException the io exception
+ */
+ SplitCloseFuture close();
+
+ /**
+ * Seek.
+ *
+ * @param cursor the cursor
+ * @throws IOException the io exception
+ */
+ void seek(String cursor);
+
+ /**
+ * Gets progress.
+ *
+ * @return the progress
+ * @throws IOException the io exception
+ */
+ String getProgress();
+
+ /**
+ * Get message delay (millseconds)
+ *
+ * @return delay
+ */
+ long getDelay();
+
+ /**
+ * Get message delay (millseconds) from being fetched
+ *
+ * @return delay
+ */
+ long getFetchedDelay();
+
+ boolean isClose();
+
+ ISplit getSplit();
+
+ boolean isInterrupt();
+
+ boolean interrupt();
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/SplitCloseFuture.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/SplitCloseFuture.java
new file mode 100644
index 00000000..b28748b8
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/reader/SplitCloseFuture.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.reader;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+
+public class SplitCloseFuture implements Future<Boolean> {
+
+ protected ISplitReader reader;
+ protected ISplit split;
+
+ public SplitCloseFuture(ISplitReader reader, ISplit split) {
+ this.reader = reader;
+ this.split = split;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return reader.isClose();
+ }
+
+ @Override
+ public Boolean get() throws InterruptedException, ExecutionException {
+ synchronized (reader) {
+ reader.wait();
+ }
+ return reader.isClose();
+ }
+
+ @Override
+ public Boolean get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ synchronized (reader) {
+ long time = timeout;
+ if (unit == TimeUnit.SECONDS) {
+ time = time * 1000;
+ } else if (unit == TimeUnit.MINUTES) {
+ time = time * 1000 * 60;
+ } else if (unit == TimeUnit.HOURS) {
+ time = time * 1000 * 60 * 60;
+ } else {
+ throw new RuntimeException("can not support this timeout, expect less hour " + timeout + " the unit is " + unit);
+ }
+ reader.wait(time);
+ }
+ return reader.isClose();
+ }
+
+ public ISplitReader getReader() {
+ return reader;
+ }
+
+ public ISplit getSplit() {
+ return split;
+ }
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/AbstractPullSource.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/AbstractPullSource.java
new file mode 100644
index 00000000..9aedc95a
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/AbstractPullSource.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.source;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+import org.apache.rocketmq.streams.common.checkpoint.CheckPoint;
+import org.apache.rocketmq.streams.common.checkpoint.CheckPointManager;
+import org.apache.rocketmq.streams.common.context.Message;
+import org.apache.rocketmq.streams.common.threadpool.ThreadPoolFactory;
+import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
+import org.apache.rocketmq.streams.connectors.balance.ISourceBalance;
+import org.apache.rocketmq.streams.connectors.balance.SplitChanged;
+import org.apache.rocketmq.streams.connectors.balance.impl.LeaseBalanceImpl;
+import org.apache.rocketmq.streams.connectors.model.PullMessage;
+import org.apache.rocketmq.streams.connectors.reader.ISplitReader;
+import org.apache.rocketmq.streams.connectors.reader.SplitCloseFuture;
+import org.apache.rocketmq.streams.serviceloader.ServiceLoaderComponent;
+
+public abstract class AbstractPullSource extends AbstractSource implements IPullSource<AbstractSource> {
+
+ private static final Log logger = LogFactory.getLog(AbstractPullSource.class);
+
+ protected transient ISourceBalance balance;// balance interface
+ protected transient ScheduledExecutorService balanceExecutor;//schdeule balance
+ protected transient Map<String, ISplitReader> splitReaders = new HashMap<>();//owner split readers
+ protected transient Map<String, ISplit> ownerSplits = new HashMap<>();//working splits by the source instance
+
+ //可以有多种实现,通过名字选择不同的实现
+ protected String balanceName = LeaseBalanceImpl.DB_BALANCE_NAME;
+ //balance schedule time
+ protected int balanceTimeSecond = 10;
+ protected long pullIntervalMs;
+ protected transient CheckPointManager checkPointManager = new CheckPointManager();
+ protected transient boolean shutDown=false;
+ @Override
+ protected boolean startSource() {
+ ServiceLoaderComponent serviceLoaderComponent = ServiceLoaderComponent.getInstance(ISourceBalance.class);
+ balance = (ISourceBalance) serviceLoaderComponent.getService().loadService(balanceName);
+ balance.setSourceIdentification(MapKeyUtil.createKey(getNameSpace(), getConfigureName()));
+ balanceExecutor = new ScheduledThreadPoolExecutor(1, new BasicThreadFactory.Builder().namingPattern("balance-task-%d").daemon(true).build());
+ List<ISplit> allSplits = fetchAllSplits();
+ SplitChanged splitChanged = balance.doBalance(allSplits, new ArrayList(ownerSplits.values()));
+ doSplitChanged(splitChanged);
+ balanceExecutor.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ logger.info("balance running..... current splits is " + ownerSplits);
+ List<ISplit> allSplits = fetchAllSplits();
+ SplitChanged splitChanged = balance.doBalance(allSplits, new ArrayList(ownerSplits.values()));
+ doSplitChanged(splitChanged);
+ }
+ }, balanceTimeSecond, balanceTimeSecond, TimeUnit.SECONDS);
+
+ startWorks();
+ return true;
+ }
+
+ private void startWorks() {
+ ExecutorService workThreads= ThreadPoolFactory.createThreadPool(maxThread);
+ long start=System.currentTimeMillis();
+ while (!shutDown) {
+ Iterator<Map.Entry<String, ISplitReader>> it = splitReaders.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<String, ISplitReader> entry=it.next();
+ String splitId=entry.getKey();
+ ISplit split=ownerSplits.get(splitId);
+ ISplitReader reader=entry.getValue();
+ ReaderRunner runner=new ReaderRunner(split,reader);
+ workThreads.execute(runner);
+ }
+ try {
+ long sleepTime=this.pullIntervalMs-(System.currentTimeMillis()-start);
+ if(sleepTime>0){
+ Thread.sleep(sleepTime);
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Override
+ public Map<String, ISplit> getAllSplitMap() {
+ List<ISplit> splits = fetchAllSplits();
+ if (splits == null) {
+ return new HashMap<>();
+ }
+ Map<String, ISplit> splitMap = new HashMap<>();
+ for (ISplit split : splits) {
+ splitMap.put(split.getQueueId(), split);
+ }
+ return splitMap;
+ }
+
+ protected void doSplitChanged(SplitChanged splitChanged) {
+ if (splitChanged == null) {
+ return;
+ }
+ if (splitChanged.getSplitCount() == 0) {
+ return;
+ }
+ if (splitChanged.isNewSplit()) {
+ doSplitAddition(splitChanged.getChangedSplits());
+ } else {
+ doSplitRelease(splitChanged.getChangedSplits());
+ }
+ }
+
+ protected void doSplitAddition(List<ISplit> changedSplits) {
+ if (changedSplits == null) {
+ return;
+ }
+ Set<String> splitIds = new HashSet<>();
+ for (ISplit split : changedSplits) {
+ splitIds.add(split.getQueueId());
+ }
+ addNewSplit(splitIds);
+ for (ISplit split : changedSplits) {
+ ISplitReader reader = createSplitReader(split);
+ reader.open(split);
+ reader.seek(loadSplitOffset(split));
+ splitReaders.put(split.getQueueId(), reader);
+ this.ownerSplits.put(split.getQueueId(), split);
+// logger.info("start next");
+// Thread thread = new Thread(new Runnable() {
+//
+// thread.setName("reader-task-" + reader.getSplit().getQueueId());
+// thread.start();
+ }
+
+ }
+
+ @Override
+ public String loadSplitOffset(ISplit split) {
+ String offset = null;
+ CheckPoint<String> checkPoint = checkPointManager.recover(this, split);
+ if (checkPoint != null) {
+ offset = JSON.parseObject(checkPoint.getData()).getString("offset");
+ }
+ return offset;
+ }
+
+ protected abstract ISplitReader createSplitReader(ISplit split);
+
+ protected void doSplitRelease(List<ISplit> changedSplits) {
+ boolean success = balance.getRemoveSplitLock();
+ if (!success) {
+ return;
+ }
+ try {
+ List<SplitCloseFuture> closeFutures = new ArrayList<>();
+ for (ISplit split : changedSplits) {
+ ISplitReader reader = this.splitReaders.get(split.getQueueId());
+ if (reader == null) {
+ continue;
+ }
+ SplitCloseFuture future = reader.close();
+ closeFutures.add(future);
+ }
+ for (SplitCloseFuture future : closeFutures) {
+ try {
+ if(!future.isDone()){
+ future.get();
+ }
+ this.splitReaders.remove(future.getSplit().getQueueId());
+ this.ownerSplits.remove(future.getSplit().getQueueId());
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ }
+ }
+
+ } finally {
+ balance.unLockRemoveSplitLock();
+ }
+
+ }
+
+
+ protected class ReaderRunner implements Runnable{
+ long mLastCheckTime = System.currentTimeMillis();
+ protected ISplit split;
+ protected ISplitReader reader;
+
+ public ReaderRunner(ISplit split,ISplitReader reader){
+ this.split=split;
+ this.reader=reader;
+ }
+
+ @Override
+ public void run() {
+ logger.info("start running");
+ if (reader.isInterrupt() == false) {
+ if (reader.next()) {
+ List<PullMessage> messages = reader.getMessage();
+ if (messages != null) {
+ for (PullMessage pullMessage : messages) {
+ String queueId = split.getQueueId();
+ String offset = pullMessage.getOffsetStr();
+ JSONObject msg = createJson(pullMessage.getMessage());
+ Message message = createMessage(msg, queueId, offset, false);
+ message.getHeader().setOffsetIsLong(pullMessage.getMessageOffset().isLongOfMainOffset());
+ executeMessage(message);
+ }
+ }
+ reader.notifyAll();
+ }
+ long curTime = System.currentTimeMillis();
+ if (curTime - mLastCheckTime > getCheckpointTime()) {
+ sendCheckpoint(reader.getSplit().getQueueId());
+ mLastCheckTime = curTime;
+ }
+
+
+ }else {
+ Set<String> removeSplits = new HashSet<>();
+ removeSplits.add(reader.getSplit().getQueueId());
+ removeSplit(removeSplits);
+ balance.unlockSplit(split);
+ reader.close();
+ synchronized (reader) {
+ reader.notifyAll();
+ }
+ }
+
+ }
+
+ }
+
+ @Override
+ public boolean supportNewSplitFind() {
+ return true;
+ }
+
+ @Override
+ public boolean supportRemoveSplitFind() {
+ return true;
+ }
+
+ @Override
+ public boolean supportOffsetRest() {
+ return true;
+ }
+
+ @Override
+ public Long getPullIntervalMs() {
+ return pullIntervalMs;
+ }
+
+ public String getBalanceName() {
+ return balanceName;
+ }
+
+ public void setBalanceName(String balanceName) {
+ this.balanceName = balanceName;
+ }
+
+ public int getBalanceTimeSecond() {
+ return balanceTimeSecond;
+ }
+
+ public void setBalanceTimeSecond(int balanceTimeSecond) {
+ this.balanceTimeSecond = balanceTimeSecond;
+ }
+
+ public void setPullIntervalMs(long pullIntervalMs) {
+ this.pullIntervalMs = pullIntervalMs;
+ }
+
+ @Override
+ public List<ISplit> ownerSplits() {
+ return new ArrayList(ownerSplits.values());
+ }
+
+}
\ No newline at end of file
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/CycleDynamicMultipleDBScanSource.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/CycleDynamicMultipleDBScanSource.java
new file mode 100644
index 00000000..561b48f2
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/CycleDynamicMultipleDBScanSource.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.source;
+
+import com.alibaba.fastjson.JSONObject;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.channel.source.AbstractSource;
+import org.apache.rocketmq.streams.common.channel.source.ISource;
+import org.apache.rocketmq.streams.common.channel.source.systemmsg.ChangeTableNameMessage;
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+import org.apache.rocketmq.streams.common.context.Message;
+import org.apache.rocketmq.streams.common.metadata.MetaDataUtils;
+import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
+import org.apache.rocketmq.streams.common.utils.ThreadUtil;
+import org.apache.rocketmq.streams.connectors.IBoundedSource;
+import org.apache.rocketmq.streams.connectors.model.ReaderStatus;
+import org.apache.rocketmq.streams.connectors.reader.ISplitReader;
+import org.apache.rocketmq.streams.connectors.source.filter.CycleSchedule;
+import org.apache.rocketmq.streams.connectors.source.filter.CycleScheduleFilter;
+import org.apache.rocketmq.streams.db.CycleSplit;
+
+/**
+ * @description
+ */
+public class CycleDynamicMultipleDBScanSource extends DynamicMultipleDBScanSource implements IBoundedSource, Serializable {
+
+ private static final long serialVersionUID = 6840988298037061128L;
+ private static final Log logger = LogFactory.getLog(CycleDynamicMultipleDBScanSource.class);
+
+ Map<String, Boolean> initReaderMap = new ConcurrentHashMap<>();
+ CycleSchedule.Cycle cycle;
+ transient AtomicInteger size = new AtomicInteger(0);
+
+ public CycleDynamicMultipleDBScanSource() {
+ super();
+ }
+
+ public CycleDynamicMultipleDBScanSource(CycleSchedule.Cycle cycle) {
+ super();
+ this.cycle = cycle;
+ }
+
+ public AtomicInteger getSize() {
+ return size;
+ }
+
+ public void setSize(AtomicInteger size) {
+ this.size = size;
+ }
+
+ /**
+ * @return
+ */
+ //todo
+ @Override
+ public synchronized List<ISplit> fetchAllSplits() {
+
+ if (this.filter == null) {
+ filter = new CycleScheduleFilter(cycle.getAllPattern());
+ }
+
+ //如果还是当前周期, 已经完成全部分区的加载, 则不在加载
+ if (size.get() == cycle.getCycleCount()) {
+ return splits;
+ }
+ String sourceName = createKey(this);
+ List<String> tableNames = MetaDataUtils.listTableNameByPattern(url, userName, password, logicTableName + "%");
+
+ logger.info(String.format("load all logic table : %s", Arrays.toString(tableNames.toArray())));
+ Iterator<String> it = tableNames.iterator();
+ while (it.hasNext()) {
+ String s = it.next();
+ String suffix = s.replace(logicTableName + "_", "");
+ if (filter.filter(sourceName, logicTableName, suffix)) {
+ logger.info(String.format("filter add %s", s));
+ CycleSplit split = new CycleSplit();
+ split.setLogicTableName(logicTableName);
+ split.setSuffix(suffix);
+ split.setCyclePeriod(cycle.getCycleDateStr());
+ String splitId = split.getQueueId();
+ if (initReaderMap.get(splitId) == null) {
+ initReaderMap.put(splitId, false);
+ splits.add(split);
+ size.incrementAndGet();
+ }
+ } else {
+ logger.info(String.format("filter remove %s", s));
+ it.remove();
+ }
+ }
+
+ this.tableNames = tableNames;
+ return splits;
+ }
+
+ public Map<String, Boolean> getInitReaderMap() {
+ return initReaderMap;
+ }
+
+ public void setInitReaderMap(Map<String, Boolean> initReaderMap) {
+ this.initReaderMap = initReaderMap;
+ }
+
+ @Override
+ public void finish() {
+ super.finish();
+ for (Map.Entry<String, Boolean> entry : initReaderMap.entrySet()) {
+ String key = entry.getKey();
+ Boolean value = entry.getValue();
+ if (value == false) {
+ logger.error(String.format("split[%s] reader is not finish, exit with error. ", key));
+ }
+ }
+ this.initReaderMap.clear();
+ this.initReaderMap = null;
+ splits.clear();
+ splits = null;
+ }
+
+ @Override
+ public boolean isFinished() {
+ List<ReaderStatus> readerStatuses = ReaderStatus.queryReaderStatusListBySourceName(createKey(this));
+ if (readerStatuses == null) {
+ return false;
+ }
+ return readerStatuses.size() == size.get();
+ }
+
+ @Override
+ protected ISplitReader createSplitReader(ISplit iSplit) {
+ return super.createSplitReader(iSplit);
+ }
+
+ private void sendChangeTableNameMessage() {
+ logger.info(String.format("start send change table name message."));
+ ChangeTableNameMessage changeTableNameMessage = new ChangeTableNameMessage();
+ changeTableNameMessage.setScheduleCycle(cycle.getCycleDateStr());
+ Message message = createMessage(new JSONObject(), null, null, false);
+ message.setSystemMessage(changeTableNameMessage);
+ message.getHeader().setSystemMessage(true);
+ executeMessage(message);
+ logger.info(String.format("finish send change table name message."));
+ }
+
+ @Override
+ public synchronized void boundedFinishedCallBack(ISplit iSplit) {
+ this.initReaderMap.put(iSplit.getQueueId(), true);
+ logger.info(String.format("current map is %s, key is %s. ", initReaderMap, iSplit.getQueueId()));
+ if (statusCheckerStart.compareAndSet(false, true)) {
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while (!isFinished()) {
+ ThreadUtil.sleep(3 * 1000);
+ }
+ logger.info(String.format("source will be closed."));
+ sendChangeTableNameMessage(); //下发修改name的消息
+ ThreadUtil.sleep(1 * 1000);
+ finish();
+ }
+
+ });
+ thread.setName(createKey(this) + "_callback");
+ thread.start();
+ }
+ }
+
+ public CycleSchedule.Cycle getCycle() {
+ return cycle;
+ }
+
+ public void setCycle(CycleSchedule.Cycle cycle) {
+ this.cycle = cycle;
+ }
+
+ @Override
+ public String createCheckPointName() {
+ return super.createCheckPointName();
+ }
+
+ public synchronized int getTotalReader() {
+ return size.get();
+ }
+
+ public static String createKey(ISource iSource) {
+ AbstractSource source = (AbstractSource) iSource;
+ CycleSchedule.Cycle cycle = ((CycleDynamicMultipleDBScanSource) iSource).getCycle();
+ return MapKeyUtil.createKey(source.getNameSpace(), source.getGroupName(), source.getConfigureName(), source.getTopic(), cycle.getCycleDateStr());
+ }
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/DynamicMultipleDBScanSource.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/DynamicMultipleDBScanSource.java
new file mode 100644
index 00000000..ea2a118b
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/DynamicMultipleDBScanSource.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.source;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+import org.apache.rocketmq.streams.common.metadata.MetaDataUtils;
+import org.apache.rocketmq.streams.connectors.reader.DBScanReader;
+import org.apache.rocketmq.streams.connectors.reader.ISplitReader;
+import org.apache.rocketmq.streams.connectors.source.filter.DataFormatPatternFilter;
+import org.apache.rocketmq.streams.connectors.source.filter.PatternFilter;
+import org.apache.rocketmq.streams.db.DynamicMultipleDBSplit;
+
+/**
+ * @description DynamicMultipleDBScanSource
+ */
+public class DynamicMultipleDBScanSource extends AbstractPullSource implements Serializable {
+
+ private static final long serialVersionUID = 3987103552547019739L;
+ private static final Log logger = LogFactory.getLog(DynamicMultipleDBScanSource.class);
+ public static final int DEFAULT_BATCH_SIZE = 50;
+ public static final int MAX_BATCH_SIZE = 100;
+
+ String url;
+ String userName;
+ String password;
+ String logicTableName;
+ String suffix;
+ int batchSize;
+ List<String> tableNames;
+ List<ISplit> splits;
+ transient volatile AtomicBoolean statusCheckerStart = new AtomicBoolean(false);
+
+ //todo
+ transient PatternFilter filter;
+
+ public DynamicMultipleDBScanSource() {
+ splits = new ArrayList<>();
+ }
+
+ @Override
+ protected boolean initConfigurable() {
+ setTopic(logicTableName);
+ return super.initConfigurable();
+ }
+
+ @Override
+ protected boolean isNotDataSplit(String queueId) {
+ return tableNames.contains(queueId);
+ }
+
+ @Override
+ protected ISplitReader createSplitReader(ISplit split) {
+
+ DBScanReader reader = new DBScanReader();
+ reader.setISplit(split);
+ reader.setUrl(url);
+ reader.setUserName(userName);
+ reader.setPassword(password);
+ reader.setTableName(String.valueOf(split.getQueue()));
+ int local = batchSize <= 0 ? DEFAULT_BATCH_SIZE : batchSize;
+ local = local > MAX_BATCH_SIZE ? MAX_BATCH_SIZE : local;
+ reader.setBatchSize(local);
+ reader.setISource(this);
+ logger.info(String.format("create reader for split %s", split.getQueueId()));
+ return reader;
+ }
+
+ @Override
+ public List<ISplit> fetchAllSplits() {
+
+ if (filter == null) {
+ filter = new DataFormatPatternFilter();
+ }
+
+// String sourceName = createKey(this);
+
+ tableNames = MetaDataUtils.listTableNameByPattern(url, userName, password, logicTableName + "%");
+
+ logger.info(String.format("load all logic table : %s", Arrays.toString(tableNames.toArray())));
+
+ for (String s : tableNames) {
+ String suffix = s.replace(logicTableName + "_", "");
+ if (filter.filter(null, logicTableName, suffix)) {
+ logger.info(String.format("filter add %s", s));
+ DynamicMultipleDBSplit split = new DynamicMultipleDBSplit();
+ split.setLogicTableName(logicTableName);
+ split.setSuffix(suffix);
+ splits.add(split);
+ } else {
+ logger.info(String.format("filter remove %s", s));
+ }
+
+ }
+ return splits;
+ }
+
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getLogicTableName() {
+ return logicTableName;
+ }
+
+ public void setLogicTableName(String logicTableName) {
+ this.logicTableName = logicTableName;
+ }
+
+ public String getSuffix() {
+ return suffix;
+ }
+
+ public void setSuffix(String suffix) {
+ this.suffix = suffix;
+ }
+
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ public void setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ public List<String> getTableNames() {
+ return tableNames;
+ }
+
+ public void setTableNames(List<String> tableNames) {
+ this.tableNames = tableNames;
+ }
+
+ public List<ISplit> getSplits() {
+ return splits;
+ }
+
+ public void setSplits(List<ISplit> splits) {
+ this.splits = splits;
+ }
+
+ public PatternFilter getFilter() {
+ return filter;
+ }
+
+ public void setFilter(PatternFilter filter) {
+ this.filter = filter;
+ }
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/IPullSource.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/IPullSource.java
new file mode 100644
index 00000000..6733911d
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/IPullSource.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.source;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.streams.common.channel.source.ISource;
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+
+/**
+ * poll message,need balance
+ */
+public interface IPullSource<T extends ISource> extends ISource<T> {
+
+ /**
+ * 拥有的分片格式
+ *
+ * @return
+ */
+ Collection<ISplit> ownerSplits();
+
+ /**
+ * get all split for the source
+ *
+ * @return
+ */
+ List<ISplit> fetchAllSplits();
+
+ /**
+ * get all split for the source
+ *
+ * @return
+ */
+ Map<String, ISplit> getAllSplitMap();
+
+ Long getPullIntervalMs();
+
+ /**
+ * get cusor from store
+ *
+ * @return
+ */
+ String loadSplitOffset(ISplit split);
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/MutilBatchTaskSource.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/MutilBatchTaskSource.java
new file mode 100644
index 00000000..d82ba32a
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/MutilBatchTaskSource.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.source;
+
+import com.alibaba.fastjson.JSON;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.streams.common.channel.split.ISplit;
+import org.apache.rocketmq.streams.common.checkpoint.CheckPoint;
+import org.apache.rocketmq.streams.common.topology.ChainPipeline;
+import org.apache.rocketmq.streams.common.topology.model.Pipeline;
+import org.apache.rocketmq.streams.common.topology.task.TaskAssigner;
+import org.apache.rocketmq.streams.common.utils.DipperThreadLocalUtil;
+import org.apache.rocketmq.streams.common.utils.RuntimeUtil;
+import org.apache.rocketmq.streams.connectors.model.PullMessage;
+import org.apache.rocketmq.streams.connectors.reader.ISplitReader;
+import org.apache.rocketmq.streams.connectors.reader.SplitCloseFuture;
+
+public class MutilBatchTaskSource extends AbstractPullSource {
+
+ @Override protected ISplitReader createSplitReader(ISplit split) {
+ return new ISplitReader() {
+ protected transient ISplit split;
+ protected boolean isInterrupt;
+ protected boolean isClose;
+ protected transient AtomicLong offsetGenerator=new AtomicLong(1000000000);
+ @Override public void open(ISplit split) {
+ this.split=split;
+ }
+
+ @Override public boolean next() {
+ return true;
+ }
+
+ @Override public List<PullMessage> getMessage() {
+ PipelineSplit pipelineSplit=(PipelineSplit)split;
+ pipelineSplit.getQueue().startChannel();
+ return null;
+ }
+
+ @Override public SplitCloseFuture close() {
+ isClose=true;
+ return new SplitCloseFuture(this,split);
+ }
+
+ @Override public void seek(String cursor) {
+
+ }
+
+ @Override public String getProgress() {
+ return RuntimeUtil.getDipperInstanceId()+"_"+offsetGenerator.incrementAndGet();
+ }
+
+ @Override public long getDelay() {
+ return 0;
+ }
+
+ @Override public long getFetchedDelay() {
+ return getPullIntervalMs();
+ }
+
+ @Override public boolean isClose() {
+ return isClose;
+ }
+
+ @Override public ISplit getSplit() {
+ return split;
+ }
+
+ @Override public boolean isInterrupt() {
+ return isInterrupt;
+ }
+
+ @Override public boolean interrupt() {
+ isInterrupt=true;
+ return isInterrupt;
+ }
+ };
+ }
+
+ @Override protected boolean isNotDataSplit(String queueId) {
+ return false;
+ }
+
+ @Override public List<ISplit> fetchAllSplits() {
+
+ List<TaskAssigner> taskAssigners = configurableService.queryConfigurableByType(TaskAssigner.TYPE);
+ if (taskAssigners == null) {
+ return null;
+ }
+ String taskName = getConfigureName();
+ List<ISplit> splits=new ArrayList<>();
+ for (TaskAssigner taskAssigner : taskAssigners) {
+ if (!taskName.equals(taskAssigner.getTaskName())) {
+ continue;
+ }
+ String pipelineName = taskAssigner.getPipelineName();
+ if(pipelineName!=null){
+ ChainPipeline<?> pipeline = configurableService.queryConfigurable(Pipeline.TYPE, pipelineName);
+ if (pipeline != null) {
+ splits.add(new PipelineSplit(pipeline));
+ }
+ }
+ }
+ return splits;
+ }
+
+
+ protected class PipelineSplit implements ISplit<PipelineSplit, ChainPipeline>{
+ protected ChainPipeline chainPipeline;
+ public PipelineSplit(ChainPipeline chainPipeline){
+ this.chainPipeline=chainPipeline;
+ }
+
+ @Override public String getQueueId() {
+ return chainPipeline.getConfigureName();
+ }
+
+ @Override public ChainPipeline getQueue() {
+ return chainPipeline;
+ }
+
+ @Override public int compareTo(PipelineSplit o) {
+ return chainPipeline.getConfigureName().compareTo(o.getQueueId());
+ }
+
+ @Override public String toJson() {
+ return chainPipeline.toJson();
+ }
+
+ @Override public void toObject(String jsonString) {
+ ChainPipeline pipeline=new ChainPipeline();
+ pipeline.toObject(jsonString);
+ this.chainPipeline=pipeline;
+ }
+ }
+
+ @Override
+ public String loadSplitOffset(ISplit split) {
+ return null;
+ }
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/SourceInstance.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/SourceInstance.java
new file mode 100644
index 00000000..c0da5b6e
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/SourceInstance.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.source;
+
+/**
+ * i个消息队列的实例,一个实例i个
+ */
+public class SourceInstance {
+ protected String sourceInstanceId;
+
+
+ public SourceInstance(String sourceInstanceId){
+ this.sourceInstanceId=sourceInstanceId;
+ }
+
+ public String getSourceInstanceId() {
+ return sourceInstanceId;
+ }
+
+ public void setSourceInstanceId(String sourceInstanceId) {
+ this.sourceInstanceId = sourceInstanceId;
+ }
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/AbstractPatternFilter.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/AbstractPatternFilter.java
new file mode 100644
index 00000000..0d5368a7
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/AbstractPatternFilter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.source.filter;
+
+import java.io.Serializable;
+
+/**
+ * @description
+ */
+public abstract class AbstractPatternFilter implements PatternFilter, Serializable {
+
+ private static final long serialVersionUID = 6500945777421871431L;
+
+ PatternFilter next;
+
+ public abstract boolean filter(String sourceName, String logicTableName, String tableName);
+
+
+ @Override
+ public PatternFilter setNext(PatternFilter filter) {
+ this.next = filter;
+ return this;
+ }
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/BoundedPatternFilter.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/BoundedPatternFilter.java
new file mode 100644
index 00000000..c06de98d
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/BoundedPatternFilter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.source.filter;
+
+import java.io.Serializable;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.rocketmq.streams.connectors.model.ReaderStatus;
+
+/**
+ * @description 过滤掉已经完成的reader
+ */
+@Deprecated
+public class BoundedPatternFilter extends AbstractPatternFilter implements Serializable {
+
+ static final Log logger = LogFactory.getLog(BoundedPatternFilter.class);
+
+ @Override
+ public boolean filter(String sourceName, String logicTableName, String tableName) {
+
+ ReaderStatus readerStatus = ReaderStatus.queryReaderStatusByUK(sourceName, logicTableName + "_" + tableName);
+ if (readerStatus != null) {
+ logger.info(String.format("filter sourceName %s, logicTableName %s, suffix %s. ", sourceName, logicTableName, tableName));
+ logger.info(String.format("query result %s", readerStatus.toString()));
+ return true;
+ }
+ if (next == null) {
+ return false;
+ }
+ return next.filter(sourceName, logicTableName, tableName);
+ }
+
+ @Override
+ public PatternFilter setNext(PatternFilter filter) {
+ super.setNext(filter);
+ return this;
+ }
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CyclePatternFilter.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CyclePatternFilter.java
new file mode 100644
index 00000000..3a0193f3
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CyclePatternFilter.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.source.filter;
+
+import java.io.Serializable;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * @description 用来做分区选取
+ */
+public class CyclePatternFilter extends AbstractPatternFilter implements Serializable {
+
+ private static final long serialVersionUID = -5151597286296228754L;
+
+ public static final int INIT_CYCLE_VERSION = 0;
+
+ CyclePeriod cyclePeriod;
+
+ Date curCycleDateTime; //当前调度周期时间
+
+ long cycleId;
+
+ String firstStartTime; //当前最小时间
+
+ List<String> allPatterns;
+
+ String expression;
+
+ boolean isInit;
+
+ //历史数据读取时使用,表示比起当前相差多少个调度周期
+ final long cycleDiff;
+
+ //todo expr解析
+ public CyclePatternFilter(String expr, Date date) throws ParseException {
+ expression = expr;
+ cycleId = INIT_CYCLE_VERSION;
+ cyclePeriod = CyclePeriod.getInstance(expression);
+ curCycleDateTime = calCycleDateTime(date);
+ allPatterns = new ArrayList<>();
+ isInit = true;
+ if(cyclePeriod.isHistory){
+ Date tmp = cyclePeriod.getHisDate();
+ cycleDiff = curCycleDateTime.getTime()/1000 * 1000 - tmp.getTime()/1000*1000;
+ }else{
+ cycleDiff = 0;
+ }
+ }
+
+
+ /**
+ *
+ * @return 返回date格式的调度周期时间
+ */
+ private Date calCycleDateTime(Date date){
+ return cyclePeriod.format(date);
+ }
+
+ private long calCycle(Date date){
+ Date tmp = calCycleDateTime(date);
+ if(tmp.getTime()/1000 == curCycleDateTime.getTime()/1000){
+ return cycleId;
+ }
+ return nextCycle(tmp);
+ }
+
+ private long nextCycle(Date date){
+ curCycleDateTime = date;
+ cycleId++;
+ calAllPattern();
+ return cycleId;
+ }
+
+ private void calAllPattern(){
+ allPatterns.clear();
+ for(int i = 1; i <= cyclePeriod.getCycle(); i++){
+ long d = (curCycleDateTime.getTime()/1000)*1000 - i * cyclePeriod.getInterval() - cycleDiff;
+ String s = cyclePeriod.getDateFormat().format(new Date(d));
+ allPatterns.add(s);
+ }
+ firstStartTime = allPatterns.get(allPatterns.size() - 1);
+ }
+
+ public boolean isNextCycle(Date date){
+ if(isInit){
+ isInit = false;
+ calAllPattern();
+ return true;
+ }
+ long tmp = cycleId;
+ return calCycle(date) > tmp;
+ }
+
+ public List<String> getAllPatterns() {
+ return allPatterns;
+ }
+
+ public long getCycleId() {
+ return cycleId;
+ }
+
+ public Date getCurCycleDateTime(){
+ return curCycleDateTime;
+ }
+
+ public String getCurCycleDateTimeStr(){
+ return cyclePeriod.getDateFormat().format(curCycleDateTime);
+ }
+
+ public long getCycleDiff() {
+ return cycleDiff;
+ }
+
+ public long getCyclePeriodDiff(){
+ return cycleDiff/cyclePeriod.getInterval();
+ }
+
+ public int getCycle(){
+ return cyclePeriod.getCycle();
+ }
+
+ public String getFirstStartTime() {
+ return firstStartTime;
+ }
+
+ @Override
+ public boolean filter(String sourceName, String logicTableName, String tableName) {
+ return allPatterns.contains(tableName);
+ }
+
+
+
+ public static void main(String[] args) throws ParseException {
+
+ CyclePatternFilter cycle = new CyclePatternFilter("yyyyMMddHHmm - 15m", new Date());
+ System.out.println(cycle);
+
+ System.out.println(cycle.filter(null, null, "202109131650"));
+ System.out.println(cycle.filter(null, null, "20210902000000"));
+ System.out.println(cycle.filter(null, null, "20210908000000"));
+ System.out.println(cycle.filter(null, null, "20210910000000"));
+ System.out.println(cycle.filter(null, null, "20210909230000"));
+
+ System.out.println(new SimpleDateFormat("yyyyMMddHH").parse("2021090923"));
+ System.out.println(new SimpleDateFormat("yyyyMMddhhmmss").parse("20210909230000"));
+ System.out.println(new SimpleDateFormat("yyyyMMddHHmmss").parse("20210909100000"));
+ System.out.println(new SimpleDateFormat("yyyyMMddhhmmss").parse("20210909100000"));
+
+
+
+
+ }
+
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CyclePeriod.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CyclePeriod.java
new file mode 100644
index 00000000..4e6cdd6a
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CyclePeriod.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.source.filter;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @Description
+ */
+public enum CyclePeriod {
+
+ CYCLE_PERIOD_DATE() {
+ @Override
+ void argsParser(String expr) throws ParseException {
+ super.argsParser(expr);
+ interval = 24 * 3600 * 1000;
+ int length = expr.length();
+ if (length == 8 && checkFormat(expr, PatternFilter.yyyyMMdd)) {
+ format = PatternFilter.yyyyMMdd;
+ } else if (length == 14 && checkFormat(expr, PatternFilter.yyyyMMddHHmmss)) {
+ format = PatternFilter.yyyyMMddHHmmss;
+ } else {
+ throw new RuntimeException(String.format("unsupported format : %s, only support yyyymmdd 、 yyyymmddhhmmss.", expr));
+ }
+ }
+
+ @Override
+ Date format(Date strDate) {
+ Date date = new Date(strDate.getTime());
+ date.setHours(0);
+ date.setMinutes(0);
+ date.setSeconds(0);
+ return date;
+ }
+
+ },
+ CYCLE_PERIOD_HOUR() {
+ @Override
+ void argsParser(String expr) throws ParseException {
+ super.argsParser(expr);
+ interval = 3600 * 1000;
+
+ int length = expr.length();
+ if (length == 10 && checkFormat(expr, PatternFilter.yyyyMMddHH)) {
+ format = PatternFilter.yyyyMMddHH;
+ } else if (length == 14 && checkFormat(expr, PatternFilter.yyyyMMddHHmmss)) {
+ format = PatternFilter.yyyyMMddHHmmss;
+ } else {
+ throw new RuntimeException(String.format("unsupported format : %s, only support yyyymmdd 、 yyyymmddhhmmss.", expr));
+ }
+ }
+
+ @Override
+ Date format(Date strDate) {
+ Date date = new Date(strDate.getTime());
+ date.setMinutes(0);
+ date.setSeconds(0);
+ return date;
+ }
+
+ },
+ CYCLE_PERIOD_MINUTE() {
+ @Override
+ void argsParser(String expr) throws ParseException {
+ super.argsParser(expr);
+ interval = 60 * 1000;
+ int length = expr.length();
+ if (length == 12 && checkFormat(expr, PatternFilter.yyyyMMddHHmm)) {
+ format = PatternFilter.yyyyMMddHHmm;
+ } else if (length == 14 && checkFormat(expr, PatternFilter.yyyyMMddHHmmss)) {
+ format = PatternFilter.yyyyMMddHHmmss;
+ } else {
+ throw new RuntimeException(String.format("unsupported format : %s, only support yyyymmdd 、 yyyymmddhhmmss.", expr));
+ }
+ }
+
+ @Override
+ Date format(Date strDate) {
+ Date date = new Date(strDate.getTime());
+ date.setSeconds(0);
+ return date;
+ }
+
+ };
+
+ boolean isHistory = false;
+
+ long interval;
+
+ int cycle;
+
+ String format;
+
+ String hisDateString;
+
+ static final Log logger = LogFactory.getLog(CyclePeriod.class);
+
+ void argsParser(String expr) throws ParseException {
+ if (expr.matches("^\\d+$")) {
+ isHistory = true;
+ hisDateString = expr;
+ }
+ }
+
+ Date format(Date strDate) {
+ throw new RuntimeException(String.format("unsupported type.", strDate));
+ }
+
+ /**
+ * expr可能是yyyymmdd 或者 20210917
+ *
+ * @param expr
+ * @param format
+ * @return
+ */
+ final boolean checkFormat(String expr, String format) {
+
+ if (!isHistory) {
+ return expr.equalsIgnoreCase(format);
+ }
+
+ try {
+ new SimpleDateFormat(format).parse(expr);
+ return true;
+ } catch (ParseException e) {
+ logger.error(String.format("error format, expr is %s, format is %s.", expr, format));
+ e.printStackTrace();
+ return false;
+ }
+ }
+
+ public Date getHisDate() throws ParseException {
+ return getDateFormat().parse(hisDateString);
+ }
+
+ public SimpleDateFormat getDateFormat() {
+ return new SimpleDateFormat(format);
+ }
+
+ public long getInterval() {
+ return interval;
+ }
+
+ public boolean isHistory() {
+ return isHistory;
+ }
+
+ public void setHistory(boolean history) {
+ isHistory = history;
+ }
+
+ public void setInterval(long interval) {
+ this.interval = interval;
+ }
+
+ public int getCycle() {
+ return cycle;
+ }
+
+ public void setCycle(int cycle) {
+ this.cycle = cycle;
+ }
+
+ public void setFormat(String format) {
+ this.format = format;
+ }
+
+ public String getFormat() {
+ return format;
+ }
+
+ public String getHisDateString() {
+ return hisDateString;
+ }
+
+ public void setHisDateString(String hisDateString) {
+ this.hisDateString = hisDateString;
+ }
+
+ public static CyclePeriod getInstance(String expression) throws ParseException {
+
+ String[] str = expression.split("\\-");
+ assert str.length == 2 : String.format("expression error : %s. ", expression);
+ String expr = str[0].trim();
+ String tmp = str[1].trim().toLowerCase();
+ String cycleStr = tmp.substring(0, tmp.length() - 1);
+ int cycle = Integer.parseInt(cycleStr);
+ CyclePeriod cyclePeriod = null;
+ if (tmp.endsWith("d")) {
+ cyclePeriod = CYCLE_PERIOD_DATE;
+ } else if (tmp.endsWith("h")) {
+ cyclePeriod = CYCLE_PERIOD_HOUR;
+ } else if (tmp.endsWith("m")) {
+ cyclePeriod = CYCLE_PERIOD_MINUTE;
+ } else {
+ new RuntimeException(String.format("unsupported format : %s", expression));
+ }
+ cyclePeriod.argsParser(expr);
+ cyclePeriod.cycle = cycle;
+
+ return cyclePeriod;
+ }
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CycleSchedule.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CycleSchedule.java
new file mode 100644
index 00000000..ba9a2797
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CycleSchedule.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.source.filter;
+
+import java.io.Serializable;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
+
+/**
+ * @description 用来做分区选取
+ */
+public class CycleSchedule implements Serializable {
+
+ private static final long serialVersionUID = -5151597286296228754L;
+ public static final int INIT_CYCLE_VERSION = 0;
+ private static CycleSchedule INSTANCE;
+ CyclePeriod cyclePeriod;
+ AtomicLong cycleId = new AtomicLong(0);
+ String expression;
+ boolean isInit;
+ //历史数据读取时使用,表示比起当前相差多少个调度周期
+ final long cycleDiff;
+
+ public CycleSchedule(String expr, Date date) throws ParseException {
+ Date local = subMs(date);
+ expression = expr;
+ cycleId.set(INIT_CYCLE_VERSION);
+ cyclePeriod = CyclePeriod.getInstance(expression);
+ isInit = true;
+ if (cyclePeriod.isHistory) {
+ Date curCycleDateTime = calCycleDateTime(local);
+ Date tmp = subMs(cyclePeriod.getHisDate());
+ cycleDiff = curCycleDateTime.getTime() - tmp.getTime();
+ } else {
+ cycleDiff = 0;
+ }
+ }
+
+ /**
+ * 去掉毫秒时间戳
+ *
+ * @param date
+ * @return
+ */
+ private Date subMs(Date date) {
+ long time = date.getTime() / 1000 * 1000;
+ return new Date(time);
+ }
+
+ /**
+ * @return 返回date格式的调度周期时间
+ */
+ private Date calCycleDateTime(Date date) {
+ return cyclePeriod.format(date);
+ }
+
+ public Cycle nextCycle(Date date) {
+ Date local = subMs(date);
+ local = cyclePeriod.format(local);
+ if (isInit) {
+ isInit = false;
+ } else {
+ cycleId.incrementAndGet();
+ }
+ List<String> ret = calAllPattern(local);
+ Cycle cycle = new Cycle();
+ cycle.setCycleId(cycleId.get());
+ cycle.setAllPattern(ret);
+ cycle.setCycleDateStr(calCycleDateStr(local));
+ cycle.setCycleCount(cyclePeriod.getCycle());
+ cycle.setCurDateStr(cyclePeriod.getDateFormat().format(local));
+ cycle.setCycleDiff(cycleDiff);
+ return cycle;
+ }
+
+ private String calCycleDateStr(Date date) {
+ long d = date.getTime() - cycleDiff;
+ Date d1 = new Date(d);
+ return cyclePeriod.getDateFormat().format(d1);
+ }
+
+ private List<String> calAllPattern(Date date) {
+ List<String> allPatterns = new ArrayList<>();
+ for (int i = 1; i <= cyclePeriod.getCycle(); i++) {
+ long d = date.getTime() - i * cyclePeriod.getInterval() - cycleDiff;
+ String s = cyclePeriod.getDateFormat().format(new Date(d));
+ allPatterns.add(s);
+ }
+ return allPatterns;
+ }
+
+ public CyclePeriod getCyclePeriod() {
+ return cyclePeriod;
+ }
+
+ public void setCyclePeriod(CyclePeriod cyclePeriod) {
+ this.cyclePeriod = cyclePeriod;
+ }
+
+ public AtomicLong getCycleId() {
+ return cycleId;
+ }
+
+ public void setCycleId(AtomicLong cycleId) {
+ this.cycleId = cycleId;
+ }
+
+ public String getExpression() {
+ return expression;
+ }
+
+ public void setExpression(String expression) {
+ this.expression = expression;
+ }
+
+ public boolean isInit() {
+ return isInit;
+ }
+
+ public void setInit(boolean init) {
+ isInit = init;
+ }
+
+ public long getCycleDiff() {
+ return cycleDiff;
+ }
+
+ public static CycleSchedule getInstance(String expr, Date date) {
+ if (INSTANCE == null) {
+ synchronized (CycleSchedule.class) {
+ if (INSTANCE == null) {
+ try {
+ INSTANCE = new CycleSchedule(expr, date);
+ } catch (ParseException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ return INSTANCE;
+ }
+
+ public static class Cycle extends BasedConfigurable implements Serializable {
+
+ private static final long serialVersionUID = 4842560538716388622L;
+ Long cycleId;
+ List<String> allPattern;
+ String cycleDateStr;
+ Integer cycleCount;
+ String curDateStr;
+ long cycleDiff;
+
+ public Integer getCycleCount() {
+ return cycleCount;
+ }
+
+ public void setCycleCount(Integer cycleCount) {
+ this.cycleCount = cycleCount;
+ }
+
+ public Cycle() {
+ }
+
+ public Long getCycleId() {
+ return cycleId;
+ }
+
+ public void setCycleId(Long cycleId) {
+ this.cycleId = cycleId;
+ }
+
+ public List<String> getAllPattern() {
+ return allPattern;
+ }
+
+ public void setAllPattern(List<String> allPattern) {
+ this.allPattern = allPattern;
+ }
+
+ public String getCycleDateStr() {
+ return cycleDateStr;
+ }
+
+ public void setCycleDateStr(String cycleDateStr) {
+ this.cycleDateStr = cycleDateStr;
+ }
+
+ public String getCurDateStr() {
+ return curDateStr;
+ }
+
+ public void setCurDateStr(String curDateStr) {
+ this.curDateStr = curDateStr;
+ }
+
+ public long getCycleDiff() {
+ return cycleDiff;
+ }
+
+ public void setCycleDiff(long cycleDiff) {
+ this.cycleDiff = cycleDiff;
+ }
+
+ @Override
+ public String toString() {
+ return "Cycle{" +
+ "cycleId=" + cycleId +
+ ", cycleDateStr='" + cycleDateStr + '\'' +
+ ", cycleCount=" + cycleCount +
+ ", curDateStr='" + curDateStr + '\'' +
+ ", cycleDiff=" + cycleDiff +
+ ", allPattern=" + Arrays.toString(allPattern.toArray()) +
+ '}';
+ }
+ }
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CycleScheduleFilter.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CycleScheduleFilter.java
new file mode 100644
index 00000000..507739d8
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/CycleScheduleFilter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.source.filter;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * @description
+ */
+public class CycleScheduleFilter extends AbstractPatternFilter implements Serializable {
+
+ List<String> allPattern;
+
+ public CycleScheduleFilter(List<String> allPattern){
+ this.allPattern = allPattern;
+ }
+
+ @Override
+ public boolean filter(String sourceName, String logicTableName, String tableName) {
+ return allPattern.contains(tableName);
+ }
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/DataFormatPatternFilter.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/DataFormatPatternFilter.java
new file mode 100644
index 00000000..0cdc0762
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/DataFormatPatternFilter.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.source.filter;
+
+import java.io.Serializable;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @description
+ */
+public class DataFormatPatternFilter extends AbstractPatternFilter implements Serializable {
+
+ private static final long serialVersionUID = 3604787588465242642L;
+
+ static final Log logger = LogFactory.getLog(DataFormatPatternFilter.class);
+
+ static final String yyyyMMddHHmmss = "yyyyMMddHHmmss";
+ static final String yyyyMMdd = "yyyyMMdd";
+ static final String yyyyMMddHH = "yyyyMMddHH";
+
+ SimpleDateFormat format1 = new SimpleDateFormat(yyyyMMdd);
+ SimpleDateFormat format2 = new SimpleDateFormat(yyyyMMddHH);
+ SimpleDateFormat format3 = new SimpleDateFormat(yyyyMMddHHmmss);
+
+ @Override
+ public boolean filter(String sourceName, String logicTableName, String tableNameSuffix) {
+
+ int len = tableNameSuffix.length();
+ boolean isFilter = false;
+
+ switch (len) {
+ case 8:
+ try {
+ format1.parse(tableNameSuffix);
+ isFilter = true;
+ } catch (ParseException e) {
+ e.printStackTrace();
+ isFilter = false;
+ }
+ break;
+ case 10:
+ try {
+ format2.parse(tableNameSuffix);
+ isFilter = true;
+ } catch (ParseException e) {
+ e.printStackTrace();
+ isFilter = false;
+ }
+ break;
+ case 14:
+ try {
+ format3.parse(tableNameSuffix);
+ isFilter = true;
+ } catch (ParseException e) {
+ e.printStackTrace();
+ isFilter = false;
+ }
+ break;
+ }
+
+ if (isFilter) {
+ logger.info(String.format("filter sourceName %s, logicTableName %s, suffix %s", sourceName, logicTableName, tableNameSuffix));
+ return true;
+ }
+ if (next != null) {
+ return next.filter(sourceName, logicTableName, tableNameSuffix);
+ }
+ return false;
+ }
+
+ @Override
+ public PatternFilter setNext(PatternFilter filter) {
+ super.setNext(filter);
+ return this;
+ }
+
+ public PatternFilter getNext() {
+ return next;
+ }
+
+ public static void main(String[] args) {
+ DataFormatPatternFilter filter = new DataFormatPatternFilter();
+// System.out.println(filter.filter("20200101"));
+// System.out.println(filter.filter("2020010101"));
+// System.out.println(filter.filter("20200101010101"));
+
+ }
+
+}
diff --git a/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/PatternFilter.java b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/PatternFilter.java
new file mode 100644
index 00000000..42365007
--- /dev/null
+++ b/rocketmq-streams-connectors/src/main/java/org/apache/rocketmq/streams/connectors/source/filter/PatternFilter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.streams.connectors.source.filter;
+
+/**
+ * @description
+ */
+public interface PatternFilter {
+
+ String yyyyMMddHHmmss = "yyyyMMddHHmmss";
+ String yyyyMMdd = "yyyyMMdd";
+ String yyyyMMddHH = "yyyyMMddHH";
+ String yyyyMMddHHmm = "yyyyMMddHHmm";
+
+
+ /**
+ * 根据sourceName和tableName判断是否符合
+ * @param sourceName
+ * @param tableName
+ * @return
+ */
+ boolean filter(String sourceName, String logicTableName, String tableName);
+
+ PatternFilter setNext(PatternFilter filter);
+
+
+}
diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java
index cf09bf48..4aa86ae4 100644
--- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java
+++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/offset/WindowMaxValueProcessor.java
@@ -127,7 +127,7 @@ public class WindowMaxValueProcessor {
}
String keyPrefix = MapKeyUtil.createKey(name, splitId);
- String sql="select * from "+ ORMUtil.getTableName(WindowMaxValue.class)+ " where msg_key like '"+keyPrefix+"%'";
+ String sql = "select * from " + ORMUtil.getTableName(WindowMaxValue.class) + " where configure_name like '%" + name + "%' and partition like '%" + splitId + "%'";
List<WindowMaxValue> windowMaxValues = ORMUtil.queryForList(sql, null, WindowMaxValue.class);
if (windowMaxValues == null || windowMaxValues.size() == 0) {
return result;