You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:16 UTC

[rocketmq-connect] 06/39: 【ISSUE #278】Define and Implement the RmqConnector and RmqSourceTask. (#381)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit c1a51cc8e2223ea43534a9010a86b6930d391675
Author: chuenfaiy <ch...@163.com>
AuthorDate: Tue Aug 13 11:40:22 2019 +0800

    【ISSUE #278】Define and Implement the RmqConnector and RmqSourceTask. (#381)
    
    * init commit
    
    * [Fix]fix some bugs in connectors and fastjson.
    
    * [Update]update the version of replicator
    
    * [Update]remove the repetitive properties
    
    * [Update]Add the license at the head of TaskTopicInfo class and TaskConfigEnum class
    
    * [Update]update the project name from  to
    
    * [Update]update the language of README.md
    
    * [Update]change the title of README.md
    
    * [Update]change the package name from  to
    
    * [Update]update the  starting position
    
    * [Update]update the  starting position
    
    * [Fix]fix the bug of  instanceName setting
---
 README.md                                          |  18 ++--
 pom.xml                                            |   4 +-
 .../{connector => replicator}/RmqConstants.java    |   4 +-
 .../RmqSourceReplicator.java}                      | 119 ++++++++++-----------
 .../{connector => replicator}/RmqSourceTask.java   |  14 +--
 .../common/ConstDefine.java                        |   2 +-
 .../{connector => replicator}/common/Utils.java    |   2 +-
 .../config/ConfigDefine.java                       |   2 +-
 .../config/ConfigUtil.java                         |   2 +-
 .../{connector => replicator}/config/DataType.java |   2 +-
 .../config/TaskConfig.java                         |   2 +-
 .../config/TaskConfigEnum.java                     |   2 +-
 .../config/TaskDivideConfig.java                   |   2 +-
 .../config/TaskTopicInfo.java                      |   2 +-
 .../schema/FieldName.java                          |   2 +-
 .../strategy/DivideStrategyEnum.java               |   2 +-
 .../strategy/DivideTaskByQueue.java                |   8 +-
 .../strategy/DivideTaskByTopic.java                |   5 +-
 .../strategy/TaskDivideStrategy.java               |   5 +-
 19 files changed, 95 insertions(+), 104 deletions(-)

diff --git a/README.md b/README.md
index 0b8bef7..77bcb94 100644
--- a/README.md
+++ b/README.md
@@ -1,13 +1,13 @@
 # rocketmq-replicator
 
-# 启动参数选择
+# boot-parameter
 
-参数 | 类型 |是否必须 |说明|示例值
+parameter | type | must | description | sample value
 ---|---|---|---|---|
-source-rocketmq | 字符串 | 是 | 源rocketmq集群namesrv地址 | 192.168.1.2:9876 |
-target-rocketmq | 字符串 | 是 | 源rocketmq集群namesrv地址 | 192.168.1.2:9876 |
-replicator-store-topic | 字符串 | 是 | replicator存储topic,需要在runtime的mq集群提前创建 | replicator-store-topic |
-task-divide-strategy | 整型 | 否 | 任务切割策略,可以按照主题和队列来切割,目前只支持主题切割且主题对应值为0 | 0 |
-white-list | 字符串 | 是 | 复制主题白名单,多个topic之间使用逗号分隔 | topic-1,topic-2 |
-task-parallelism | 整型 | 否 | 任务并行度,默认值为1,当topic数大于task数时,一个task将负责多个topic | 2 |
-source-record-converter | 字符串 | 是 | 源数据解析器,目前使用的是Json解析器 | io.openmessaging.connect.runtime.converter.JsonConverter |
\ No newline at end of file
+source-rocketmq | String | Yes | namesrv address of source rocketmq cluster | 192.168.1.2:9876 |
+target-rocketmq | String | Yes | namesrv address of target rocketmq cluster | 192.168.1.2:9876 |
+replicator-store-topic | String | Yes | topic name to store all source messages | replicator-store-topic |
+task-divide-strategy | Integer | No | task dividing strategy, default value is 0 for dividing by topic | 0 |
+white-list | String | Yes | topic white list and multiple fields are separated by commas | topic-1,topic-2 |
+task-parallelism | String | No | task parallelism,default value is 1,one task will be responsible for multiple topics for the value greater than 1 | 2 |
+source-record-converter | String | Yes | source data parser | io.openmessaging.connect.runtime.converter.JsonConverter |
diff --git a/pom.xml b/pom.xml
index 37bf78f..d283ca9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,8 +22,8 @@
     <modelVersion>4.0.0</modelVersion>
 
     <groupId>org.apache.rocketmq</groupId>
