You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:13 UTC

[rocketmq-connect] 03/39: Define and Implement the RmqConnector and RmqSourceTask. (#343)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 6ddd7e447b529185970c388541399d4b5a576bb2
Author: chuenfaiy <ch...@163.com>
AuthorDate: Fri Jul 26 21:29:10 2019 +0800

    Define and Implement the RmqConnector and RmqSourceTask. (#343)
    
    
    * [Fix]fix some bugs in connectors and fastjson.
    
    * [Update]update the version of replicator
    
    * [Update]remove the repetitive properties
    
    * [Update]Add the license at the head of TaskTopicInfo class and TaskConfigEnum class
---
 .gitignore                                         |   1 -
 README.md                                          |  13 ++
 pom.xml                                            |  12 +-
 .../rocketmq/connector/RmqSourceConnector.java     | 131 +++++++++++++++------
 .../apache/rocketmq/connector/RmqSourceTask.java   | 120 +++++++++++--------
 .../common/{Utils.java => ConstDefine.java}        |  10 +-
 .../apache/rocketmq/connector/common/Utils.java    |   8 +-
 .../rocketmq/connector/config/ConfigDefine.java    |  34 ++++--
 .../rocketmq/connector/config/TaskConfig.java      |  45 +++----
 .../Utils.java => config/TaskConfigEnum.java}      |  25 +++-
 .../{TaskConfig.java => TaskDivideConfig.java}     |  75 ++++++------
 .../config/{TaskConfig.java => TaskTopicInfo.java} |  44 +------
 .../connector/strategy/DivideTaskByQueue.java      |  12 +-
 .../connector/strategy/DivideTaskByTopic.java      |  30 +++--
 .../connector/strategy/TaskDivideStrategy.java     |   3 +-
 15 files changed, 307 insertions(+), 256 deletions(-)

diff --git a/.gitignore b/.gitignore
index 36716aa..525eaaa 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,3 @@
-*.iml
 *.class
 *.jar
 *dependency-reduced-pom.xml
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..0b8bef7
--- /dev/null
+++ b/README.md
@@ -0,0 +1,13 @@
+# rocketmq-replicator
+
+# 启动参数选择
+
+参数 | 类型 |是否必须 |说明|示例值
+---|---|---|---|---|
+source-rocketmq | 字符串 | 是 | 源rocketmq集群namesrv地址 | 192.168.1.2:9876 |
+target-rocketmq | 字符串 | 是 | 源rocketmq集群namesrv地址 | 192.168.1.2:9876 |
+replicator-store-topic | 字符串 | 是 | replicator存储topic,需要在runtime的mq集群提前创建 | replicator-store-topic |
+task-divide-strategy | 整型 | 否 | 任务切割策略,可以按照主题和队列来切割,目前只支持主题切割且主题对应值为0 | 0 |
+white-list | 字符串 | 是 | 复制主题白名单,多个topic之间使用逗号分隔 | topic-1,topic-2 |
+task-parallelism | 整型 | 否 | 任务并行度,默认值为1,当topic数大于task数时,一个task将负责多个topic | 2 |
+source-record-converter | 字符串 | 是 | 源数据解析器,目前使用的是Json解析器 | io.openmessaging.connect.runtime.converter.JsonConverter |
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 13a5af4..37bf78f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,7 +23,7 @@
 
     <groupId>org.apache.rocketmq</groupId>
     <artifactId>rocketmq-connector</artifactId>
-    <version>1.0-SNAPSHOT</version>
+    <version>0.0.1-SNAPSHOT</version>
 
     <properties>
         <rocketmq.version>4.4.0</rocketmq.version>
@@ -72,16 +72,6 @@
             <version>0.9.6</version>
         </dependency>
         <dependency>
-            <groupId>io.openmessaging</groupId>
-            <artifactId>openmessaging-connect</artifactId>
-            <version>0.1.0-beta</version>
-        </dependency>
-        <dependency>
-            <groupId>io.openmessaging</groupId>
-            <artifactId>openmessaging-connector</artifactId>
-            <version>0.1.0-beta</version>
-        </dependency>
-        <dependency>
             <groupId>com.alibaba</groupId>
             <artifactId>fastjson</artifactId>
             <version>1.2.51</version>
diff --git a/src/main/java/org/apache/rocketmq/connector/RmqSourceConnector.java b/src/main/java/org/apache/rocketmq/connector/RmqSourceConnector.java
index c919fa3..2b5bda4 100644
--- a/src/main/java/org/apache/rocketmq/connector/RmqSourceConnector.java
+++ b/src/main/java/org/apache/rocketmq/connector/RmqSourceConnector.java
@@ -19,13 +19,16 @@ package org.apache.rocketmq.connector;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.Task;
 import io.openmessaging.connector.api.source.SourceConnector;
+import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.body.TopicList;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.connector.common.ConstDefine;
+import org.apache.rocketmq.connector.common.Utils;
 import org.apache.rocketmq.connector.config.ConfigDefine;
+import org.apache.rocketmq.connector.config.DataType;
+import org.apache.rocketmq.connector.config.TaskDivideConfig;
 import org.apache.rocketmq.connector.strategy.DivideStrategyEnum;
 import org.apache.rocketmq.connector.strategy.DivideTaskByQueue;
 import org.apache.rocketmq.connector.strategy.DivideTaskByTopic;
@@ -35,48 +38,96 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 public class RmqSourceConnector extends SourceConnector {
 
     private static final Logger log = LoggerFactory.getLogger(RmqSourceConnector.class);
 
+    private boolean syncDLQ = false;
+
+    private boolean syncRETRY = false;
+
     private KeyValue config;
 
     private Map<String, List<MessageQueue>> topicRouteMap;
 
-    private final TaskDivideStrategy taskDivideStrategy;
+    private TaskDivideStrategy taskDivideStrategy;
+
+    private Set<String> whiteList;
+
+    private volatile boolean started = false;
+
+    private volatile boolean configValid = false;
+
+    private DefaultMQAdminExt defaultMQAdminExt;
+
+    private int taskParallelism = 1;
 
     public RmqSourceConnector() {
 
         topicRouteMap = new HashMap<String, List<MessageQueue>>();
-
-        if (this.config.getInt(ConfigDefine.TASK_DIVIDE_STRATEGY) == DivideStrategyEnum.BY_TOPIC.ordinal()) {
-            taskDivideStrategy = new DivideTaskByTopic();
-        } else {
-            taskDivideStrategy = new DivideTaskByQueue();
-        }
+        whiteList = new HashSet<String>();
     }
 
     public String verifyAndSetConfig(KeyValue config) {
 
+        // check the need key.
         for(String requestKey : ConfigDefine.REQUEST_CONFIG){
             if(!config.containsKey(requestKey)){
                 return "Request config key: " + requestKey;
             }
         }
+
+        // check the whitelist, whitelist is required.
+        String whileListStr = this.config.getString(ConfigDefine.CONN_WHITE_LIST);
+        String[] wl = whileListStr.trim().split(",");
+        if (wl.length <= 0) return "White list must be not empty.";
+        else {
+            for (String t: wl) {
+                this.whiteList.add(t.trim());
+            }
+        }
+
+        if (this.config.containsKey(ConfigDefine.CONN_TASK_DIVIDE_STRATEGY) &&
+                this.config.getInt(ConfigDefine.CONN_TASK_DIVIDE_STRATEGY) == DivideStrategyEnum.BY_QUEUE.ordinal()) {
+            this.taskDivideStrategy = new DivideTaskByQueue();
+        } else {
+            this.taskDivideStrategy = new DivideTaskByTopic();
+        }
+
+        if (config.containsKey(ConfigDefine.CONN_TASK_PARALLELISM)) {
+            this.taskParallelism = this.config.getInt(ConfigDefine.CONN_TASK_PARALLELISM);
+        }
+
         this.config = config;
+        this.configValid = true;
         return "";
     }
 
     public void start() {
+      
+        if (configValid) {
+            RPCHook rpcHook = null;
+            this.defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+            this.defaultMQAdminExt.setNamesrvAddr(this.config.getString(ConfigDefine.CONN_SOURCE_RMQ));
+            this.defaultMQAdminExt.setInstanceName(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
+            try {
+                defaultMQAdminExt.start();
+            } catch (MQClientException e) {
+                log.error("Replicator start failed for `defaultMQAdminExt` exception.", e);
+            }
+            started = true;
+        }
     }
 
     public void stop() {
-
+        if (started) {
+            if (defaultMQAdminExt != null) {
+                defaultMQAdminExt.shutdown();
+            }
+            started = false;
+        }
     }
 
     public void pause() {
@@ -88,26 +139,24 @@ public class RmqSourceConnector extends SourceConnector {
     }
 
     public Class<? extends Task> taskClass() {
-        return null;
+      
+        return RmqSourceTask.class;
     }
 
     public List<KeyValue> taskConfigs() {
-
-        System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, this.config.getString(ConfigDefine.SOURCE_RMQ));
-        RPCHook rpcHook = null;
-        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
-        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
-        try {
-            defaultMQAdminExt.start();
-            TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
-            for (String topic: topicList.getTopicList()) {
-                if (!topic.equals(ConfigDefine.STORE_TOPIC)) {
-                    TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
-                    if (!topicRouteMap.containsKey(topic)) {
-                        topicRouteMap.put(topic, new ArrayList<MessageQueue>());
-                    }
-                    for (QueueData qd: topicRouteData.getQueueDatas()) {
-                        if (PermName.isReadable(qd.getPerm())) {
+      
+        if (started && configValid) {
+            try {
+                for (String topic : this.whiteList) {
+                    if ((syncRETRY && topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) ||
+                            (syncDLQ && topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) ||
+                            !topic.equals(ConfigDefine.CONN_STORE_TOPIC)) {
+
+                        TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+                        if (!topicRouteMap.containsKey(topic)) {
+                            topicRouteMap.put(topic, new ArrayList<MessageQueue>());
+                        }
+                        for (QueueData qd : topicRouteData.getQueueDatas()) {
                             for (int i = 0; i < qd.getReadQueueNums(); i++) {
                                 MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                                 topicRouteMap.get(topic).add(mq);
@@ -115,15 +164,21 @@ public class RmqSourceConnector extends SourceConnector {
                         }
                     }
                 }
+            } catch (Exception e) {
+                log.error("Fetch topic list error.", e);
             }
-        } catch (Exception e) {
-            log.error("fetch topic list error: ", e);
-        } finally {
-            defaultMQAdminExt.shutdown();
-        }
 
-        return this.taskDivideStrategy.divide(this.topicRouteMap,
-                this.config.getString(ConfigDefine.SOURCE_RMQ), this.config.getString(ConfigDefine.STORE_TOPIC));
+            TaskDivideConfig tdc = new TaskDivideConfig(
+                    this.config.getString(ConfigDefine.CONN_SOURCE_RMQ),
+                    this.config.getString(ConfigDefine.CONN_STORE_TOPIC),
+                    this.config.getString(ConfigDefine.CONN_SOURCE_RECORD_CONVERTER),
+                    DataType.COMMON_MESSAGE.ordinal(),
+                    this.taskParallelism
+            );
+            return this.taskDivideStrategy.divide(this.topicRouteMap, tdc);
+        } else {
+            return new ArrayList<KeyValue>();
+        }
     }
 }
 
diff --git a/src/main/java/org/apache/rocketmq/connector/RmqSourceTask.java b/src/main/java/org/apache/rocketmq/connector/RmqSourceTask.java
index d71dc87..4fca698 100644
--- a/src/main/java/org/apache/rocketmq/connector/RmqSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connector/RmqSourceTask.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.connector.common.Utils;
 import org.apache.rocketmq.connector.config.ConfigUtil;
 import org.apache.rocketmq.connector.config.DataType;
 import org.apache.rocketmq.connector.config.TaskConfig;
+import org.apache.rocketmq.connector.config.TaskTopicInfo;
 import org.apache.rocketmq.connector.schema.FieldName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,6 +44,7 @@ public class RmqSourceTask extends SourceTask {
     private final String taskId;
     private final TaskConfig config;
     private final DefaultMQPullConsumer consumer;
+    private volatile boolean started = false;
 
     private Map<MessageQueue, Long> mqOffsetMap;
     public RmqSourceTask() {
@@ -67,50 +69,68 @@ public class RmqSourceTask extends SourceTask {
 
     public void start(KeyValue config) {
         ConfigUtil.load(config, this.config);
-        this.consumer.setConsumerGroup(Utils.createGroupName(this.config.getSourceTopic()));
+        this.consumer.setConsumerGroup(this.taskId);
         this.consumer.setNamesrvAddr(this.config.getSourceRocketmq());
+        List<TaskTopicInfo> topicList = JSONObject.parseArray(this.config.getTaskTopicList(), TaskTopicInfo.class);
+
         try {
             this.consumer.start();
-            Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(this.config.getSourceTopic());
-            if (!this.config.getQueueId().equals("")) {
-                for (MessageQueue mq: mqs) {
-                    if (Integer.valueOf(this.config.getQueueId()) == mq.getQueueId()) {
+            for (TaskTopicInfo tti: topicList) {
+                Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(tti.getSourceTopic());
+                if (!tti.getQueueId().equals("")) {
+                    // divide task by queue
+                    for (MessageQueue mq: mqs) {
+                        if (Integer.valueOf(tti.getQueueId()) == mq.getQueueId()) {
+                            ByteBuffer positionInfo = this.context.positionStorageReader().getPosition(
+                                    ByteBuffer.wrap(RmqConstants.getPartition(
+                                            mq.getTopic(),
+                                            mq.getBrokerName(),
+                                            String.valueOf(mq.getQueueId())).getBytes("UTF-8")));
+
+                            if (null != positionInfo && positionInfo.array().length > 0) {
+                                String positionJson = new String(positionInfo.array(), "UTF-8");
+                                JSONObject jsonObject = JSONObject.parseObject(positionJson);
+                                this.config.setNextPosition(jsonObject.getLong(RmqConstants.NEXT_POSITION));
+                            } else {
+                                this.config.setNextPosition(0L);
+                            }
+                            mqOffsetMap.put(mq, this.config.getNextPosition());
+                        }
+                    }
+                } else {
+                    // divide task by topic
+                    for (MessageQueue mq: mqs) {
                         ByteBuffer positionInfo = this.context.positionStorageReader().getPosition(
-                                ByteBuffer.wrap(RmqConstants.getPartition(mq.getTopic(),
-                                        mq.getBrokerName(), String.valueOf(mq.getQueueId()))
-                                        .getBytes("UTF-8")));
+                                ByteBuffer.wrap(RmqConstants.getPartition(
+                                        mq.getTopic(),
+                                        mq.getBrokerName(),
+                                        String.valueOf(mq.getQueueId())).getBytes("UTF-8")));
 
                         if (null != positionInfo && positionInfo.array().length > 0) {
                             String positionJson = new String(positionInfo.array(), "UTF-8");
                             JSONObject jsonObject = JSONObject.parseObject(positionJson);
                             this.config.setNextPosition(jsonObject.getLong(RmqConstants.NEXT_POSITION));
+                        } else {
+                            this.config.setNextPosition(0L);
                         }
                         mqOffsetMap.put(mq, this.config.getNextPosition());
                     }
                 }
-            } else {
-                for (MessageQueue mq: mqs) {
-                    ByteBuffer positionInfo = this.context.positionStorageReader().getPosition(
-                            ByteBuffer.wrap(RmqConstants.getPartition(mq.getTopic(),
-                                    mq.getBrokerName(), String.valueOf(mq.getQueueId()))
-                                    .getBytes("UTF-8")));
-
-                    if (null != positionInfo && positionInfo.array().length > 0) {
-                        String positionJson = new String(positionInfo.array(), "UTF-8");
-                        JSONObject jsonObject = JSONObject.parseObject(positionJson);
-                        this.config.setNextPosition(jsonObject.getLong(RmqConstants.NEXT_POSITION));
-                    }
-                    mqOffsetMap.put(mq, this.config.getNextPosition());
-                }
             }
-
+            started = true;
         } catch (Exception e) {
-            log.error("consumer of task {} start failed. ", this.taskId, e);
+            log.error("Consumer of task {} start failed.", this.taskId, e);
         }
     }
 
     public void stop() {
-        this.consumer.shutdown();
+      
+        if (started) {
+            if (this.consumer != null) {
+                this.consumer.shutdown();
+            }
+            started = false;
+        }
     }
 
     public void pause() {
@@ -124,46 +144,48 @@ public class RmqSourceTask extends SourceTask {
     private Collection<SourceDataEntry> pollCommonMessage() {
 
         List<SourceDataEntry> res = new ArrayList<SourceDataEntry>();
-
-        try {
-            for (MessageQueue mq : this.mqOffsetMap.keySet()) {
-                PullResult pullResult = consumer.pull(mq, "*", this.mqOffsetMap.get(mq), 32);
-                switch (pullResult.getPullStatus()) {
-                    case FOUND: {
-                        this.mqOffsetMap.put(mq, pullResult.getNextBeginOffset());
-                        JSONObject jsonObject = new JSONObject();
-                        jsonObject.put(RmqConstants.NEXT_POSITION, pullResult.getNextBeginOffset());
-
-                        List<MessageExt> msgs = pullResult.getMsgFoundList();
-                        for (MessageExt m : msgs) {
+        if (started) {
+            try {
+                for (MessageQueue mq : this.mqOffsetMap.keySet()) {
+                    PullResult pullResult = consumer.pull(mq, "*",
+                            this.mqOffsetMap.get(mq), 32);
+                    switch (pullResult.getPullStatus()) {
+                        case FOUND: {
+                            this.mqOffsetMap.put(mq, pullResult.getNextBeginOffset());
+                            JSONObject jsonObject = new JSONObject();
+                            jsonObject.put(RmqConstants.NEXT_POSITION, pullResult.getNextBeginOffset());
+                            List<MessageExt> msgs = pullResult.getMsgFoundList();
                             Schema schema = new Schema();
                             schema.setDataSource(this.config.getSourceRocketmq());
-                            schema.setName(this.config.getSourceTopic());
+                            schema.setName(mq.getTopic());
                             schema.setFields(new ArrayList<Field>());
-                            schema.getFields().add(new Field(0, FieldName.COMMON_MESSAGE.getKey(), FieldType.STRING));
+                            schema.getFields().add(new Field(0,
+                                    FieldName.COMMON_MESSAGE.getKey(), FieldType.STRING));
 
                             DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
                             dataEntryBuilder.timestamp(System.currentTimeMillis())
                                     .queue(this.config.getStoreTopic()).entryType(EntryType.CREATE);
-                            dataEntryBuilder.putFiled(FieldName.COMMON_MESSAGE.getKey(), JSONObject.toJSONString(m));
+                            dataEntryBuilder.putFiled(FieldName.COMMON_MESSAGE.getKey(), JSONObject.toJSONString(msgs));
                             SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
-                                    ByteBuffer.wrap(RmqConstants.getPartition(this.config.getSourceTopic(),
-                                            this.config.getBrokerName(),
-                                            this.config.getQueueId()).getBytes("UTF-8")),
+                                    ByteBuffer.wrap(RmqConstants.getPartition(
+                                            mq.getTopic(),
+                                            mq.getBrokerName(),
+                                            String.valueOf(mq.getQueueId())).getBytes("UTF-8")),
                                     ByteBuffer.wrap(jsonObject.toJSONString().getBytes("UTF-8"))
                             );
                             res.add(sourceDataEntry);
+                            break;
                         }
-                        break;
+                        default:
+                            break;
                     }
-                    default:
-                        break;
                 }
+            } catch (Exception e) {
+                log.error("Rocketmq replicator task poll error, current config: {}", JSON.toJSONString(config), e);
             }
-        } catch (Exception e) {
-            log.error("Rocketmq connector task poll error, current config: {}", JSON.toJSONString(config), e);
+        } else {
+            log.warn("Rocketmq replicator task is not started.");
         }
-
         return res;
     }
 
diff --git a/src/main/java/org/apache/rocketmq/connector/common/Utils.java b/src/main/java/org/apache/rocketmq/connector/common/ConstDefine.java
similarity index 70%
copy from src/main/java/org/apache/rocketmq/connector/common/Utils.java
copy to src/main/java/org/apache/rocketmq/connector/common/ConstDefine.java
index 71f538c..6e1eeac 100644
--- a/src/main/java/org/apache/rocketmq/connector/common/Utils.java
+++ b/src/main/java/org/apache/rocketmq/connector/common/ConstDefine.java
@@ -16,13 +16,9 @@
  */
 package org.apache.rocketmq.connector.common;
 
-public class Utils {
+public class ConstDefine {
 
-    public static String createGroupName(String prefix) {
-        return new StringBuilder().append(prefix).append("_").append(System.currentTimeMillis()).toString();
-    }
+    public static String TASK_GROUP_NAME_PREFIX = "REPLICATOR-TASK";
 
-    public static String createTaskId(String prefix) {
-        return new StringBuilder().append(prefix).append("_").append(System.currentTimeMillis()).toString();
-    }
+    public static String REPLICATOR_ADMIN_PREFIX = "REPLICATOR-ADMIN";
 }
diff --git a/src/main/java/org/apache/rocketmq/connector/common/Utils.java b/src/main/java/org/apache/rocketmq/connector/common/Utils.java
index 71f538c..f5822c6 100644
--- a/src/main/java/org/apache/rocketmq/connector/common/Utils.java
+++ b/src/main/java/org/apache/rocketmq/connector/common/Utils.java
@@ -19,10 +19,14 @@ package org.apache.rocketmq.connector.common;
 public class Utils {
 
     public static String createGroupName(String prefix) {
-        return new StringBuilder().append(prefix).append("_").append(System.currentTimeMillis()).toString();
+        return new StringBuilder().append(prefix).append("@").append(System.currentTimeMillis()).toString();
+    }
+
+    public static String createGroupName(String prefix, String postfix) {
+        return new StringBuilder().append(prefix).append("@").append(postfix).toString();
     }
 
     public static String createTaskId(String prefix) {
-        return new StringBuilder().append(prefix).append("_").append(System.currentTimeMillis()).toString();
+        return new StringBuilder().append(prefix).append("@").append(System.currentTimeMillis()).toString();
     }
 }
diff --git a/src/main/java/org/apache/rocketmq/connector/config/ConfigDefine.java b/src/main/java/org/apache/rocketmq/connector/config/ConfigDefine.java
index 64e9c94..13f8960 100644
--- a/src/main/java/org/apache/rocketmq/connector/config/ConfigDefine.java
+++ b/src/main/java/org/apache/rocketmq/connector/config/ConfigDefine.java
@@ -21,28 +21,38 @@ import java.util.Set;
 
 public class ConfigDefine {
 
-    public static final String SOURCE_RMQ = "sourceRocketmq";
+    public static final String CONN_SOURCE_RMQ = "source-rocketmq";
 
-    public static final String STORE_TOPIC = "storeTopic";
+    public static final String CONN_STORE_TOPIC = "replicator-store-topic";
 
-    public static final String TARGET_RMQ = "targetRocketmq";
+    public static final String CONN_TARGET_RMQ = "target-rocketmq";
 
-    public static final String DATA_TYPE = "dataType";
+    public static final String CONN_SOURCE_GROUP = "source-group";
 
-    public static final String QUEUE_ID = "queueId";
+    public static final String CONN_DATA_TYPE = "data-type";
 
-    public static final String TASK_DIVIDE_STRATEGY = "taskDivideStrategy";
+    public static final String CONN_TASK_DIVIDE_STRATEGY = "task-divide-strategy";
 
-    public static final String BROKER_NAME = "brokerName";
+    public static final String CONN_BROKER_NAME = "broker-name";
 
-    public static final String SOURCE_TOPIC = "sourceTopic";
+    public static final String CONN_SOURCE_TOPIC = "source-topic";
 
+    public static final String CONN_WHITE_LIST = "white-list";
+
+    public static final String CONN_SOURCE_RECORD_CONVERTER = "source-record-converter";
+
+    public static final String CONN_TASK_PARALLELISM = "task-parallelism";
+
+    /**
+     * The required key for all configurations.
+     */
     public static final Set<String> REQUEST_CONFIG = new HashSet<String>(){
         {
-            add("sourceRocketmq");
-            add("targetRocketmq");
-            add("storeTopic");
-            add("taskDivideStrategy");
+            add(CONN_SOURCE_RMQ);
+            add(CONN_TARGET_RMQ);
+            add(CONN_STORE_TOPIC);
+            add(CONN_WHITE_LIST);
+            add(CONN_SOURCE_RECORD_CONVERTER);
         }
     };
 }
diff --git a/src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java b/src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java
index 7bba3fb..d480b90 100644
--- a/src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java
+++ b/src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java
@@ -19,27 +19,26 @@ package org.apache.rocketmq.connector.config;
 public class TaskConfig {
 
     private String storeTopic;
-    private String sourceTopic;
+    private String sourceGroup;
     private String sourceRocketmq;
     private Integer dataType;
-    private String brokerName;
-    private String queueId;
     private Long nextPosition;
+    private String taskTopicList;
 
-    public String getStoreTopic() {
-        return storeTopic;
+    public String getSourceGroup() {
+        return sourceGroup;
     }
 
-    public void setStoreTopic(String storeTopic) {
-        this.storeTopic = storeTopic;
+    public void setSourceGroup(String sourceGroup) {
+        this.sourceGroup = sourceGroup;
     }
 
-    public String getSourceTopic() {
-        return sourceTopic;
+    public String getStoreTopic() {
+        return storeTopic;
     }
 
-    public void setSourceTopic(String sourceTopic) {
-        this.sourceTopic = sourceTopic;
+    public void setStoreTopic(String storeTopic) {
+        this.storeTopic = storeTopic;
     }
 
     public String getSourceRocketmq() {
@@ -62,22 +61,6 @@ public class TaskConfig {
         this.dataType = dataType;
     }
 
-    public String getBrokerName() {
-        return brokerName;
-    }
-
-    public void setBrokerName(String brokerName) {
-        this.brokerName = brokerName;
-    }
-
-    public String getQueueId() {
-        return queueId;
-    }
-
-    public void setQueueId(String queueId) {
-        this.queueId = queueId;
-    }
-
     public Long getNextPosition() {
         return nextPosition;
     }
@@ -85,4 +68,12 @@ public class TaskConfig {
     public void setNextPosition(Long nextPosition) {
         this.nextPosition = nextPosition;
     }
+
+    public String getTaskTopicList() {
+        return taskTopicList;
+    }
+
+    public void setTaskTopicList(String taskTopicList) {
+        this.taskTopicList = taskTopicList;
+    }
 }
diff --git a/src/main/java/org/apache/rocketmq/connector/common/Utils.java b/src/main/java/org/apache/rocketmq/connector/config/TaskConfigEnum.java
similarity index 59%
copy from src/main/java/org/apache/rocketmq/connector/common/Utils.java
copy to src/main/java/org/apache/rocketmq/connector/config/TaskConfigEnum.java
index 71f538c..d8aa08a 100644
--- a/src/main/java/org/apache/rocketmq/connector/common/Utils.java
+++ b/src/main/java/org/apache/rocketmq/connector/config/TaskConfigEnum.java
@@ -14,15 +14,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.connector.common;
+package org.apache.rocketmq.connector.config;
 
-public class Utils {
+public enum TaskConfigEnum {
 
-    public static String createGroupName(String prefix) {
-        return new StringBuilder().append(prefix).append("_").append(System.currentTimeMillis()).toString();
+    TASK_ID("taskId"),
+    TASK_SOURCE_GROUP("sourceGroup"),
+    TASK_SOURCE_ROCKETMQ("sourceRocketmq"),
+    TASK_SOURCE_TOPIC("sourceTopic"),
+    TASK_STORE_ROCKETMQ("storeTopic"),
+    TASK_DATA_TYPE("dataType"),
+    TASK_BROKER_NAME("brokerName"),
+    TASK_QUEUE_ID("queueId"),
+    TASK_NEXT_POSITION("nextPosition"),
+    TASK_TOPIC_INFO("taskTopicList");
+
+    private String key;
+
+    TaskConfigEnum(String key) {
+        this.key = key;
     }
 
-    public static String createTaskId(String prefix) {
-        return new StringBuilder().append(prefix).append("_").append(System.currentTimeMillis()).toString();
+    public String getKey() {
+        return key;
     }
 }
diff --git a/src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java b/src/main/java/org/apache/rocketmq/connector/config/TaskDivideConfig.java
similarity index 53%
copy from src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java
copy to src/main/java/org/apache/rocketmq/connector/config/TaskDivideConfig.java
index 7bba3fb..7f904b3 100644
--- a/src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java
+++ b/src/main/java/org/apache/rocketmq/connector/config/TaskDivideConfig.java
@@ -16,73 +16,64 @@
  */
 package org.apache.rocketmq.connector.config;
 
-public class TaskConfig {
+public class TaskDivideConfig {
+
+    private String sourceNamesrvAddr;
 
     private String storeTopic;
-    private String sourceTopic;
-    private String sourceRocketmq;
-    private Integer dataType;
-    private String brokerName;
-    private String queueId;
-    private Long nextPosition;
 
-    public String getStoreTopic() {
-        return storeTopic;
-    }
+    private String srcRecordConverter;
 
-    public void setStoreTopic(String storeTopic) {
-        this.storeTopic = storeTopic;
-    }
+    private int dataType;
 
-    public String getSourceTopic() {
-        return sourceTopic;
-    }
+    private int taskParallelism;
 
-    public void setSourceTopic(String sourceTopic) {
-        this.sourceTopic = sourceTopic;
+    public TaskDivideConfig(String sourceNamesrvAddr, String storeTopic, String srcRecordConverter,
+                            int dataType, int taskParallelism) {
+        this.sourceNamesrvAddr = sourceNamesrvAddr;
+        this.storeTopic = storeTopic;
+        this.srcRecordConverter = srcRecordConverter;
+        this.dataType = dataType;
+        this.taskParallelism = taskParallelism;
     }
 
-    public String getSourceRocketmq() {
-        return sourceRocketmq;
+    public String getSourceNamesrvAddr() {
+        return sourceNamesrvAddr;
     }
 
-    public void setSourceRocketmq(String sourceRocketmq) {
-        this.sourceRocketmq = sourceRocketmq;
+    public void setSourceNamesrvAddr(String sourceNamesrvAddr) {
+        this.sourceNamesrvAddr = sourceNamesrvAddr;
     }
 
-    public int getDataType() {
-        return dataType;
-    }
-
-    public void setDataType(int dataType) {
-        this.dataType = dataType;
+    public String getStoreTopic() {
+        return storeTopic;
     }
 
-    public void setDataType(Integer dataType) {
-        this.dataType = dataType;
+    public void setStoreTopic(String storeTopic) {
+        this.storeTopic = storeTopic;
     }
 
-    public String getBrokerName() {
-        return brokerName;
+    public String getSrcRecordConverter() {
+        return srcRecordConverter;
     }
 
-    public void setBrokerName(String brokerName) {
-        this.brokerName = brokerName;
+    public void setSrcRecordConverter(String srcRecordConverter) {
+        this.srcRecordConverter = srcRecordConverter;
     }
 
-    public String getQueueId() {
-        return queueId;
+    public int getDataType() {
+        return dataType;
     }
 
-    public void setQueueId(String queueId) {
-        this.queueId = queueId;
+    public void setDataType(int dataType) {
+        this.dataType = dataType;
     }
 
-    public Long getNextPosition() {
-        return nextPosition;
+    public int getTaskParallelism() {
+        return taskParallelism;
     }
 
-    public void setNextPosition(Long nextPosition) {
-        this.nextPosition = nextPosition;
+    public void setTaskParallelism(int taskParallelism) {
+        this.taskParallelism = taskParallelism;
     }
 }
diff --git a/src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java b/src/main/java/org/apache/rocketmq/connector/config/TaskTopicInfo.java
similarity index 61%
copy from src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java
copy to src/main/java/org/apache/rocketmq/connector/config/TaskTopicInfo.java
index 7bba3fb..e1f47d5 100644
--- a/src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java
+++ b/src/main/java/org/apache/rocketmq/connector/config/TaskTopicInfo.java
@@ -16,22 +16,16 @@
  */
 package org.apache.rocketmq.connector.config;
 
-public class TaskConfig {
+public class TaskTopicInfo {
 
-    private String storeTopic;
     private String sourceTopic;
-    private String sourceRocketmq;
-    private Integer dataType;
     private String brokerName;
     private String queueId;
-    private Long nextPosition;
 
-    public String getStoreTopic() {
-        return storeTopic;
-    }
-
-    public void setStoreTopic(String storeTopic) {
-        this.storeTopic = storeTopic;
+    public TaskTopicInfo(String sourceTopic, String brokerName, String queueId) {
+        this.sourceTopic = sourceTopic;
+        this.brokerName = brokerName;
+        this.queueId = queueId;
     }
 
     public String getSourceTopic() {
@@ -42,26 +36,6 @@ public class TaskConfig {
         this.sourceTopic = sourceTopic;
     }
 
-    public String getSourceRocketmq() {
-        return sourceRocketmq;
-    }
-
-    public void setSourceRocketmq(String sourceRocketmq) {
-        this.sourceRocketmq = sourceRocketmq;
-    }
-
-    public int getDataType() {
-        return dataType;
-    }
-
-    public void setDataType(int dataType) {
-        this.dataType = dataType;
-    }
-
-    public void setDataType(Integer dataType) {
-        this.dataType = dataType;
-    }
-
     public String getBrokerName() {
         return brokerName;
     }
@@ -77,12 +51,4 @@ public class TaskConfig {
     public void setQueueId(String queueId) {
         this.queueId = queueId;
     }
-
-    public Long getNextPosition() {
-        return nextPosition;
-    }
-
-    public void setNextPosition(Long nextPosition) {
-        this.nextPosition = nextPosition;
-    }
 }
diff --git a/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByQueue.java b/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByQueue.java
index b6eae8f..caa69f3 100644
--- a/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByQueue.java
+++ b/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByQueue.java
@@ -21,27 +21,19 @@ import io.openmessaging.internal.DefaultKeyValue;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.connector.config.ConfigDefine;
 import org.apache.rocketmq.connector.config.DataType;
+import org.apache.rocketmq.connector.config.TaskDivideConfig;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
 public class DivideTaskByQueue extends TaskDivideStrategy {
-    public List<KeyValue> divide(Map<String, List<MessageQueue>> topicRouteMap, String source, String storeTopic) {
+    public List<KeyValue> divide(Map<String, List<MessageQueue>> topicRouteMap, TaskDivideConfig tdc) {
 
         List<KeyValue> config = new ArrayList<KeyValue>();
 
         for (String t: topicRouteMap.keySet()) {
             for (MessageQueue mq: topicRouteMap.get(t)) {
-                KeyValue keyValue = new DefaultKeyValue();
-                keyValue.put(ConfigDefine.STORE_TOPIC, storeTopic);
-                keyValue.put(ConfigDefine.SOURCE_RMQ, source);
-                keyValue.put(ConfigDefine.STORE_TOPIC, t);
-                keyValue.put(ConfigDefine.BROKER_NAME, mq.getBrokerName());
-                keyValue.put(ConfigDefine.QUEUE_ID, String.valueOf(mq.getQueueId()));
-                keyValue.put(ConfigDefine.SOURCE_TOPIC, t);
-                keyValue.put(ConfigDefine.DATA_TYPE, DataType.COMMON_MESSAGE.ordinal());
-                config.add(keyValue);
             }
         }
 
diff --git a/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByTopic.java b/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByTopic.java
index 7c96b28..53699a9 100644
--- a/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByTopic.java
+++ b/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByTopic.java
@@ -16,31 +16,39 @@
  */
 package org.apache.rocketmq.connector.strategy;
 
+import com.alibaba.fastjson.JSONObject;
 import io.openmessaging.KeyValue;
 import io.openmessaging.internal.DefaultKeyValue;
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.connector.config.ConfigDefine;
-import org.apache.rocketmq.connector.config.DataType;
+import org.apache.rocketmq.connector.config.*;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 public class DivideTaskByTopic extends TaskDivideStrategy {
 
-    public List<KeyValue> divide(Map<String, List<MessageQueue>> topicRouteMap, String source, String storeTopic) {
+    public List<KeyValue> divide(Map<String, List<MessageQueue>> topicRouteMap, TaskDivideConfig tdc) {
 
         List<KeyValue> config = new ArrayList<KeyValue>();
-
+        int parallelism = tdc.getTaskParallelism();
+        int id = -1;
+        Map<Integer, List<TaskTopicInfo>> taskTopicList = new HashMap<Integer, List<TaskTopicInfo>>();
         for (String t: topicRouteMap.keySet()) {
+            int ind = ++id%parallelism;
+            if (!taskTopicList.containsKey(ind)) {
+                taskTopicList.put(ind, new ArrayList<TaskTopicInfo>());
+            }
+            taskTopicList.get(ind).add(new TaskTopicInfo(t, "", ""));
+        }
+
+        for (int i=0; i<parallelism; i++) {
             KeyValue keyValue = new DefaultKeyValue();
-            keyValue.put(ConfigDefine.STORE_TOPIC, storeTopic);
-            keyValue.put(ConfigDefine.SOURCE_RMQ, source);
-            keyValue.put(ConfigDefine.STORE_TOPIC, t);
-            keyValue.put(ConfigDefine.BROKER_NAME, "");
-            keyValue.put(ConfigDefine.QUEUE_ID, "");
-            keyValue.put(ConfigDefine.SOURCE_TOPIC, t);
-            keyValue.put(ConfigDefine.DATA_TYPE, DataType.COMMON_MESSAGE.ordinal());
+            keyValue.put(TaskConfigEnum.TASK_STORE_ROCKETMQ.getKey(), tdc.getStoreTopic());
+            keyValue.put(TaskConfigEnum.TASK_SOURCE_ROCKETMQ.getKey(), tdc.getSourceNamesrvAddr());
+            keyValue.put(TaskConfigEnum.TASK_DATA_TYPE.getKey(), DataType.COMMON_MESSAGE.ordinal());
+            keyValue.put(TaskConfigEnum.TASK_TOPIC_INFO.getKey(), JSONObject.toJSONString(taskTopicList.get(i)));
             config.add(keyValue);
         }
 
diff --git a/src/main/java/org/apache/rocketmq/connector/strategy/TaskDivideStrategy.java b/src/main/java/org/apache/rocketmq/connector/strategy/TaskDivideStrategy.java
index f847cb1..e80d092 100644
--- a/src/main/java/org/apache/rocketmq/connector/strategy/TaskDivideStrategy.java
+++ b/src/main/java/org/apache/rocketmq/connector/strategy/TaskDivideStrategy.java
@@ -18,11 +18,12 @@ package org.apache.rocketmq.connector.strategy;
 
 import io.openmessaging.KeyValue;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.connector.config.TaskDivideConfig;
 
 import java.util.List;
 import java.util.Map;
 
 public abstract class TaskDivideStrategy {
 
-    public abstract List<KeyValue> divide(Map<String, List<MessageQueue>> topicMap, String source, String storeTopic);
+    public abstract List<KeyValue> divide(Map<String, List<MessageQueue>> topicMap, TaskDivideConfig tdc);
 }