You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/09/20 01:15:55 UTC

[rocketmq-connect] branch master updated: [ISSUE #286]kafka connector adapter:how map Rocketmq MessageQueue to Kafka TopicPartition (#301)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 68e24c49 [ISSUE #286]kafka connector adapter:how map Rocketmq MessageQueue to Kafka TopicPartition (#301)
68e24c49 is described below

commit 68e24c4925099be32c8ed680d2caf35267d9337b
Author: 欧夺标 <ou...@gmail.com>
AuthorDate: Tue Sep 20 09:15:50 2022 +0800

    [ISSUE #286]kafka connector adapter:how map Rocketmq MessageQueue to Kafka TopicPartition (#301)
    
    * rocketmq-connect-kafka-connector-adapter 0.0.1-SNAPSHOT
    
    * rocketmq-connect-kafka-connector-adapter:kafka mongodb connector支持
    
    * kafka-connector-adapter:why need getTopicAndBrokerName
    
    * kafka-connector-adapter:why need getTopicAndBrokerName
    
    * kafka-connector-adapter:增加RecordPartitionTopicPartitionConvertor接口
    
    * kafka-connector-adapter:增加KafkaTopicPartitionMapper接口
    
    * kafka-connector-adapter:增加KafkaTopicPartitionMapper接口
    
    * kafka-connector-adapter:增加RocketmqRecordPartitionKafkaTopicPartitionMapper接口
    
    * kafka-connector-adapter:RocketmqRecordPartitionKafkaTopicPartitionMapper实现
    
    * kafka-connector-adapter:提取公共逻辑到KafkaRocketmqTask
    
    * kafka-connector-adapter:正则提取brokername id
    
    * kafka-connector-adapter:mongo文档
    
    * kafka-connector-adapter:mongo文档
    
    * kafka-connector-adapter:neo4j文档
    
    * kafka-connector-adapter:mongodb文档
    
    * kafka-connector-adapter:mongodb文档
    
    * kafka-connector-adapter:mongodb文档
    
    * kafka-connector-adapter:mongodb文档
    
    * 新版本支持
    
    Co-authored-by: ouduobiao <du...@alibaba-inc.com>
---
 .../README.md                                      | 44 +++++++---
 .../how-to/kafka-mongo-connector.md                | 85 +++++++++++++++++++
 .../how-to/kafka-neo4j-connector.md                | 58 +++++++++++++
 .../connect/kafka/config/ConfigDefine.java         | 15 ++--
 .../kafka/connector/KafkaRocketmqConnector.java    | 63 ++++++++------
 .../connector/KafkaRocketmqSinkConnector.java      | 12 +--
 .../kafka/connector/KafkaRocketmqSinkTask.java     | 98 ++++------------------
 .../connector/KafkaRocketmqSourceConnector.java    | 12 +--
 .../kafka/connector/KafkaRocketmqSourceTask.java   | 89 +++-----------------
 .../connect/kafka/connector/KafkaRocketmqTask.java | 93 ++++++++++++++++++++
 .../connector/RocketmqKafkaSinkTaskContext.java    | 27 ++----
 ...mqRecordPartitionKafkaTopicPartitionMapper.java | 49 +++++++++++
 .../rocketmq/connect/kafka/util/ConfigUtil.java    | 29 +++++--
 ...mqRecordPartitionKafkaTopicPartitionMapper.java | 67 +++++++++++++++
 ...ocketmqBrokerNameKafkaTopicPartitionMapper.java | 54 ++++++++++++
 .../rocketmq/connect/kafka/util/RecordUtil.java    | 62 +++++---------
 ...mqRecordPartitionKafkaTopicPartitionMapper.java | 53 ++++++++++++
 ...mqRecordPartitionKafkaTopicPartitionMapper.java | 78 +++++++++++++++++
 18 files changed, 703 insertions(+), 285 deletions(-)

diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/README.md b/connectors/rocketmq-connect-kafka-connector-adapter/README.md
index 01b7a63b..98522f87 100644
--- a/connectors/rocketmq-connect-kafka-connector-adapter/README.md
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/README.md
@@ -4,7 +4,7 @@
 
 **参数说明**
 
-参数分为3类:rocketmq connect runtime参数、 kafka-connector-adapter参数,以及 具体kafka connector参数
+参数分为2类:rocketmq connect runtime参数、 kafka-connector参数
 
 rocketmq connect runtime参数:
 - **connector-class**: kafka-connector-adapter的类名
@@ -13,17 +13,21 @@ rocketmq connect runtime参数:
   
   如果是SinkConnector,对应为org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSinkConnector。
   
-- **connect-topicname**: 要导入导出数据的rocketmq topic
-- **tasks.num**: 启动的task数目 
-  
+- **connector.class**: 要导入导出数据的rocketmq topic
+- **max.task**: 启动的task数目 
+
+kafka-connector参数放在kafka.connector.configs里,又分为2类:kafka-connector-adapter参数,以及 具体kafka connector
+
 kafka-connector-adapter参数:
 - **connector.class**: kafka connector的类名
 - **plugin.path**: kafka connector插件路径
 
 具体kafka connector参数:
 
-参考具体kafka connector的文档
+- 参考具体kafka connector的文档
 
+当connector-class是org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSinkConnector时,有特殊参数:
+- **rocketmq.recordPartition.kafkaTopicPartition.mapper**: recordPartition映射到kafkaTopicPartition实现,取值为:encodedTopic、assignEncodedPartition和regexEncodedPartition,默认是encodedTopic
 
 # 快速开始
 
@@ -67,13 +71,29 @@ touch /tmp/test-source-file.txt
 
 echo "Hello \r\nRocketMQ\r\n Connect" >> /tmp/test-source-file.txt
 
-curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSourceConnector -d '{"connector-class":"org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSourceConnector","connect-topicname":"fileTopic","connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","plugin.path":"/tmp/kafka-plugins","topic":"fileTopic","file":"/tmp/test-source-file.txt"}'
+curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSourceConnector -d '{
+	"connector.class": "org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSourceConnector",
+	"kafka.connector.configs":{
+		"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
+		"plugin.path": "/tmp/kafka-plugins",
+		"topic": "fileTopic",
+		"file": "/tmp/test-source-file.txt"
+	}
+}'
 ```
 
 ## 5.启动sink connector
 
 ```
-curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSinkConnector -d '{"connector-class":"org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSinkConnector","connect-topicname":"fileTopic","connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector","plugin.path":"/tmp/kafka-plugins","file":"/tmp/test-sink-file.txt"}'
+curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSinkConnector -d '{
+	"connector.class": "org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSinkConnector",
+	"connect.topicnames": "fileTopic",
+	"kafka.connector.configs":{
+		"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
+		"plugin.path": "/tmp/kafka-plugins",
+		"file": "/tmp/test-sink-file.txt"
+	}
+}'
 
 cat /tmp/test-sink-file.txt
 ```
@@ -82,10 +102,12 @@ cat /tmp/test-sink-file.txt
 
 todo
 
-# 如何运行kafka-mongo-connector
+# 如何运行其他kafka connector
 
-todo
+## 1.获取kafka connector的uber jar
+仓库[kafka-connector-plugins](https://github.com/oudb/kafka-connector-plugins)收集了一些常见connector的uber jar
 
-# 如何运行kafka-jdbc-connector
+## 2.运行指南
+1.[mongodb](how-to/kafka-mongo-connector.md)
 
-todo
\ No newline at end of file
+2.[neo4j](how-to/kafka-neo4j-connector.md)
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/how-to/kafka-mongo-connector.md b/connectors/rocketmq-connect-kafka-connector-adapter/how-to/kafka-mongo-connector.md
new file mode 100644
index 00000000..f17271f6
--- /dev/null
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/how-to/kafka-mongo-connector.md
@@ -0,0 +1,85 @@
+假设rocketmq有2个节点,一个brokerName是broker-0,另一个brokerName是broker-1
+
+## 1.启动mongodb
+```
+docker run \
+    --publish=27017:27017  --name mongo1\
+    mongo \
+    --replSet rs0
+    
+docker exec -it mongo1 bash
+
+cat << EOF > config-data.js 
+db = db.getSiblingDB("quickstart");
+db.createCollection("source");
+db.createCollection("sink");
+EOF
+
+
+cat << EOF > config-replica.js 
+rsconf = {
+  _id: "rs0",
+  members: [{ _id: 0, host: "127.0.0.1:27017", priority: 1.0 }],
+};
+rs.initiate(rsconf);
+rs.status();
+EOF
+
+
+mongosh  mongodb://127.0.0.1:27017 config-replica.js && sleep 10 && mongosh mongodb://127.0.0.11:27017 config-data.js
+
+```
+
+
+## 2.mongo source connector
+```
+curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/mongo-source -d '{
+	"connector.class": "org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSourceConnector",
+	"connect.topicname":"mongoTopic",
+	"kafka.connector.configs":{
+		"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
+		"plugin.path": "/kafka-connecor-plugins",
+		"connection.uri": "mongodb://127.0.0.1:27017/?replicaSet=rs0",
+		"database": "quickstart",
+		"collection": "sampleData",
+		"pipeline": "[{\"$match\": {\"operationType\": \"insert\"}}, {$addFields : {\"fullDocument.travel\":\"MongoDB Kafka Connector\"}}]"
+	}
+}'
+```
+
+## 3.mongo sink connector
+```
+curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/mongo-sink -d '{
+   "connector.class": "org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSinkConnector",
+   	"connect.topicnames": "mongoTopic",
+   	"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
+   	"plugin.path": "/kafka-connecor-plugins/mongodb-connector",
+   	"topics": "mongoTopic",
+	"rocketmq.recordPartition.kafkaTopicPartition.mapper": "assignEncodedPartition",
+	"assignEncodedPartition.assignments": "broker-0:0,broker-1:1",
+   	"connection.uri": "mongodb://127.0.0.1:27017/?replicaSet=rs0",
+   	"database": "quickstart",
+   	"collection": "topicData",
+   	"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler"
+   }'
+```
+
+
+## 4.测试
+```
+docker exec -it mongo1 bash
+
+mongosh mongodb://127.0.0.1:27017/?replicaSet=rs0
+
+use quickstart
+
+db.sampleData.insertOne({"hello":"world"})
+
+db.topicData.find()
+
+```
+
+
+更多关于connector:
+
+https://www.mongodb.com/docs/kafka-connector/current/
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/how-to/kafka-neo4j-connector.md b/connectors/rocketmq-connect-kafka-connector-adapter/how-to/kafka-neo4j-connector.md
new file mode 100644
index 00000000..b4762dc3
--- /dev/null
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/how-to/kafka-neo4j-connector.md
@@ -0,0 +1,58 @@
+假设rocketmq有2个节点,一个brokerName是broker-0,另一个brokerName是broker-1
+
+
+## 1.启动测试 neo4j server
+```
+docker run \
+    --publish=7474:7474 --publish=7687:7687 \
+    --volume=$HOME/neo4j/data:/data \
+	--env=NEO4J_AUTH=neo4j/connect \
+    neo4j
+```
+
+账号是neo4j,密码是connect
+
+## 2.运行neo4j sink connector
+```
+curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/neo4jSinkConnector -d '{
+    "connector.class": "org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSinkConnector",
+    "connect.topicnames":"my-topic",
+	"kafka.connector.configs":{
+		"rocketmq.recordPartition.kafkaTopicPartition.mapper": "assignEncodedPartition",
+		"assignEncodedPartition.assignments": "broker-0:0,broker-1:1",
+		"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
+		"key.converter": "org.apache.kafka.connect.json.JsonConverter",
+		"key.converter.schemas.enable": false,
+		"value.converter": "org.apache.kafka.connect.json.JsonConverter",
+		"value.converter.schemas.enable": false,
+		"errors.retry.timeout": "-1",
+		"errors.retry.delay.max.ms": "1000",
+		"errors.tolerance": "all",
+		"errors.log.enable": true,
+		"errors.log.include.messages": true,
+		"neo4j.server.uri": "bolt://localhost:7687",
+		"neo4j.authentication.basic.username": "neo4j",
+		"neo4j.authentication.basic.password": "connect",
+		"neo4j.encryption.enabled": false,
+		"neo4j.topic.cypher.my-topic": "MERGE (p:Person{name: event.name, surname: event.surname}) MERGE (f:Family{name: event.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
+		"plugin.path": "/kafka-connecor-plugins/neo4j-connector"
+	}
+}'
+```
+其中assignEncodedPartition.assignments指定brokerName的编码,例子是broker-0:0,broker-1:1
+
+## 3.测试
+1、给topic发送消息:
+sh mqadmin sendMessage -n localhost:9876 -t my-topic -p '{"name": "Name", "surname": "Surname"}'
+
+2、到neo4j查询
+浏览器登录:http://localhost:7474/browser/
+
+query:
+
+MATCH  (ee:Person) WHERE ee.name ='Name' RETURN  ee;
+
+
+更多关于connector:
+
+https://neo4j.com/labs/kafka/4.0/kafka-connect/
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java
index 9cbba085..da877363 100644
--- a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java
@@ -8,9 +8,13 @@ import java.util.HashSet;
 import java.util.Set;
 
 public class ConfigDefine {
-    public static String ROCKETMQ_CONNECTOR_CLASS = "connector-class";
-    public static String CONNECTOR_CLASS = ConnectorConfig.CONNECTOR_CLASS_CONFIG;
-    public static String PLUGIN_PATH = "plugin.path";
+    public static final String ROCKETMQ_CONNECTOR_CLASS = "connector.class";
+    public static final String ROCKETMQ_CONNECT_TOPIC_NAME = "connect.topicname";
+    public static final String ROCKETMQ_CONNECT_TOPIC_NAMES = "connect.topicnames";
+
+    public static final String KAFKA_CONNECTOR_CONFIGS = "kafka.connector.configs";
+    public static final String CONNECTOR_CLASS = ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+    public static final String PLUGIN_PATH = "plugin.path";
 
     public static final String TASK_CLASS = TaskConfig.TASK_CLASS_CONFIG;
 
@@ -18,11 +22,12 @@ public class ConfigDefine {
     public static final String VALUE_CONVERTER = WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG;
     public static final String HEADER_CONVERTER = WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG;
 
+    // encodedTopic/assignEncodedPartition/regexEncodedPartition
+    public static final String ROCKETMQ_RECORDPARTITION_KAFKATOPICPARTITION_MAPPER = "rocketmq.recordPartition.kafkaTopicPartition.mapper";
 
     public static final Set<String> REQUEST_CONFIG = new HashSet<String>(){
         {
-            add(CONNECTOR_CLASS);
-            add(PLUGIN_PATH);
+            add(KAFKA_CONNECTOR_CONFIGS);
         }
     };
 }
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqConnector.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqConnector.java
index 89bee2c7..285bf39b 100644
--- a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqConnector.java
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqConnector.java
@@ -4,8 +4,10 @@ import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.connector.Connector;
 import io.openmessaging.connector.api.component.task.Task;
 import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import io.openmessaging.internal.DefaultKeyValue;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.SinkConnectorConfig;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.rocketmq.connect.kafka.config.ConfigDefine;
 import org.apache.rocketmq.connect.kafka.util.ConfigUtil;
@@ -13,11 +15,7 @@ import org.apache.rocketmq.connect.kafka.util.KafkaPluginsUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.sql.DriverManager;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.stream.Collectors;
 
 public class KafkaRocketmqConnector extends Connector {
@@ -41,30 +39,31 @@ public class KafkaRocketmqConnector extends Connector {
             taskKeyValueConfigs.addAll(
                     taskConfigs
                             .stream()
-                            .map(ConfigUtil::mapConfigToKeyValue)
-                            .collect(Collectors.toList())
-            );
-
-            taskKeyValueConfigs.forEach(kv -> {
-                kv.put(ConfigDefine.PLUGIN_PATH, this.kafkaConnectorConfigs.get(ConfigDefine.PLUGIN_PATH));
-                kv.put(ConfigDefine.CONNECTOR_CLASS, this.kafkaConnectorConfigs.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
-                kv.put(ConfigDefine.TASK_CLASS, this.kafkaConnector.taskClass().getName());
+                            .map(kv -> {
 
-                kv.put(ConfigDefine.ROCKETMQ_CONNECTOR_CLASS, childConnector.getClass().getName());
+                                kv.put(ConfigDefine.PLUGIN_PATH, this.kafkaConnectorConfigs.get(ConfigDefine.PLUGIN_PATH));
+                                kv.put(ConfigDefine.CONNECTOR_CLASS, this.kafkaConnectorConfigs.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
+                                kv.put(ConfigDefine.TASK_CLASS, this.kafkaConnector.taskClass().getName());
 
-                if( this.kafkaConnectorConfigs.containsKey(ConfigDefine.KEY_CONVERTER)){
-                    kv.put(ConfigDefine.KEY_CONVERTER, this.kafkaConnectorConfigs.get(ConfigDefine.KEY_CONVERTER));
-                }
+                                if( this.kafkaConnectorConfigs.containsKey(ConfigDefine.KEY_CONVERTER)){
+                                    kv.put(ConfigDefine.KEY_CONVERTER, this.kafkaConnectorConfigs.get(ConfigDefine.KEY_CONVERTER));
+                                }
 
-                if( this.kafkaConnectorConfigs.containsKey(ConfigDefine.VALUE_CONVERTER)){
-                    kv.put(ConfigDefine.VALUE_CONVERTER, this.kafkaConnectorConfigs.get(ConfigDefine.VALUE_CONVERTER));
-                }
+                                if( this.kafkaConnectorConfigs.containsKey(ConfigDefine.VALUE_CONVERTER)){
+                                    kv.put(ConfigDefine.VALUE_CONVERTER, this.kafkaConnectorConfigs.get(ConfigDefine.VALUE_CONVERTER));
+                                }
 
-                if( this.kafkaConnectorConfigs.containsKey(ConfigDefine.HEADER_CONVERTER)){
-                    kv.put(ConfigDefine.HEADER_CONVERTER, this.kafkaConnectorConfigs.get(ConfigDefine.HEADER_CONVERTER));
-                }
-            });
+                                if( this.kafkaConnectorConfigs.containsKey(ConfigDefine.HEADER_CONVERTER)){
+                                    kv.put(ConfigDefine.HEADER_CONVERTER, this.kafkaConnectorConfigs.get(ConfigDefine.HEADER_CONVERTER));
+                                }
 
+                                KeyValue keyValue = new DefaultKeyValue();
+                                keyValue.put(ConfigDefine.ROCKETMQ_CONNECTOR_CLASS, childConnector.getClass().getName());
+                                keyValue.put(ConfigDefine.KAFKA_CONNECTOR_CONFIGS, ConfigUtil.toJson(kv));
+                                return keyValue;
+                            })
+                            .collect(Collectors.toList())
+            );
         });
         return taskKeyValueConfigs;
     }
@@ -92,14 +91,26 @@ public class KafkaRocketmqConnector extends Connector {
 
     @Override
     public void validate(KeyValue config) {
-
         for(String requestConfig: ConfigDefine.REQUEST_CONFIG){
             if(!config.containsKey(requestConfig)){
                 throw new ConnectException("miss config:"+requestConfig);
             }
         }
+        this.kafkaConnectorConfigs = ConfigUtil.getKafkaConnectorConfigs(config);
+        for(String requestConfig: Arrays.asList(ConfigDefine.CONNECTOR_CLASS, ConfigDefine.PLUGIN_PATH)){
+            if(!kafkaConnectorConfigs.containsKey(requestConfig)){
+                throw new ConnectException("miss config:"+requestConfig);
+            }
+        }
+        if(!kafkaConnectorConfigs.containsKey(SinkConnectorConfig.TOPICS_CONFIG)){
+            if(config.containsKey(ConfigDefine.ROCKETMQ_CONNECT_TOPIC_NAMES)){
+                kafkaConnectorConfigs.put(
+                        SinkConnectorConfig.TOPICS_CONFIG,
+                        config.getString(ConfigDefine.ROCKETMQ_CONNECT_TOPIC_NAMES)
+                );
+            }
+        }
 
-        this.kafkaConnectorConfigs = ConfigUtil.keyValueConfigToMap(config);
         log.info("kafka connector config is {}", this.kafkaConnectorConfigs);
         this.kafkaPlugins =  KafkaPluginsUtil.getPlugins(Collections.singletonMap(KafkaPluginsUtil.PLUGIN_PATH, this.kafkaConnectorConfigs.get(ConfigDefine.PLUGIN_PATH)));
         String connectorClassName = this.kafkaConnectorConfigs.get(ConfigDefine.CONNECTOR_CLASS);
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkConnector.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkConnector.java
index 9cec38ef..d5f882fd 100644
--- a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkConnector.java
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkConnector.java
@@ -13,31 +13,31 @@ public class KafkaRocketmqSinkConnector extends SinkConnector {
 
     private static final Logger log = LoggerFactory.getLogger(KafkaRocketmqSinkConnector.class);
 
-    private KafkaRocketmqConnector kafkaRocketmqConnector = new KafkaRocketmqConnector(this);
+    private KafkaRocketmqConnector parentConnector = new KafkaRocketmqConnector(this);
 
     @Override
     public List<KeyValue> taskConfigs(int maxTasks) {
-        return kafkaRocketmqConnector.taskConfigs(maxTasks);
+        return parentConnector.taskConfigs(maxTasks);
     }
 
     @Override
     public Class<? extends Task> taskClass() {
-        return kafkaRocketmqConnector.taskClass();
+        return parentConnector.taskClass();
     }
 
     @Override
     public void start(KeyValue config) {
-        kafkaRocketmqConnector.start(config);
+        parentConnector.start(config);
     }
 
     @Override
     public void stop() {
-        kafkaRocketmqConnector.stop();
+        parentConnector.stop();
     }
 
     @Override
     public void validate(KeyValue config) {
-        kafkaRocketmqConnector.validate(config);
+        parentConnector.validate(config);
     }
 
 }
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkTask.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkTask.java
index c2b6cfaa..431d6b3a 100644
--- a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkTask.java
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkTask.java
@@ -9,23 +9,18 @@ import io.openmessaging.connector.api.errors.ConnectException;
 import io.openmessaging.connector.api.errors.RetriableException;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.header.ConnectHeaders;
 import org.apache.kafka.connect.runtime.TaskConfig;
-import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.sink.SinkRecord;
-import org.apache.kafka.connect.storage.Converter;
-import org.apache.kafka.connect.storage.HeaderConverter;
-
-import org.apache.kafka.connect.transforms.util.SimpleConfig;
 import org.apache.rocketmq.connect.kafka.config.ConfigDefine;
 import org.apache.rocketmq.connect.kafka.util.ConfigUtil;
 import org.apache.rocketmq.connect.kafka.util.KafkaPluginsUtil;
 import org.apache.rocketmq.connect.kafka.util.RecordUtil;
+import org.apache.rocketmq.connect.kafka.util.RocketmqRecordPartitionKafkaTopicPartitionMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,27 +32,23 @@ public class KafkaRocketmqSinkTask extends SinkTask {
     private static final Logger log = LoggerFactory.getLogger(KafkaRocketmqSinkTask.class);
 
     private org.apache.kafka.connect.sink.SinkTask kafkaSinkTask;
-    private ClassLoader classLoader;
-
-    private Converter keyConverter;
-    private Converter valueConverter;
-    private HeaderConverter headerConverter;
+    private final KafkaRocketmqTask parentTask = new KafkaRocketmqTask();
+    private RocketmqRecordPartitionKafkaTopicPartitionMapper kafkaTopicPartitionMapper;
 
     @Override
     public void put(List<ConnectRecord> sinkRecords) throws ConnectException {
         Collection<SinkRecord> records = new ArrayList<>(sinkRecords.size());
         for(ConnectRecord sinkRecord: sinkRecords){
             String topic = (String)sinkRecord.getPosition().getPartition().getPartition().get(RecordUtil.TOPIC);
-            SchemaAndValue valueSchemaAndValue = valueConverter.toConnectData(topic, ((String)sinkRecord.getData()).getBytes(StandardCharsets.UTF_8));
+            SchemaAndValue valueSchemaAndValue = this.parentTask.getValueConverter().toConnectData(topic, ((String)sinkRecord.getData()).getBytes(StandardCharsets.UTF_8));
             String key = sinkRecord.getExtension(RecordUtil.KAFKA_MSG_KEY);
             SchemaAndValue keySchemaAndValue = null;
             if(key != null) {
-                keySchemaAndValue = keyConverter.toConnectData(topic, key.getBytes(StandardCharsets.UTF_8));
+                keySchemaAndValue = this.parentTask.getKeyConverter().toConnectData(topic, key.getBytes(StandardCharsets.UTF_8));
             }
-
+            TopicPartition topicPartition = this.kafkaTopicPartitionMapper.toTopicPartition(sinkRecord.getPosition().getPartition());
             SinkRecord record = new SinkRecord(
-                    RecordUtil.getTopicAndBrokerName(sinkRecord.getPosition().getPartition()),
-                    RecordUtil.getPartition(sinkRecord.getPosition().getPartition()),
+                    topicPartition.topic(), topicPartition.partition(),
                     keySchemaAndValue==null?null:keySchemaAndValue.schema(),
                     keySchemaAndValue==null?null:keySchemaAndValue.value(),
                     valueSchemaAndValue.schema(), valueSchemaAndValue.value(),
@@ -80,7 +71,7 @@ public class KafkaRocketmqSinkTask extends SinkTask {
             if(RecordUtil.KAFKA_MSG_KEY.equals(headerKey)){
                 continue;
             }
-            SchemaAndValue headerSchemaAndValue = headerConverter
+            SchemaAndValue headerSchemaAndValue = parentTask.getHeaderConverter()
                     .toConnectHeader(topic, headerKey, extensions.getString(headerKey).getBytes());
             headers.add(headerKey, headerSchemaAndValue);
         }
@@ -91,21 +82,22 @@ public class KafkaRocketmqSinkTask extends SinkTask {
 
     @Override
     public void start(KeyValue config) {
-        Map<String, String> kafkaTaskProps = ConfigUtil.keyValueConfigToMap(config);
+        Map<String, String> kafkaTaskProps = ConfigUtil.getKafkaConnectorConfigs(config);
         log.info("kafka connector task config is {}", kafkaTaskProps);
         Plugins kafkaPlugins =  KafkaPluginsUtil.getPlugins(Collections.singletonMap(KafkaPluginsUtil.PLUGIN_PATH, kafkaTaskProps.get(ConfigDefine.PLUGIN_PATH)));
         String connectorClass = kafkaTaskProps.get(ConfigDefine.CONNECTOR_CLASS);
         ClassLoader connectorLoader = kafkaPlugins.delegatingLoader().connectorLoader(connectorClass);
-        this.classLoader = Plugins.compareAndSwapLoaders(connectorLoader);
+        this.parentTask.setClassLoader(Plugins.compareAndSwapLoaders(connectorLoader));
         try {
             TaskConfig taskConfig = new TaskConfig(kafkaTaskProps);
             Class<? extends Task> taskClass = taskConfig.getClass(ConfigDefine.TASK_CLASS).asSubclass(Task.class);
             this.kafkaSinkTask = (org.apache.kafka.connect.sink.SinkTask)kafkaPlugins.newTask(taskClass);
-            initConverter(kafkaPlugins, kafkaTaskProps);
-            this.kafkaSinkTask.initialize(new RocketmqKafkaSinkTaskContext(sinkTaskContext));
+            this.parentTask.initConverter(kafkaPlugins, kafkaTaskProps, this.sinkTaskContext.getConnectorName(), this.sinkTaskContext.getTaskName());
+            this.kafkaTopicPartitionMapper = RocketmqRecordPartitionKafkaTopicPartitionMapper.newKafkaTopicPartitionMapper(kafkaTaskProps);
+            this.kafkaSinkTask.initialize(new RocketmqKafkaSinkTaskContext(sinkTaskContext, this.kafkaTopicPartitionMapper));
             this.kafkaSinkTask.start(kafkaTaskProps);
         } catch (Throwable e){
-            recoverClassLoader();
+            this.parentTask.recoverClassLoader();
             throw e;
         }
     }
@@ -115,69 +107,11 @@ public class KafkaRocketmqSinkTask extends SinkTask {
         try {
             this.kafkaSinkTask.stop();
         } finally {
-            recoverClassLoader();
-        }
-    }
-
-
-    private void recoverClassLoader(){
-        if(this.classLoader != null){
-            Plugins.compareAndSwapLoaders(this.classLoader);
-            this.classLoader = null;
+            this.parentTask.recoverClassLoader();
         }
     }
 
-    private void initConverter(Plugins plugins, Map<String, String> taskProps){
-
-        ConfigDef converterConfigDef = new ConfigDef()
-                .define(ConfigDefine.KEY_CONVERTER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, "")
-                .define(ConfigDefine.VALUE_CONVERTER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, "")
-                .define(ConfigDefine.HEADER_CONVERTER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, "");
 
-        Map<String, String> connProps = new HashMap<>();
-        if(taskProps.containsKey(ConfigDefine.KEY_CONVERTER)){
-            connProps.put(ConfigDefine.KEY_CONVERTER, taskProps.get(ConfigDefine.KEY_CONVERTER));
-        }
-        if(taskProps.containsKey(ConfigDefine.VALUE_CONVERTER)){
-            connProps.put(ConfigDefine.VALUE_CONVERTER, taskProps.get(ConfigDefine.VALUE_CONVERTER));
-        }
-        if(taskProps.containsKey(ConfigDefine.HEADER_CONVERTER)){
-            connProps.put(ConfigDefine.HEADER_CONVERTER, taskProps.get(ConfigDefine.HEADER_CONVERTER));
-        }
-        SimpleConfig connConfig = new SimpleConfig(converterConfigDef, connProps);
-
-        Map<String, String> workerProps = new HashMap<>();
-        workerProps.put(ConfigDefine.KEY_CONVERTER, "org.apache.kafka.connect.json.JsonConverter");
-        workerProps.put(ConfigDefine.VALUE_CONVERTER, "org.apache.kafka.connect.json.JsonConverter");
-        workerProps.put(ConfigDefine.HEADER_CONVERTER, "org.apache.kafka.connect.storage.SimpleHeaderConverter");
-        SimpleConfig workerConfig = new SimpleConfig(converterConfigDef, workerProps);
-
-        keyConverter = plugins.newConverter(connConfig, ConfigDefine.KEY_CONVERTER, Plugins.ClassLoaderUsage
-                .CURRENT_CLASSLOADER);
-        valueConverter = plugins.newConverter(connConfig, ConfigDefine.VALUE_CONVERTER, Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER);
-        headerConverter = plugins.newHeaderConverter(connConfig, ConfigDefine.HEADER_CONVERTER,
-                Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER);
-
-        if (keyConverter == null) {
-            keyConverter = plugins.newConverter(workerConfig, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage.PLUGINS);
-            log.info("Set up the key converter {} for task {} using the worker config", keyConverter.getClass(), sinkTaskContext.getTaskName());
-        } else {
-            log.info("Set up the key converter {} for task {} using the connector config", keyConverter.getClass(), sinkTaskContext.getTaskName());
-        }
-        if (valueConverter == null) {
-            valueConverter = plugins.newConverter(workerConfig, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage.PLUGINS);
-            log.info("Set up the value converter {} for task {} using the worker config", valueConverter.getClass(), sinkTaskContext.getTaskName());
-        } else {
-            log.info("Set up the value converter {} for task {} using the connector config", valueConverter.getClass(), sinkTaskContext.getTaskName());
-        }
-        if (headerConverter == null) {
-            headerConverter = plugins.newHeaderConverter(workerConfig, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage
-                    .PLUGINS);
-            log.info("Set up the header converter {} for task {} using the worker config", headerConverter.getClass(), sinkTaskContext.getTaskName());
-        } else {
-            log.info("Set up the header converter {} for task {} using the connector config", headerConverter.getClass(), sinkTaskContext.getTaskName());
-        }
-    }
 
     @Override
     public void flush(Map<RecordPartition, RecordOffset> currentOffsets) throws ConnectException {
@@ -190,7 +124,7 @@ public class KafkaRocketmqSinkTask extends SinkTask {
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(currentOffsets.size());
 
         for(Map.Entry<RecordPartition, RecordOffset> po: currentOffsets.entrySet()){
-            TopicPartition tp = RecordUtil.recordPartitionToTopicPartition(po.getKey());
+            TopicPartition tp = this.kafkaTopicPartitionMapper.toTopicPartition(po.getKey());
             OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(RecordUtil.getOffset(po.getValue()));
             offsets.put(tp, offsetAndMetadata);
         }
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceConnector.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceConnector.java
index 6f6e006b..4409033b 100644
--- a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceConnector.java
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceConnector.java
@@ -13,31 +13,31 @@ public class KafkaRocketmqSourceConnector extends SourceConnector {
 
     private static final Logger log = LoggerFactory.getLogger(KafkaRocketmqSourceConnector.class);
 
-    private KafkaRocketmqConnector kafkaRocketmqConnector = new KafkaRocketmqConnector(this);
+    private KafkaRocketmqConnector parentConnector = new KafkaRocketmqConnector(this);
 
     @Override
     public List<KeyValue> taskConfigs(int maxTasks) {
-        return kafkaRocketmqConnector.taskConfigs(maxTasks);
+        return parentConnector.taskConfigs(maxTasks);
     }
     
     @Override
     public Class<? extends Task> taskClass() {
-        return kafkaRocketmqConnector.taskClass();
+        return parentConnector.taskClass();
     }
 
     @Override
     public void start(KeyValue config) {
-        kafkaRocketmqConnector.start(config);
+        parentConnector.start(config);
     }
 
     @Override
     public void stop() {
-        kafkaRocketmqConnector.stop();
+        parentConnector.stop();
     }
 
     @Override
     public void validate(KeyValue config) {
-        kafkaRocketmqConnector.validate(config);
+        parentConnector.validate(config);
     }
 
 }
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceTask.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceTask.java
index 3db8251e..332da72b 100644
--- a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceTask.java
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceTask.java
@@ -3,13 +3,8 @@ package org.apache.rocketmq.connect.kafka.connector;
 import io.openmessaging.connector.api.data.*;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.runtime.TaskConfig;
-import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.storage.Converter;
-import org.apache.kafka.connect.storage.HeaderConverter;
-import org.apache.kafka.connect.transforms.util.SimpleConfig;
 import org.apache.rocketmq.connect.kafka.util.KafkaPluginsUtil;
 import org.apache.rocketmq.connect.kafka.util.RecordUtil;
 import org.slf4j.Logger;
@@ -27,17 +22,11 @@ public class KafkaRocketmqSourceTask extends SourceTask {
     private static final Logger log = LoggerFactory.getLogger(KafkaRocketmqSourceTask.class);
 
     private org.apache.kafka.connect.source.SourceTask kafkaSourceTask;
-
-    private ClassLoader classLoader;
-
-    private Converter keyConverter;
-    private Converter valueConverter;
-    private HeaderConverter headerConverter;
+    private final KafkaRocketmqTask parentTask = new KafkaRocketmqTask();
 
 
     @Override
     public List<ConnectRecord> poll() throws InterruptedException {
-
         List<SourceRecord>  sourceRecords =  this.kafkaSourceTask.poll();
 
         if(sourceRecords == null){
@@ -47,107 +36,49 @@ public class KafkaRocketmqSourceTask extends SourceTask {
         List<ConnectRecord> connectRecords = new ArrayList<>(sourceRecords.size());
         for(SourceRecord sourceRecord: sourceRecords){
             connectRecords.add(RecordUtil.toConnectRecord(sourceRecord,
-                    this.keyConverter, this.valueConverter, this.headerConverter));
+                    this.parentTask.getKeyConverter(), this.parentTask.getValueConverter(),
+                    this.parentTask.getHeaderConverter()));
         }
         return connectRecords;
     }
 
     @Override
     public void start(KeyValue config) {
-        Map<String, String> kafkaTaskProps = ConfigUtil.keyValueConfigToMap(config);
+        Map<String, String> kafkaTaskProps = ConfigUtil.getKafkaConnectorConfigs(config);
         log.info("kafka connector task config is {}", kafkaTaskProps);
         Plugins kafkaPlugins = KafkaPluginsUtil.getPlugins(Collections.singletonMap(KafkaPluginsUtil.PLUGIN_PATH, kafkaTaskProps.get(ConfigDefine.PLUGIN_PATH)));
         String connectorClass = kafkaTaskProps.get(ConfigDefine.CONNECTOR_CLASS);
         ClassLoader connectorLoader = kafkaPlugins.delegatingLoader().connectorLoader(connectorClass);
-        this.classLoader = Plugins.compareAndSwapLoaders(connectorLoader);
+        this.parentTask.setClassLoader(Plugins.compareAndSwapLoaders(connectorLoader));
         try {
 
             TaskConfig taskConfig = new TaskConfig(kafkaTaskProps);
             Class<? extends Task> taskClass = taskConfig.getClass(ConfigDefine.TASK_CLASS).asSubclass(Task.class);
             this.kafkaSourceTask = (org.apache.kafka.connect.source.SourceTask)kafkaPlugins.newTask(taskClass);
-
-            initConverter(kafkaPlugins, kafkaTaskProps);
-
+            this.parentTask.initConverter(kafkaPlugins, kafkaTaskProps, this.sourceTaskContext.getConnectorName(), this.sourceTaskContext.getTaskName());
             this.kafkaSourceTask.initialize(new RocketmqKafkaSourceTaskContext(sourceTaskContext));
             this.kafkaSourceTask.start(kafkaTaskProps);
         } catch (Throwable e){
-            recoverClassLoader();
+            this.parentTask.recoverClassLoader();
             throw e;
         }
     }
 
-    private void initConverter(Plugins plugins, Map<String, String> taskProps){
-
-        ConfigDef converterConfigDef = new ConfigDef()
-                .define(ConfigDefine.KEY_CONVERTER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, "")
-                .define(ConfigDefine.VALUE_CONVERTER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, "")
-                .define(ConfigDefine.HEADER_CONVERTER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, "");
-
-        Map<String, String> connProps = new HashMap<>();
-        if(taskProps.containsKey(ConfigDefine.KEY_CONVERTER)){
-            connProps.put(ConfigDefine.KEY_CONVERTER, taskProps.get(ConfigDefine.KEY_CONVERTER));
-        }
-        if(taskProps.containsKey(ConfigDefine.VALUE_CONVERTER)){
-            connProps.put(ConfigDefine.VALUE_CONVERTER, taskProps.get(ConfigDefine.VALUE_CONVERTER));
-        }
-        if(taskProps.containsKey(ConfigDefine.HEADER_CONVERTER)){
-            connProps.put(ConfigDefine.HEADER_CONVERTER, taskProps.get(ConfigDefine.HEADER_CONVERTER));
-        }
-        SimpleConfig connConfig = new SimpleConfig(converterConfigDef, connProps);
-
-        Map<String, String> workerProps = new HashMap<>();
-        workerProps.put(ConfigDefine.KEY_CONVERTER, "org.apache.kafka.connect.json.JsonConverter");
-        workerProps.put(ConfigDefine.VALUE_CONVERTER, "org.apache.kafka.connect.json.JsonConverter");
-        workerProps.put(ConfigDefine.HEADER_CONVERTER, "org.apache.kafka.connect.storage.SimpleHeaderConverter");
-        SimpleConfig workerConfig = new SimpleConfig(converterConfigDef, workerProps);
-
-        keyConverter = plugins.newConverter(connConfig, ConfigDefine.KEY_CONVERTER, Plugins.ClassLoaderUsage
-                .CURRENT_CLASSLOADER);
-        valueConverter = plugins.newConverter(connConfig, ConfigDefine.VALUE_CONVERTER, Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER);
-        headerConverter = plugins.newHeaderConverter(connConfig, ConfigDefine.HEADER_CONVERTER,
-                Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER);
-
-        if (keyConverter == null) {
-            keyConverter = plugins.newConverter(workerConfig, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage.PLUGINS);
-            log.info("Set up the key converter {} for task {} using the worker config", keyConverter.getClass(), sourceTaskContext.getTaskName());
-        } else {
-            log.info("Set up the key converter {} for task {} using the connector config", keyConverter.getClass(), sourceTaskContext.getTaskName());
-        }
-        if (valueConverter == null) {
-            valueConverter = plugins.newConverter(workerConfig, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage.PLUGINS);
-            log.info("Set up the value converter {} for task {} using the worker config", valueConverter.getClass(), sourceTaskContext.getTaskName());
-        } else {
-            log.info("Set up the value converter {} for task {} using the connector config", valueConverter.getClass(), sourceTaskContext.getTaskName());
-        }
-        if (headerConverter == null) {
-            headerConverter = plugins.newHeaderConverter(workerConfig, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage
-                    .PLUGINS);
-            log.info("Set up the header converter {} for task {} using the worker config", headerConverter.getClass(), sourceTaskContext.getTaskName());
-        } else {
-            log.info("Set up the header converter {} for task {} using the connector config", headerConverter.getClass(), sourceTaskContext.getTaskName());
-        }
-    }
 
     @Override
     public void stop() {
         try {
             this.kafkaSourceTask.stop();
         } finally {
-            recoverClassLoader();
+            this.parentTask.recoverClassLoader();
         }
     }
 
-    private void recoverClassLoader(){
-        if(this.classLoader != null){
-            Plugins.compareAndSwapLoaders(this.classLoader);
-            this.classLoader = null;
-        }
-    }
+
 
 
     @Override
     public void commit(ConnectRecord record, Map<String, String> metadata) {
-
         if(this.kafkaSourceTask == null){
             log.warn("the task is not start, metadata:{}", metadata);
             return;
@@ -161,7 +92,7 @@ public class KafkaRocketmqSourceTask extends SourceTask {
                     System.currentTimeMillis(), 0,0
             );
             this.kafkaSourceTask.commitRecord(
-                    RecordUtil.toSourceRecord(record, this.keyConverter, this.valueConverter, this.headerConverter),
+                    RecordUtil.toSourceRecord(record, this.parentTask.getKeyConverter(), this.parentTask.getValueConverter(), this.parentTask.getHeaderConverter()),
                     recordMetadata
             );
         } catch (InterruptedException e){
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqTask.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqTask.java
new file mode 100644
index 00000000..3cf94aed
--- /dev/null
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqTask.java
@@ -0,0 +1,93 @@
+package org.apache.rocketmq.connect.kafka.connector;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+import org.apache.rocketmq.connect.kafka.config.ConfigDefine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class KafkaRocketmqTask {
+    private static final Logger log = LoggerFactory.getLogger(KafkaRocketmqTask.class);
+
+    private ClassLoader classLoader;
+    private Converter keyConverter;
+    private Converter valueConverter;
+    private HeaderConverter headerConverter;
+
+    public ClassLoader getClassLoader() {
+        return classLoader;
+    }
+
+    public void setClassLoader(ClassLoader classLoader) {
+        this.classLoader = classLoader;
+    }
+
+    public Converter getKeyConverter() {
+        return keyConverter;
+    }
+
+    public Converter getValueConverter() {
+        return valueConverter;
+    }
+
+    public HeaderConverter getHeaderConverter() {
+        return headerConverter;
+    }
+
+    public void recoverClassLoader(){
+        if(this.classLoader != null){
+            Plugins.compareAndSwapLoaders(this.classLoader);
+            this.classLoader = null;
+        }
+    }
+
+    public void initConverter(Plugins plugins, Map<String, String> taskProps, String connectorName, String taskName){
+        Map<String, String> connProps = new HashMap<>(taskProps);
+        connProps.put(ConnectorConfig.NAME_CONFIG, connectorName);
+        final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps);
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put(ConfigDefine.KEY_CONVERTER, "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put(ConfigDefine.VALUE_CONVERTER, "org.apache.kafka.connect.json.JsonConverter");
+        workerProps.put(ConfigDefine.HEADER_CONVERTER, "org.apache.kafka.connect.storage.SimpleHeaderConverter");
+        ConfigDef converterConfigDef = new ConfigDef()
+                .define(ConfigDefine.KEY_CONVERTER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, "")
+                .define(ConfigDefine.VALUE_CONVERTER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, "")
+                .define(ConfigDefine.HEADER_CONVERTER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, "");
+        SimpleConfig workerConfig = new SimpleConfig(converterConfigDef, workerProps);
+
+        keyConverter = plugins.newConverter(connConfig, ConfigDefine.KEY_CONVERTER, Plugins.ClassLoaderUsage
+                .CURRENT_CLASSLOADER);
+        valueConverter = plugins.newConverter(connConfig, ConfigDefine.VALUE_CONVERTER, Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER);
+        headerConverter = plugins.newHeaderConverter(connConfig, ConfigDefine.HEADER_CONVERTER,
+                Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER);
+
+        if (keyConverter == null) {
+            keyConverter = plugins.newConverter(workerConfig, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage.PLUGINS);
+            log.info("Set up the key converter {} for task {} using the worker config", keyConverter.getClass(), taskName);
+        } else {
+            log.info("Set up the key converter {} for task {} using the connector config", keyConverter.getClass(), taskName);
+        }
+        if (valueConverter == null) {
+            valueConverter = plugins.newConverter(workerConfig, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage.PLUGINS);
+            log.info("Set up the value converter {} for task {} using the worker config", valueConverter.getClass(), taskName);
+        } else {
+            log.info("Set up the value converter {} for task {} using the connector config", valueConverter.getClass(), taskName);
+        }
+        if (headerConverter == null) {
+            headerConverter = plugins.newHeaderConverter(workerConfig, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage
+                    .PLUGINS);
+            log.info("Set up the header converter {} for task {} using the worker config", headerConverter.getClass(), taskName);
+        } else {
+            log.info("Set up the header converter {} for task {} using the connector config", headerConverter.getClass(), taskName);
+        }
+    }
+
+}
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaSinkTaskContext.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaSinkTaskContext.java
index 848032be..52d4c9dc 100644
--- a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaSinkTaskContext.java
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaSinkTaskContext.java
@@ -8,6 +8,7 @@ import org.apache.kafka.connect.sink.ErrantRecordReporter;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.rocketmq.connect.kafka.util.ConfigUtil;
 import org.apache.rocketmq.connect.kafka.util.RecordUtil;
+import org.apache.rocketmq.connect.kafka.util.RocketmqRecordPartitionKafkaTopicPartitionMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -23,9 +24,11 @@ public class RocketmqKafkaSinkTaskContext implements org.apache.kafka.connect.si
     private static final ExecutorService EXECUTOR_SERVICE  = Executors.newFixedThreadPool(1);
 
     private SinkTaskContext sinkTaskContext;
+    private RocketmqRecordPartitionKafkaTopicPartitionMapper kafkaTopicPartitionMapper;
 
-    public RocketmqKafkaSinkTaskContext(SinkTaskContext sinkTaskContext) {
+    public RocketmqKafkaSinkTaskContext(SinkTaskContext sinkTaskContext, RocketmqRecordPartitionKafkaTopicPartitionMapper kafkaTopicPartitionMapper) {
         this.sinkTaskContext = sinkTaskContext;
+        this.kafkaTopicPartitionMapper = kafkaTopicPartitionMapper;
     }
 
     @Override
@@ -38,10 +41,8 @@ public class RocketmqKafkaSinkTaskContext implements org.apache.kafka.connect.si
 
         Map<RecordPartition, RecordOffset> offsets2 = new HashMap<>(offsets.size());
         offsets.forEach((tp,offset) -> {
-            Map<String, String> map = RecordUtil.getPartitionMap(tp.topic());
-            map.put(RecordUtil.QUEUE_ID, tp.partition() + "");
-            RecordPartition recordPartition = new RecordPartition(map);
 
+            RecordPartition recordPartition = kafkaTopicPartitionMapper.toRecordPartition(tp);
             Map<String, String> offsetMap = new HashMap<>();
             offsetMap.put(RecordUtil.QUEUE_OFFSET, offset + "");
             RecordOffset recordOffset = new RecordOffset(offsetMap);
@@ -65,7 +66,7 @@ public class RocketmqKafkaSinkTaskContext implements org.apache.kafka.connect.si
     public Set<TopicPartition> assignment() {
         return sinkTaskContext.assignment()
                 .stream()
-                .map(RecordUtil::recordPartitionToTopicPartition)
+                .map(kafkaTopicPartitionMapper::toTopicPartition)
                 .collect(Collectors.toSet());
     }
 
@@ -85,7 +86,7 @@ public class RocketmqKafkaSinkTaskContext implements org.apache.kafka.connect.si
 
     private List<RecordPartition> toRecordPartitions(TopicPartition... partitions){
         return Arrays.stream(partitions)
-                .map(RecordUtil::topicPartitionToRecordPartition)
+                .map(kafkaTopicPartitionMapper::toRecordPartition)
                 .collect(Collectors.toList());
     }
 
@@ -104,19 +105,7 @@ public class RocketmqKafkaSinkTaskContext implements org.apache.kafka.connect.si
                  return EXECUTOR_SERVICE.submit(new Callable<Void>() {
                      @Override
                      public Void call() throws Exception {
-
-                         Map<String, String> partitionMap = RecordUtil.getPartitionMap(record.topic());
-                         partitionMap.put(RecordUtil.QUEUE_ID, record.kafkaPartition() + "");
-                         RecordPartition recordPartition = new RecordPartition(partitionMap);
-
-                         Map<String, String> offsetMap = new HashMap<>();
-                         offsetMap.put(RecordUtil.QUEUE_OFFSET, record.kafkaOffset() + "");
-                         RecordOffset recordOffset = new RecordOffset(offsetMap);
-
-                         ConnectRecord connectRecord = new ConnectRecord(
-                                 recordPartition, recordOffset, record.timestamp(),
-                                 SchemaBuilder.string().build(), record.value()
-                                 );
+                         ConnectRecord connectRecord = RecordUtil.sinkRecordToConnectRecord(record, RocketmqKafkaSinkTaskContext.this.kafkaTopicPartitionMapper);
                          sinkTaskContext.errorRecordReporter().report(connectRecord, error);
                          return null;
                      }
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/AssignEncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/AssignEncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper.java
new file mode 100644
index 00000000..4881f9f7
--- /dev/null
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/AssignEncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper.java
@@ -0,0 +1,49 @@
+package org.apache.rocketmq.connect.kafka.util;
+
+import org.apache.kafka.connect.errors.ConnectException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class AssignEncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper
+        extends EncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper{
+
+
+    private String assignments_config = "assignments";
+
+    private Map<String, Integer> brokerName2Ids = new HashMap<>();
+    private Map<Integer, String> id2BrokerName = new HashMap<>();
+
+
+    @Override
+    public void configure(Map<String, String> configs) {
+        super.configure(configs);
+        String assignments = configs.get(assignments_config);
+        if(assignments == null || assignments.trim().isEmpty()){
+            throw new ConnectException("miss assignments config");
+        }
+        for(String brokerNameAndId:assignments.split(",")){
+            String[] brokerNameAndIds = brokerNameAndId.trim().split(":");
+            String brokerName = brokerNameAndIds[0].trim();
+            Integer id = Integer.valueOf(brokerNameAndIds[1].trim());
+            checkBrokerNameId(id);
+            if(brokerName2Ids.put(brokerName, id) != null){
+                throw new ConnectException("error config,repeat brokerName:"+brokerName);
+            }
+            if(id2BrokerName.put(id, brokerName) != null){
+                throw new ConnectException("error config,repeat brokerNameId:"+id);
+            }
+        }
+    }
+
+    @Override
+    protected Integer getBrokerNameId(String brokerName) {
+        return brokerName2Ids.get(brokerName);
+    }
+
+    @Override
+    protected String getBrokerNameById(int id) {
+        return id2BrokerName.get(id);
+    }
+
+}
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/ConfigUtil.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/ConfigUtil.java
index 2113fcbf..c5593627 100644
--- a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/ConfigUtil.java
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/ConfigUtil.java
@@ -1,7 +1,10 @@
 package org.apache.rocketmq.connect.kafka.util;
 
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import io.openmessaging.KeyValue;
-import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rocketmq.connect.kafka.config.ConfigDefine;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -9,6 +12,8 @@ import java.util.Set;
 
 public class ConfigUtil {
 
+    private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
     public static Map<String, String> keyValueConfigToMap(KeyValue keyValueConfig){
         if(keyValueConfig == null){
             return null;
@@ -20,15 +25,21 @@ public class ConfigUtil {
         return mapConfig;
     }
 
-
-    public static KeyValue mapConfigToKeyValue(Map<String, String> mapConfig){
-        if(mapConfig == null){
-            return null;
+    public static Map<String, String> getKafkaConnectorConfigs(KeyValue keyValueConfig){
+        String kafkaConnectorConfigsStr = keyValueConfig.getString(ConfigDefine.KAFKA_CONNECTOR_CONFIGS);
+        try {
+            return OBJECT_MAPPER.readValue(kafkaConnectorConfigsStr,
+                    new TypeReference<Map<String, String>>(){});
+        } catch (Exception e){
+            throw new ConnectException("error kafka.connector.configs config" , e);
         }
+    }
 
-        KeyValue keyValue = new DefaultKeyValue();
-        mapConfig.forEach((k, v)-> keyValue.put(k, v));
-
-        return keyValue;
+    public static String toJson(Map<String, String> mapConfig) {
+        try {
+            return OBJECT_MAPPER.writeValueAsString(mapConfig);
+        } catch (Exception e){
+            throw new RuntimeException(e);
+        }
     }
 }
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/EncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/EncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper.java
new file mode 100644
index 00000000..7548f8e5
--- /dev/null
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/EncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper.java
@@ -0,0 +1,67 @@
+package org.apache.rocketmq.connect.kafka.util;
+
+import io.openmessaging.connector.api.data.RecordPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public abstract class EncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper
+        extends RocketmqRecordPartitionKafkaTopicPartitionMapper {
+
+    private String partitionBits_config = "partitionBits";
+
+    private int partitionBits;
+
+    private int maxPartition;
+    private int maxBrokerNameId;
+
+
+    protected abstract Integer getBrokerNameId(String brokerName);
+    protected abstract String getBrokerNameById(int id);
+
+
+    @Override
+    public void configure(Map<String, String> configs) {
+        this.partitionBits = Integer.valueOf(configs.getOrDefault(partitionBits_config, "16"));
+        this.maxPartition = 2^16;
+        this.maxBrokerNameId = 2^(32-16);
+    }
+
+    protected void checkBrokerNameId(int brokerNameId){
+        if(brokerNameId < 0 || brokerNameId>this.maxBrokerNameId){
+            throw new ConnectException("brokerNameId:"+brokerNameId+" must >=0 and <="+this.maxBrokerNameId);
+        }
+    }
+
+
+
+    @Override
+    public TopicPartition toTopicPartition(RecordPartition recordPartition) {
+        int queueId = getQueueId(recordPartition);
+        Integer brokerNameId = getBrokerNameId(getBrokerName(recordPartition));
+        int encodedPartition = queueId | (brokerNameId << partitionBits);
+        return new TopicPartition(getMessageQueueTopic(recordPartition), encodedPartition);
+    }
+
+    @Override
+    public RecordPartition toRecordPartition(TopicPartition topicPartition) {
+
+        int encodedPartition = topicPartition.partition();
+        int queueId = encodedPartition & (0xffffffff>>>(32-partitionBits));
+        int brokerNameId = encodedPartition>>>partitionBits;
+
+        String topic = topicPartition.topic();
+        String brokerName = this.getBrokerNameById(brokerNameId);
+        if(brokerName == null || brokerName.trim().isEmpty()){
+            throw new ConnectException("can not get brokerName from  id:"+brokerNameId);
+        }
+
+        Map<String, String> map = new HashMap<>();
+        map.put(RecordUtil.TOPIC, topic);
+        map.put(RecordUtil.BROKER_NAME, brokerName);
+        map.put(RecordUtil.QUEUE_ID, queueId + "");
+        return new RecordPartition(map);
+    }
+}
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/EncodedTopicRocketmqBrokerNameKafkaTopicPartitionMapper.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/EncodedTopicRocketmqBrokerNameKafkaTopicPartitionMapper.java
new file mode 100644
index 00000000..d94faed7
--- /dev/null
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/EncodedTopicRocketmqBrokerNameKafkaTopicPartitionMapper.java
@@ -0,0 +1,54 @@
+package org.apache.rocketmq.connect.kafka.util;
+
+import io.openmessaging.connector.api.data.RecordPartition;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * 编码到Topic
+ */
+public class EncodedTopicRocketmqBrokerNameKafkaTopicPartitionMapper
+        extends RocketmqRecordPartitionKafkaTopicPartitionMapper {
+
+    private String SEPARATOR_CONFIG = "separator";
+
+    private String DEFAULT_SEP = "@#@";;
+
+    private String separator = DEFAULT_SEP;
+
+    @Override
+    public void configure(Map<String, String> configs) {
+        this.separator = configs.getOrDefault(SEPARATOR_CONFIG, DEFAULT_SEP);
+    }
+
+    @Override
+    public TopicPartition toTopicPartition(RecordPartition recordPartition) {
+        String topicAndBrokerName = getTopicAndBrokerName(recordPartition);
+        int partition = getQueueId(recordPartition);
+        return new TopicPartition(topicAndBrokerName, partition);
+    }
+
+    private  String getTopicAndBrokerName(RecordPartition recordPartition) {
+        return getMessageQueueTopic(recordPartition) +
+                this.separator +
+                getBrokerName(recordPartition);
+    }
+
+    @Override
+    public RecordPartition toRecordPartition(TopicPartition topicPartition) {
+        Map<String, String> map = getPartitionMap(topicPartition.topic());
+        map.put(RecordUtil.QUEUE_ID, topicPartition.partition() + "");
+        return new RecordPartition(map);
+    }
+
+    private  Map<String, String>  getPartitionMap(String topicAndBrokerName) {
+        String[] split = topicAndBrokerName.split(this.separator);
+        Map<String, String> map = new HashMap<>();
+        map.put(RecordUtil.TOPIC, split[0]);
+        map.put(RecordUtil.BROKER_NAME, split[1]);
+        return map;
+    }
+
+}
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/RecordUtil.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/RecordUtil.java
index 815ae561..f190b0f5 100644
--- a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/RecordUtil.java
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/RecordUtil.java
@@ -9,6 +9,7 @@ import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.header.ConnectHeaders;
 import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.HeaderConverter;
@@ -24,60 +25,22 @@ public class RecordUtil {
     public static final String TOPIC = "topic";
     public static final String QUEUE_OFFSET = "queueOffset";
 
-
-    private static final String TOPIC_SEP = "@#@";
-
-
     public static final String KAFKA_MSG_KEY = "kafka_key";
     public static final String KAFKA_CONNECT_RECORD_TOPIC_KEY = "kafka_connect_record_topic";
     public static final String KAFKA_CONNECT_RECORD_PARTITION_KEY = "kafka_connect_record_partition";
     public static final String KAFKA_CONNECT_RECORD_HEADER_KEY_PREFIX = "kafka_connect_record_header_";
 
-    public static String getTopicAndBrokerName(RecordPartition recordPartition) {
-        return new StringBuilder()
-                .append(recordPartition.getPartition().get(TOPIC))
-                .append(TOPIC_SEP)
-                .append(recordPartition.getPartition().get(BROKER_NAME))
-                .toString();
-    }
-
-    public static Map<String, String>  getPartitionMap(String topicAndBrokerName) {
-        String[] split = topicAndBrokerName.split(TOPIC_SEP);
-        Map<String, String> map = new HashMap<>();
-        map.put(TOPIC, split[0]);
-        map.put(BROKER_NAME, split[1]);
-
-        return map;
-    }
-
     public static  long getOffset(RecordOffset recordOffset){
         return Long.valueOf(
                 (String) recordOffset.getOffset().get(QUEUE_OFFSET)
         );
     }
-
-    public static  int getPartition(RecordPartition recordPartition){
-        return Integer.valueOf(
-                (String) recordPartition.getPartition().get(QUEUE_ID)
-        );
-    }
-
-    public static TopicPartition recordPartitionToTopicPartition(RecordPartition recordPartition){
-        String topicAndBrokerName = getTopicAndBrokerName(recordPartition);
-        int partition = getPartition(recordPartition);
-        return new TopicPartition(topicAndBrokerName, partition);
-    }
-
-    public static RecordPartition topicPartitionToRecordPartition(TopicPartition topicPartition){
-        Map<String, String> map = RecordUtil.getPartitionMap(topicPartition.topic());
-        map.put(RecordUtil.QUEUE_ID, topicPartition.partition() + "");
-        return new RecordPartition(map);
-    }
-
-
     public static ConnectRecord toConnectRecord(SourceRecord sourceRecord, Converter keyConverter, Converter valueConverter,
                                          HeaderConverter headerConverter){
-        RecordPartition recordPartition = new RecordPartition(new HashMap<>(sourceRecord.sourcePartition()));
+        Map<String, Object> partition = new HashMap<>();
+        partition.put("topic", sourceRecord.topic());
+        partition.putAll(sourceRecord.sourcePartition());
+        RecordPartition recordPartition = new RecordPartition(partition);
         RecordOffset recordOffset = new RecordOffset(new HashMap<>(sourceRecord.sourceOffset()));
         Long timestamp = sourceRecord.timestamp();
 
@@ -154,4 +117,19 @@ public class RecordUtil {
         return sourceRecord;
     }
 
+    public static ConnectRecord sinkRecordToConnectRecord(SinkRecord sinkRecord, RocketmqRecordPartitionKafkaTopicPartitionMapper kafkaTopicPartitionMapper){
+        TopicPartition topicPartition = new TopicPartition(sinkRecord.topic(), sinkRecord.kafkaPartition());
+        RecordPartition recordPartition = kafkaTopicPartitionMapper.toRecordPartition(topicPartition);
+
+        Map<String, String> offsetMap = new HashMap<>();
+        offsetMap.put(RecordUtil.QUEUE_OFFSET, sinkRecord.kafkaOffset() + "");
+        RecordOffset recordOffset = new RecordOffset(offsetMap);
+
+        ConnectRecord connectRecord = new ConnectRecord(
+                recordPartition, recordOffset, sinkRecord.timestamp(),
+                SchemaBuilder.string().build(), sinkRecord.value()
+        );
+        return connectRecord;
+    }
+
 }
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/RegexEncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/RegexEncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper.java
new file mode 100644
index 00000000..952b9ab7
--- /dev/null
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/RegexEncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper.java
@@ -0,0 +1,53 @@
+package org.apache.rocketmq.connect.kafka.util;
+
+import org.apache.kafka.connect.errors.ConnectException;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class RegexEncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper
+        extends EncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper{
+
+
+    private String pattern_config = "pattern";
+    private Pattern pattern;
+
+    private Map<Integer, String> id2BrokerName = new HashMap<>();
+
+    @Override
+    public void configure(Map<String, String> configs) {
+        super.configure(configs);
+        String pattern = configs.get(pattern_config);
+        if(pattern == null || pattern.trim().isEmpty()){
+            throw new ConnectException("miss pattern config");
+        }
+
+        this.pattern = Pattern.compile(pattern);
+
+    }
+
+    @Override
+    protected Integer getBrokerNameId(String brokerName) {
+        Matcher matcher = pattern.matcher(brokerName);
+        if(matcher.find()){
+            try {
+                Integer id =  Integer.valueOf(matcher.group());
+                if(id2BrokerName.put(id, brokerName) != null){
+                    throw new ConnectException("error config,repeat brokerNameId:"+id);
+                }
+                return id;
+            } catch (NumberFormatException e){
+                throw new ConnectException("can not get brokerNameId from brokerName for regex:"+brokerName, e);
+            }
+        } else {
+            throw new ConnectException("can not get brokerNameId from brokerName for find:"+brokerName);
+        }
+    }
+
+    @Override
+    protected String getBrokerNameById(int id) {
+        return this.id2BrokerName.get(id);
+    }
+}
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/RocketmqRecordPartitionKafkaTopicPartitionMapper.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/RocketmqRecordPartitionKafkaTopicPartitionMapper.java
new file mode 100644
index 00000000..5692ac97
--- /dev/null
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/RocketmqRecordPartitionKafkaTopicPartitionMapper.java
@@ -0,0 +1,78 @@
+package org.apache.rocketmq.connect.kafka.util;
+
+import io.openmessaging.connector.api.data.RecordPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rocketmq.connect.kafka.config.ConfigDefine;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * rocketmq分区概念是三元组,即MessageQueue(topic,brokerName,queueId)
+ *kafka分区概念是二元组,即TopicPartition(topic,partition)
+ *org.apache.kafka.connect.sink.SinkTaskContext多个接口需要TopicPartition(topic,partition)映射到
+ *MessageQueue(topic,brokerName,queueId),所以需要转换
+ */
+public abstract class RocketmqRecordPartitionKafkaTopicPartitionMapper {
+
+
+    public static RocketmqRecordPartitionKafkaTopicPartitionMapper newKafkaTopicPartitionMapper(Map<String, String> kafkaTaskProps){
+        String mapper = kafkaTaskProps.getOrDefault(ConfigDefine.ROCKETMQ_RECORDPARTITION_KAFKATOPICPARTITION_MAPPER, "encodedTopic");
+        RocketmqRecordPartitionKafkaTopicPartitionMapper kafkaTopicPartitionMapper;
+        if(mapper.equalsIgnoreCase("encodedTopic")){
+            kafkaTopicPartitionMapper = new EncodedTopicRocketmqBrokerNameKafkaTopicPartitionMapper();
+        } else if(mapper.equalsIgnoreCase("assignEncodedPartition")){
+            kafkaTopicPartitionMapper = new AssignEncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper();
+        } else if(mapper.equalsIgnoreCase("regexEncodedPartition")){
+            kafkaTopicPartitionMapper = new RegexEncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper();
+        } else {
+            throw new ConnectException("unknown rocketmq.recordPartition.kafkaTopicPartition.mapper config:"+mapper);
+        }
+        Map<String, String> mapperConfig = new HashMap<>();
+        String prefix = mapper+".";
+        for(Map.Entry<String, String> kv: kafkaTaskProps.entrySet()){
+            if(kv.getKey().startsWith(prefix)){
+                mapperConfig.put(kv.getKey().substring(prefix.length()), kv.getValue());
+            }
+        }
+        kafkaTopicPartitionMapper.configure(mapperConfig);
+        return kafkaTopicPartitionMapper;
+    }
+
+    protected String getBrokerName(RecordPartition recordPartition){
+        return (String)recordPartition.getPartition().get(RecordUtil.BROKER_NAME);
+    }
+
+    protected String getMessageQueueTopic(RecordPartition recordPartition){
+        return (String)recordPartition.getPartition().get(RecordUtil.TOPIC);
+    }
+
+    protected int getQueueId(RecordPartition recordPartition){
+        return Integer.valueOf(
+                (String) recordPartition.getPartition().get(RecordUtil.QUEUE_ID)
+        );
+    }
+
+    /**
+     * 配置
+     * @param configs
+     */
+    public abstract void configure(Map<String, String> configs);
+
+    /**
+     * 转换为kafka TopicPartition
+     * @param recordPartition
+     * @return
+     */
+    public abstract TopicPartition toTopicPartition(RecordPartition recordPartition);
+
+    /**
+     * 转换为openmessaging RecordPartition 也就是 rocketmq的MessageQueue
+     * @param topicPartition
+     * @return
+     */
+    public abstract RecordPartition toRecordPartition(TopicPartition topicPartition);
+
+
+}