-    <artifactId>rocketmq-connector</artifactId>
-    <version>0.0.1-SNAPSHOT</version>
+    <artifactId>rocketmq-replicator</artifactId>
+    <version>0.1.0-SNAPSHOT</version>
 
     <properties>
         <rocketmq.version>4.4.0</rocketmq.version>
diff --git a/src/main/java/org/apache/rocketmq/connector/RmqConstants.java b/src/main/java/org/apache/rocketmq/replicator/RmqConstants.java
similarity index 90%
rename from src/main/java/org/apache/rocketmq/connector/RmqConstants.java
rename to src/main/java/org/apache/rocketmq/replicator/RmqConstants.java
index 1278424..4994abe 100644
--- a/src/main/java/org/apache/rocketmq/connector/RmqConstants.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqConstants.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.connector;
+package org.apache.rocketmq.replicator;
 
 public class RmqConstants {
 
@@ -26,6 +26,8 @@ public class RmqConstants {
 
     public static final String NEXT_POSITION = "nextPosition";
 
+    public static final String SOURCE_INSTANCE_NAME = "REPLICATOR_SOURCE_CONSUMER";
+
     public static String getPartition(String topic, String broker, String queueId) {
         return new StringBuilder().append(broker).append(topic).append(queueId).toString();
     }
diff --git a/src/main/java/org/apache/rocketmq/connector/RmqSourceConnector.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
similarity index 54%
rename from src/main/java/org/apache/rocketmq/connector/RmqSourceConnector.java
rename to src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
index 33eb958..c1f350f 100644
--- a/src/main/java/org/apache/rocketmq/connector/RmqSourceConnector.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.connector;
+package org.apache.rocketmq.replicator;
 
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.Task;
@@ -24,15 +24,15 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.connector.common.ConstDefine;
-import org.apache.rocketmq.connector.common.Utils;
-import org.apache.rocketmq.connector.config.ConfigDefine;
-import org.apache.rocketmq.connector.config.DataType;
-import org.apache.rocketmq.connector.config.TaskDivideConfig;
-import org.apache.rocketmq.connector.strategy.DivideStrategyEnum;
-import org.apache.rocketmq.connector.strategy.DivideTaskByQueue;
-import org.apache.rocketmq.connector.strategy.DivideTaskByTopic;
-import org.apache.rocketmq.connector.strategy.TaskDivideStrategy;
+import org.apache.rocketmq.replicator.common.ConstDefine;
+import org.apache.rocketmq.replicator.common.Utils;
+import org.apache.rocketmq.replicator.config.ConfigDefine;
+import org.apache.rocketmq.replicator.config.DataType;
+import org.apache.rocketmq.replicator.config.TaskDivideConfig;
+import org.apache.rocketmq.replicator.strategy.DivideStrategyEnum;
+import org.apache.rocketmq.replicator.strategy.DivideTaskByQueue;
+import org.apache.rocketmq.replicator.strategy.DivideTaskByTopic;
+import org.apache.rocketmq.replicator.strategy.TaskDivideStrategy;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.slf4j.Logger;
@@ -40,15 +40,15 @@ import org.slf4j.LoggerFactory;
 
 import java.util.*;
 
-public class RmqSourceConnector extends SourceConnector {
+public class RmqSourceReplicator extends SourceConnector {
 
-    private static final Logger log = LoggerFactory.getLogger(RmqSourceConnector.class);
+    private static final Logger log = LoggerFactory.getLogger(RmqSourceReplicator.class);
 
     private boolean syncDLQ = false;
 
     private boolean syncRETRY = false;
 
-    private KeyValue config;
+    private KeyValue replicatorConfig;
 
     private Map<String, List<MessageQueue>> topicRouteMap;
 
@@ -56,15 +56,11 @@ public class RmqSourceConnector extends SourceConnector {
 
     private Set<String> whiteList;
 
-    private volatile boolean started = false;
-
     private volatile boolean configValid = false;
 
-    private DefaultMQAdminExt defaultMQAdminExt;
-
     private int taskParallelism = 1;
 
-    public RmqSourceConnector() {
+    public RmqSourceReplicator() {
 
         topicRouteMap = new HashMap<String, List<MessageQueue>>();
         whiteList = new HashSet<String>();
@@ -80,7 +76,7 @@ public class RmqSourceConnector extends SourceConnector {
         }
 
         // check the whitelist, whitelist is required.
-        String whileListStr = this.config.getString(ConfigDefine.CONN_WHITE_LIST);
+        String whileListStr = config.getString(ConfigDefine.CONN_WHITE_LIST);
         String[] wl = whileListStr.trim().split(",");
         if (wl.length <= 0) return "White list must be not empty.";
         else {
@@ -89,46 +85,26 @@ public class RmqSourceConnector extends SourceConnector {
             }
         }
 
-        if (this.config.containsKey(ConfigDefine.CONN_TASK_DIVIDE_STRATEGY) &&
-                this.config.getInt(ConfigDefine.CONN_TASK_DIVIDE_STRATEGY) == DivideStrategyEnum.BY_QUEUE.ordinal()) {
+        if (config.containsKey(ConfigDefine.CONN_TASK_DIVIDE_STRATEGY) &&
+                config.getInt(ConfigDefine.CONN_TASK_DIVIDE_STRATEGY) == DivideStrategyEnum.BY_QUEUE.ordinal()) {
             this.taskDivideStrategy = new DivideTaskByQueue();
         } else {
             this.taskDivideStrategy = new DivideTaskByTopic();
         }
 
         if (config.containsKey(ConfigDefine.CONN_TASK_PARALLELISM)) {
-            this.taskParallelism = this.config.getInt(ConfigDefine.CONN_TASK_PARALLELISM);
+            this.taskParallelism = config.getInt(ConfigDefine.CONN_TASK_PARALLELISM);
         }
 
-        this.config = config;
+        this.replicatorConfig = config;
         this.configValid = true;
         return "";
     }
 
     public void start() {
-      
-        if (configValid) {
-            RPCHook rpcHook = null;
-            this.defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
-            this.defaultMQAdminExt.setNamesrvAddr(this.config.getString(ConfigDefine.CONN_SOURCE_RMQ));
-            this.defaultMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
-            this.defaultMQAdminExt.setInstanceName(Utils.createInstanceName(this.config.getString(ConfigDefine.CONN_SOURCE_RMQ)));
-            try {
-                defaultMQAdminExt.start();
-            } catch (MQClientException e) {
-                log.error("Replicator start failed for `defaultMQAdminExt` exception.", e);
-            }
-            started = true;
-        }
     }
 
     public void stop() {
-        if (started) {
-            if (defaultMQAdminExt != null) {
-                defaultMQAdminExt.shutdown();
-            }
-            started = false;
-        }
     }
 
     public void pause() {
@@ -146,33 +122,52 @@ public class RmqSourceConnector extends SourceConnector {
 
     public List<KeyValue> taskConfigs() {
       
-        if (started && configValid) {
+        if (configValid) {
+
+            boolean adminStarted = false;
+            RPCHook rpcHook = null;
+            DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+            defaultMQAdminExt.setNamesrvAddr(this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ));
+            defaultMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
+            defaultMQAdminExt.setInstanceName(Utils.createInstanceName(this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ)));
+
             try {
-                for (String topic : this.whiteList) {
-                    if ((syncRETRY && topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) ||
-                            (syncDLQ && topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) ||
-                            !topic.equals(ConfigDefine.CONN_STORE_TOPIC)) {
-
-                        TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
-                        if (!topicRouteMap.containsKey(topic)) {
-                            topicRouteMap.put(topic, new ArrayList<MessageQueue>());
-                        }
-                        for (QueueData qd : topicRouteData.getQueueDatas()) {
-                            for (int i = 0; i < qd.getReadQueueNums(); i++) {
-                                MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
-                                topicRouteMap.get(topic).add(mq);
+                defaultMQAdminExt.start();
+                adminStarted = true;
+            } catch (MQClientException e) {
+                log.error("Replicator start failed for `defaultMQAdminExt` exception.", e);
+            }
+
+            if (adminStarted) {
+                try {
+                    for (String topic : this.whiteList) {
+                        if ((syncRETRY && topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) ||
+                                (syncDLQ && topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) ||
+                                !topic.equals(ConfigDefine.CONN_STORE_TOPIC)) {
+
+                            TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+                            if (!topicRouteMap.containsKey(topic)) {
+                                topicRouteMap.put(topic, new ArrayList<MessageQueue>());
+                            }
+                            for (QueueData qd : topicRouteData.getQueueDatas()) {
+                                for (int i = 0; i < qd.getReadQueueNums(); i++) {
+                                    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
+                                    topicRouteMap.get(topic).add(mq);
+                                }
                             }
                         }
                     }
+                } catch (Exception e) {
+                    log.error("Fetch topic list error.", e);
+                } finally {
+                    defaultMQAdminExt.shutdown();
                 }
-            } catch (Exception e) {
-                log.error("Fetch topic list error.", e);
             }
 
             TaskDivideConfig tdc = new TaskDivideConfig(
-                    this.config.getString(ConfigDefine.CONN_SOURCE_RMQ),
-                    this.config.getString(ConfigDefine.CONN_STORE_TOPIC),
-                    this.config.getString(ConfigDefine.CONN_SOURCE_RECORD_CONVERTER),
+                    this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ),
+                    this.replicatorConfig.getString(ConfigDefine.CONN_STORE_TOPIC),
+                    this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RECORD_CONVERTER),
                     DataType.COMMON_MESSAGE.ordinal(),
                     this.taskParallelism
             );
diff --git a/src/main/java/org/apache/rocketmq/connector/RmqSourceTask.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
similarity index 95%
rename from src/main/java/org/apache/rocketmq/connector/RmqSourceTask.java
rename to src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index 7b3b011..50a88bb 100644
--- a/src/main/java/org/apache/rocketmq/connector/RmqSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.connector;
+package org.apache.rocketmq.replicator;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
@@ -25,12 +25,12 @@ import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.connector.common.Utils;
-import org.apache.rocketmq.connector.config.ConfigUtil;
-import org.apache.rocketmq.connector.config.DataType;
-import org.apache.rocketmq.connector.config.TaskConfig;
-import org.apache.rocketmq.connector.config.TaskTopicInfo;
-import org.apache.rocketmq.connector.schema.FieldName;
+import org.apache.rocketmq.replicator.common.Utils;
+import org.apache.rocketmq.replicator.config.ConfigUtil;
+import org.apache.rocketmq.replicator.config.DataType;
+import org.apache.rocketmq.replicator.config.TaskConfig;
+import org.apache.rocketmq.replicator.config.TaskTopicInfo;
+import org.apache.rocketmq.replicator.schema.FieldName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/src/main/java/org/apache/rocketmq/connector/common/ConstDefine.java b/src/main/java/org/apache/rocketmq/replicator/common/ConstDefine.java
similarity index 95%
rename from src/main/java/org/apache/rocketmq/connector/common/ConstDefine.java
rename to src/main/java/org/apache/rocketmq/replicator/common/ConstDefine.java
index 6e1eeac..08d985b 100644
--- a/src/main/java/org/apache/rocketmq/connector/common/ConstDefine.java
+++ b/src/main/java/org/apache/rocketmq/replicator/common/ConstDefine.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.connector.common;
+package org.apache.rocketmq.replicator.common;
 
 public class ConstDefine {
 
diff --git a/src/main/java/org/apache/rocketmq/connector/common/Utils.java b/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
similarity index 97%
rename from src/main/java/org/apache/rocketmq/connector/common/Utils.java
rename to src/main/java/org/apache/rocketmq/replicator/common/Utils.java
index 30fe44c..0e4766d 100644
--- a/src/main/java/org/apache/rocketmq/connector/common/Utils.java
+++ b/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.connector.common;
+package org.apache.rocketmq.replicator.common;
 
 import java.util.ArrayList;
 import java.util.Collections;
diff --git a/src/main/java/org/apache/rocketmq/connector/config/ConfigDefine.java b/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
similarity index 97%
rename from src/main/java/org/apache/rocketmq/connector/config/ConfigDefine.java
rename to src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
index 13f8960..3934c2f 100644
--- a/src/main/java/org/apache/rocketmq/connector/config/ConfigDefine.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.connector.config;
+package org.apache.rocketmq.replicator.config;
 
 import java.util.HashSet;
 import java.util.Set;
diff --git a/src/main/java/org/apache/rocketmq/connector/config/ConfigUtil.java b/src/main/java/org/apache/rocketmq/replicator/config/ConfigUtil.java
similarity index 98%
rename from src/main/java/org/apache/rocketmq/connector/config/ConfigUtil.java
rename to src/main/java/org/apache/rocketmq/replicator/config/ConfigUtil.java
index d2dd7da..5da92bc 100644
--- a/src/main/java/org/apache/rocketmq/connector/config/ConfigUtil.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/ConfigUtil.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.connector.config;
+package org.apache.rocketmq.replicator.config;
 
 import io.openmessaging.KeyValue;
 
diff --git a/src/main/java/org/apache/rocketmq/connector/config/DataType.java b/src/main/java/org/apache/rocketmq/replicator/config/DataType.java
similarity index 95%
rename from src/main/java/org/apache/rocketmq/connector/config/DataType.java
rename to src/main/java/org/apache/rocketmq/replicator/config/DataType.java
index 3e77a3a..75f772e 100644
--- a/src/main/java/org/apache/rocketmq/connector/config/DataType.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/DataType.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.connector.config;
+package org.apache.rocketmq.replicator.config;
 
 public enum DataType {
 
diff --git a/src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java b/src/main/java/org/apache/rocketmq/replicator/config/TaskConfig.java
similarity index 97%
rename from src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java
rename to src/main/java/org/apache/rocketmq/replicator/config/TaskConfig.java
index d480b90..e85280f 100644
--- a/src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/TaskConfig.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.connector.config;
+package org.apache.rocketmq.replicator.config;
 
 public class TaskConfig {
 
diff --git a/src/main/java/org/apache/rocketmq/connector/config/TaskConfigEnum.java b/src/main/java/org/apache/rocketmq/replicator/config/TaskConfigEnum.java
similarity index 96%
rename from src/main/java/org/apache/rocketmq/connector/config/TaskConfigEnum.java
rename to src/main/java/org/apache/rocketmq/replicator/config/TaskConfigEnum.java
index fca4dcc..1516f7a 100644
--- a/src/main/java/org/apache/rocketmq/connector/config/TaskConfigEnum.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/TaskConfigEnum.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.connector.config;
+package org.apache.rocketmq.replicator.config;
 
 public enum TaskConfigEnum {
 
diff --git a/src/main/java/org/apache/rocketmq/connector/config/TaskDivideConfig.java b/src/main/java/org/apache/rocketmq/replicator/config/TaskDivideConfig.java
similarity index 98%
rename from src/main/java/org/apache/rocketmq/connector/config/TaskDivideConfig.java
rename to src/main/java/org/apache/rocketmq/replicator/config/TaskDivideConfig.java
index 7f904b3..e6a8144 100644
--- a/src/main/java/org/apache/rocketmq/connector/config/TaskDivideConfig.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/TaskDivideConfig.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.connector.config;
+package org.apache.rocketmq.replicator.config;
 
 public class TaskDivideConfig {
 
diff --git a/src/main/java/org/apache/rocketmq/connector/config/TaskTopicInfo.java b/src/main/java/org/apache/rocketmq/replicator/config/TaskTopicInfo.java
similarity index 97%
rename from src/main/java/org/apache/rocketmq/connector/config/TaskTopicInfo.java
rename to src/main/java/org/apache/rocketmq/replicator/config/TaskTopicInfo.java
index e1f47d5..f078028 100644
--- a/src/main/java/org/apache/rocketmq/connector/config/TaskTopicInfo.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/TaskTopicInfo.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.connector.config;
+package org.apache.rocketmq.replicator.config;
 
 public class TaskTopicInfo {
 
diff --git a/src/main/java/org/apache/rocketmq/connector/schema/FieldName.java b/src/main/java/org/apache/rocketmq/replicator/schema/FieldName.java
similarity index 95%
rename from src/main/java/org/apache/rocketmq/connector/schema/FieldName.java
rename to src/main/java/org/apache/rocketmq/replicator/schema/FieldName.java
index 9f4dc47..913ffca 100644
--- a/src/main/java/org/apache/rocketmq/connector/schema/FieldName.java
+++ b/src/main/java/org/apache/rocketmq/replicator/schema/FieldName.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.connector.schema;
+package org.apache.rocketmq.replicator.schema;
 
 public enum FieldName {
     COMMON_MESSAGE("MessageExt");
diff --git a/src/main/java/org/apache/rocketmq/connector/strategy/DivideStrategyEnum.java b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideStrategyEnum.java
similarity index 94%
rename from src/main/java/org/apache/rocketmq/connector/strategy/DivideStrategyEnum.java
rename to src/main/java/org/apache/rocketmq/replicator/strategy/DivideStrategyEnum.java
index 9dc060f..fb46be3 100644
--- a/src/main/java/org/apache/rocketmq/connector/strategy/DivideStrategyEnum.java
+++ b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideStrategyEnum.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.connector.strategy;
+package org.apache.rocketmq.replicator.strategy;
 
 public enum DivideStrategyEnum {
 
diff --git a/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByQueue.java b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
similarity index 83%
rename from src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByQueue.java
rename to src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
index caa69f3..77ed871 100644
--- a/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByQueue.java
+++ b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
@@ -14,15 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.connector.strategy;
+package org.apache.rocketmq.replicator.strategy;
 
 import io.openmessaging.KeyValue;
-import io.openmessaging.internal.DefaultKeyValue;
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.connector.config.ConfigDefine;
-import org.apache.rocketmq.connector.config.DataType;
-import org.apache.rocketmq.connector.config.TaskDivideConfig;
-
+import org.apache.rocketmq.replicator.config.TaskDivideConfig;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
diff --git a/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByTopic.java b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java
similarity index 96%
rename from src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByTopic.java
rename to src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java
index 9b019f1..e667d57 100644
--- a/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByTopic.java
+++ b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java
@@ -14,14 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.connector.strategy;
+package org.apache.rocketmq.replicator.strategy;
 
 import com.alibaba.fastjson.JSONObject;
 import io.openmessaging.KeyValue;
 import io.openmessaging.internal.DefaultKeyValue;
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.connector.config.*;
-
+import org.apache.rocketmq.replicator.config.*;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
diff --git a/src/main/java/org/apache/rocketmq/connector/strategy/TaskDivideStrategy.java b/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java
similarity index 90%
rename from src/main/java/org/apache/rocketmq/connector/strategy/TaskDivideStrategy.java
rename to src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java
index e80d092..0e0ac99 100644
--- a/src/main/java/org/apache/rocketmq/connector/strategy/TaskDivideStrategy.java
+++ b/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java
@@ -14,12 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.connector.strategy;
+package org.apache.rocketmq.replicator.strategy;
 
 import io.openmessaging.KeyValue;
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.connector.config.TaskDivideConfig;
-
+import org.apache.rocketmq.replicator.config.TaskDivideConfig;
 import java.util.List;
 import java.util.Map;