You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/26 08:40:05 UTC

[rocketmq-connect] branch master updated: [ISSUE #399]Debezium sopport heartbeat record (#401)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b6a6be5 [ISSUE #399]Debezium sopport heartbeat record (#401)
6b6a6be5 is described below

commit 6b6a6be5097fbf559098dc6b47182c2e26ecc0d3
Author: xiaoyi <su...@163.com>
AuthorDate: Mon Dec 26 16:40:00 2022 +0800

    [ISSUE #399]Debezium sopport heartbeat record (#401)
    
    * fixed null pointer exception #294
    
    * Fix invalid offset submitted by sinktask #310
    
    * Support debezium heartbeat record #399
    
    * Support debezium heartbeat record #399
    
    * Support debezium heartbeat record #399
    
    * fixed
    
    * fixed
---
 .../kafka/connect/adaptor/schema/Converters.java   | 10 ++++-
 .../resources/debezium-mysql-source-config.yaml    | 19 ++++++++--
 .../runtime/connectorwrapper/WorkerSourceTask.java | 44 +++++++++++++---------
 .../connect/runtime/utils/ConnectUtil.java         |  2 +-
 4 files changed, 51 insertions(+), 24 deletions(-)

diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java
index 13115545..c4caf592 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.connect.kafka.connect.adaptor.schema;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.RecordOffset;
 import io.openmessaging.connector.api.data.RecordPartition;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.header.ConnectHeaders;
@@ -37,6 +38,7 @@ import java.util.Map;
  */
 public class Converters {
 
+    public static final String TOPIC = "topic";
 
     public static ConnectRecord fromSourceRecord(SourceRecord record) {
         // sourceRecord convert connect Record
@@ -51,8 +53,14 @@ public class Converters {
 
 
         RocketMQSourceValueConverter rocketMQSourceValueConverter = new RocketMQSourceValueConverter();
+        String sourceTopic = record.topic();
+        Map<String, Object> partition = (Map<String, Object>) record.sourcePartition();
+        if (StringUtils.isNotBlank(sourceTopic) && partition != null) {
+            // set topic
+            partition.put(TOPIC, sourceTopic);
+        }
         ConnectRecord connectRecord = new ConnectRecord(
-                new RecordPartition(record.sourcePartition()),
+                new RecordPartition(partition),
                 new RecordOffset(record.sourceOffset()),
                 record.timestamp(),
                 keySchema,
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/resources/debezium-mysql-source-config.yaml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/resources/debezium-mysql-source-config.yaml
index 5dac1dcc..0f0e3b6c 100644
--- a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/resources/debezium-mysql-source-config.yaml
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/resources/debezium-mysql-source-config.yaml
@@ -21,9 +21,18 @@
   "connector-class": "org.apache.rocketmq.connect.debezium.mysql.DebeziumMysqlConnector",
   "max-task": "1",
   "connect-topicname": "debezium-mysql-source",
-  "debezium.transforms": "Unwrap",
-  "debezium.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
-  "debezium.transforms.Unwrap.delete.handling.mode": "drop",
+
+  ## TopicRoute topic重定向
+  "kafka.transforms": "Unwrap,TopicRoute",
+  "kafka.transforms.Unwrap.delete.handling.mode": "none",
+  "kafka.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
+  "kafka.transforms.Unwrap.add.headers": "op,source.db,source.table",
+  "kafka.transforms.TopicRoute.type": "org.apache.kafka.connect.transforms.RegexRouter",
+  "kafka.transforms.TopicRoute.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
+  "kafka.transforms.TopicRoute.replacement": "debezium-incremental-snapshot-readonly-0009",
+
+  ## ddl 禁用
+  "include.schema.changes": false,
 
   "database.history.skip.unparseable.ddl": true,
   "database.history.name.srv.addr": "localhost:9876",
@@ -31,7 +40,6 @@
   "database.history.store.only.monitored.tables.ddl": true,
 
   "database.user": "*******",
-  "include.schema.changes": false,
   "database.server.name": "test-server-02",
   "database.port": 3306,
   "database.hostname": "**********",
@@ -41,6 +49,9 @@
   "max.batch.size": 50,
   "database.include.list": "db-01",
   "snapshot.mode": "when_needed",
+  ## heartbeat
+  "heartbeat.interval.ms": 60000,
+  "heartbeat.topics.prefix": "heartbeat-topic-prefix",
 
   "source-record-converter": "org.apache.rocketmq.connect.runtime.converter.JsonConverter"
 }
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index cfec5924..45e4f0d2 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -76,6 +76,7 @@ import org.apache.rocketmq.connect.runtime.store.PositionStorageWriter;
 import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
 import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
 import org.apache.rocketmq.connect.runtime.utils.Utils;
+import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -408,25 +409,10 @@ public class WorkerSourceTask extends WorkerTask {
      * @return
      */
     private String maybeCreateAndGetTopic(ConnectRecord record) {
-        String topic = taskConfig.getString(SourceConnectorConfig.CONNECT_TOPICNAME);
+        String topic = overwriteTopicFromRecord(record);
         if (StringUtils.isBlank(topic)) {
-            RecordPosition recordPosition = record.getPosition();
-            if (null == recordPosition) {
-                log.error("connect-topicname config is null and recordPosition is null , lack of topic config");
-            }
-            RecordPartition partition = recordPosition.getPartition();
-            if (null == partition) {
-                log.error("connect-topicname config is null and partition is null , lack of topic config");
-            }
-            Map<String, ?> partitionMap = partition.getPartition();
-            if (null == partitionMap) {
-                log.error("connect-topicname config is null and partitionMap is null , lack of topic config");
-            }
-            Object o = partitionMap.get(TOPIC);
-            if (null == o) {
-                log.error("connect-topicname config is null and partitionMap.get is null , lack of topic config");
-            }
-            topic = (String) o;
+            // topic from config
+            topic = taskConfig.getString(SourceConnectorConfig.CONNECT_TOPICNAME);
         }
         if (StringUtils.isBlank(topic)) {
             throw new ConnectException("source connect lack of topic config");
@@ -437,6 +423,28 @@ public class WorkerSourceTask extends WorkerTask {
         return topic;
     }
 
+    @Nullable
+    private static String overwriteTopicFromRecord(ConnectRecord record) {
+        RecordPosition recordPosition = record.getPosition();
+        if (null == recordPosition) {
+            log.error("Record position is null , lack of topic config");
+        }
+        RecordPartition partition = recordPosition.getPartition();
+        if (null == partition) {
+            log.error("Partition is null , lack of topic config");
+        }
+        Map<String, ?> partitionMap = partition.getPartition();
+        if (null == partitionMap) {
+            log.error("Partition map is null , lack of topic config");
+        }
+        Object o = partitionMap.get(TOPIC);
+        if (null == o) {
+            log.error("Partition map element topic is null , lack of topic config");
+            return null;
+        }
+        return (String) o;
+    }
+
     private void putExtendMsgProperty(ConnectRecord sourceDataEntry, Message sourceMessage, String topic) {
         KeyValue extensionKeyValues = sourceDataEntry.getExtensions();
         if (null == extensionKeyValues) {
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
index 446a6dfa..47a8ff52 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
@@ -198,7 +198,7 @@ public class ConnectUtil {
         } catch (MQClientException e) {
             foundTopicRouteInfo = false;
         } catch (Exception e) {
-            throw new RuntimeException("get topic route info  failed", e);
+            throw new RuntimeException("Get topic route info  failed", e);
         } finally {
             if (defaultMQAdminExt != null) {
                 defaultMQAdminExt.shutdown();