You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/26 08:40:05 UTC
[rocketmq-connect] branch master updated: [ISSUE #399]Debezium sopport heartbeat record (#401)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 6b6a6be5 [ISSUE #399]Debezium sopport heartbeat record (#401)
6b6a6be5 is described below
commit 6b6a6be5097fbf559098dc6b47182c2e26ecc0d3
Author: xiaoyi <su...@163.com>
AuthorDate: Mon Dec 26 16:40:00 2022 +0800
[ISSUE #399]Debezium sopport heartbeat record (#401)
* fixed null pointer exception #294
* Fix invalid offset submitted by sinktask #310
* Support debezium heartbeat record #399
* Support debezium heartbeat record #399
* Support debezium heartbeat record #399
* fixed
* fixed
---
.../kafka/connect/adaptor/schema/Converters.java | 10 ++++-
.../resources/debezium-mysql-source-config.yaml | 19 ++++++++--
.../runtime/connectorwrapper/WorkerSourceTask.java | 44 +++++++++++++---------
.../connect/runtime/utils/ConnectUtil.java | 2 +-
4 files changed, 51 insertions(+), 24 deletions(-)
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java
index 13115545..c4caf592 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.connect.kafka.connect.adaptor.schema;
import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
+import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.header.ConnectHeaders;
@@ -37,6 +38,7 @@ import java.util.Map;
*/
public class Converters {
+ public static final String TOPIC = "topic";
public static ConnectRecord fromSourceRecord(SourceRecord record) {
// sourceRecord convert connect Record
@@ -51,8 +53,14 @@ public class Converters {
RocketMQSourceValueConverter rocketMQSourceValueConverter = new RocketMQSourceValueConverter();
+ String sourceTopic = record.topic();
+ Map<String, Object> partition = (Map<String, Object>) record.sourcePartition();
+ if (StringUtils.isNotBlank(sourceTopic) && partition != null) {
+ // set topic
+ partition.put(TOPIC, sourceTopic);
+ }
ConnectRecord connectRecord = new ConnectRecord(
- new RecordPartition(record.sourcePartition()),
+ new RecordPartition(partition),
new RecordOffset(record.sourceOffset()),
record.timestamp(),
keySchema,
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/resources/debezium-mysql-source-config.yaml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/resources/debezium-mysql-source-config.yaml
index 5dac1dcc..0f0e3b6c 100644
--- a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/resources/debezium-mysql-source-config.yaml
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/resources/debezium-mysql-source-config.yaml
@@ -21,9 +21,18 @@
"connector-class": "org.apache.rocketmq.connect.debezium.mysql.DebeziumMysqlConnector",
"max-task": "1",
"connect-topicname": "debezium-mysql-source",
- "debezium.transforms": "Unwrap",
- "debezium.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
- "debezium.transforms.Unwrap.delete.handling.mode": "drop",
+
+ ## TopicRoute topic重定向
+ "kafka.transforms": "Unwrap,TopicRoute",
+ "kafka.transforms.Unwrap.delete.handling.mode": "none",
+ "kafka.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
+ "kafka.transforms.Unwrap.add.headers": "op,source.db,source.table",
+ "kafka.transforms.TopicRoute.type": "org.apache.kafka.connect.transforms.RegexRouter",
+ "kafka.transforms.TopicRoute.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
+ "kafka.transforms.TopicRoute.replacement": "debezium-incremental-snapshot-readonly-0009",
+
+ ## ddl 禁用
+ "include.schema.changes": false,
"database.history.skip.unparseable.ddl": true,
"database.history.name.srv.addr": "localhost:9876",
@@ -31,7 +40,6 @@
"database.history.store.only.monitored.tables.ddl": true,
"database.user": "*******",
- "include.schema.changes": false,
"database.server.name": "test-server-02",
"database.port": 3306,
"database.hostname": "**********",
@@ -41,6 +49,9 @@
"max.batch.size": 50,
"database.include.list": "db-01",
"snapshot.mode": "when_needed",
+ ## heartbeat
+ "heartbeat.interval.ms": 60000,
+ "heartbeat.topics.prefix": "heartbeat-topic-prefix",
"source-record-converter": "org.apache.rocketmq.connect.runtime.converter.JsonConverter"
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index cfec5924..45e4f0d2 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -76,6 +76,7 @@ import org.apache.rocketmq.connect.runtime.store.PositionStorageWriter;
import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
import org.apache.rocketmq.connect.runtime.utils.Utils;
+import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -408,25 +409,10 @@ public class WorkerSourceTask extends WorkerTask {
* @return
*/
private String maybeCreateAndGetTopic(ConnectRecord record) {
- String topic = taskConfig.getString(SourceConnectorConfig.CONNECT_TOPICNAME);
+ String topic = overwriteTopicFromRecord(record);
if (StringUtils.isBlank(topic)) {
- RecordPosition recordPosition = record.getPosition();
- if (null == recordPosition) {
- log.error("connect-topicname config is null and recordPosition is null , lack of topic config");
- }
- RecordPartition partition = recordPosition.getPartition();
- if (null == partition) {
- log.error("connect-topicname config is null and partition is null , lack of topic config");
- }
- Map<String, ?> partitionMap = partition.getPartition();
- if (null == partitionMap) {
- log.error("connect-topicname config is null and partitionMap is null , lack of topic config");
- }
- Object o = partitionMap.get(TOPIC);
- if (null == o) {
- log.error("connect-topicname config is null and partitionMap.get is null , lack of topic config");
- }
- topic = (String) o;
+ // topic from config
+ topic = taskConfig.getString(SourceConnectorConfig.CONNECT_TOPICNAME);
}
if (StringUtils.isBlank(topic)) {
throw new ConnectException("source connect lack of topic config");
@@ -437,6 +423,28 @@ public class WorkerSourceTask extends WorkerTask {
return topic;
}
+ @Nullable
+ private static String overwriteTopicFromRecord(ConnectRecord record) {
+ RecordPosition recordPosition = record.getPosition();
+ if (null == recordPosition) {
+ log.error("Record position is null , lack of topic config");
+ }
+ RecordPartition partition = recordPosition.getPartition();
+ if (null == partition) {
+ log.error("Partition is null , lack of topic config");
+ }
+ Map<String, ?> partitionMap = partition.getPartition();
+ if (null == partitionMap) {
+ log.error("Partition map is null , lack of topic config");
+ }
+ Object o = partitionMap.get(TOPIC);
+ if (null == o) {
+ log.error("Partition map element topic is null , lack of topic config");
+ return null;
+ }
+ return (String) o;
+ }
+
private void putExtendMsgProperty(ConnectRecord sourceDataEntry, Message sourceMessage, String topic) {
KeyValue extensionKeyValues = sourceDataEntry.getExtensions();
if (null == extensionKeyValues) {
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
index 446a6dfa..47a8ff52 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
@@ -198,7 +198,7 @@ public class ConnectUtil {
} catch (MQClientException e) {
foundTopicRouteInfo = false;
} catch (Exception e) {
- throw new RuntimeException("get topic route info failed", e);
+ throw new RuntimeException("Get topic route info failed", e);
} finally {
if (defaultMQAdminExt != null) {
defaultMQAdminExt.shutdown();