You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:16 UTC
[rocketmq-connect] 06/39: 【ISSUE #278】Define and Implement the RmqConnector and RmqSourceTask. (#381)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit c1a51cc8e2223ea43534a9010a86b6930d391675
Author: chuenfaiy <ch...@163.com>
AuthorDate: Tue Aug 13 11:40:22 2019 +0800
【ISSUE #278】Define and Implement the RmqConnector and RmqSourceTask. (#381)
* init commit
* [Fix]fix some bugs in connectors and fastjson.
* [Update]update the version of replicator
* [Update]remove the repetitive properties
* [Update]Add the license at the head of TaskTopicInfo class and TaskConfigEnum class
* [Update]update the project name from to
* [Update]update the language of README.md
* [Update]change the title of README.md
* [Update]change the package name from to
* [Update]update the starting position
* [Update]update the starting position
* [Fix]fix the bug of instanceName setting
---
README.md | 18 ++--
pom.xml | 4 +-
.../{connector => replicator}/RmqConstants.java | 4 +-
.../RmqSourceReplicator.java} | 119 ++++++++++-----------
.../{connector => replicator}/RmqSourceTask.java | 14 +--
.../common/ConstDefine.java | 2 +-
.../{connector => replicator}/common/Utils.java | 2 +-
.../config/ConfigDefine.java | 2 +-
.../config/ConfigUtil.java | 2 +-
.../{connector => replicator}/config/DataType.java | 2 +-
.../config/TaskConfig.java | 2 +-
.../config/TaskConfigEnum.java | 2 +-
.../config/TaskDivideConfig.java | 2 +-
.../config/TaskTopicInfo.java | 2 +-
.../schema/FieldName.java | 2 +-
.../strategy/DivideStrategyEnum.java | 2 +-
.../strategy/DivideTaskByQueue.java | 8 +-
.../strategy/DivideTaskByTopic.java | 5 +-
.../strategy/TaskDivideStrategy.java | 5 +-
19 files changed, 95 insertions(+), 104 deletions(-)
diff --git a/README.md b/README.md
index 0b8bef7..77bcb94 100644
--- a/README.md
+++ b/README.md
@@ -1,13 +1,13 @@
# rocketmq-replicator
-# 启动参数选择
+# boot-parameter
-参数 | 类型 |是否必须 |说明|示例值
+parameter | type | must | description | sample value
---|---|---|---|---|
-source-rocketmq | 字符串 | 是 | 源rocketmq集群namesrv地址 | 192.168.1.2:9876 |
-target-rocketmq | 字符串 | 是 | 源rocketmq集群namesrv地址 | 192.168.1.2:9876 |
-replicator-store-topic | 字符串 | 是 | replicator存储topic,需要在runtime的mq集群提前创建 | replicator-store-topic |
-task-divide-strategy | 整型 | 否 | 任务切割策略,可以按照主题和队列来切割,目前只支持主题切割且主题对应值为0 | 0 |
-white-list | 字符串 | 是 | 复制主题白名单,多个topic之间使用逗号分隔 | topic-1,topic-2 |
-task-parallelism | 整型 | 否 | 任务并行度,默认值为1,当topic数大于task数时,一个task将负责多个topic | 2 |
-source-record-converter | 字符串 | 是 | 源数据解析器,目前使用的是Json解析器 | io.openmessaging.connect.runtime.converter.JsonConverter |
\ No newline at end of file
+source-rocketmq | String | Yes | namesrv address of source rocketmq cluster | 192.168.1.2:9876 |
+target-rocketmq | String | Yes | namesrv address of target rocketmq cluster | 192.168.1.2:9876 |
+replicator-store-topic | String | Yes | topic name to store all source messages | replicator-store-topic |
+task-divide-strategy | Integer | No | task dividing strategy, default value is 0 for dividing by topic | 0 |
+white-list | String | Yes | topic white list and multiple fields are separated by commas | topic-1,topic-2 |
+task-parallelism | String | No | task parallelism,default value is 1,one task will be responsible for multiple topics for the value greater than 1 | 2 |
+source-record-converter | String | Yes | source data parser | io.openmessaging.connect.runtime.converter.JsonConverter |
diff --git a/pom.xml b/pom.xml
index 37bf78f..d283ca9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,8 +22,8 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-connector</artifactId>
- <version>0.0.1-SNAPSHOT</version>
+ <artifactId>rocketmq-replicator</artifactId>
+ <version>0.1.0-SNAPSHOT</version>
<properties>
<rocketmq.version>4.4.0</rocketmq.version>
diff --git a/src/main/java/org/apache/rocketmq/connector/RmqConstants.java b/src/main/java/org/apache/rocketmq/replicator/RmqConstants.java
similarity index 90%
rename from src/main/java/org/apache/rocketmq/connector/RmqConstants.java
rename to src/main/java/org/apache/rocketmq/replicator/RmqConstants.java
index 1278424..4994abe 100644
--- a/src/main/java/org/apache/rocketmq/connector/RmqConstants.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqConstants.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.connector;
+package org.apache.rocketmq.replicator;
public class RmqConstants {
@@ -26,6 +26,8 @@ public class RmqConstants {
public static final String NEXT_POSITION = "nextPosition";
+ public static final String SOURCE_INSTANCE_NAME = "REPLICATOR_SOURCE_CONSUMER";
+
public static String getPartition(String topic, String broker, String queueId) {
return new StringBuilder().append(broker).append(topic).append(queueId).toString();
}
diff --git a/src/main/java/org/apache/rocketmq/connector/RmqSourceConnector.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
similarity index 54%
rename from src/main/java/org/apache/rocketmq/connector/RmqSourceConnector.java
rename to src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
index 33eb958..c1f350f 100644
--- a/src/main/java/org/apache/rocketmq/connector/RmqSourceConnector.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.connector;
+package org.apache.rocketmq.replicator;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.Task;
@@ -24,15 +24,15 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.connector.common.ConstDefine;
-import org.apache.rocketmq.connector.common.Utils;
-import org.apache.rocketmq.connector.config.ConfigDefine;
-import org.apache.rocketmq.connector.config.DataType;
-import org.apache.rocketmq.connector.config.TaskDivideConfig;
-import org.apache.rocketmq.connector.strategy.DivideStrategyEnum;
-import org.apache.rocketmq.connector.strategy.DivideTaskByQueue;
-import org.apache.rocketmq.connector.strategy.DivideTaskByTopic;
-import org.apache.rocketmq.connector.strategy.TaskDivideStrategy;
+import org.apache.rocketmq.replicator.common.ConstDefine;
+import org.apache.rocketmq.replicator.common.Utils;
+import org.apache.rocketmq.replicator.config.ConfigDefine;
+import org.apache.rocketmq.replicator.config.DataType;
+import org.apache.rocketmq.replicator.config.TaskDivideConfig;
+import org.apache.rocketmq.replicator.strategy.DivideStrategyEnum;
+import org.apache.rocketmq.replicator.strategy.DivideTaskByQueue;
+import org.apache.rocketmq.replicator.strategy.DivideTaskByTopic;
+import org.apache.rocketmq.replicator.strategy.TaskDivideStrategy;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.slf4j.Logger;
@@ -40,15 +40,15 @@ import org.slf4j.LoggerFactory;
import java.util.*;
-public class RmqSourceConnector extends SourceConnector {
+public class RmqSourceReplicator extends SourceConnector {
- private static final Logger log = LoggerFactory.getLogger(RmqSourceConnector.class);
+ private static final Logger log = LoggerFactory.getLogger(RmqSourceReplicator.class);
private boolean syncDLQ = false;
private boolean syncRETRY = false;
- private KeyValue config;
+ private KeyValue replicatorConfig;
private Map<String, List<MessageQueue>> topicRouteMap;
@@ -56,15 +56,11 @@ public class RmqSourceConnector extends SourceConnector {
private Set<String> whiteList;
- private volatile boolean started = false;
-
private volatile boolean configValid = false;
- private DefaultMQAdminExt defaultMQAdminExt;
-
private int taskParallelism = 1;
- public RmqSourceConnector() {
+ public RmqSourceReplicator() {
topicRouteMap = new HashMap<String, List<MessageQueue>>();
whiteList = new HashSet<String>();
@@ -80,7 +76,7 @@ public class RmqSourceConnector extends SourceConnector {
}
// check the whitelist, whitelist is required.
- String whileListStr = this.config.getString(ConfigDefine.CONN_WHITE_LIST);
+ String whileListStr = config.getString(ConfigDefine.CONN_WHITE_LIST);
String[] wl = whileListStr.trim().split(",");
if (wl.length <= 0) return "White list must be not empty.";
else {
@@ -89,46 +85,26 @@ public class RmqSourceConnector extends SourceConnector {
}
}
- if (this.config.containsKey(ConfigDefine.CONN_TASK_DIVIDE_STRATEGY) &&
- this.config.getInt(ConfigDefine.CONN_TASK_DIVIDE_STRATEGY) == DivideStrategyEnum.BY_QUEUE.ordinal()) {
+ if (config.containsKey(ConfigDefine.CONN_TASK_DIVIDE_STRATEGY) &&
+ config.getInt(ConfigDefine.CONN_TASK_DIVIDE_STRATEGY) == DivideStrategyEnum.BY_QUEUE.ordinal()) {
this.taskDivideStrategy = new DivideTaskByQueue();
} else {
this.taskDivideStrategy = new DivideTaskByTopic();
}
if (config.containsKey(ConfigDefine.CONN_TASK_PARALLELISM)) {
- this.taskParallelism = this.config.getInt(ConfigDefine.CONN_TASK_PARALLELISM);
+ this.taskParallelism = config.getInt(ConfigDefine.CONN_TASK_PARALLELISM);
}
- this.config = config;
+ this.replicatorConfig = config;
this.configValid = true;
return "";
}
public void start() {
-
- if (configValid) {
- RPCHook rpcHook = null;
- this.defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
- this.defaultMQAdminExt.setNamesrvAddr(this.config.getString(ConfigDefine.CONN_SOURCE_RMQ));
- this.defaultMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
- this.defaultMQAdminExt.setInstanceName(Utils.createInstanceName(this.config.getString(ConfigDefine.CONN_SOURCE_RMQ)));
- try {
- defaultMQAdminExt.start();
- } catch (MQClientException e) {
- log.error("Replicator start failed for `defaultMQAdminExt` exception.", e);
- }
- started = true;
- }
}
public void stop() {
- if (started) {
- if (defaultMQAdminExt != null) {
- defaultMQAdminExt.shutdown();
- }
- started = false;
- }
}
public void pause() {
@@ -146,33 +122,52 @@ public class RmqSourceConnector extends SourceConnector {
public List<KeyValue> taskConfigs() {
- if (started && configValid) {
+ if (configValid) {
+
+ boolean adminStarted = false;
+ RPCHook rpcHook = null;
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ defaultMQAdminExt.setNamesrvAddr(this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ));
+ defaultMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
+ defaultMQAdminExt.setInstanceName(Utils.createInstanceName(this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ)));
+
try {
- for (String topic : this.whiteList) {
- if ((syncRETRY && topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) ||
- (syncDLQ && topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) ||
- !topic.equals(ConfigDefine.CONN_STORE_TOPIC)) {
-
- TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
- if (!topicRouteMap.containsKey(topic)) {
- topicRouteMap.put(topic, new ArrayList<MessageQueue>());
- }
- for (QueueData qd : topicRouteData.getQueueDatas()) {
- for (int i = 0; i < qd.getReadQueueNums(); i++) {
- MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
- topicRouteMap.get(topic).add(mq);
+ defaultMQAdminExt.start();
+ adminStarted = true;
+ } catch (MQClientException e) {
+ log.error("Replicator start failed for `defaultMQAdminExt` exception.", e);
+ }
+
+ if (adminStarted) {
+ try {
+ for (String topic : this.whiteList) {
+ if ((syncRETRY && topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) ||
+ (syncDLQ && topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) ||
+ !topic.equals(ConfigDefine.CONN_STORE_TOPIC)) {
+
+ TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+ if (!topicRouteMap.containsKey(topic)) {
+ topicRouteMap.put(topic, new ArrayList<MessageQueue>());
+ }
+ for (QueueData qd : topicRouteData.getQueueDatas()) {
+ for (int i = 0; i < qd.getReadQueueNums(); i++) {
+ MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
+ topicRouteMap.get(topic).add(mq);
+ }
}
}
}
+ } catch (Exception e) {
+ log.error("Fetch topic list error.", e);
+ } finally {
+ defaultMQAdminExt.shutdown();
}
- } catch (Exception e) {
- log.error("Fetch topic list error.", e);
}
TaskDivideConfig tdc = new TaskDivideConfig(
- this.config.getString(ConfigDefine.CONN_SOURCE_RMQ),
- this.config.getString(ConfigDefine.CONN_STORE_TOPIC),
- this.config.getString(ConfigDefine.CONN_SOURCE_RECORD_CONVERTER),
+ this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ),
+ this.replicatorConfig.getString(ConfigDefine.CONN_STORE_TOPIC),
+ this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RECORD_CONVERTER),
DataType.COMMON_MESSAGE.ordinal(),
this.taskParallelism
);
diff --git a/src/main/java/org/apache/rocketmq/connector/RmqSourceTask.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
similarity index 95%
rename from src/main/java/org/apache/rocketmq/connector/RmqSourceTask.java
rename to src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index 7b3b011..50a88bb 100644
--- a/src/main/java/org/apache/rocketmq/connector/RmqSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.connector;
+package org.apache.rocketmq.replicator;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
@@ -25,12 +25,12 @@ import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.connector.common.Utils;
-import org.apache.rocketmq.connector.config.ConfigUtil;
-import org.apache.rocketmq.connector.config.DataType;
-import org.apache.rocketmq.connector.config.TaskConfig;
-import org.apache.rocketmq.connector.config.TaskTopicInfo;
-import org.apache.rocketmq.connector.schema.FieldName;
+import org.apache.rocketmq.replicator.common.Utils;
+import org.apache.rocketmq.replicator.config.ConfigUtil;
+import org.apache.rocketmq.replicator.config.DataType;
+import org.apache.rocketmq.replicator.config.TaskConfig;
+import org.apache.rocketmq.replicator.config.TaskTopicInfo;
+import org.apache.rocketmq.replicator.schema.FieldName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/src/main/java/org/apache/rocketmq/connector/common/ConstDefine.java b/src/main/java/org/apache/rocketmq/replicator/common/ConstDefine.java
similarity index 95%
rename from src/main/java/org/apache/rocketmq/connector/common/ConstDefine.java
rename to src/main/java/org/apache/rocketmq/replicator/common/ConstDefine.java
index 6e1eeac..08d985b 100644
--- a/src/main/java/org/apache/rocketmq/connector/common/ConstDefine.java
+++ b/src/main/java/org/apache/rocketmq/replicator/common/ConstDefine.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.connector.common;
+package org.apache.rocketmq.replicator.common;
public class ConstDefine {
diff --git a/src/main/java/org/apache/rocketmq/connector/common/Utils.java b/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
similarity index 97%
rename from src/main/java/org/apache/rocketmq/connector/common/Utils.java
rename to src/main/java/org/apache/rocketmq/replicator/common/Utils.java
index 30fe44c..0e4766d 100644
--- a/src/main/java/org/apache/rocketmq/connector/common/Utils.java
+++ b/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.connector.common;
+package org.apache.rocketmq.replicator.common;
import java.util.ArrayList;
import java.util.Collections;
diff --git a/src/main/java/org/apache/rocketmq/connector/config/ConfigDefine.java b/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
similarity index 97%
rename from src/main/java/org/apache/rocketmq/connector/config/ConfigDefine.java
rename to src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
index 13f8960..3934c2f 100644
--- a/src/main/java/org/apache/rocketmq/connector/config/ConfigDefine.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.connector.config;
+package org.apache.rocketmq.replicator.config;
import java.util.HashSet;
import java.util.Set;
diff --git a/src/main/java/org/apache/rocketmq/connector/config/ConfigUtil.java b/src/main/java/org/apache/rocketmq/replicator/config/ConfigUtil.java
similarity index 98%
rename from src/main/java/org/apache/rocketmq/connector/config/ConfigUtil.java
rename to src/main/java/org/apache/rocketmq/replicator/config/ConfigUtil.java
index d2dd7da..5da92bc 100644
--- a/src/main/java/org/apache/rocketmq/connector/config/ConfigUtil.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/ConfigUtil.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.connector.config;
+package org.apache.rocketmq.replicator.config;
import io.openmessaging.KeyValue;
diff --git a/src/main/java/org/apache/rocketmq/connector/config/DataType.java b/src/main/java/org/apache/rocketmq/replicator/config/DataType.java
similarity index 95%
rename from src/main/java/org/apache/rocketmq/connector/config/DataType.java
rename to src/main/java/org/apache/rocketmq/replicator/config/DataType.java
index 3e77a3a..75f772e 100644
--- a/src/main/java/org/apache/rocketmq/connector/config/DataType.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/DataType.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.connector.config;
+package org.apache.rocketmq.replicator.config;
public enum DataType {
diff --git a/src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java b/src/main/java/org/apache/rocketmq/replicator/config/TaskConfig.java
similarity index 97%
rename from src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java
rename to src/main/java/org/apache/rocketmq/replicator/config/TaskConfig.java
index d480b90..e85280f 100644
--- a/src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/TaskConfig.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.connector.config;
+package org.apache.rocketmq.replicator.config;
public class TaskConfig {
diff --git a/src/main/java/org/apache/rocketmq/connector/config/TaskConfigEnum.java b/src/main/java/org/apache/rocketmq/replicator/config/TaskConfigEnum.java
similarity index 96%
rename from src/main/java/org/apache/rocketmq/connector/config/TaskConfigEnum.java
rename to src/main/java/org/apache/rocketmq/replicator/config/TaskConfigEnum.java
index fca4dcc..1516f7a 100644
--- a/src/main/java/org/apache/rocketmq/connector/config/TaskConfigEnum.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/TaskConfigEnum.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.connector.config;
+package org.apache.rocketmq.replicator.config;
public enum TaskConfigEnum {
diff --git a/src/main/java/org/apache/rocketmq/connector/config/TaskDivideConfig.java b/src/main/java/org/apache/rocketmq/replicator/config/TaskDivideConfig.java
similarity index 98%
rename from src/main/java/org/apache/rocketmq/connector/config/TaskDivideConfig.java
rename to src/main/java/org/apache/rocketmq/replicator/config/TaskDivideConfig.java
index 7f904b3..e6a8144 100644
--- a/src/main/java/org/apache/rocketmq/connector/config/TaskDivideConfig.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/TaskDivideConfig.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.connector.config;
+package org.apache.rocketmq.replicator.config;
public class TaskDivideConfig {
diff --git a/src/main/java/org/apache/rocketmq/connector/config/TaskTopicInfo.java b/src/main/java/org/apache/rocketmq/replicator/config/TaskTopicInfo.java
similarity index 97%
rename from src/main/java/org/apache/rocketmq/connector/config/TaskTopicInfo.java
rename to src/main/java/org/apache/rocketmq/replicator/config/TaskTopicInfo.java
index e1f47d5..f078028 100644
--- a/src/main/java/org/apache/rocketmq/connector/config/TaskTopicInfo.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/TaskTopicInfo.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.connector.config;
+package org.apache.rocketmq.replicator.config;
public class TaskTopicInfo {
diff --git a/src/main/java/org/apache/rocketmq/connector/schema/FieldName.java b/src/main/java/org/apache/rocketmq/replicator/schema/FieldName.java
similarity index 95%
rename from src/main/java/org/apache/rocketmq/connector/schema/FieldName.java
rename to src/main/java/org/apache/rocketmq/replicator/schema/FieldName.java
index 9f4dc47..913ffca 100644
--- a/src/main/java/org/apache/rocketmq/connector/schema/FieldName.java
+++ b/src/main/java/org/apache/rocketmq/replicator/schema/FieldName.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.connector.schema;
+package org.apache.rocketmq.replicator.schema;
public enum FieldName {
COMMON_MESSAGE("MessageExt");
diff --git a/src/main/java/org/apache/rocketmq/connector/strategy/DivideStrategyEnum.java b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideStrategyEnum.java
similarity index 94%
rename from src/main/java/org/apache/rocketmq/connector/strategy/DivideStrategyEnum.java
rename to src/main/java/org/apache/rocketmq/replicator/strategy/DivideStrategyEnum.java
index 9dc060f..fb46be3 100644
--- a/src/main/java/org/apache/rocketmq/connector/strategy/DivideStrategyEnum.java
+++ b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideStrategyEnum.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.connector.strategy;
+package org.apache.rocketmq.replicator.strategy;
public enum DivideStrategyEnum {
diff --git a/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByQueue.java b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
similarity index 83%
rename from src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByQueue.java
rename to src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
index caa69f3..77ed871 100644
--- a/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByQueue.java
+++ b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
@@ -14,15 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.connector.strategy;
+package org.apache.rocketmq.replicator.strategy;
import io.openmessaging.KeyValue;
-import io.openmessaging.internal.DefaultKeyValue;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.connector.config.ConfigDefine;
-import org.apache.rocketmq.connector.config.DataType;
-import org.apache.rocketmq.connector.config.TaskDivideConfig;
-
+import org.apache.rocketmq.replicator.config.TaskDivideConfig;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
diff --git a/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByTopic.java b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java
similarity index 96%
rename from src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByTopic.java
rename to src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java
index 9b019f1..e667d57 100644
--- a/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByTopic.java
+++ b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java
@@ -14,14 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.connector.strategy;
+package org.apache.rocketmq.replicator.strategy;
import com.alibaba.fastjson.JSONObject;
import io.openmessaging.KeyValue;
import io.openmessaging.internal.DefaultKeyValue;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.connector.config.*;
-
+import org.apache.rocketmq.replicator.config.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
diff --git a/src/main/java/org/apache/rocketmq/connector/strategy/TaskDivideStrategy.java b/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java
similarity index 90%
rename from src/main/java/org/apache/rocketmq/connector/strategy/TaskDivideStrategy.java
rename to src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java
index e80d092..0e0ac99 100644
--- a/src/main/java/org/apache/rocketmq/connector/strategy/TaskDivideStrategy.java
+++ b/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java
@@ -14,12 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.connector.strategy;
+package org.apache.rocketmq.replicator.strategy;
import io.openmessaging.KeyValue;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.connector.config.TaskDivideConfig;
-
+import org.apache.rocketmq.replicator.config.TaskDivideConfig;
import java.util.List;
import java.util.Map;