You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/09/20 01:15:55 UTC
[rocketmq-connect] branch master updated: [ISSUE #286]kafka connector adapter:how map Rocketmq MessageQueue to Kafka TopicPartition (#301)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 68e24c49 [ISSUE #286]kafka connector adapter:how map Rocketmq MessageQueue to Kafka TopicPartition (#301)
68e24c49 is described below
commit 68e24c4925099be32c8ed680d2caf35267d9337b
Author: 欧夺标 <ou...@gmail.com>
AuthorDate: Tue Sep 20 09:15:50 2022 +0800
[ISSUE #286]kafka connector adapter:how map Rocketmq MessageQueue to Kafka TopicPartition (#301)
* rocketmq-connect-kafka-connector-adapter 0.0.1-SNAPSHOT
* rocketmq-connect-kafka-connector-adapter:kafka mongodb connector支持
* kafka-connector-adapter:why need getTopicAndBrokerName
* kafka-connector-adapter:why need getTopicAndBrokerName
* kafka-connector-adapter:增加RecordPartitionTopicPartitionConvertor接口
* kafka-connector-adapter:增加KafkaTopicPartitionMapper接口
* kafka-connector-adapter:增加KafkaTopicPartitionMapper接口
* kafka-connector-adapter:增加RocketmqRecordPartitionKafkaTopicPartitionMapper接口
* kafka-connector-adapter:RocketmqRecordPartitionKafkaTopicPartitionMapper实现
* kafka-connector-adapter:提取公共逻辑到KafkaRocketmqTask
* kafka-connector-adapter:正则提取brokername id
* kafka-connector-adapter:mongo文档
* kafka-connector-adapter:mongo文档
* kafka-connector-adapter:neo4j文档
* kafka-connector-adapter:mongodb文档
* kafka-connector-adapter:mongodb文档
* kafka-connector-adapter:mongodb文档
* kafka-connector-adapter:mongodb文档
* 新版本支持
Co-authored-by: ouduobiao <du...@alibaba-inc.com>
---
.../README.md | 44 +++++++---
.../how-to/kafka-mongo-connector.md | 85 +++++++++++++++++++
.../how-to/kafka-neo4j-connector.md | 58 +++++++++++++
.../connect/kafka/config/ConfigDefine.java | 15 ++--
.../kafka/connector/KafkaRocketmqConnector.java | 63 ++++++++------
.../connector/KafkaRocketmqSinkConnector.java | 12 +--
.../kafka/connector/KafkaRocketmqSinkTask.java | 98 ++++------------------
.../connector/KafkaRocketmqSourceConnector.java | 12 +--
.../kafka/connector/KafkaRocketmqSourceTask.java | 89 +++-----------------
.../connect/kafka/connector/KafkaRocketmqTask.java | 93 ++++++++++++++++++++
.../connector/RocketmqKafkaSinkTaskContext.java | 27 ++----
...mqRecordPartitionKafkaTopicPartitionMapper.java | 49 +++++++++++
.../rocketmq/connect/kafka/util/ConfigUtil.java | 29 +++++--
...mqRecordPartitionKafkaTopicPartitionMapper.java | 67 +++++++++++++++
...ocketmqBrokerNameKafkaTopicPartitionMapper.java | 54 ++++++++++++
.../rocketmq/connect/kafka/util/RecordUtil.java | 62 +++++---------
...mqRecordPartitionKafkaTopicPartitionMapper.java | 53 ++++++++++++
...mqRecordPartitionKafkaTopicPartitionMapper.java | 78 +++++++++++++++++
18 files changed, 703 insertions(+), 285 deletions(-)
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/README.md b/connectors/rocketmq-connect-kafka-connector-adapter/README.md
index 01b7a63b..98522f87 100644
--- a/connectors/rocketmq-connect-kafka-connector-adapter/README.md
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/README.md
@@ -4,7 +4,7 @@
**参数说明**
-参数分为3类:rocketmq connect runtime参数、 kafka-connector-adapter参数,以及 具体kafka connector参数
+参数分为2类:rocketmq connect runtime参数、 kafka-connector参数
rocketmq connect runtime参数:
- **connector-class**: kafka-connector-adapter的类名
@@ -13,17 +13,21 @@ rocketmq connect runtime参数:
如果是SinkConnector,对应为org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSinkConnector。
-- **connect-topicname**: 要导入导出数据的rocketmq topic
-- **tasks.num**: 启动的task数目
-
+- **connector.class**: 要导入导出数据的rocketmq topic
+- **max.task**: 启动的task数目
+
+kafka-connector参数放在kafka.connector.configs里,又分为2类:kafka-connector-adapter参数,以及 具体kafka connector
+
kafka-connector-adapter参数:
- **connector.class**: kafka connector的类名
- **plugin.path**: kafka connector插件路径
具体kafka connector参数:
-参考具体kafka connector的文档
+- 参考具体kafka connector的文档
+当connector-class是org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSinkConnector时,有特殊参数:
+- **rocketmq.recordPartition.kafkaTopicPartition.mapper**: recordPartition映射到kafkaTopicPartition实现,取值为:encodedTopic、assignEncodedPartition和regexEncodedPartition,默认是encodedTopic
# 快速开始
@@ -67,13 +71,29 @@ touch /tmp/test-source-file.txt
echo "Hello \r\nRocketMQ\r\n Connect" >> /tmp/test-source-file.txt
-curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSourceConnector -d '{"connector-class":"org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSourceConnector","connect-topicname":"fileTopic","connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","plugin.path":"/tmp/kafka-plugins","topic":"fileTopic","file":"/tmp/test-source-file.txt"}'
+curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSourceConnector -d '{
+ "connector.class": "org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSourceConnector",
+ "kafka.connector.configs":{
+ "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
+ "plugin.path": "/tmp/kafka-plugins",
+ "topic": "fileTopic",
+ "file": "/tmp/test-source-file.txt"
+ }
+}'
```
## 5.启动sink connector
```
-curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSinkConnector -d '{"connector-class":"org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSinkConnector","connect-topicname":"fileTopic","connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector","plugin.path":"/tmp/kafka-plugins","file":"/tmp/test-sink-file.txt"}'
+curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSinkConnector -d '{
+ "connector.class": "org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSinkConnector",
+ "connect.topicnames": "fileTopic",
+ "kafka.connector.configs":{
+ "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
+ "plugin.path": "/tmp/kafka-plugins",
+ "file": "/tmp/test-sink-file.txt"
+ }
+}'
cat /tmp/test-sink-file.txt
```
@@ -82,10 +102,12 @@ cat /tmp/test-sink-file.txt
todo
-# 如何运行kafka-mongo-connector
+# 如何运行其他kafka connector
-todo
+## 1.获取kafka connector的uber jar
+仓库[kafka-connector-plugins](https://github.com/oudb/kafka-connector-plugins)收集了一些常见connector的uber jar
-# 如何运行kafka-jdbc-connector
+## 2.运行指南
+1.[mongodb](how-to/kafka-mongo-connector.md)
-todo
\ No newline at end of file
+2.[neo4j](how-to/kafka-neo4j-connector.md)
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/how-to/kafka-mongo-connector.md b/connectors/rocketmq-connect-kafka-connector-adapter/how-to/kafka-mongo-connector.md
new file mode 100644
index 00000000..f17271f6
--- /dev/null
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/how-to/kafka-mongo-connector.md
@@ -0,0 +1,85 @@
+假设rocketmq有2个节点,一个brokerName是broker-0,另一个brokerName是broker-1
+
+## 1.启动mongodb
+```
+docker run \
+ --publish=27017:27017 --name mongo1\
+ mongo \
+ --replSet rs0
+
+docker exec -it mongo1 bash
+
+cat << EOF > config-data.js
+db = db.getSiblingDB("quickstart");
+db.createCollection("source");
+db.createCollection("sink");
+EOF
+
+
+cat << EOF > config-replica.js
+rsconf = {
+ _id: "rs0",
+ members: [{ _id: 0, host: "127.0.0.1:27017", priority: 1.0 }],
+};
+rs.initiate(rsconf);
+rs.status();
+EOF
+
+
+mongosh mongodb://127.0.0.1:27017 config-replica.js && sleep 10 && mongosh mongodb://127.0.0.11:27017 config-data.js
+
+```
+
+
+## 2.mongo source connector
+```
+curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/mongo-source -d '{
+ "connector.class": "org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSourceConnector",
+ "connect.topicname":"mongoTopic",
+ "kafka.connector.configs":{
+ "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
+ "plugin.path": "/kafka-connecor-plugins",
+ "connection.uri": "mongodb://127.0.0.1:27017/?replicaSet=rs0",
+ "database": "quickstart",
+ "collection": "sampleData",
+ "pipeline": "[{\"$match\": {\"operationType\": \"insert\"}}, {$addFields : {\"fullDocument.travel\":\"MongoDB Kafka Connector\"}}]"
+ }
+}'
+```
+
+## 3.mongo sink connector
+```
+curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/mongo-sink -d '{
+ "connector.class": "org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSinkConnector",
+ "connect.topicnames": "mongoTopic",
+ "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
+ "plugin.path": "/kafka-connecor-plugins/mongodb-connector",
+ "topics": "mongoTopic",
+ "rocketmq.recordPartition.kafkaTopicPartition.mapper": "assignEncodedPartition",
+ "assignEncodedPartition.assignments": "broker-0:0,broker-1:1",
+ "connection.uri": "mongodb://127.0.0.1:27017/?replicaSet=rs0",
+ "database": "quickstart",
+ "collection": "topicData",
+ "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler"
+ }'
+```
+
+
+## 4.测试
+```
+docker exec -it mongo1 bash
+
+mongosh mongodb://127.0.0.1:27017/?replicaSet=rs0
+
+use quickstart
+
+db.sampleData.insertOne({"hello":"world"})
+
+db.topicData.find()
+
+```
+
+
+更多关于connector:
+
+https://www.mongodb.com/docs/kafka-connector/current/
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/how-to/kafka-neo4j-connector.md b/connectors/rocketmq-connect-kafka-connector-adapter/how-to/kafka-neo4j-connector.md
new file mode 100644
index 00000000..b4762dc3
--- /dev/null
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/how-to/kafka-neo4j-connector.md
@@ -0,0 +1,58 @@
+假设rocketmq有2个节点,一个brokerName是broker-0,另一个brokerName是broker-1
+
+
+## 1.启动测试 neo4j server
+```
+docker run \
+ --publish=7474:7474 --publish=7687:7687 \
+ --volume=$HOME/neo4j/data:/data \
+ --env=NEO4J_AUTH=neo4j/connect \
+ neo4j
+```
+
+账号是neo4j,密码是connect
+
+## 2.运行neo4j sink connector
+```
+curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/neo4jSinkConnector -d '{
+ "connector.class": "org.apache.rocketmq.connect.kafka.connector.KafkaRocketmqSinkConnector",
+ "connect.topicnames":"my-topic",
+ "kafka.connector.configs":{
+ "rocketmq.recordPartition.kafkaTopicPartition.mapper": "assignEncodedPartition",
+ "assignEncodedPartition.assignments": "broker-0:0,broker-1:1",
+ "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
+ "key.converter": "org.apache.kafka.connect.json.JsonConverter",
+ "key.converter.schemas.enable": false,
+ "value.converter": "org.apache.kafka.connect.json.JsonConverter",
+ "value.converter.schemas.enable": false,
+ "errors.retry.timeout": "-1",
+ "errors.retry.delay.max.ms": "1000",
+ "errors.tolerance": "all",
+ "errors.log.enable": true,
+ "errors.log.include.messages": true,
+ "neo4j.server.uri": "bolt://localhost:7687",
+ "neo4j.authentication.basic.username": "neo4j",
+ "neo4j.authentication.basic.password": "connect",
+ "neo4j.encryption.enabled": false,
+ "neo4j.topic.cypher.my-topic": "MERGE (p:Person{name: event.name, surname: event.surname}) MERGE (f:Family{name: event.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
+ "plugin.path": "/kafka-connecor-plugins/neo4j-connector"
+ }
+}'
+```
+其中assignEncodedPartition.assignments指定brokerName的编码,例子是broker-0:0,broker-1:1
+
+## 3.测试
+1、给topic发送消息:
+sh mqadmin sendMessage -n localhost:9876 -t my-topic -p '{"name": "Name", "surname": "Surname"}'
+
+2、到neo4j查询
+浏览器登录:http://localhost:7474/browser/
+
+query:
+
+MATCH (ee:Person) WHERE ee.name ='Name' RETURN ee;
+
+
+更多关于connector:
+
+https://neo4j.com/labs/kafka/4.0/kafka-connect/
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java
index 9cbba085..da877363 100644
--- a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/config/ConfigDefine.java
@@ -8,9 +8,13 @@ import java.util.HashSet;
import java.util.Set;
public class ConfigDefine {
- public static String ROCKETMQ_CONNECTOR_CLASS = "connector-class";
- public static String CONNECTOR_CLASS = ConnectorConfig.CONNECTOR_CLASS_CONFIG;
- public static String PLUGIN_PATH = "plugin.path";
+ public static final String ROCKETMQ_CONNECTOR_CLASS = "connector.class";
+ public static final String ROCKETMQ_CONNECT_TOPIC_NAME = "connect.topicname";
+ public static final String ROCKETMQ_CONNECT_TOPIC_NAMES = "connect.topicnames";
+
+ public static final String KAFKA_CONNECTOR_CONFIGS = "kafka.connector.configs";
+ public static final String CONNECTOR_CLASS = ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+ public static final String PLUGIN_PATH = "plugin.path";
public static final String TASK_CLASS = TaskConfig.TASK_CLASS_CONFIG;
@@ -18,11 +22,12 @@ public class ConfigDefine {
public static final String VALUE_CONVERTER = WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG;
public static final String HEADER_CONVERTER = WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG;
+ // encodedTopic/assignEncodedPartition/regexEncodedPartition
+ public static final String ROCKETMQ_RECORDPARTITION_KAFKATOPICPARTITION_MAPPER = "rocketmq.recordPartition.kafkaTopicPartition.mapper";
public static final Set<String> REQUEST_CONFIG = new HashSet<String>(){
{
- add(CONNECTOR_CLASS);
- add(PLUGIN_PATH);
+ add(KAFKA_CONNECTOR_CONFIGS);
}
};
}
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqConnector.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqConnector.java
index 89bee2c7..285bf39b 100644
--- a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqConnector.java
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqConnector.java
@@ -4,8 +4,10 @@ import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.connector.Connector;
import io.openmessaging.connector.api.component.task.Task;
import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import io.openmessaging.internal.DefaultKeyValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.rocketmq.connect.kafka.config.ConfigDefine;
import org.apache.rocketmq.connect.kafka.util.ConfigUtil;
@@ -13,11 +15,7 @@ import org.apache.rocketmq.connect.kafka.util.KafkaPluginsUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.sql.DriverManager;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.stream.Collectors;
public class KafkaRocketmqConnector extends Connector {
@@ -41,30 +39,31 @@ public class KafkaRocketmqConnector extends Connector {
taskKeyValueConfigs.addAll(
taskConfigs
.stream()
- .map(ConfigUtil::mapConfigToKeyValue)
- .collect(Collectors.toList())
- );
-
- taskKeyValueConfigs.forEach(kv -> {
- kv.put(ConfigDefine.PLUGIN_PATH, this.kafkaConnectorConfigs.get(ConfigDefine.PLUGIN_PATH));
- kv.put(ConfigDefine.CONNECTOR_CLASS, this.kafkaConnectorConfigs.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
- kv.put(ConfigDefine.TASK_CLASS, this.kafkaConnector.taskClass().getName());
+ .map(kv -> {
- kv.put(ConfigDefine.ROCKETMQ_CONNECTOR_CLASS, childConnector.getClass().getName());
+ kv.put(ConfigDefine.PLUGIN_PATH, this.kafkaConnectorConfigs.get(ConfigDefine.PLUGIN_PATH));
+ kv.put(ConfigDefine.CONNECTOR_CLASS, this.kafkaConnectorConfigs.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
+ kv.put(ConfigDefine.TASK_CLASS, this.kafkaConnector.taskClass().getName());
- if( this.kafkaConnectorConfigs.containsKey(ConfigDefine.KEY_CONVERTER)){
- kv.put(ConfigDefine.KEY_CONVERTER, this.kafkaConnectorConfigs.get(ConfigDefine.KEY_CONVERTER));
- }
+ if( this.kafkaConnectorConfigs.containsKey(ConfigDefine.KEY_CONVERTER)){
+ kv.put(ConfigDefine.KEY_CONVERTER, this.kafkaConnectorConfigs.get(ConfigDefine.KEY_CONVERTER));
+ }
- if( this.kafkaConnectorConfigs.containsKey(ConfigDefine.VALUE_CONVERTER)){
- kv.put(ConfigDefine.VALUE_CONVERTER, this.kafkaConnectorConfigs.get(ConfigDefine.VALUE_CONVERTER));
- }
+ if( this.kafkaConnectorConfigs.containsKey(ConfigDefine.VALUE_CONVERTER)){
+ kv.put(ConfigDefine.VALUE_CONVERTER, this.kafkaConnectorConfigs.get(ConfigDefine.VALUE_CONVERTER));
+ }
- if( this.kafkaConnectorConfigs.containsKey(ConfigDefine.HEADER_CONVERTER)){
- kv.put(ConfigDefine.HEADER_CONVERTER, this.kafkaConnectorConfigs.get(ConfigDefine.HEADER_CONVERTER));
- }
- });
+ if( this.kafkaConnectorConfigs.containsKey(ConfigDefine.HEADER_CONVERTER)){
+ kv.put(ConfigDefine.HEADER_CONVERTER, this.kafkaConnectorConfigs.get(ConfigDefine.HEADER_CONVERTER));
+ }
+ KeyValue keyValue = new DefaultKeyValue();
+ keyValue.put(ConfigDefine.ROCKETMQ_CONNECTOR_CLASS, childConnector.getClass().getName());
+ keyValue.put(ConfigDefine.KAFKA_CONNECTOR_CONFIGS, ConfigUtil.toJson(kv));
+ return keyValue;
+ })
+ .collect(Collectors.toList())
+ );
});
return taskKeyValueConfigs;
}
@@ -92,14 +91,26 @@ public class KafkaRocketmqConnector extends Connector {
@Override
public void validate(KeyValue config) {
-
for(String requestConfig: ConfigDefine.REQUEST_CONFIG){
if(!config.containsKey(requestConfig)){
throw new ConnectException("miss config:"+requestConfig);
}
}
+ this.kafkaConnectorConfigs = ConfigUtil.getKafkaConnectorConfigs(config);
+ for(String requestConfig: Arrays.asList(ConfigDefine.CONNECTOR_CLASS, ConfigDefine.PLUGIN_PATH)){
+ if(!kafkaConnectorConfigs.containsKey(requestConfig)){
+ throw new ConnectException("miss config:"+requestConfig);
+ }
+ }
+ if(!kafkaConnectorConfigs.containsKey(SinkConnectorConfig.TOPICS_CONFIG)){
+ if(config.containsKey(ConfigDefine.ROCKETMQ_CONNECT_TOPIC_NAMES)){
+ kafkaConnectorConfigs.put(
+ SinkConnectorConfig.TOPICS_CONFIG,
+ config.getString(ConfigDefine.ROCKETMQ_CONNECT_TOPIC_NAMES)
+ );
+ }
+ }
- this.kafkaConnectorConfigs = ConfigUtil.keyValueConfigToMap(config);
log.info("kafka connector config is {}", this.kafkaConnectorConfigs);
this.kafkaPlugins = KafkaPluginsUtil.getPlugins(Collections.singletonMap(KafkaPluginsUtil.PLUGIN_PATH, this.kafkaConnectorConfigs.get(ConfigDefine.PLUGIN_PATH)));
String connectorClassName = this.kafkaConnectorConfigs.get(ConfigDefine.CONNECTOR_CLASS);
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkConnector.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkConnector.java
index 9cec38ef..d5f882fd 100644
--- a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkConnector.java
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkConnector.java
@@ -13,31 +13,31 @@ public class KafkaRocketmqSinkConnector extends SinkConnector {
private static final Logger log = LoggerFactory.getLogger(KafkaRocketmqSinkConnector.class);
- private KafkaRocketmqConnector kafkaRocketmqConnector = new KafkaRocketmqConnector(this);
+ private KafkaRocketmqConnector parentConnector = new KafkaRocketmqConnector(this);
@Override
public List<KeyValue> taskConfigs(int maxTasks) {
- return kafkaRocketmqConnector.taskConfigs(maxTasks);
+ return parentConnector.taskConfigs(maxTasks);
}
@Override
public Class<? extends Task> taskClass() {
- return kafkaRocketmqConnector.taskClass();
+ return parentConnector.taskClass();
}
@Override
public void start(KeyValue config) {
- kafkaRocketmqConnector.start(config);
+ parentConnector.start(config);
}
@Override
public void stop() {
- kafkaRocketmqConnector.stop();
+ parentConnector.stop();
}
@Override
public void validate(KeyValue config) {
- kafkaRocketmqConnector.validate(config);
+ parentConnector.validate(config);
}
}
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkTask.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkTask.java
index c2b6cfaa..431d6b3a 100644
--- a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkTask.java
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSinkTask.java
@@ -9,23 +9,18 @@ import io.openmessaging.connector.api.errors.ConnectException;
import io.openmessaging.connector.api.errors.RetriableException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.runtime.TaskConfig;
-import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.sink.SinkRecord;
-import org.apache.kafka.connect.storage.Converter;
-import org.apache.kafka.connect.storage.HeaderConverter;
-
-import org.apache.kafka.connect.transforms.util.SimpleConfig;
import org.apache.rocketmq.connect.kafka.config.ConfigDefine;
import org.apache.rocketmq.connect.kafka.util.ConfigUtil;
import org.apache.rocketmq.connect.kafka.util.KafkaPluginsUtil;
import org.apache.rocketmq.connect.kafka.util.RecordUtil;
+import org.apache.rocketmq.connect.kafka.util.RocketmqRecordPartitionKafkaTopicPartitionMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,27 +32,23 @@ public class KafkaRocketmqSinkTask extends SinkTask {
private static final Logger log = LoggerFactory.getLogger(KafkaRocketmqSinkTask.class);
private org.apache.kafka.connect.sink.SinkTask kafkaSinkTask;
- private ClassLoader classLoader;
-
- private Converter keyConverter;
- private Converter valueConverter;
- private HeaderConverter headerConverter;
+ private final KafkaRocketmqTask parentTask = new KafkaRocketmqTask();
+ private RocketmqRecordPartitionKafkaTopicPartitionMapper kafkaTopicPartitionMapper;
@Override
public void put(List<ConnectRecord> sinkRecords) throws ConnectException {
Collection<SinkRecord> records = new ArrayList<>(sinkRecords.size());
for(ConnectRecord sinkRecord: sinkRecords){
String topic = (String)sinkRecord.getPosition().getPartition().getPartition().get(RecordUtil.TOPIC);
- SchemaAndValue valueSchemaAndValue = valueConverter.toConnectData(topic, ((String)sinkRecord.getData()).getBytes(StandardCharsets.UTF_8));
+ SchemaAndValue valueSchemaAndValue = this.parentTask.getValueConverter().toConnectData(topic, ((String)sinkRecord.getData()).getBytes(StandardCharsets.UTF_8));
String key = sinkRecord.getExtension(RecordUtil.KAFKA_MSG_KEY);
SchemaAndValue keySchemaAndValue = null;
if(key != null) {
- keySchemaAndValue = keyConverter.toConnectData(topic, key.getBytes(StandardCharsets.UTF_8));
+ keySchemaAndValue = this.parentTask.getKeyConverter().toConnectData(topic, key.getBytes(StandardCharsets.UTF_8));
}
-
+ TopicPartition topicPartition = this.kafkaTopicPartitionMapper.toTopicPartition(sinkRecord.getPosition().getPartition());
SinkRecord record = new SinkRecord(
- RecordUtil.getTopicAndBrokerName(sinkRecord.getPosition().getPartition()),
- RecordUtil.getPartition(sinkRecord.getPosition().getPartition()),
+ topicPartition.topic(), topicPartition.partition(),
keySchemaAndValue==null?null:keySchemaAndValue.schema(),
keySchemaAndValue==null?null:keySchemaAndValue.value(),
valueSchemaAndValue.schema(), valueSchemaAndValue.value(),
@@ -80,7 +71,7 @@ public class KafkaRocketmqSinkTask extends SinkTask {
if(RecordUtil.KAFKA_MSG_KEY.equals(headerKey)){
continue;
}
- SchemaAndValue headerSchemaAndValue = headerConverter
+ SchemaAndValue headerSchemaAndValue = parentTask.getHeaderConverter()
.toConnectHeader(topic, headerKey, extensions.getString(headerKey).getBytes());
headers.add(headerKey, headerSchemaAndValue);
}
@@ -91,21 +82,22 @@ public class KafkaRocketmqSinkTask extends SinkTask {
@Override
public void start(KeyValue config) {
- Map<String, String> kafkaTaskProps = ConfigUtil.keyValueConfigToMap(config);
+ Map<String, String> kafkaTaskProps = ConfigUtil.getKafkaConnectorConfigs(config);
log.info("kafka connector task config is {}", kafkaTaskProps);
Plugins kafkaPlugins = KafkaPluginsUtil.getPlugins(Collections.singletonMap(KafkaPluginsUtil.PLUGIN_PATH, kafkaTaskProps.get(ConfigDefine.PLUGIN_PATH)));
String connectorClass = kafkaTaskProps.get(ConfigDefine.CONNECTOR_CLASS);
ClassLoader connectorLoader = kafkaPlugins.delegatingLoader().connectorLoader(connectorClass);
- this.classLoader = Plugins.compareAndSwapLoaders(connectorLoader);
+ this.parentTask.setClassLoader(Plugins.compareAndSwapLoaders(connectorLoader));
try {
TaskConfig taskConfig = new TaskConfig(kafkaTaskProps);
Class<? extends Task> taskClass = taskConfig.getClass(ConfigDefine.TASK_CLASS).asSubclass(Task.class);
this.kafkaSinkTask = (org.apache.kafka.connect.sink.SinkTask)kafkaPlugins.newTask(taskClass);
- initConverter(kafkaPlugins, kafkaTaskProps);
- this.kafkaSinkTask.initialize(new RocketmqKafkaSinkTaskContext(sinkTaskContext));
+ this.parentTask.initConverter(kafkaPlugins, kafkaTaskProps, this.sinkTaskContext.getConnectorName(), this.sinkTaskContext.getTaskName());
+ this.kafkaTopicPartitionMapper = RocketmqRecordPartitionKafkaTopicPartitionMapper.newKafkaTopicPartitionMapper(kafkaTaskProps);
+ this.kafkaSinkTask.initialize(new RocketmqKafkaSinkTaskContext(sinkTaskContext, this.kafkaTopicPartitionMapper));
this.kafkaSinkTask.start(kafkaTaskProps);
} catch (Throwable e){
- recoverClassLoader();
+ this.parentTask.recoverClassLoader();
throw e;
}
}
@@ -115,69 +107,11 @@ public class KafkaRocketmqSinkTask extends SinkTask {
try {
this.kafkaSinkTask.stop();
} finally {
- recoverClassLoader();
- }
- }
-
-
- private void recoverClassLoader(){
- if(this.classLoader != null){
- Plugins.compareAndSwapLoaders(this.classLoader);
- this.classLoader = null;
+ this.parentTask.recoverClassLoader();
}
}
- private void initConverter(Plugins plugins, Map<String, String> taskProps){
-
- ConfigDef converterConfigDef = new ConfigDef()
- .define(ConfigDefine.KEY_CONVERTER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, "")
- .define(ConfigDefine.VALUE_CONVERTER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, "")
- .define(ConfigDefine.HEADER_CONVERTER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, "");
- Map<String, String> connProps = new HashMap<>();
- if(taskProps.containsKey(ConfigDefine.KEY_CONVERTER)){
- connProps.put(ConfigDefine.KEY_CONVERTER, taskProps.get(ConfigDefine.KEY_CONVERTER));
- }
- if(taskProps.containsKey(ConfigDefine.VALUE_CONVERTER)){
- connProps.put(ConfigDefine.VALUE_CONVERTER, taskProps.get(ConfigDefine.VALUE_CONVERTER));
- }
- if(taskProps.containsKey(ConfigDefine.HEADER_CONVERTER)){
- connProps.put(ConfigDefine.HEADER_CONVERTER, taskProps.get(ConfigDefine.HEADER_CONVERTER));
- }
- SimpleConfig connConfig = new SimpleConfig(converterConfigDef, connProps);
-
- Map<String, String> workerProps = new HashMap<>();
- workerProps.put(ConfigDefine.KEY_CONVERTER, "org.apache.kafka.connect.json.JsonConverter");
- workerProps.put(ConfigDefine.VALUE_CONVERTER, "org.apache.kafka.connect.json.JsonConverter");
- workerProps.put(ConfigDefine.HEADER_CONVERTER, "org.apache.kafka.connect.storage.SimpleHeaderConverter");
- SimpleConfig workerConfig = new SimpleConfig(converterConfigDef, workerProps);
-
- keyConverter = plugins.newConverter(connConfig, ConfigDefine.KEY_CONVERTER, Plugins.ClassLoaderUsage
- .CURRENT_CLASSLOADER);
- valueConverter = plugins.newConverter(connConfig, ConfigDefine.VALUE_CONVERTER, Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER);
- headerConverter = plugins.newHeaderConverter(connConfig, ConfigDefine.HEADER_CONVERTER,
- Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER);
-
- if (keyConverter == null) {
- keyConverter = plugins.newConverter(workerConfig, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage.PLUGINS);
- log.info("Set up the key converter {} for task {} using the worker config", keyConverter.getClass(), sinkTaskContext.getTaskName());
- } else {
- log.info("Set up the key converter {} for task {} using the connector config", keyConverter.getClass(), sinkTaskContext.getTaskName());
- }
- if (valueConverter == null) {
- valueConverter = plugins.newConverter(workerConfig, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage.PLUGINS);
- log.info("Set up the value converter {} for task {} using the worker config", valueConverter.getClass(), sinkTaskContext.getTaskName());
- } else {
- log.info("Set up the value converter {} for task {} using the connector config", valueConverter.getClass(), sinkTaskContext.getTaskName());
- }
- if (headerConverter == null) {
- headerConverter = plugins.newHeaderConverter(workerConfig, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage
- .PLUGINS);
- log.info("Set up the header converter {} for task {} using the worker config", headerConverter.getClass(), sinkTaskContext.getTaskName());
- } else {
- log.info("Set up the header converter {} for task {} using the connector config", headerConverter.getClass(), sinkTaskContext.getTaskName());
- }
- }
@Override
public void flush(Map<RecordPartition, RecordOffset> currentOffsets) throws ConnectException {
@@ -190,7 +124,7 @@ public class KafkaRocketmqSinkTask extends SinkTask {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(currentOffsets.size());
for(Map.Entry<RecordPartition, RecordOffset> po: currentOffsets.entrySet()){
- TopicPartition tp = RecordUtil.recordPartitionToTopicPartition(po.getKey());
+ TopicPartition tp = this.kafkaTopicPartitionMapper.toTopicPartition(po.getKey());
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(RecordUtil.getOffset(po.getValue()));
offsets.put(tp, offsetAndMetadata);
}
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceConnector.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceConnector.java
index 6f6e006b..4409033b 100644
--- a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceConnector.java
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceConnector.java
@@ -13,31 +13,31 @@ public class KafkaRocketmqSourceConnector extends SourceConnector {
private static final Logger log = LoggerFactory.getLogger(KafkaRocketmqSourceConnector.class);
- private KafkaRocketmqConnector kafkaRocketmqConnector = new KafkaRocketmqConnector(this);
+ private KafkaRocketmqConnector parentConnector = new KafkaRocketmqConnector(this);
@Override
public List<KeyValue> taskConfigs(int maxTasks) {
- return kafkaRocketmqConnector.taskConfigs(maxTasks);
+ return parentConnector.taskConfigs(maxTasks);
}
@Override
public Class<? extends Task> taskClass() {
- return kafkaRocketmqConnector.taskClass();
+ return parentConnector.taskClass();
}
@Override
public void start(KeyValue config) {
- kafkaRocketmqConnector.start(config);
+ parentConnector.start(config);
}
@Override
public void stop() {
- kafkaRocketmqConnector.stop();
+ parentConnector.stop();
}
@Override
public void validate(KeyValue config) {
- kafkaRocketmqConnector.validate(config);
+ parentConnector.validate(config);
}
}
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceTask.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceTask.java
index 3db8251e..332da72b 100644
--- a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceTask.java
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqSourceTask.java
@@ -3,13 +3,8 @@ package org.apache.rocketmq.connect.kafka.connector;
import io.openmessaging.connector.api.data.*;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.runtime.TaskConfig;
-import org.apache.kafka.connect.runtime.WorkerConfig;
-import org.apache.kafka.connect.storage.Converter;
-import org.apache.kafka.connect.storage.HeaderConverter;
-import org.apache.kafka.connect.transforms.util.SimpleConfig;
import org.apache.rocketmq.connect.kafka.util.KafkaPluginsUtil;
import org.apache.rocketmq.connect.kafka.util.RecordUtil;
import org.slf4j.Logger;
@@ -27,17 +22,11 @@ public class KafkaRocketmqSourceTask extends SourceTask {
private static final Logger log = LoggerFactory.getLogger(KafkaRocketmqSourceTask.class);
private org.apache.kafka.connect.source.SourceTask kafkaSourceTask;
-
- private ClassLoader classLoader;
-
- private Converter keyConverter;
- private Converter valueConverter;
- private HeaderConverter headerConverter;
+ private final KafkaRocketmqTask parentTask = new KafkaRocketmqTask();
@Override
public List<ConnectRecord> poll() throws InterruptedException {
-
List<SourceRecord> sourceRecords = this.kafkaSourceTask.poll();
if(sourceRecords == null){
@@ -47,107 +36,49 @@ public class KafkaRocketmqSourceTask extends SourceTask {
List<ConnectRecord> connectRecords = new ArrayList<>(sourceRecords.size());
for(SourceRecord sourceRecord: sourceRecords){
connectRecords.add(RecordUtil.toConnectRecord(sourceRecord,
- this.keyConverter, this.valueConverter, this.headerConverter));
+ this.parentTask.getKeyConverter(), this.parentTask.getValueConverter(),
+ this.parentTask.getHeaderConverter()));
}
return connectRecords;
}
@Override
public void start(KeyValue config) {
- Map<String, String> kafkaTaskProps = ConfigUtil.keyValueConfigToMap(config);
+ Map<String, String> kafkaTaskProps = ConfigUtil.getKafkaConnectorConfigs(config);
log.info("kafka connector task config is {}", kafkaTaskProps);
Plugins kafkaPlugins = KafkaPluginsUtil.getPlugins(Collections.singletonMap(KafkaPluginsUtil.PLUGIN_PATH, kafkaTaskProps.get(ConfigDefine.PLUGIN_PATH)));
String connectorClass = kafkaTaskProps.get(ConfigDefine.CONNECTOR_CLASS);
ClassLoader connectorLoader = kafkaPlugins.delegatingLoader().connectorLoader(connectorClass);
- this.classLoader = Plugins.compareAndSwapLoaders(connectorLoader);
+ this.parentTask.setClassLoader(Plugins.compareAndSwapLoaders(connectorLoader));
try {
TaskConfig taskConfig = new TaskConfig(kafkaTaskProps);
Class<? extends Task> taskClass = taskConfig.getClass(ConfigDefine.TASK_CLASS).asSubclass(Task.class);
this.kafkaSourceTask = (org.apache.kafka.connect.source.SourceTask)kafkaPlugins.newTask(taskClass);
-
- initConverter(kafkaPlugins, kafkaTaskProps);
-
+ this.parentTask.initConverter(kafkaPlugins, kafkaTaskProps, this.sourceTaskContext.getConnectorName(), this.sourceTaskContext.getTaskName());
this.kafkaSourceTask.initialize(new RocketmqKafkaSourceTaskContext(sourceTaskContext));
this.kafkaSourceTask.start(kafkaTaskProps);
} catch (Throwable e){
- recoverClassLoader();
+ this.parentTask.recoverClassLoader();
throw e;
}
}
- private void initConverter(Plugins plugins, Map<String, String> taskProps){
-
- ConfigDef converterConfigDef = new ConfigDef()
- .define(ConfigDefine.KEY_CONVERTER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, "")
- .define(ConfigDefine.VALUE_CONVERTER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, "")
- .define(ConfigDefine.HEADER_CONVERTER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, "");
-
- Map<String, String> connProps = new HashMap<>();
- if(taskProps.containsKey(ConfigDefine.KEY_CONVERTER)){
- connProps.put(ConfigDefine.KEY_CONVERTER, taskProps.get(ConfigDefine.KEY_CONVERTER));
- }
- if(taskProps.containsKey(ConfigDefine.VALUE_CONVERTER)){
- connProps.put(ConfigDefine.VALUE_CONVERTER, taskProps.get(ConfigDefine.VALUE_CONVERTER));
- }
- if(taskProps.containsKey(ConfigDefine.HEADER_CONVERTER)){
- connProps.put(ConfigDefine.HEADER_CONVERTER, taskProps.get(ConfigDefine.HEADER_CONVERTER));
- }
- SimpleConfig connConfig = new SimpleConfig(converterConfigDef, connProps);
-
- Map<String, String> workerProps = new HashMap<>();
- workerProps.put(ConfigDefine.KEY_CONVERTER, "org.apache.kafka.connect.json.JsonConverter");
- workerProps.put(ConfigDefine.VALUE_CONVERTER, "org.apache.kafka.connect.json.JsonConverter");
- workerProps.put(ConfigDefine.HEADER_CONVERTER, "org.apache.kafka.connect.storage.SimpleHeaderConverter");
- SimpleConfig workerConfig = new SimpleConfig(converterConfigDef, workerProps);
-
- keyConverter = plugins.newConverter(connConfig, ConfigDefine.KEY_CONVERTER, Plugins.ClassLoaderUsage
- .CURRENT_CLASSLOADER);
- valueConverter = plugins.newConverter(connConfig, ConfigDefine.VALUE_CONVERTER, Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER);
- headerConverter = plugins.newHeaderConverter(connConfig, ConfigDefine.HEADER_CONVERTER,
- Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER);
-
- if (keyConverter == null) {
- keyConverter = plugins.newConverter(workerConfig, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage.PLUGINS);
- log.info("Set up the key converter {} for task {} using the worker config", keyConverter.getClass(), sourceTaskContext.getTaskName());
- } else {
- log.info("Set up the key converter {} for task {} using the connector config", keyConverter.getClass(), sourceTaskContext.getTaskName());
- }
- if (valueConverter == null) {
- valueConverter = plugins.newConverter(workerConfig, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage.PLUGINS);
- log.info("Set up the value converter {} for task {} using the worker config", valueConverter.getClass(), sourceTaskContext.getTaskName());
- } else {
- log.info("Set up the value converter {} for task {} using the connector config", valueConverter.getClass(), sourceTaskContext.getTaskName());
- }
- if (headerConverter == null) {
- headerConverter = plugins.newHeaderConverter(workerConfig, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage
- .PLUGINS);
- log.info("Set up the header converter {} for task {} using the worker config", headerConverter.getClass(), sourceTaskContext.getTaskName());
- } else {
- log.info("Set up the header converter {} for task {} using the connector config", headerConverter.getClass(), sourceTaskContext.getTaskName());
- }
- }
@Override
public void stop() {
try {
this.kafkaSourceTask.stop();
} finally {
- recoverClassLoader();
+ this.parentTask.recoverClassLoader();
}
}
- private void recoverClassLoader(){
- if(this.classLoader != null){
- Plugins.compareAndSwapLoaders(this.classLoader);
- this.classLoader = null;
- }
- }
+
@Override
public void commit(ConnectRecord record, Map<String, String> metadata) {
-
if(this.kafkaSourceTask == null){
log.warn("the task is not start, metadata:{}", metadata);
return;
@@ -161,7 +92,7 @@ public class KafkaRocketmqSourceTask extends SourceTask {
System.currentTimeMillis(), 0,0
);
this.kafkaSourceTask.commitRecord(
- RecordUtil.toSourceRecord(record, this.keyConverter, this.valueConverter, this.headerConverter),
+ RecordUtil.toSourceRecord(record, this.parentTask.getKeyConverter(), this.parentTask.getValueConverter(), this.parentTask.getHeaderConverter()),
recordMetadata
);
} catch (InterruptedException e){
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqTask.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqTask.java
new file mode 100644
index 00000000..3cf94aed
--- /dev/null
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/KafkaRocketmqTask.java
@@ -0,0 +1,93 @@
+package org.apache.rocketmq.connect.kafka.connector;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+import org.apache.rocketmq.connect.kafka.config.ConfigDefine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class KafkaRocketmqTask {
+ private static final Logger log = LoggerFactory.getLogger(KafkaRocketmqTask.class);
+
+ private ClassLoader classLoader;
+ private Converter keyConverter;
+ private Converter valueConverter;
+ private HeaderConverter headerConverter;
+
+ public ClassLoader getClassLoader() {
+ return classLoader;
+ }
+
+ public void setClassLoader(ClassLoader classLoader) {
+ this.classLoader = classLoader;
+ }
+
+ public Converter getKeyConverter() {
+ return keyConverter;
+ }
+
+ public Converter getValueConverter() {
+ return valueConverter;
+ }
+
+ public HeaderConverter getHeaderConverter() {
+ return headerConverter;
+ }
+
+ public void recoverClassLoader(){
+ if(this.classLoader != null){
+ Plugins.compareAndSwapLoaders(this.classLoader);
+ this.classLoader = null;
+ }
+ }
+
+ public void initConverter(Plugins plugins, Map<String, String> taskProps, String connectorName, String taskName){
+ Map<String, String> connProps = new HashMap<>(taskProps);
+ connProps.put(ConnectorConfig.NAME_CONFIG, connectorName);
+ final ConnectorConfig connConfig = new ConnectorConfig(plugins, connProps);
+ Map<String, String> workerProps = new HashMap<>();
+ workerProps.put(ConfigDefine.KEY_CONVERTER, "org.apache.kafka.connect.json.JsonConverter");
+ workerProps.put(ConfigDefine.VALUE_CONVERTER, "org.apache.kafka.connect.json.JsonConverter");
+ workerProps.put(ConfigDefine.HEADER_CONVERTER, "org.apache.kafka.connect.storage.SimpleHeaderConverter");
+ ConfigDef converterConfigDef = new ConfigDef()
+ .define(ConfigDefine.KEY_CONVERTER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, "")
+ .define(ConfigDefine.VALUE_CONVERTER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, "")
+ .define(ConfigDefine.HEADER_CONVERTER, ConfigDef.Type.CLASS, null, ConfigDef.Importance.LOW, "");
+ SimpleConfig workerConfig = new SimpleConfig(converterConfigDef, workerProps);
+
+ keyConverter = plugins.newConverter(connConfig, ConfigDefine.KEY_CONVERTER, Plugins.ClassLoaderUsage
+ .CURRENT_CLASSLOADER);
+ valueConverter = plugins.newConverter(connConfig, ConfigDefine.VALUE_CONVERTER, Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER);
+ headerConverter = plugins.newHeaderConverter(connConfig, ConfigDefine.HEADER_CONVERTER,
+ Plugins.ClassLoaderUsage.CURRENT_CLASSLOADER);
+
+ if (keyConverter == null) {
+ keyConverter = plugins.newConverter(workerConfig, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage.PLUGINS);
+ log.info("Set up the key converter {} for task {} using the worker config", keyConverter.getClass(), taskName);
+ } else {
+ log.info("Set up the key converter {} for task {} using the connector config", keyConverter.getClass(), taskName);
+ }
+ if (valueConverter == null) {
+ valueConverter = plugins.newConverter(workerConfig, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage.PLUGINS);
+ log.info("Set up the value converter {} for task {} using the worker config", valueConverter.getClass(), taskName);
+ } else {
+ log.info("Set up the value converter {} for task {} using the connector config", valueConverter.getClass(), taskName);
+ }
+ if (headerConverter == null) {
+ headerConverter = plugins.newHeaderConverter(workerConfig, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, Plugins.ClassLoaderUsage
+ .PLUGINS);
+ log.info("Set up the header converter {} for task {} using the worker config", headerConverter.getClass(), taskName);
+ } else {
+ log.info("Set up the header converter {} for task {} using the connector config", headerConverter.getClass(), taskName);
+ }
+ }
+
+}
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaSinkTaskContext.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaSinkTaskContext.java
index 848032be..52d4c9dc 100644
--- a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaSinkTaskContext.java
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/connector/RocketmqKafkaSinkTaskContext.java
@@ -8,6 +8,7 @@ import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.rocketmq.connect.kafka.util.ConfigUtil;
import org.apache.rocketmq.connect.kafka.util.RecordUtil;
+import org.apache.rocketmq.connect.kafka.util.RocketmqRecordPartitionKafkaTopicPartitionMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -23,9 +24,11 @@ public class RocketmqKafkaSinkTaskContext implements org.apache.kafka.connect.si
private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(1);
private SinkTaskContext sinkTaskContext;
+ private RocketmqRecordPartitionKafkaTopicPartitionMapper kafkaTopicPartitionMapper;
- public RocketmqKafkaSinkTaskContext(SinkTaskContext sinkTaskContext) {
+ public RocketmqKafkaSinkTaskContext(SinkTaskContext sinkTaskContext, RocketmqRecordPartitionKafkaTopicPartitionMapper kafkaTopicPartitionMapper) {
this.sinkTaskContext = sinkTaskContext;
+ this.kafkaTopicPartitionMapper = kafkaTopicPartitionMapper;
}
@Override
@@ -38,10 +41,8 @@ public class RocketmqKafkaSinkTaskContext implements org.apache.kafka.connect.si
Map<RecordPartition, RecordOffset> offsets2 = new HashMap<>(offsets.size());
offsets.forEach((tp,offset) -> {
- Map<String, String> map = RecordUtil.getPartitionMap(tp.topic());
- map.put(RecordUtil.QUEUE_ID, tp.partition() + "");
- RecordPartition recordPartition = new RecordPartition(map);
+ RecordPartition recordPartition = kafkaTopicPartitionMapper.toRecordPartition(tp);
Map<String, String> offsetMap = new HashMap<>();
offsetMap.put(RecordUtil.QUEUE_OFFSET, offset + "");
RecordOffset recordOffset = new RecordOffset(offsetMap);
@@ -65,7 +66,7 @@ public class RocketmqKafkaSinkTaskContext implements org.apache.kafka.connect.si
public Set<TopicPartition> assignment() {
return sinkTaskContext.assignment()
.stream()
- .map(RecordUtil::recordPartitionToTopicPartition)
+ .map(kafkaTopicPartitionMapper::toTopicPartition)
.collect(Collectors.toSet());
}
@@ -85,7 +86,7 @@ public class RocketmqKafkaSinkTaskContext implements org.apache.kafka.connect.si
private List<RecordPartition> toRecordPartitions(TopicPartition... partitions){
return Arrays.stream(partitions)
- .map(RecordUtil::topicPartitionToRecordPartition)
+ .map(kafkaTopicPartitionMapper::toRecordPartition)
.collect(Collectors.toList());
}
@@ -104,19 +105,7 @@ public class RocketmqKafkaSinkTaskContext implements org.apache.kafka.connect.si
return EXECUTOR_SERVICE.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
-
- Map<String, String> partitionMap = RecordUtil.getPartitionMap(record.topic());
- partitionMap.put(RecordUtil.QUEUE_ID, record.kafkaPartition() + "");
- RecordPartition recordPartition = new RecordPartition(partitionMap);
-
- Map<String, String> offsetMap = new HashMap<>();
- offsetMap.put(RecordUtil.QUEUE_OFFSET, record.kafkaOffset() + "");
- RecordOffset recordOffset = new RecordOffset(offsetMap);
-
- ConnectRecord connectRecord = new ConnectRecord(
- recordPartition, recordOffset, record.timestamp(),
- SchemaBuilder.string().build(), record.value()
- );
+ ConnectRecord connectRecord = RecordUtil.sinkRecordToConnectRecord(record, RocketmqKafkaSinkTaskContext.this.kafkaTopicPartitionMapper);
sinkTaskContext.errorRecordReporter().report(connectRecord, error);
return null;
}
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/AssignEncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/AssignEncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper.java
new file mode 100644
index 00000000..4881f9f7
--- /dev/null
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/AssignEncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper.java
@@ -0,0 +1,49 @@
+package org.apache.rocketmq.connect.kafka.util;
+
+import org.apache.kafka.connect.errors.ConnectException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class AssignEncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper
+ extends EncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper{
+
+
+ private String assignments_config = "assignments";
+
+ private Map<String, Integer> brokerName2Ids = new HashMap<>();
+ private Map<Integer, String> id2BrokerName = new HashMap<>();
+
+
+ @Override
+ public void configure(Map<String, String> configs) {
+ super.configure(configs);
+ String assignments = configs.get(assignments_config);
+ if(assignments == null || assignments.trim().isEmpty()){
+ throw new ConnectException("miss assignments config");
+ }
+ for(String brokerNameAndId:assignments.split(",")){
+ String[] brokerNameAndIds = brokerNameAndId.trim().split(":");
+ String brokerName = brokerNameAndIds[0].trim();
+ Integer id = Integer.valueOf(brokerNameAndIds[1].trim());
+ checkBrokerNameId(id);
+ if(brokerName2Ids.put(brokerName, id) != null){
+ throw new ConnectException("error config,repeat brokerName:"+brokerName);
+ }
+ if(id2BrokerName.put(id, brokerName) != null){
+ throw new ConnectException("error config,repeat brokerNameId:"+id);
+ }
+ }
+ }
+
+ @Override
+ protected Integer getBrokerNameId(String brokerName) {
+ return brokerName2Ids.get(brokerName);
+ }
+
+ @Override
+ protected String getBrokerNameById(int id) {
+ return id2BrokerName.get(id);
+ }
+
+}
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/ConfigUtil.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/ConfigUtil.java
index 2113fcbf..c5593627 100644
--- a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/ConfigUtil.java
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/ConfigUtil.java
@@ -1,7 +1,10 @@
package org.apache.rocketmq.connect.kafka.util;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
import io.openmessaging.KeyValue;
-import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rocketmq.connect.kafka.config.ConfigDefine;
import java.util.HashMap;
import java.util.Map;
@@ -9,6 +12,8 @@ import java.util.Set;
public class ConfigUtil {
+ private final static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
public static Map<String, String> keyValueConfigToMap(KeyValue keyValueConfig){
if(keyValueConfig == null){
return null;
@@ -20,15 +25,21 @@ public class ConfigUtil {
return mapConfig;
}
-
- public static KeyValue mapConfigToKeyValue(Map<String, String> mapConfig){
- if(mapConfig == null){
- return null;
+ public static Map<String, String> getKafkaConnectorConfigs(KeyValue keyValueConfig){
+ String kafkaConnectorConfigsStr = keyValueConfig.getString(ConfigDefine.KAFKA_CONNECTOR_CONFIGS);
+ try {
+ return OBJECT_MAPPER.readValue(kafkaConnectorConfigsStr,
+ new TypeReference<Map<String, String>>(){});
+ } catch (Exception e){
+ throw new ConnectException("error kafka.connector.configs config" , e);
}
+ }
- KeyValue keyValue = new DefaultKeyValue();
- mapConfig.forEach((k, v)-> keyValue.put(k, v));
-
- return keyValue;
+ public static String toJson(Map<String, String> mapConfig) {
+ try {
+ return OBJECT_MAPPER.writeValueAsString(mapConfig);
+ } catch (Exception e){
+ throw new RuntimeException(e);
+ }
}
}
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/EncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/EncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper.java
new file mode 100644
index 00000000..7548f8e5
--- /dev/null
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/EncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper.java
@@ -0,0 +1,67 @@
+package org.apache.rocketmq.connect.kafka.util;
+
+import io.openmessaging.connector.api.data.RecordPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public abstract class EncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper
+ extends RocketmqRecordPartitionKafkaTopicPartitionMapper {
+
+ private String partitionBits_config = "partitionBits";
+
+ private int partitionBits;
+
+ private int maxPartition;
+ private int maxBrokerNameId;
+
+
+ protected abstract Integer getBrokerNameId(String brokerName);
+ protected abstract String getBrokerNameById(int id);
+
+
+ @Override
+ public void configure(Map<String, String> configs) {
+ this.partitionBits = Integer.valueOf(configs.getOrDefault(partitionBits_config, "16"));
+ this.maxPartition = 2^16;
+ this.maxBrokerNameId = 2^(32-16);
+ }
+
+ protected void checkBrokerNameId(int brokerNameId){
+ if(brokerNameId < 0 || brokerNameId>this.maxBrokerNameId){
+ throw new ConnectException("brokerNameId:"+brokerNameId+" must >=0 and <="+this.maxBrokerNameId);
+ }
+ }
+
+
+
+ @Override
+ public TopicPartition toTopicPartition(RecordPartition recordPartition) {
+ int queueId = getQueueId(recordPartition);
+ Integer brokerNameId = getBrokerNameId(getBrokerName(recordPartition));
+ int encodedPartition = queueId | (brokerNameId << partitionBits);
+ return new TopicPartition(getMessageQueueTopic(recordPartition), encodedPartition);
+ }
+
+ @Override
+ public RecordPartition toRecordPartition(TopicPartition topicPartition) {
+
+ int encodedPartition = topicPartition.partition();
+ int queueId = encodedPartition & (0xffffffff>>>(32-partitionBits));
+ int brokerNameId = encodedPartition>>>partitionBits;
+
+ String topic = topicPartition.topic();
+ String brokerName = this.getBrokerNameById(brokerNameId);
+ if(brokerName == null || brokerName.trim().isEmpty()){
+ throw new ConnectException("can not get brokerName from id:"+brokerNameId);
+ }
+
+ Map<String, String> map = new HashMap<>();
+ map.put(RecordUtil.TOPIC, topic);
+ map.put(RecordUtil.BROKER_NAME, brokerName);
+ map.put(RecordUtil.QUEUE_ID, queueId + "");
+ return new RecordPartition(map);
+ }
+}
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/EncodedTopicRocketmqBrokerNameKafkaTopicPartitionMapper.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/EncodedTopicRocketmqBrokerNameKafkaTopicPartitionMapper.java
new file mode 100644
index 00000000..d94faed7
--- /dev/null
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/EncodedTopicRocketmqBrokerNameKafkaTopicPartitionMapper.java
@@ -0,0 +1,54 @@
+package org.apache.rocketmq.connect.kafka.util;
+
+import io.openmessaging.connector.api.data.RecordPartition;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * 编码到Topic
+ */
+public class EncodedTopicRocketmqBrokerNameKafkaTopicPartitionMapper
+ extends RocketmqRecordPartitionKafkaTopicPartitionMapper {
+
+ private String SEPARATOR_CONFIG = "separator";
+
+ private String DEFAULT_SEP = "@#@";;
+
+ private String separator = DEFAULT_SEP;
+
+ @Override
+ public void configure(Map<String, String> configs) {
+ this.separator = configs.getOrDefault(SEPARATOR_CONFIG, DEFAULT_SEP);
+ }
+
+ @Override
+ public TopicPartition toTopicPartition(RecordPartition recordPartition) {
+ String topicAndBrokerName = getTopicAndBrokerName(recordPartition);
+ int partition = getQueueId(recordPartition);
+ return new TopicPartition(topicAndBrokerName, partition);
+ }
+
+ private String getTopicAndBrokerName(RecordPartition recordPartition) {
+ return getMessageQueueTopic(recordPartition) +
+ this.separator +
+ getBrokerName(recordPartition);
+ }
+
+ @Override
+ public RecordPartition toRecordPartition(TopicPartition topicPartition) {
+ Map<String, String> map = getPartitionMap(topicPartition.topic());
+ map.put(RecordUtil.QUEUE_ID, topicPartition.partition() + "");
+ return new RecordPartition(map);
+ }
+
+ private Map<String, String> getPartitionMap(String topicAndBrokerName) {
+ String[] split = topicAndBrokerName.split(this.separator);
+ Map<String, String> map = new HashMap<>();
+ map.put(RecordUtil.TOPIC, split[0]);
+ map.put(RecordUtil.BROKER_NAME, split[1]);
+ return map;
+ }
+
+}
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/RecordUtil.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/RecordUtil.java
index 815ae561..f190b0f5 100644
--- a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/RecordUtil.java
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/RecordUtil.java
@@ -9,6 +9,7 @@ import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
@@ -24,60 +25,22 @@ public class RecordUtil {
public static final String TOPIC = "topic";
public static final String QUEUE_OFFSET = "queueOffset";
-
- private static final String TOPIC_SEP = "@#@";
-
-
public static final String KAFKA_MSG_KEY = "kafka_key";
public static final String KAFKA_CONNECT_RECORD_TOPIC_KEY = "kafka_connect_record_topic";
public static final String KAFKA_CONNECT_RECORD_PARTITION_KEY = "kafka_connect_record_partition";
public static final String KAFKA_CONNECT_RECORD_HEADER_KEY_PREFIX = "kafka_connect_record_header_";
- public static String getTopicAndBrokerName(RecordPartition recordPartition) {
- return new StringBuilder()
- .append(recordPartition.getPartition().get(TOPIC))
- .append(TOPIC_SEP)
- .append(recordPartition.getPartition().get(BROKER_NAME))
- .toString();
- }
-
- public static Map<String, String> getPartitionMap(String topicAndBrokerName) {
- String[] split = topicAndBrokerName.split(TOPIC_SEP);
- Map<String, String> map = new HashMap<>();
- map.put(TOPIC, split[0]);
- map.put(BROKER_NAME, split[1]);
-
- return map;
- }
-
public static long getOffset(RecordOffset recordOffset){
return Long.valueOf(
(String) recordOffset.getOffset().get(QUEUE_OFFSET)
);
}
-
- public static int getPartition(RecordPartition recordPartition){
- return Integer.valueOf(
- (String) recordPartition.getPartition().get(QUEUE_ID)
- );
- }
-
- public static TopicPartition recordPartitionToTopicPartition(RecordPartition recordPartition){
- String topicAndBrokerName = getTopicAndBrokerName(recordPartition);
- int partition = getPartition(recordPartition);
- return new TopicPartition(topicAndBrokerName, partition);
- }
-
- public static RecordPartition topicPartitionToRecordPartition(TopicPartition topicPartition){
- Map<String, String> map = RecordUtil.getPartitionMap(topicPartition.topic());
- map.put(RecordUtil.QUEUE_ID, topicPartition.partition() + "");
- return new RecordPartition(map);
- }
-
-
public static ConnectRecord toConnectRecord(SourceRecord sourceRecord, Converter keyConverter, Converter valueConverter,
HeaderConverter headerConverter){
- RecordPartition recordPartition = new RecordPartition(new HashMap<>(sourceRecord.sourcePartition()));
+ Map<String, Object> partition = new HashMap<>();
+ partition.put("topic", sourceRecord.topic());
+ partition.putAll(sourceRecord.sourcePartition());
+ RecordPartition recordPartition = new RecordPartition(partition);
RecordOffset recordOffset = new RecordOffset(new HashMap<>(sourceRecord.sourceOffset()));
Long timestamp = sourceRecord.timestamp();
@@ -154,4 +117,19 @@ public class RecordUtil {
return sourceRecord;
}
+ public static ConnectRecord sinkRecordToConnectRecord(SinkRecord sinkRecord, RocketmqRecordPartitionKafkaTopicPartitionMapper kafkaTopicPartitionMapper){
+ TopicPartition topicPartition = new TopicPartition(sinkRecord.topic(), sinkRecord.kafkaPartition());
+ RecordPartition recordPartition = kafkaTopicPartitionMapper.toRecordPartition(topicPartition);
+
+ Map<String, String> offsetMap = new HashMap<>();
+ offsetMap.put(RecordUtil.QUEUE_OFFSET, sinkRecord.kafkaOffset() + "");
+ RecordOffset recordOffset = new RecordOffset(offsetMap);
+
+ ConnectRecord connectRecord = new ConnectRecord(
+ recordPartition, recordOffset, sinkRecord.timestamp(),
+ SchemaBuilder.string().build(), sinkRecord.value()
+ );
+ return connectRecord;
+ }
+
}
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/RegexEncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/RegexEncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper.java
new file mode 100644
index 00000000..952b9ab7
--- /dev/null
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/RegexEncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper.java
@@ -0,0 +1,53 @@
+package org.apache.rocketmq.connect.kafka.util;
+
+import org.apache.kafka.connect.errors.ConnectException;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class RegexEncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper
+ extends EncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper{
+
+
+ private String pattern_config = "pattern";
+ private Pattern pattern;
+
+ private Map<Integer, String> id2BrokerName = new HashMap<>();
+
+ @Override
+ public void configure(Map<String, String> configs) {
+ super.configure(configs);
+ String pattern = configs.get(pattern_config);
+ if(pattern == null || pattern.trim().isEmpty()){
+ throw new ConnectException("miss pattern config");
+ }
+
+ this.pattern = Pattern.compile(pattern);
+
+ }
+
+ @Override
+ protected Integer getBrokerNameId(String brokerName) {
+ Matcher matcher = pattern.matcher(brokerName);
+ if(matcher.find()){
+ try {
+ Integer id = Integer.valueOf(matcher.group());
+ if(id2BrokerName.put(id, brokerName) != null){
+ throw new ConnectException("error config,repeat brokerNameId:"+id);
+ }
+ return id;
+ } catch (NumberFormatException e){
+ throw new ConnectException("can not get brokerNameId from brokerName for regex:"+brokerName, e);
+ }
+ } else {
+ throw new ConnectException("can not get brokerNameId from brokerName for find:"+brokerName);
+ }
+ }
+
+ @Override
+ protected String getBrokerNameById(int id) {
+ return this.id2BrokerName.get(id);
+ }
+}
diff --git a/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/RocketmqRecordPartitionKafkaTopicPartitionMapper.java b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/RocketmqRecordPartitionKafkaTopicPartitionMapper.java
new file mode 100644
index 00000000..5692ac97
--- /dev/null
+++ b/connectors/rocketmq-connect-kafka-connector-adapter/src/main/java/org/apache/rocketmq/connect/kafka/util/RocketmqRecordPartitionKafkaTopicPartitionMapper.java
@@ -0,0 +1,78 @@
+package org.apache.rocketmq.connect.kafka.util;
+
+import io.openmessaging.connector.api.data.RecordPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rocketmq.connect.kafka.config.ConfigDefine;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * rocketmq分区概念是三元组,即MessageQueue(topic,brokerName,queueId)
+ *kafka分区概念是二元组,即TopicPartition(topic,partition)
+ *org.apache.kafka.connect.sink.SinkTaskContext多个接口需要TopicPartition(topic,partition)映射到
+ *MessageQueue(topic,brokerName,queueId),所以需要转换
+ */
+public abstract class RocketmqRecordPartitionKafkaTopicPartitionMapper {
+
+
+ public static RocketmqRecordPartitionKafkaTopicPartitionMapper newKafkaTopicPartitionMapper(Map<String, String> kafkaTaskProps){
+ String mapper = kafkaTaskProps.getOrDefault(ConfigDefine.ROCKETMQ_RECORDPARTITION_KAFKATOPICPARTITION_MAPPER, "encodedTopic");
+ RocketmqRecordPartitionKafkaTopicPartitionMapper kafkaTopicPartitionMapper;
+ if(mapper.equalsIgnoreCase("encodedTopic")){
+ kafkaTopicPartitionMapper = new EncodedTopicRocketmqBrokerNameKafkaTopicPartitionMapper();
+ } else if(mapper.equalsIgnoreCase("assignEncodedPartition")){
+ kafkaTopicPartitionMapper = new AssignEncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper();
+ } else if(mapper.equalsIgnoreCase("regexEncodedPartition")){
+ kafkaTopicPartitionMapper = new RegexEncodedPartitionRocketmqRecordPartitionKafkaTopicPartitionMapper();
+ } else {
+ throw new ConnectException("unknown rocketmq.recordPartition.kafkaTopicPartition.mapper config:"+mapper);
+ }
+ Map<String, String> mapperConfig = new HashMap<>();
+ String prefix = mapper+".";
+ for(Map.Entry<String, String> kv: kafkaTaskProps.entrySet()){
+ if(kv.getKey().startsWith(prefix)){
+ mapperConfig.put(kv.getKey().substring(prefix.length()), kv.getValue());
+ }
+ }
+ kafkaTopicPartitionMapper.configure(mapperConfig);
+ return kafkaTopicPartitionMapper;
+ }
+
+ protected String getBrokerName(RecordPartition recordPartition){
+ return (String)recordPartition.getPartition().get(RecordUtil.BROKER_NAME);
+ }
+
+ protected String getMessageQueueTopic(RecordPartition recordPartition){
+ return (String)recordPartition.getPartition().get(RecordUtil.TOPIC);
+ }
+
+ protected int getQueueId(RecordPartition recordPartition){
+ return Integer.valueOf(
+ (String) recordPartition.getPartition().get(RecordUtil.QUEUE_ID)
+ );
+ }
+
+ /**
+ * 配置
+ * @param configs
+ */
+ public abstract void configure(Map<String, String> configs);
+
+ /**
+ * 转换为kafka TopicPartition
+ * @param recordPartition
+ * @return
+ */
+ public abstract TopicPartition toTopicPartition(RecordPartition recordPartition);
+
+ /**
+ * 转换为openmessaging RecordPartition 也就是 rocketmq的MessageQueue
+ * @param topicPartition
+ * @return
+ */
+ public abstract RecordPartition toRecordPartition(TopicPartition topicPartition);
+
+
+}