You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/06/22 06:26:50 UTC
[rocketmq-connect] branch master updated: [ISSUE #93] Connect plugins supports CDC mode, such as MySQL binlog (#152)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new d8ee3af [ISSUE #93] Connect plugins supports CDC mode, such as MySQL binlog (#152)
d8ee3af is described below
commit d8ee3af54c7cb00ad22c4180e313cb20fd5dc6c7
Author: xiaoyi <su...@163.com>
AuthorDate: Wed Jun 22 14:26:46 2022 +0800
[ISSUE #93] Connect plugins supports CDC mode, such as MySQL binlog (#152)
* rocketmq connect supports debezium CDC mode #93
* Abstruct DebeziumConnector
* add debezium connector validate
* add connect validate、transform #93
* test debezium mysql #93
* modify kafka transform prefix
* add debezium config template
* upgrade rocketmq connect JDBC plug-in #153
* fixed api #93
* fixed converter
* perfect conversion
* upgrade rocketmq jdbc plugin and runtime api to 0.1.3 #153
* upgrade debezium api to 0.1.3
* upgrade debezium api to 0.1.3
* Enhance Kafka connect adaptability #93
* fixed
* add Decimal logical
* optimize transform api
* upgrade api to 0.1.3
* upgrade api to 0.1.3
* upgrade api to 0.1.3
* fixed
* fixed
* fixed
Co-authored-by: “sunxiaojian” <“sunxiaojian926@163.com”>
Co-authored-by: xiaojian.sxj <xi...@alibaba-inc.com>
---
connectors/rocketmq-connect-debezium/README.md | 0
.../kafka-connect-adaptor/pom.xml | 78 +++++
.../connect/adaptor/config/ConnectKeyValue.java | 120 ++++++++
.../connector/AbstractKafkaSinkConnector.java | 99 ++++++
.../connector/AbstractKafkaSourceConnector.java | 99 ++++++
.../adaptor/connector/ConnectorClassSetter.java | 22 ++
.../adaptor/connector/KafkaConnectorContext.java | 23 ++
.../connector/KafkaSinkAdaptorConnector.java | 32 ++
.../connector/KafkaSourceAdaptorConnector.java | 32 ++
.../adaptor/context/KafkaOffsetStorageReader.java | 69 +++++
.../context/RocketMQKafkaErrantRecordReporter.java | 28 ++
.../context/RocketMQKafkaSinkTaskContext.java | 151 +++++++++
.../context/RocketMQKafkaSourceTaskContext.java | 44 +++
.../kafka/connect/adaptor/schema/Converters.java | 161 ++++++++++
.../adaptor/schema/KafkaSinkSchemaConverter.java | 311 +++++++++++++++++++
.../adaptor/schema/KafkaSinkValueConverter.java | 132 ++++++++
.../schema/RocketMQSourceSchemaConverter.java | 302 ++++++++++++++++++
.../schema/RocketMQSourceValueConverter.java | 137 +++++++++
.../adaptor/task/AbstractKafkaConnectSink.java | 122 ++++++++
.../adaptor/task/AbstractKafkaConnectSource.java | 132 ++++++++
.../adaptor/task/KafkaConnectAdaptorSink.java | 67 ++++
.../adaptor/task/KafkaConnectAdaptorSource.java | 69 +++++
.../connect/adaptor/task/TaskClassSetter.java | 23 ++
.../adaptor/transforms/TransformationWrapper.java | 89 ++++++
.../connect/adaptor/SourceRecordConverterTest.java | 203 +++++++++++++
connectors/rocketmq-connect-debezium/pom.xml | 338 +++++++++++++++++++++
.../rocketmq-connect-debezium-core/pom.xml | 103 +++++++
.../rocketmq/connect/debezium/ConnectUtil.java | 232 ++++++++++++++
.../connect/debezium/DebeziumConnector.java | 36 +++
.../rocketmq/connect/debezium/DebeziumSource.java | 42 +++
.../connect/debezium/RocketMqConnectConfig.java | 194 ++++++++++++
.../connect/debezium/RocketMqDatabaseHistory.java | 255 ++++++++++++++++
.../debezium/SchemaRenameTransformation.java | 29 ++
.../rocketmq-connect-debezium-mongodb/pom.xml | 190 ++++++++++++
.../debezium/mongodb/DebeziumMongoDBConnector.java | 49 +++
.../debezium/mongodb/DebeziumMongoDBSource.java | 37 +++
.../resources/debezium-mongodb-source-config.yaml | 18 ++
.../rocketmq-connect-debezium-mysql/pom.xml | 197 ++++++++++++
.../debezium/mysql/DebeziumMysqlConnector.java | 47 +++
.../debezium/mysql/DebeziumMysqlSource.java | 37 +++
.../resources/debezium-mysql-source-config.yaml | 46 +++
.../rocketmq-connect-debezium-oracle/pom.xml | 190 ++++++++++++
.../debezium/oracle/DebeziumOracleConnector.java | 47 +++
.../debezium/oracle/DebeziumOracleSource.java | 37 +++
.../resources/debezium-oracle-source-config.yaml | 19 ++
.../rocketmq-connect-debezium-postgresql/pom.xml | 195 ++++++++++++
.../postgres/DebeziumPostgresConnector.java | 47 +++
.../debezium/postgres/DebeziumPostgresSource.java | 36 +++
.../resources/debezium-postgres-source-config.yaml | 18 ++
.../rocketmq-connect-debezium-sqlserver/pom.xml | 187 ++++++++++++
.../sqlserver/DebeziumSqlServerConnector.java | 47 +++
.../sqlserver/DebeziumSqlServerSource.java | 35 +++
.../debezium-sqlserver-source-config.yaml | 19 ++
.../connect/jdbc/common/DebeziumTimeTypes.java | 59 ++++
.../rocketmq/connect/jdbc/common/HeaderField.java | 29 ++
.../jdbc/dialect/impl/GenericDatabaseDialect.java | 14 +
.../apache/rocketmq/connect/jdbc/sink/Updater.java | 4 +
.../runtime/connectorwrapper/WorkerSinkTask.java | 29 +-
.../runtime/connectorwrapper/WorkerSourceTask.java | 4 +-
59 files changed, 5397 insertions(+), 14 deletions(-)
diff --git a/connectors/rocketmq-connect-debezium/README.md b/connectors/rocketmq-connect-debezium/README.md
new file mode 100644
index 0000000..e69de29
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/pom.xml b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/pom.xml
new file mode 100644
index 0000000..64190bb
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/pom.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>rocketmq-connect-debezium</artifactId>
+ <groupId>org.apache.rocketmq</groupId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>kafka-connect-adaptor</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+
+ <dependencies>
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-connector</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-tools</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-remoting</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-openmessaging</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>connect-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>connect-runtime</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/config/ConnectKeyValue.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/config/ConnectKeyValue.java
new file mode 100644
index 0000000..6856a07
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/config/ConnectKeyValue.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.connect.adaptor.config;
+
+import io.openmessaging.KeyValue;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * connect key value
+ */
+public class ConnectKeyValue implements KeyValue {
+ private Map<String, String> properties;
+
+ public ConnectKeyValue() {
+ properties = new ConcurrentHashMap<String, String>();
+ }
+
+ @Override
+ public KeyValue put(String key, int value) {
+ properties.put(key, String.valueOf(value));
+ return this;
+ }
+
+ @Override
+ public KeyValue put(String key, long value) {
+ properties.put(key, String.valueOf(value));
+ return this;
+ }
+
+ @Override
+ public KeyValue put(String key, double value) {
+ properties.put(key, String.valueOf(value));
+ return this;
+ }
+
+ @Override
+ public KeyValue put(String key, String value) {
+ properties.put(key, String.valueOf(value));
+ return this;
+ }
+
+ @Override
+ public int getInt(String key) {
+ if (!properties.containsKey(key))
+ return 0;
+ return Integer.valueOf(properties.get(key));
+ }
+
+ @Override
+ public int getInt(final String key, final int defaultValue) {
+ return properties.containsKey(key) ? getInt(key) : defaultValue;
+ }
+
+ @Override
+ public long getLong(String key) {
+ if (!properties.containsKey(key))
+ return 0;
+ return Long.valueOf(properties.get(key));
+ }
+
+ @Override
+ public long getLong(final String key, final long defaultValue) {
+ return properties.containsKey(key) ? getLong(key) : defaultValue;
+ }
+
+ @Override
+ public double getDouble(String key) {
+ if (!properties.containsKey(key))
+ return 0;
+ return Double.valueOf(properties.get(key));
+ }
+
+ @Override
+ public double getDouble(final String key, final double defaultValue) {
+ return properties.containsKey(key) ? getDouble(key) : defaultValue;
+ }
+
+ @Override
+ public String getString(String key) {
+ return properties.get(key);
+ }
+
+ @Override
+ public String getString(final String key, final String defaultValue) {
+ return properties.containsKey(key) ? getString(key) : defaultValue;
+ }
+
+ @Override
+ public Set<String> keySet() {
+ return properties.keySet();
+ }
+
+ @Override
+ public boolean containsKey(String key) {
+ return properties.containsKey(key);
+ }
+
+ public Map<String, String> config() {
+ return properties;
+ }
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/AbstractKafkaSinkConnector.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/AbstractKafkaSinkConnector.java
new file mode 100644
index 0000000..8e5391e
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/AbstractKafkaSinkConnector.java
@@ -0,0 +1,99 @@
+package org.apache.rocketmq.connect.kafka.connect.adaptor.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import io.openmessaging.connector.api.errors.ConnectException;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.config.ConnectKeyValue;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * kafka source connector
+ */
+public abstract class AbstractKafkaSinkConnector extends SinkConnector implements ConnectorClassSetter {
+
+ /**
+ * kafka connect init
+ */
+ protected ConnectKeyValue configValue;
+
+ /**
+ * source connector
+ */
+ protected org.apache.kafka.connect.sink.SinkConnector sinkConnector;
+
+ /**
+ * task config
+ */
+ protected Map<String, String> taskConfig;
+
+ /**
+ * try override start and stop
+ * @return
+ */
+ protected org.apache.kafka.connect.sink.SinkConnector originalSinkConnector(){
+ return sinkConnector;
+ }
+
+ /**
+ * Returns a set of configurations for Tasks based on the current configuration,
+ * producing at most count configurations.
+ * @param maxTasks maximum number of configurations to generate
+ * @return configurations for Tasks
+ */
+ @Override
+ public List<KeyValue> taskConfigs(int maxTasks) {
+ List<Map<String, String>> groupConnectors = sinkConnector.taskConfigs(maxTasks);
+ List<KeyValue> configs = new ArrayList<>();
+ for (Map<String, String> configMaps : groupConnectors) {
+ KeyValue keyValue = new DefaultKeyValue();
+ configMaps.forEach((k, v)->{
+ keyValue.put(k, v);
+ });
+ configs.add(keyValue);
+ }
+ return configs;
+ }
+
+ /**
+ * Start the component
+ * @param config component context
+ */
+ @Override
+ public void start(KeyValue config) {
+ this.configValue = new ConnectKeyValue();
+ config.keySet().forEach(key -> {
+ this.configValue.put(key, config.getString(key));
+ });
+ setConnectorClass(configValue);
+ taskConfig = new HashMap<>(configValue.config());
+ // get the source class name from config and create source task from reflection
+ try {
+ sinkConnector = Class.forName(taskConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG))
+ .asSubclass(org.apache.kafka.connect.sink.SinkConnector.class)
+ .getDeclaredConstructor()
+ .newInstance();
+ } catch (Exception e) {
+ throw new ConnectException("Load task class failed, " + taskConfig.get(TaskConfig.TASK_CLASS_CONFIG));
+ }
+ }
+
+ /**
+ * Stop the component.
+ */
+ @Override
+ public void stop() {
+ if (sinkConnector != null){
+ sinkConnector = null;
+ configValue = null;
+ taskConfig = null;
+ }
+ }
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/AbstractKafkaSourceConnector.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/AbstractKafkaSourceConnector.java
new file mode 100644
index 0000000..f9d5456
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/AbstractKafkaSourceConnector.java
@@ -0,0 +1,99 @@
+package org.apache.rocketmq.connect.kafka.connect.adaptor.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import io.openmessaging.connector.api.errors.ConnectException;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.config.ConnectKeyValue;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * kafka source connector
+ */
+public abstract class AbstractKafkaSourceConnector extends SourceConnector implements ConnectorClassSetter {
+
+
+ /**
+ * kafka connect init
+ */
+ protected ConnectKeyValue configValue;
+
+ /**
+ * task config
+ */
+ protected Map<String, String> taskConfig;
+
+ /**
+ * source connector
+ */
+ protected org.apache.kafka.connect.source.SourceConnector sourceConnector;
+
+ /**
+ * try override start and stop
+ * @return
+ */
+ protected org.apache.kafka.connect.source.SourceConnector originalSinkConnector(){
+ return sourceConnector;
+ }
+
+ /**
+ * Returns a set of configurations for Tasks based on the current configuration,
+ * producing at most count configurations.
+ * @param maxTasks maximum number of configurations to generate
+ * @return configurations for Tasks
+ */
+ @Override
+ public List<KeyValue> taskConfigs(int maxTasks) {
+ List<Map<String, String>> groupConnectors = sourceConnector.taskConfigs(maxTasks);
+ List<KeyValue> configs = new ArrayList<>();
+ for (Map<String, String> configMaps : groupConnectors) {
+ KeyValue keyValue = new DefaultKeyValue();
+ configMaps.forEach((k, v)->{
+ keyValue.put(k, v);
+ });
+ configs.add(keyValue);
+ }
+ return configs;
+ }
+
+ /**
+ * Start the component
+ * @param config component context
+ */
+ @Override
+ public void start(KeyValue config) {
+ this.configValue = new ConnectKeyValue();
+ config.keySet().forEach(key -> {
+ this.configValue.put(key, config.getString(key));
+ });
+ setConnectorClass(configValue);
+ taskConfig = new HashMap<>(configValue.config());
+ // get the source class name from config and create source task from reflection
+ try {
+ sourceConnector = Class.forName(taskConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG))
+ .asSubclass(org.apache.kafka.connect.source.SourceConnector.class)
+ .getDeclaredConstructor()
+ .newInstance();
+ } catch (Exception e) {
+ throw new ConnectException("Load task class failed, " + taskConfig.get(TaskConfig.TASK_CLASS_CONFIG));
+ }
+ }
+
+ /**
+ * Stop the component.
+ */
+ @Override
+ public void stop() {
+ if (sourceConnector != null){
+ sourceConnector = null;
+ configValue = null;
+ this.taskConfig = null;
+ }
+ }
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/ConnectorClassSetter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/ConnectorClassSetter.java
new file mode 100644
index 0000000..b0c0007
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/ConnectorClassSetter.java
@@ -0,0 +1,22 @@
+package org.apache.rocketmq.connect.kafka.connect.adaptor.connector;
+
+import io.openmessaging.KeyValue;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+
+/**
+ * connector class setter
+ */
+public interface ConnectorClassSetter {
+ /**
+ * set connector class
+ * @param config
+ */
+ default void setConnectorClass(KeyValue config) {
+ config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, getConnectorClass());
+ }
+
+ /**
+ * get connector class
+ */
+ String getConnectorClass();
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaConnectorContext.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaConnectorContext.java
new file mode 100644
index 0000000..a42c507
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaConnectorContext.java
@@ -0,0 +1,23 @@
+package org.apache.rocketmq.connect.kafka.connect.adaptor.connector;
+
+import org.apache.kafka.connect.connector.ConnectorContext;
+
+/**
+ * kafka connect context
+ */
+public class KafkaConnectorContext implements ConnectorContext {
+ io.openmessaging.connector.api.component.connector.ConnectorContext context;
+ public KafkaConnectorContext(io.openmessaging.connector.api.component.connector.ConnectorContext context){
+ this.context = context;
+ }
+
+ @Override
+ public void requestTaskReconfiguration() {
+ context.requestTaskReconfiguration();
+ }
+
+ @Override
+ public void raiseError(Exception e) {
+ context.raiseError(e);
+ }
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaSinkAdaptorConnector.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaSinkAdaptorConnector.java
new file mode 100644
index 0000000..90575bf
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaSinkAdaptorConnector.java
@@ -0,0 +1,32 @@
+package org.apache.rocketmq.connect.kafka.connect.adaptor.connector;
+
+import io.openmessaging.KeyValue;
+
+/**
+ * kafka source connector
+ */
+public abstract class KafkaSinkAdaptorConnector extends AbstractKafkaSinkConnector {
+
+ /**
+ * Start the component
+ * @param config component context
+ */
+ @Override
+ public void start(KeyValue config) {
+ super.start(config);
+ sinkConnector.validate(taskConfig);
+ sinkConnector.initialize(new KafkaConnectorContext(connectorContext));
+ sinkConnector.start(taskConfig);
+ }
+
+ /**
+ * Stop the component.
+ */
+ @Override
+ public void stop() {
+ if (sinkConnector != null){
+ sinkConnector.stop();
+ }
+ super.stop();
+ }
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaSourceAdaptorConnector.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaSourceAdaptorConnector.java
new file mode 100644
index 0000000..cea0a22
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaSourceAdaptorConnector.java
@@ -0,0 +1,32 @@
+package org.apache.rocketmq.connect.kafka.connect.adaptor.connector;
+
+import io.openmessaging.KeyValue;
+
+/**
+ * kafka source connector
+ */
+public abstract class KafkaSourceAdaptorConnector extends AbstractKafkaSourceConnector {
+
+ /**
+ * Start the component
+ * @param config component context
+ */
+ @Override
+ public void start(KeyValue config) {
+ super.start(config);
+ sourceConnector.validate(taskConfig);
+ sourceConnector.initialize(new KafkaConnectorContext(connectorContext));
+ sourceConnector.start(taskConfig);
+ }
+
+ /**
+ * Stop the component.
+ */
+ @Override
+ public void stop() {
+ if (sourceConnector != null){
+ sourceConnector.stop();
+ }
+ super.stop();
+ }
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/KafkaOffsetStorageReader.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/KafkaOffsetStorageReader.java
new file mode 100644
index 0000000..ac8307b
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/KafkaOffsetStorageReader.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.connect.adaptor.context;
+
+import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * kafka offset storage reader
+ */
+public class KafkaOffsetStorageReader implements CloseableOffsetStorageReader {
+ SourceTaskContext context;
+
+ public KafkaOffsetStorageReader(SourceTaskContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public void close() {
+ this.context = null;
+ }
+
+ @Override
+ public <T> Map<String, Object> offset(Map<String, T> map) {
+ RecordPartition partition = new RecordPartition(map);
+ RecordOffset offset = context.offsetStorageReader().readOffset(partition);
+ return (Map<String, Object>) offset.getOffset();
+ }
+
+ @Override
+ public <T> Map<Map<String, T>, Map<String, Object>> offsets(Collection<Map<String, T>> collection) {
+ List<RecordPartition> partitions = new ArrayList<>();
+ collection.forEach(partitionMap -> {
+ RecordPartition partition = new RecordPartition(partitionMap);
+ partitions.add(partition);
+ });
+ Map<Map<String, T>, Map<String, Object>> offsetMap = new ConcurrentHashMap<>();
+ Map<RecordPartition, RecordOffset> offsets = context.offsetStorageReader().readOffsets(partitions);
+ offsets.forEach((partition, offset) -> {
+ Map<String, T> mapPartition = (Map<String, T>) partition.getPartition();
+ Map<String, Object> mapOffset = (Map<String, Object>) offset.getOffset();
+ offsetMap.put(mapPartition, mapOffset);
+ });
+ return offsetMap;
+ }
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaErrantRecordReporter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaErrantRecordReporter.java
new file mode 100644
index 0000000..3b951c6
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaErrantRecordReporter.java
@@ -0,0 +1,28 @@
+package org.apache.rocketmq.connect.kafka.connect.adaptor.context;
+
+import io.openmessaging.connector.api.component.task.sink.ErrorRecordReporter;
+import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.schema.Converters;
+
+import java.util.concurrent.Future;
+
+/**
+ * rocketmq kafka error record reporter
+ */
+public class RocketMQKafkaErrantRecordReporter implements ErrantRecordReporter {
+ final ErrorRecordReporter errorRecordReporter;
+ RocketMQKafkaErrantRecordReporter(SinkTaskContext context){
+ errorRecordReporter = context.errorRecordReporter();
+ }
+
+ @Override
+ public Future<Void> report(SinkRecord sinkRecord, Throwable throwable) {
+ // 数据转换
+ ConnectRecord record = Converters.fromSinkRecord(sinkRecord);
+ errorRecordReporter.report(record, throwable);
+ return null;
+ }
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaSinkTaskContext.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaSinkTaskContext.java
new file mode 100644
index 0000000..0fd48a8
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaSinkTaskContext.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.connect.adaptor.context;
+
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkTaskContext;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * rocketmq kafka sink task context
+ */
+public class RocketMQKafkaSinkTaskContext implements SinkTaskContext {
+ public static final String BROKER_NAME = "brokerName";
+ public static final String QUEUE_ID = "queueId";
+ public static final String TOPIC = "topic";
+ public static final String QUEUE_OFFSET = "queueOffset";
+
+ private final io.openmessaging.connector.api.component.task.sink.SinkTaskContext sinkContext;
+ private final Map<String, String> taskConfig;
+ private final ErrantRecordReporter errantRecordReporter;
+
+ public RocketMQKafkaSinkTaskContext(io.openmessaging.connector.api.component.task.sink.SinkTaskContext context, Map<String, String> taskConfig) {
+ this.sinkContext = context;
+ this.taskConfig = taskConfig;
+ this.errantRecordReporter = new RocketMQKafkaErrantRecordReporter(context);
+ }
+
+ @Override
+ public Map<String, String> configs() {
+ return taskConfig;
+ }
+
+ @Override
+ public void offset(Map<TopicPartition, Long> map) {
+ Map<RecordPartition, RecordOffset> offsets = new HashMap<>();
+ map.forEach((topicPartition, offset)->{
+ offsets.put(convertToRecordPartition(topicPartition), convertToRecordOffset(offset));
+ });
+ sinkContext.resetOffset(offsets);
+ }
+
+ @Override
+ public void offset(TopicPartition topicPartition, long l) {
+ sinkContext.resetOffset(convertToRecordPartition(topicPartition), convertToRecordOffset(l));
+ }
+
+ @Override
+ public void timeout(long l) {
+
+ }
+
+ @Override
+ public Set<TopicPartition> assignment() {
+ Set<TopicPartition> topicPartitions = new HashSet<>();
+ Set<RecordPartition> recordPartitions = sinkContext.assignment();
+ recordPartitions.forEach(partition -> {
+ topicPartitions.add(convertToTopicPartition(partition.getPartition()));
+ });
+ return topicPartitions;
+ }
+
+ @Override
+ public void pause(TopicPartition... topicPartitions) {
+ List<RecordPartition> partitions = new ArrayList<>();
+ for (TopicPartition topicPartition : topicPartitions){
+ RecordPartition recordPartition = convertToRecordPartition(topicPartition);
+ if (recordPartition != null){
+ partitions.add(recordPartition);
+ }
+ }
+ sinkContext.pause(partitions);
+ }
+
+ @Override
+ public void resume(TopicPartition... topicPartitions) {
+ List<RecordPartition> partitions = new ArrayList<>();
+ for (TopicPartition topicPartition : topicPartitions){
+ RecordPartition recordPartition = convertToRecordPartition(topicPartition);
+ if (recordPartition != null){
+ partitions.add(recordPartition);
+ }
+ }
+ sinkContext.resume(partitions);
+ }
+
+ @Override
+ public void requestCommit() {
+ }
+
+
+ @Override
+ public ErrantRecordReporter errantRecordReporter() {
+ return errantRecordReporter;
+ }
+
+
+ /**
+ * convert to kafka topic partition
+ * @param partitionMap
+ * @return
+ */
+ public TopicPartition convertToTopicPartition(Map<String, ?> partitionMap) {
+ if (partitionMap.containsKey(TOPIC) && partitionMap.containsKey(QUEUE_ID)){
+ return new TopicPartition(partitionMap.get(TOPIC).toString(), Integer.valueOf(partitionMap.get(QUEUE_ID).toString()));
+ }
+ return null;
+ }
+
+ /**
+ * convert to rocketmq record partition
+ * @param topicPartition
+ * @return
+ */
+ public RecordPartition convertToRecordPartition(TopicPartition topicPartition) {
+ if (topicPartition != null){
+ return new RecordPartition(Collections.singletonMap(topicPartition.topic(),topicPartition.partition()));
+ }
+ return null;
+ }
+
+ private RecordOffset convertToRecordOffset(long l) {
+ return new RecordOffset(Collections.singletonMap(QUEUE_OFFSET,l));
+ }
+
+
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaSourceTaskContext.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaSourceTaskContext.java
new file mode 100644
index 0000000..6716dcf
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaSourceTaskContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.connect.adaptor.context;
+
+import org.apache.kafka.connect.source.SourceTaskContext;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
+
+import java.util.Map;
+
+public class RocketMQKafkaSourceTaskContext implements SourceTaskContext {
+
+ private final OffsetStorageReader reader;
+ private final Map<String, String> taskConfig;
+
+ public RocketMQKafkaSourceTaskContext(OffsetStorageReader reader, Map<String, String> taskConfig) {
+ this.reader = reader;
+ this.taskConfig = taskConfig;
+ }
+
+ @Override
+ public Map<String, String> configs() {
+ return taskConfig;
+ }
+
+ @Override
+ public OffsetStorageReader offsetStorageReader() {
+ return reader;
+ }
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java
new file mode 100644
index 0000000..abc2abf
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.kafka.connect.adaptor.schema;
+
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.context.RocketMQKafkaSinkTaskContext;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * converter transforms record
+ */
+public class Converters {
+
+
+ public static ConnectRecord fromSourceRecord(SourceRecord record){
+ // sourceRecord convert connect Record
+ RocketMQSourceSchemaConverter rocketMQSourceSchemaConverter = new RocketMQSourceSchemaConverter(record.valueSchema());
+ io.openmessaging.connector.api.data.Schema schema = rocketMQSourceSchemaConverter.schema();
+ RocketMQSourceValueConverter rocketMQSourceValueConverter = new RocketMQSourceValueConverter();
+ ConnectRecord connectRecord = new ConnectRecord(
+ new RecordPartition(record.sourcePartition()),
+ new RecordOffset(record.sourceOffset()),
+ record.timestamp(),
+ schema,
+ rocketMQSourceValueConverter.value(schema, record.value()));
+ Iterator<Header> headers = record.headers().iterator();
+ while (headers.hasNext()) {
+ Header header = headers.next();
+ connectRecord.addExtension(header.key(), String.valueOf(header.value()));
+ }
+ return connectRecord;
+ }
+
+
+ public static ConnectRecord fromSinkRecord(SinkRecord record){
+ // sourceRecord convert connect Record
+ RocketMQSourceSchemaConverter rocketMQSourceSchemaConverter = new RocketMQSourceSchemaConverter(record.valueSchema());
+ io.openmessaging.connector.api.data.Schema schema = rocketMQSourceSchemaConverter.schema();
+ RocketMQSourceValueConverter rocketMQSourceValueConverter = new RocketMQSourceValueConverter();
+ ConnectRecord connectRecord = new ConnectRecord(
+ toRecordPartition(record),
+ toRecordOffset(record),
+ record.timestamp(),
+ schema,
+ rocketMQSourceValueConverter.value(schema, record.value()));
+ Iterator<Header> headers = record.headers().iterator();
+ while (headers.hasNext()) {
+ Header header = headers.next();
+ connectRecord.addExtension(header.key(), String.valueOf(header.value()));
+ }
+ return connectRecord;
+ }
+
+
+
+ /**
+ * convert rocketmq connect record to sink record
+ * @param record
+ * @return
+ */
+ public static SinkRecord fromConnectRecord(ConnectRecord record){
+ // connect record convert kafka sink record
+ KafkaSinkSchemaConverter kafkaSinkSchemaConverter = new KafkaSinkSchemaConverter(record.getSchema());
+ Schema schema = kafkaSinkSchemaConverter.schema();
+ KafkaSinkValueConverter sinkValueConverter = new KafkaSinkValueConverter();
+ // add headers
+ Headers headers = new ConnectHeaders();
+ Iterator extensions = record.getExtensions().keySet().iterator();
+ while (extensions.hasNext()) {
+ String key = String.valueOf(extensions.next());
+ headers.add(key, record.getExtensions().getString(key) , null);
+ }
+
+ SinkRecord sinkRecord = new SinkRecord(
+ topic(record.getPosition().getPartition()),
+ partition(record.getPosition().getPartition()),
+ null,
+ null,
+ schema,
+ sinkValueConverter.value(schema, record.getData()),
+ offset(record.getPosition().getOffset()),
+ record.getTimestamp(),
+ TimestampType.NO_TIMESTAMP_TYPE,
+ headers
+ );
+ return sinkRecord;
+ }
+
+ public static RecordPartition toRecordPartition(SinkRecord record){
+
+ Map<String, String> recordPartitionMap = new HashMap<>();
+ recordPartitionMap.put(RocketMQKafkaSinkTaskContext.TOPIC, record.topic());
+ recordPartitionMap.put(RocketMQKafkaSinkTaskContext.QUEUE_ID, record.kafkaPartition() + "");
+ return new RecordPartition(recordPartitionMap);
+ }
+
+ public static RecordOffset toRecordOffset(SinkRecord record){
+ Map<String, String> recordOffsetMap = new HashMap<>();
+ recordOffsetMap.put(RocketMQKafkaSinkTaskContext.QUEUE_OFFSET, record.kafkaOffset() + "");
+ return new RecordOffset(recordOffsetMap);
+ }
+
+
+ /**
+ * get topic
+ * @param partition
+ * @return
+ */
+ public static String topic(RecordPartition partition){
+ return partition.getPartition().get(RocketMQKafkaSinkTaskContext.TOPIC).toString();
+ }
+
+ /**
+ * get partition
+ * @param partition
+ * @return
+ */
+ public static int partition(RecordPartition partition){
+ if (partition.getPartition().containsKey(RocketMQKafkaSinkTaskContext.QUEUE_ID)){
+ return Integer.valueOf(partition.getPartition().get(RocketMQKafkaSinkTaskContext.QUEUE_ID).toString());
+ }
+ return -1;
+ }
+ /**
+ * get offset
+ * @param offset
+ * @return
+ */
+ public static int offset(RecordOffset offset){
+ if (offset.getOffset().containsKey(RocketMQKafkaSinkTaskContext.QUEUE_OFFSET)){
+ return Integer.valueOf(offset.getOffset().get(RocketMQKafkaSinkTaskContext.QUEUE_OFFSET).toString());
+ }
+ return -1;
+ }
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkSchemaConverter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkSchemaConverter.java
new file mode 100644
index 0000000..9e8a188
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkSchemaConverter.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.kafka.connect.adaptor.schema;
+
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.logical.Timestamp;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.kafka.connect.data.Date;
+import org.apache.kafka.connect.data.Decimal;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * convert rocketmq connect record data to kafka sink record data
+ */
+public class KafkaSinkSchemaConverter {
+ private static Logger logger = LoggerFactory.getLogger(KafkaSinkSchemaConverter.class);
+ private static Map<String, String> logicalMapping = new HashMap<>();
+ static {
+ logicalMapping.put(io.openmessaging.connector.api.data.logical.Decimal.LOGICAL_NAME,Decimal.LOGICAL_NAME);
+ logicalMapping.put(io.openmessaging.connector.api.data.logical.Date.LOGICAL_NAME, Date.LOGICAL_NAME);
+ logicalMapping.put(io.openmessaging.connector.api.data.logical.Time.LOGICAL_NAME, Time.LOGICAL_NAME);
+ logicalMapping.put(io.openmessaging.connector.api.data.logical.Timestamp.LOGICAL_NAME, Timestamp.LOGICAL_NAME);
+ }
+
+ private SchemaBuilder builder;
+ public KafkaSinkSchemaConverter(Schema schema) {
+ builder = convertKafkaSchema(schema);
+ }
+
+ public org.apache.kafka.connect.data.Schema schema() {
+ return builder.build();
+ }
+
+ /**
+ * convert kafka schema
+ * @param originalSchema
+ * @return
+ */
+ private SchemaBuilder convertKafkaSchema(io.openmessaging.connector.api.data.Schema originalSchema) {
+ String schemaName = convertSchemaName(originalSchema.getName());
+ switch (originalSchema.getFieldType()) {
+ case INT8:
+ return SchemaBuilder
+ .int8()
+ .optional()
+ .name(schemaName)
+ .doc(originalSchema.getDoc())
+ .defaultValue(originalSchema.getDefaultValue());
+ case INT16:
+ return SchemaBuilder
+ .int16()
+ .optional()
+ .name(schemaName)
+ .doc(originalSchema.getDoc())
+ .defaultValue(originalSchema.getDefaultValue());
+ case INT32:
+ return SchemaBuilder
+ .int32()
+ .optional()
+ .name(schemaName)
+ .doc(originalSchema.getDoc())
+ .defaultValue(originalSchema.getDefaultValue());
+ case INT64:
+ return SchemaBuilder
+ .int64()
+ .optional()
+ .name(schemaName)
+ .doc(originalSchema.getDoc())
+ .defaultValue(originalSchema.getDefaultValue());
+ case FLOAT32:
+ return SchemaBuilder
+ .float32()
+ .optional()
+ .name(schemaName)
+ .doc(originalSchema.getDoc())
+ .defaultValue(originalSchema.getDefaultValue());
+ case FLOAT64:
+ return SchemaBuilder
+ .float64()
+ .optional()
+ .name(schemaName)
+ .doc(originalSchema.getDoc())
+ .defaultValue(originalSchema.getDefaultValue());
+ case BOOLEAN:
+ return SchemaBuilder
+ .bool()
+ .optional()
+ .name(schemaName)
+ .doc(originalSchema.getDoc())
+ .defaultValue(originalSchema.getDefaultValue());
+ case STRING:
+ return SchemaBuilder.
+ string()
+ .optional()
+ .name(schemaName)
+ .doc(originalSchema.getDoc())
+ .defaultValue(originalSchema.getDefaultValue());
+ case BYTES:
+ return SchemaBuilder
+ .bytes()
+ .optional()
+ .name(schemaName)
+ .doc(originalSchema.getDoc())
+ .defaultValue(originalSchema.getDefaultValue());
+ case STRUCT:
+ SchemaBuilder schemaBuilder = SchemaBuilder
+ .struct()
+ .optional()
+ .name(schemaName)
+ .doc(originalSchema.getDoc())
+ .defaultValue(originalSchema.getDefaultValue());
+ convertStructSchema(schemaBuilder, originalSchema);
+ return schemaBuilder;
+ case ARRAY:
+ return SchemaBuilder.array(convertKafkaSchema(originalSchema.getValueSchema()).build())
+ .optional()
+ .name(schemaName)
+ .doc(originalSchema.getDoc())
+ .defaultValue(originalSchema.getDefaultValue());
+ case MAP:
+ return SchemaBuilder.map(
+ convertKafkaSchema(originalSchema.getKeySchema()).build(),
+ convertKafkaSchema(originalSchema.getValueSchema()).build()
+ ).optional()
+ .name(schemaName)
+ .doc(originalSchema.getDoc())
+ .defaultValue(originalSchema.getDefaultValue());
+ default:
+ throw new RuntimeException(" Type not supported: {}" + originalSchema.getFieldType());
+
+ }
+
+ }
+
+ /**
+ * convert schema
+ *
+ * @param schemaBuilder
+ * @param originalSchema
+ */
+ private void convertStructSchema(org.apache.kafka.connect.data.SchemaBuilder schemaBuilder, io.openmessaging.connector.api.data.Schema originalSchema) {
+ for (Field field : originalSchema.getFields()) {
+ try {
+
+ // schema
+ Schema schema = field.getSchema();
+ String schemaName = convertSchemaName(field.getSchema().getName());
+
+ // field name
+ String fieldName = field.getName();
+ FieldType type = schema.getFieldType();
+
+
+ switch (type) {
+ case INT8:
+ schemaBuilder.field(
+ fieldName,
+ SchemaBuilder
+ .int8()
+ .name(schemaName)
+ .doc(schema.getDoc())
+ .defaultValue(schema.getDefaultValue())
+ .optional()
+ .build()
+ );
+ break;
+ case INT16:
+ schemaBuilder.field(
+ fieldName,
+ SchemaBuilder
+ .int16()
+ .name(schemaName)
+ .doc(schema.getDoc())
+ .defaultValue(schema.getDefaultValue())
+ .optional()
+ .build()
+ );
+ break;
+ case INT32:
+ schemaBuilder.field(
+ fieldName,
+ SchemaBuilder
+ .int32()
+ .name(schemaName)
+ .doc(schema.getDoc())
+ .defaultValue(schema.getDefaultValue())
+ .optional()
+ .build()
+ );
+ break;
+ case INT64:
+ schemaBuilder.field(
+ fieldName,
+ SchemaBuilder
+ .int64()
+ .name(schemaName)
+ .doc(schema.getDoc())
+ .defaultValue(schema.getDefaultValue())
+ .optional()
+ .build()
+ );
+ break;
+ case FLOAT32:
+ schemaBuilder.field(
+ fieldName,
+ SchemaBuilder
+ .float32()
+ .name(schemaName)
+ .doc(schema.getDoc())
+ .defaultValue(schema.getDefaultValue())
+ .optional()
+ .build()
+ );
+ break;
+ case FLOAT64:
+ schemaBuilder.field(
+ fieldName,
+ SchemaBuilder
+ .float64()
+ .name(schemaName)
+ .doc(schema.getDoc())
+ .defaultValue(schema.getDefaultValue())
+ .optional()
+ .build()
+ );
+ break;
+ case BOOLEAN:
+ schemaBuilder.field(
+ fieldName,
+ SchemaBuilder
+ .bool()
+ .name(schemaName)
+ .doc(schema.getDoc())
+ .defaultValue(schema.getDefaultValue())
+ .optional()
+ .build()
+ );
+ break;
+ case STRING:
+ schemaBuilder.field(
+ fieldName,
+ SchemaBuilder
+ .string()
+ .name(schemaName)
+ .doc(schema.getDoc())
+ .defaultValue(schema.getDefaultValue())
+ .optional()
+ .build()
+ );
+ break;
+ case BYTES:
+ schemaBuilder.field(
+ fieldName,
+ SchemaBuilder
+ .bytes()
+ .name(schemaName)
+ .doc(schema.getDoc())
+ .defaultValue(schema.getDefaultValue())
+ .optional()
+ .build()
+ );
+ break;
+ case STRUCT:
+ case ARRAY:
+ case MAP:
+ schemaBuilder.field(
+ fieldName,
+ convertKafkaSchema(field.getSchema()).build()
+ );
+ break;
+ default:
+ break;
+ }
+ } catch (Exception ex) {
+ logger.error("Convert schema failure! ex {}", ex);
+ throw new ConnectException(ex);
+ }
+ }
+ }
+
+
+ private String convertSchemaName(String schemaName){
+ if (logicalMapping.containsKey(schemaName)){
+ return logicalMapping.get(schemaName);
+ }
+ return schemaName;
+ }
+
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkValueConverter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkValueConverter.java
new file mode 100644
index 0000000..0cbc2f9
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkValueConverter.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.kafka.connect.adaptor.schema;
+
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * convert rocketmq connect record to kafka sink record
+ */
+public class KafkaSinkValueConverter {
+
+ private static Logger logger = LoggerFactory.getLogger(KafkaSinkValueConverter.class);
+
+ public Object value(Schema schema,Object data) {
+ return convertKafkaValue(schema, data);
+ }
+
+ /**
+ * convert value
+ * @param targetSchema
+ * @param originalValue
+ * @return
+ */
+ private Object convertKafkaValue(Schema targetSchema, Object originalValue) {
+ switch (targetSchema.type()) {
+ case INT8:
+ case INT16:
+ case INT32:
+ case INT64:
+ case FLOAT32:
+ case FLOAT64:
+ case BOOLEAN:
+ case STRING:
+ case BYTES:
+ return originalValue;
+ case STRUCT:
+ Struct toStruct = new Struct(targetSchema);
+ if (originalValue != null) {
+ convertStructValue(toStruct, (org.apache.kafka.connect.data.Struct) originalValue);
+ }
+ return toStruct;
+ case ARRAY:
+ List<Object> array = (List<Object>) originalValue;
+ List<Object> newArray = new ArrayList<>();
+ array.forEach(item -> {
+ newArray.add(convertKafkaValue(targetSchema.valueSchema(), item));
+ });
+ return newArray;
+ case MAP:
+ Map mapData = (Map) originalValue;
+ Map newMapData = new ConcurrentHashMap();
+ mapData.forEach((k, v) -> {
+ newMapData.put(
+ convertKafkaValue(targetSchema.keySchema(), k),
+ convertKafkaValue(targetSchema.valueSchema(), v)
+ );
+ });
+ return newMapData;
+ default:
+ throw new RuntimeException(" Type not supported: {}" + targetSchema.type());
+
+ }
+
+ }
+
+ /**
+ * convert struct value
+ *
+ * @param toStruct
+ * @param originalStruct
+ */
+ private void convertStructValue(Struct toStruct, org.apache.kafka.connect.data.Struct originalStruct) {
+
+ for (Field field : toStruct.schema().fields()) {
+ try {
+ Schema.Type type = field.schema().type();
+ Object value = originalStruct.get(field.name());
+ switch (type) {
+ case INT8:
+ case INT16:
+ case INT32:
+ case INT64:
+ case FLOAT32:
+ case FLOAT64:
+ case BOOLEAN:
+ case STRING:
+ case BYTES:
+ toStruct.put(field.name(), value);
+ break;
+ case STRUCT:
+ case ARRAY:
+ case MAP:
+ toStruct.put(
+ field.name(),
+ convertKafkaValue(
+ toStruct.schema().field(field.name()).schema(),
+ value
+ )
+ );
+ break;
+ }
+ } catch (Exception ex) {
+ logger.error("Convert to kafka schema failure, {}", ex);
+ throw new ConnectException(ex);
+ }
+ }
+ }
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceSchemaConverter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceSchemaConverter.java
new file mode 100644
index 0000000..b357fd7
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceSchemaConverter.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.connect.adaptor.schema;
+
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.logical.Timestamp;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.kafka.connect.data.Date;
+import org.apache.kafka.connect.data.Decimal;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Time;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * schema converter
+ */
+public class RocketMQSourceSchemaConverter {
+ private static Logger logger = LoggerFactory.getLogger(RocketMQSourceSchemaConverter.class);
+ private static Map<String, String> logicalMapping = new HashMap<>();
+ static {
+ logicalMapping.put(Decimal.LOGICAL_NAME, io.openmessaging.connector.api.data.logical.Decimal.LOGICAL_NAME);
+ logicalMapping.put(Date.LOGICAL_NAME, io.openmessaging.connector.api.data.logical.Date.LOGICAL_NAME);
+ logicalMapping.put(Time.LOGICAL_NAME, io.openmessaging.connector.api.data.logical.Time.LOGICAL_NAME);
+ logicalMapping.put(Timestamp.LOGICAL_NAME, io.openmessaging.connector.api.data.logical.Timestamp.LOGICAL_NAME);
+ }
+
+ private SchemaBuilder builder;
+
+ public RocketMQSourceSchemaConverter(Schema schema) {
+ builder = convertKafkaSchema(schema);
+ }
+
+ public io.openmessaging.connector.api.data.Schema schema() {
+ return builder.build();
+ }
+
+ private SchemaBuilder convertKafkaSchema(org.apache.kafka.connect.data.Schema originalSchema) {
+ String schemaName = convertSchemaName(originalSchema.name());
+ switch (originalSchema.type()) {
+ case INT8:
+ return SchemaBuilder
+ .int8()
+ .optional()
+ .name(schemaName)
+ .doc(originalSchema.doc())
+ .defaultValue(originalSchema.defaultValue());
+ case INT16:
+ return SchemaBuilder
+ .int16()
+ .optional()
+ .name(schemaName)
+ .doc(originalSchema.doc())
+ .defaultValue(originalSchema.defaultValue());
+ case INT32:
+ return SchemaBuilder
+ .int32()
+ .optional()
+ .name(schemaName)
+ .doc(originalSchema.doc())
+ .defaultValue(originalSchema.defaultValue());
+ case INT64:
+ return SchemaBuilder
+ .int64()
+ .optional()
+ .name(schemaName)
+ .doc(originalSchema.doc())
+ .defaultValue(originalSchema.defaultValue());
+ case FLOAT32:
+ return SchemaBuilder
+ .float32()
+ .optional()
+ .name(schemaName)
+ .doc(originalSchema.doc())
+ .defaultValue(originalSchema.defaultValue());
+ case FLOAT64:
+ return SchemaBuilder
+ .float64()
+ .optional()
+ .name(schemaName)
+ .doc(originalSchema.doc())
+ .defaultValue(originalSchema.defaultValue());
+ case BOOLEAN:
+ return SchemaBuilder
+ .bool()
+ .optional()
+ .name(schemaName)
+ .doc(originalSchema.doc())
+ .defaultValue(originalSchema.defaultValue());
+ case STRING:
+ return SchemaBuilder.
+ string()
+ .optional()
+ .name(schemaName)
+ .doc(originalSchema.doc())
+ .defaultValue(originalSchema.defaultValue());
+ case BYTES:
+ return SchemaBuilder
+ .bytes()
+ .optional()
+ .name(schemaName)
+ .doc(originalSchema.doc())
+ .defaultValue(originalSchema.defaultValue());
+ case STRUCT:
+ SchemaBuilder schemaBuilder = SchemaBuilder
+ .struct()
+ .optional()
+ .name(schemaName)
+ .doc(originalSchema.doc())
+ .defaultValue(originalSchema.defaultValue());
+ convertStructSchema(schemaBuilder, originalSchema);
+ return schemaBuilder;
+ case ARRAY:
+ return SchemaBuilder.array(convertKafkaSchema(originalSchema.valueSchema()).build())
+ .optional()
+ .name(schemaName)
+ .doc(originalSchema.doc())
+ .defaultValue(originalSchema.defaultValue());
+ case MAP:
+ return SchemaBuilder.map(
+ convertKafkaSchema(originalSchema.keySchema()).build(),
+ convertKafkaSchema(originalSchema.valueSchema()).build()
+ ).optional()
+ .name(schemaName)
+ .doc(originalSchema.doc())
+ .defaultValue(originalSchema.defaultValue());
+ default:
+ throw new RuntimeException(" Type not supported: {}" + originalSchema.type());
+
+ }
+
+ }
+
+ /**
+ * convert schema
+ *
+ * @param schemaBuilder
+ * @param originalSchema
+ */
+ private void convertStructSchema(io.openmessaging.connector.api.data.SchemaBuilder schemaBuilder, org.apache.kafka.connect.data.Schema originalSchema) {
+ for (Field field : originalSchema.fields()) {
+ try {
+ Schema schema = field.schema();
+ org.apache.kafka.connect.data.Schema.Type type = schema.type();
+ String schemaName = convertSchemaName(field.schema().name());
+ switch (type) {
+ case INT8:
+ schemaBuilder.field(
+ field.name(),
+ SchemaBuilder
+ .int8()
+ .name(schemaName)
+ .doc(schema.doc())
+ .defaultValue(schema.defaultValue())
+ .optional()
+ .build()
+ );
+ break;
+ case INT16:
+ schemaBuilder.field(
+ field.name(),
+ SchemaBuilder
+ .int16()
+ .name(schemaName)
+ .doc(schema.doc())
+ .defaultValue(schema.defaultValue())
+ .optional()
+ .build()
+ );
+ break;
+ case INT32:
+ schemaBuilder.field(
+ field.name(),
+ SchemaBuilder
+ .int32()
+ .name(schemaName)
+ .doc(schema.doc())
+ .defaultValue(schema.defaultValue())
+ .optional()
+ .build()
+ );
+ break;
+ case INT64:
+ schemaBuilder.field(
+ field.name(),
+ SchemaBuilder
+ .int64()
+ .name(schemaName)
+ .doc(schema.doc())
+ .defaultValue(schema.defaultValue())
+ .optional()
+ .build()
+ );
+ break;
+ case FLOAT32:
+ schemaBuilder.field(
+ field.name(),
+ SchemaBuilder
+ .float32()
+ .name(schemaName)
+ .doc(schema.doc())
+ .defaultValue(schema.defaultValue())
+ .optional()
+ .build()
+ );
+ break;
+ case FLOAT64:
+ schemaBuilder.field(
+ field.name(),
+ SchemaBuilder
+ .float64()
+ .name(schemaName)
+ .doc(schema.doc())
+ .defaultValue(schema.defaultValue())
+ .optional()
+ .build()
+ );
+ break;
+ case BOOLEAN:
+ schemaBuilder.field(
+ field.name(),
+ SchemaBuilder
+ .bool()
+ .name(schemaName)
+ .doc(schema.doc())
+ .defaultValue(schema.defaultValue())
+ .optional()
+ .build()
+ );
+ break;
+ case STRING:
+ schemaBuilder.field(
+ field.name(),
+ SchemaBuilder
+ .string()
+ .name(schemaName)
+ .doc(schema.doc())
+ .defaultValue(schema.defaultValue())
+ .optional()
+ .build()
+ );
+ break;
+ case BYTES:
+ schemaBuilder.field(
+ field.name(),
+ SchemaBuilder
+ .bytes()
+ .name(schemaName)
+ .doc(schema.doc())
+ .defaultValue(schema.defaultValue())
+ .optional()
+ .build()
+ );
+ break;
+ case STRUCT:
+ case ARRAY:
+ case MAP:
+ schemaBuilder.field(
+ field.name(),
+ convertKafkaSchema(field.schema()).build()
+ );
+ break;
+ default:
+ break;
+ }
+ } catch (Exception ex) {
+ logger.error("Convert schema failure! ex {}", ex);
+ throw new ConnectException(ex);
+ }
+ }
+ }
+
+
+ private String convertSchemaName(String schemaName){
+ if (logicalMapping.containsKey(schemaName)){
+ return logicalMapping.get(schemaName);
+ }
+ return schemaName;
+ }
+
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceValueConverter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceValueConverter.java
new file mode 100644
index 0000000..8801efb
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceValueConverter.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.connect.adaptor.schema;
+
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * value converter
+ */
+public class RocketMQSourceValueConverter {
+ private static Logger logger = LoggerFactory.getLogger(RocketMQSourceValueConverter.class);
+
+ public Object value(Schema schema, Object value) {
+ return convertKafkaValue(schema, value);
+ }
+
+ /**
+ * convert value
+ *
+ * @param targetSchema
+ * @param originalValue
+ * @return
+ */
+ private Object convertKafkaValue(Schema targetSchema, Object originalValue) {
+ switch (targetSchema.getFieldType()) {
+ case INT8:
+ case INT16:
+ case INT32:
+ case INT64:
+ case FLOAT32:
+ case FLOAT64:
+ case BOOLEAN:
+ case STRING:
+ case BYTES:
+ return originalValue;
+ case STRUCT:
+ Struct toStruct = new Struct(targetSchema);
+ if (originalValue != null) {
+ convertStructValue(toStruct, (org.apache.kafka.connect.data.Struct) originalValue);
+ }
+ return toStruct;
+ case ARRAY:
+ List<Object> array = (List<Object>) originalValue;
+ List<Object> newArray = new ArrayList<>();
+ array.forEach(item -> {
+ newArray.add(convertKafkaValue(targetSchema.getValueSchema(), item));
+ });
+ return newArray;
+ case MAP:
+ Map mapData = (Map) originalValue;
+ Map newMapData = new ConcurrentHashMap();
+ mapData.forEach((k, v) -> {
+ newMapData.put(
+ convertKafkaValue(targetSchema.getKeySchema(), k),
+ convertKafkaValue(targetSchema.getValueSchema(), v)
+ );
+ });
+ return newMapData;
+ default:
+ throw new RuntimeException(" Type not supported: {}" + targetSchema.getFieldType());
+
+ }
+
+ }
+
+ /**
+ * convert struct value
+ *
+ * @param toStruct
+ * @param originalStruct
+ */
+ private void convertStructValue(Struct toStruct, org.apache.kafka.connect.data.Struct originalStruct) {
+
+ for (Field field : toStruct.schema().getFields()) {
+ try {
+ FieldType type = field.getSchema().getFieldType();
+ Object value = originalStruct.get(field.getName());
+ switch (type) {
+ case INT8:
+ case INT16:
+ case INT32:
+ case INT64:
+ case FLOAT32:
+ case FLOAT64:
+ case BOOLEAN:
+ case STRING:
+ case BYTES:
+ toStruct.put(field.getName(), value);
+ break;
+ case STRUCT:
+ case ARRAY:
+ case MAP:
+ toStruct.put(
+ field.getName(),
+ convertKafkaValue(
+ toStruct.schema().getField(field.getName()).getSchema(),
+ value
+ )
+ );
+ break;
+ }
+ } catch (Exception ex) {
+ logger.error("Convert schema failure! ex {}", ex);
+ throw new ConnectException(ex);
+ }
+ }
+ }
+
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/AbstractKafkaConnectSink.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/AbstractKafkaConnectSink.java
new file mode 100644
index 0000000..8677db9
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/AbstractKafkaConnectSink.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.kafka.connect.adaptor.task;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTaskContext;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.config.ConnectKeyValue;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.context.RocketMQKafkaSinkTaskContext;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.transforms.TransformationWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * abstract kafka connect sink
+ */
+public abstract class AbstractKafkaConnectSink extends SinkTask implements TaskClassSetter{
+
+ private static final Logger log = LoggerFactory.getLogger(AbstractKafkaConnectSink.class);
+
+ private SinkTaskContext kafkaSinkTaskContext;
+ private org.apache.kafka.connect.sink.SinkTask sinkTask;
+
+ protected TransformationWrapper transformationWrapper;
+
+ /**
+ * kafka connect init
+ */
+ protected ConnectKeyValue configValue;
+
+ /**
+ * convert by kafka sink transform
+ * @param record
+ */
+ protected abstract SinkRecord transforms(SinkRecord record);
+
+ /**
+ * convert ConnectRecord to SinkRecord
+ * @param record
+ * @return
+ */
+ public abstract SinkRecord processSinkRecord(ConnectRecord record);
+
+
+ /**
+ * Put the records to the sink
+ * @param records sink records
+ */
+ @Override
+ public void put(List<ConnectRecord> records) {
+ // convert sink data
+ List<SinkRecord> sinkRecords = new ArrayList<>();
+ records.forEach(connectRecord -> {
+ SinkRecord record = this.processSinkRecord(connectRecord);
+ sinkRecords.add(this.transforms(record));
+ });
+ sinkTask.put(sinkRecords);
+ }
+
+
+ @Override
+ public void validate(KeyValue keyValue) {
+ }
+
+ @Override
+ public void start(KeyValue keyValue) {
+ this.configValue = new ConnectKeyValue();
+ keyValue.keySet().forEach(key -> {
+ this.configValue.put(key, keyValue.getString(key));
+ });
+
+ setTaskClass(configValue);
+
+ Map<String, String> taskConfig = new HashMap<>(configValue.config());
+ // get the source class name from config and create source task from reflection
+ try {
+ sinkTask = Class.forName(taskConfig.get(TaskConfig.TASK_CLASS_CONFIG))
+ .asSubclass(org.apache.kafka.connect.sink.SinkTask.class)
+ .getDeclaredConstructor()
+ .newInstance();
+ } catch (Exception e) {
+ throw new ConnectException("Load task class failed, " + taskConfig.get(TaskConfig.TASK_CLASS_CONFIG));
+ }
+
+ kafkaSinkTaskContext = new RocketMQKafkaSinkTaskContext(sinkTaskContext, taskConfig);
+ sinkTask.initialize(kafkaSinkTaskContext);
+ sinkTask.start(taskConfig);
+ transformationWrapper = new TransformationWrapper(taskConfig);
+ }
+
+
+ @Override
+ public void stop() {
+ if (sinkTask != null) {
+ sinkTask.stop();
+ sinkTask = null;
+ }
+ }
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/AbstractKafkaConnectSource.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/AbstractKafkaConnectSource.java
new file mode 100644
index 0000000..f062b3d
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/AbstractKafkaConnectSource.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.kafka.connect.adaptor.task;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTaskContext;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.config.ConnectKeyValue;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.context.KafkaOffsetStorageReader;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.context.RocketMQKafkaSourceTaskContext;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.schema.Converters;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.transforms.TransformationWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * abstract kafka connect
+ */
+public abstract class AbstractKafkaConnectSource extends SourceTask implements TaskClassSetter{
+
+ private static final Logger log = LoggerFactory.getLogger(AbstractKafkaConnectSource.class);
+
+ protected TransformationWrapper transformationWrapper;
+ private SourceTaskContext kafkaSourceTaskContext;
+ private org.apache.kafka.connect.source.SourceTask sourceTask;
+ private OffsetStorageReader offsetReader;
+
+ /**
+ * kafka connect init
+ */
+ protected ConnectKeyValue configValue;
+
+ @Override
+ public List<ConnectRecord> poll() throws InterruptedException {
+ List<SourceRecord> recordList = sourceTask.poll();
+ if (recordList == null || recordList.isEmpty()) {
+ Thread.sleep(1000);
+ }
+ List<ConnectRecord> records = new ArrayList<>();
+ for (SourceRecord sourceRecord : recordList) {
+ // transforms
+ SourceRecord transformRecord = transforms(sourceRecord);
+ ConnectRecord processRecord = Converters.fromSourceRecord(transformRecord);
+ if (processRecord != null) {
+ records.add(processRecord);
+ }
+ }
+ return records;
+ }
+
+ /**
+ * convert transform
+ * @param sourceRecord
+ */
+ protected abstract SourceRecord transforms(SourceRecord sourceRecord);
+
+ /**
+ * process source record
+ *
+ * @param next
+ * @return
+ */
+ public abstract ConnectRecord processSourceRecord(SourceRecord next);
+
+
+ @Override
+ public void validate(KeyValue keyValue) {
+ }
+
+ @Override
+ public void start(KeyValue keyValue) {
+ this.configValue = new ConnectKeyValue();
+ keyValue.keySet().forEach(key -> {
+ this.configValue.put(key, keyValue.getString(key));
+ });
+
+ setTaskClass(configValue);
+ Map<String, String> taskConfig = new HashMap<>(configValue.config());
+
+ // get the source class name from config and create source task from reflection
+ try {
+ sourceTask = Class.forName(taskConfig.get(TaskConfig.TASK_CLASS_CONFIG))
+ .asSubclass(org.apache.kafka.connect.source.SourceTask.class)
+ .getDeclaredConstructor()
+ .newInstance();
+ } catch (Exception e) {
+ throw new ConnectException("Load task class failed, " + taskConfig.get(TaskConfig.TASK_CLASS_CONFIG));
+ }
+
+ offsetReader = new KafkaOffsetStorageReader(
+ sourceTaskContext
+ );
+
+ kafkaSourceTaskContext = new RocketMQKafkaSourceTaskContext(offsetReader, taskConfig);
+ sourceTask.initialize(kafkaSourceTaskContext);
+ sourceTask.start(taskConfig);
+ transformationWrapper = new TransformationWrapper(taskConfig);
+ }
+
+
+ @Override
+ public void stop() {
+ if (sourceTask != null) {
+ sourceTask.stop();
+ sourceTask = null;
+ }
+ }
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/KafkaConnectAdaptorSink.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/KafkaConnectAdaptorSink.java
new file mode 100644
index 0000000..439f733
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/KafkaConnectAdaptorSink.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.connect.adaptor.task;
+
+import io.openmessaging.connector.api.data.ConnectRecord;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.schema.Converters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * kafka connect adaptor sink
+ */
+public abstract class KafkaConnectAdaptorSink extends AbstractKafkaConnectSink {
+ private static final Logger log = LoggerFactory.getLogger(KafkaConnectAdaptorSink.class);
+
+ /**
+ * convert by kafka sink transform
+ *
+ * @param record
+ */
+ @Override
+ protected SinkRecord transforms(SinkRecord record) {
+ List<Transformation> transformations = transformationWrapper.transformations();
+ Iterator transformationIterator = transformations.iterator();
+ while (transformationIterator.hasNext()) {
+ Transformation<SinkRecord> transformation = (Transformation) transformationIterator.next();
+ log.trace("applying transformation {} to {}", transformation.getClass().getName(), record);
+ record = transformation.apply(record);
+ if (record == null) {
+ break;
+ }
+ }
+ return record;
+ }
+
+ /**
+ * convert ConnectRecord to SinkRecord
+ * @param record
+ * @return
+ */
+ @Override
+ public SinkRecord processSinkRecord(ConnectRecord record) {
+ SinkRecord sinkRecord = Converters.fromConnectRecord(record);
+ return transforms(sinkRecord);
+ }
+
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/KafkaConnectAdaptorSource.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/KafkaConnectAdaptorSource.java
new file mode 100644
index 0000000..06140cf
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/KafkaConnectAdaptorSource.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.connect.adaptor.task;
+
+import io.openmessaging.connector.api.data.ConnectRecord;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.schema.Converters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.List;
+
+
+/**
+ * kafka connect adaptor source
+ */
+public abstract class KafkaConnectAdaptorSource extends AbstractKafkaConnectSource {
+ private static final Logger log = LoggerFactory.getLogger(KafkaConnectAdaptorSource.class);
+
+ /**
+ * convert transform
+ *
+ * @param record
+ */
+ @Override
+ protected SourceRecord transforms(SourceRecord record) {
+ List<Transformation> transformations = transformationWrapper.transformations();
+ Iterator transformationIterator = transformations.iterator();
+ while (transformationIterator.hasNext()) {
+ Transformation<SourceRecord> transformation = (Transformation) transformationIterator.next();
+ log.trace("applying transformation {} to {}", transformation.getClass().getName(), record);
+ record = transformation.apply(record);
+ if (record == null) {
+ break;
+ }
+ }
+ return record;
+ }
+
+ /**
+ * process source record
+ *
+ * @param record
+ * @return
+ */
+ @Override
+ public ConnectRecord processSourceRecord(SourceRecord record) {
+ record = this.transforms(record);
+ ConnectRecord connectRecord = Converters.fromSourceRecord(record);
+ return connectRecord;
+ }
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/TaskClassSetter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/TaskClassSetter.java
new file mode 100644
index 0000000..64c58c4
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/TaskClassSetter.java
@@ -0,0 +1,23 @@
+package org.apache.rocketmq.connect.kafka.connect.adaptor.task;
+
+import io.openmessaging.KeyValue;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.TaskConfig;
+
+/**
+ * task class setter
+ */
+public interface TaskClassSetter {
+ /**
+ * set connector class
+ * @param config
+ */
+ default void setTaskClass(KeyValue config) {
+ config.put(TaskConfig.TASK_CLASS_CONFIG, getTaskClass());
+ }
+
+ /**
+ * get connector class
+ */
+ String getTaskClass();
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/transforms/TransformationWrapper.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/transforms/TransformationWrapper.java
new file mode 100644
index 0000000..c0700a6
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/transforms/TransformationWrapper.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.connect.adaptor.transforms;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * transformation utils
+ */
+public class TransformationWrapper {
+
+ private static final String DEFAULT_TRANSFORM = "kafka.transforms";
+ private Map<String, String> props;
+
+ public TransformationWrapper(Map<String, String> props) {
+ this.props = props;
+ }
+
+ public List<Transformation> transformations() {
+ if (!props.containsKey(DEFAULT_TRANSFORM)) {
+ return Collections.emptyList();
+ }
+ List<String> transformAliases = Arrays.asList(props.get(DEFAULT_TRANSFORM).split(","));
+ List<Transformation> transformations = new ArrayList(transformAliases.size());
+ Iterator transformIterator = transformAliases.iterator();
+ while (transformIterator.hasNext()) {
+ String alias = (String) transformIterator.next();
+ String prefix = DEFAULT_TRANSFORM + "." + alias + ".";
+ try {
+ Transformation<SourceRecord> transformation = (Transformation) Utils.newInstance(getClass(prefix + "type"), Transformation.class);
+ Map<String, Object> configs = originalsWithPrefix(prefix);
+ transformation.configure(configs);
+ transformations.add(transformation);
+ } catch (Exception var12) {
+ throw new ConnectException(var12);
+ }
+ }
+ return transformations;
+ }
+
+ public Map<String, Object> originalsWithPrefix(String prefix) {
+ Map<String, Object> result = new ConcurrentHashMap<>();
+
+ Iterator entryIterator = props.entrySet().iterator();
+ while (entryIterator.hasNext()) {
+ Map.Entry<String, ?> entry = (Map.Entry) entryIterator.next();
+ if ((entry.getKey()).startsWith(prefix) && (entry.getKey()).length() > prefix.length()) {
+ result.put((entry.getKey()).substring(prefix.length()), entry.getValue());
+ }
+ }
+ return result;
+ }
+
+ public Class<?> getClass(String key) {
+ try {
+ return (Class<?>) Class.forName(props.get(key));
+ } catch (ClassNotFoundException e) {
+ throw new ConnectException(e);
+ }
+ }
+
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/test/java/org/apache/rocketmq/connect/kafka/connect/adaptor/SourceRecordConverterTest.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/test/java/org/apache/rocketmq/connect/kafka/connect/adaptor/SourceRecordConverterTest.java
new file mode 100644
index 0000000..0264dcb
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/test/java/org/apache/rocketmq/connect/kafka/connect/adaptor/SourceRecordConverterTest.java
@@ -0,0 +1,203 @@
+package org.apache.rocketmq.connect.kafka.connect.adaptor;
+
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.schema.RocketMQSourceSchemaConverter;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.schema.RocketMQSourceValueConverter;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * source record converter test
+ */
+public class SourceRecordConverterTest {
+ private SourceRecord originalRecord;
+
+ @Before
+ public void before() {
+ // init record
+ Schema schema = SchemaBuilder.struct()
+ .field("field-string-01", Schema.INT64_SCHEMA)
+ .field("field-boolean-02", Schema.BOOLEAN_SCHEMA)
+ .field(
+ "field-struct-03",
+ SchemaBuilder.struct()
+ .field("field-struct-int64-01", Schema.INT64_SCHEMA)
+ .field("field-struct-boolean-02", Schema.BOOLEAN_SCHEMA)
+ )
+ .field("field-array-04-01", SchemaBuilder.array(Schema.STRING_SCHEMA))
+ .field("field-array-04-02",
+ SchemaBuilder.array(
+ SchemaBuilder.struct()
+ .field("field-struct-string-01", Schema.STRING_SCHEMA)
+ .field("field-struct-boolean-02", Schema.BOOLEAN_SCHEMA)
+ )
+ )
+ .field("field-map-05",
+ SchemaBuilder.map(
+ Schema.STRING_SCHEMA,
+ SchemaBuilder.struct()
+ .field("field-struct-string-01", Schema.STRING_SCHEMA)
+ .field("field-struct-boolean-02", Schema.BOOLEAN_SCHEMA)
+ )
+ ).build();
+
+ Struct struct = new Struct(schema);
+ struct.put("field-string-01", 111111L);
+ struct.put("field-boolean-02", true);
+
+ Struct struct_01 = new Struct(schema.field("field-struct-03").schema());
+ struct_01.put("field-struct-int64-01", 64l);
+ struct_01.put("field-struct-boolean-02", true);
+ // add struct
+ struct.put("field-struct-03", struct_01);
+
+ // add primate array
+ List<String> strings = new ArrayList<>();
+ strings.add("xxxx");
+ struct.put("field-array-04-01", strings);
+
+ // add struct array
+ List<Struct> structs = new ArrayList<>();
+ structs.add(new Struct(schema.field("field-array-04-02").schema().valueSchema())
+ .put("field-struct-string-01", "scsdcsd")
+ .put("field-struct-boolean-02", true));
+
+ struct.put("field-array-04-02", structs);
+
+ // add map
+
+ Map<String, Struct> maps = new HashMap<>();
+ Struct mapStruct = new Struct(schema.field("field-map-05").schema().valueSchema());
+ mapStruct.put("field-struct-string-01", "scdsc");
+ mapStruct.put("field-struct-boolean-02", true);
+ maps.put("XXXX-test", mapStruct);
+ struct.put("field-map-05", maps);
+
+ originalRecord = new SourceRecord(new HashMap<>(), new HashMap<>(), "test-Topic", 1, schema, struct);
+ }
+
+ public Struct initStruct(Schema schema) {
+ Struct struct = new Struct(schema);
+ schema.fields().forEach(field -> {
+ Schema.Type type = field.schema().type();
+ switch (type) {
+ case STRUCT:
+ struct.put(field.name(), initStruct(field.schema()));
+ break;
+ case INT8:
+ case INT16:
+ case INT32:
+ struct.put(field.name(), 100);
+ break;
+ case INT64:
+ struct.put(field.name(), 100000L);
+ break;
+ case FLOAT32:
+ struct.put(field.name(), 100.00);
+ break;
+ case FLOAT64:
+ struct.put(field.name(), 100000.00);
+ break;
+ case BOOLEAN:
+ struct.put(field.name(), true);
+ break;
+ case STRING:
+ struct.put(field.name(), "test");
+ break;
+ case ARRAY:
+ struct.put(field.name(), initArray(schema.valueSchema()));
+ break;
+ case MAP:
+ struct.put(field.name(), initMap(schema.keySchema(), schema.valueSchema()));
+ }
+ });
+ return struct;
+ }
+
+ public Object initArray(Schema schema) {
+ Schema.Type type = schema.type();
+ switch (type) {
+ case STRUCT:
+ Struct[] structs = new Struct[1];
+ structs[0] = initStruct(schema);
+ return structs;
+ case INT8:
+ case INT16:
+ case INT32:
+ Integer[] integers = new Integer[1];
+ integers[0] = 100;
+ return integers;
+ case INT64:
+ Long[] longs = new Long[1];
+ longs[0] = 1000000L;
+ return longs;
+ case FLOAT32:
+ Float[] floats = new Float[1];
+ floats[0] = 1000.00f;
+ return floats;
+ case FLOAT64:
+ Double[] doubles = new Double[1];
+ doubles[0] = 1000.00;
+ return doubles;
+ case BOOLEAN:
+ Boolean[] booleans = new Boolean[1];
+ booleans[0] = true;
+ return booleans;
+ case STRING:
+ String[] strings = new String[1];
+ strings[0] = "********XXXX";
+ return strings;
+ case ARRAY:
+ Object[] obj = new Object[1];
+ obj[0] = initArray(schema.valueSchema());
+ return obj;
+ case MAP:
+ break;
+ }
+ return null;
+ }
+
+ private Object initMap(Schema keySchema, Schema valueSchema) {
+ return null;
+ }
+
+ @Test
+ public void testConverter() {
+ // sourceRecord convert connect Record
+ RocketMQSourceSchemaConverter rocketMQSourceSchemaConverter = new RocketMQSourceSchemaConverter(originalRecord.valueSchema());
+
+ io.openmessaging.connector.api.data.Schema schema =rocketMQSourceSchemaConverter.schema();
+ RocketMQSourceValueConverter rocketMQSourceValueConverter = new RocketMQSourceValueConverter();
+ ConnectRecord connectRecord = new ConnectRecord(
+ new RecordPartition(originalRecord.sourcePartition()),
+ new RecordOffset(originalRecord.sourceOffset()),
+ originalRecord.timestamp(),
+ rocketMQSourceSchemaConverter.schema(),
+ rocketMQSourceValueConverter.value(schema, originalRecord.value()));
+ Iterator<Header> headers = originalRecord.headers().iterator();
+ while (headers.hasNext()) {
+ Header header = headers.next();
+ if (header.schema().type().isPrimitive()) {
+ connectRecord.addExtension(header.key(), (String) header.value());
+ }
+ }
+ final byte[] messageBody = JSON.toJSONString(connectRecord).getBytes();
+ String bodyStr = new String(messageBody, StandardCharsets.UTF_8);
+ ConnectRecord newConnectRecord= JSON.parseObject(bodyStr, ConnectRecord.class);
+ }
+}
diff --git a/connectors/rocketmq-connect-debezium/pom.xml b/connectors/rocketmq-connect-debezium/pom.xml
new file mode 100644
index 0000000..8f2f286
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/pom.xml
@@ -0,0 +1,338 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-connect-debezium</artifactId>
+ <packaging>pom</packaging>
+ <version>0.0.1-SNAPSHOT</version>
+ <modules>
+ <module>kafka-connect-adaptor</module>
+ <module>rocketmq-connect-debezium-core</module>
+ <module>rocketmq-connect-debezium-mysql</module>
+ <module>rocketmq-connect-debezium-oracle</module>
+ <module>rocketmq-connect-debezium-postgresql</module>
+ <module>rocketmq-connect-debezium-mongodb</module>
+ <module>rocketmq-connect-debezium-sqlserver</module>
+ </modules>
+
+ <name>rocketmq-connect-debezium</name>
+ <url>https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-connect-jdbc</url>
+
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ </license>
+ </licenses>
+
+ <issueManagement>
+ <system>jira</system>
+ <url>https://issues.apache.org/jira/browse/RocketMQ</url>
+ </issueManagement>
+
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>versions-maven-plugin</artifactId>
+ <version>2.3</version>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>clirr-maven-plugin</artifactId>
+ <version>2.7</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+ <outputDirectory>${project.build.directory}/lib</outputDirectory>
+ <excludeTransitive>false</excludeTransitive>
+ <stripVersion>true</stripVersion>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.6.1</version>
+ <configuration>
+ <source>${maven.compiler.source}</source>
+ <target>${maven.compiler.target}</target>
+ <compilerVersion>${maven.compiler.source}</compilerVersion>
+ <showDeprecation>true</showDeprecation>
+ <showWarnings>true</showWarnings>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.19.1</version>
+ <configuration>
+ <argLine>-Xms512m -Xmx1024m</argLine>
+ <forkMode>always</forkMode>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.6</version>
+ <configuration>
+ <locales>en_US</locales>
+ <outputEncoding>UTF-8</outputEncoding>
+ <inputEncoding>UTF-8</inputEncoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>3.0.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.10.4</version>
+ <configuration>
+ <charset>UTF-8</charset>
+ <locale>en_US</locale>
+ <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+ </configuration>
+ <executions>
+ <execution>
+ <id>aggregate</id>
+ <goals>
+ <goal>aggregate</goal>
+ </goals>
+ <phase>site</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>3.0.2</version>
+ <configuration>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>3.0.4</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>2.17</version>
+ <executions>
+ <execution>
+ <id>verify</id>
+ <phase>verify</phase>
+ <configuration>
+ <configLocation>../../style/rmq_checkstyle.xml</configLocation>
+ <encoding>UTF-8</encoding>
+ <consoleOutput>true</consoleOutput>
+ <failsOnError>true</failsOnError>
+ <includeTestSourceDirectory>false</includeTestSourceDirectory>
+ <includeTestResources>false</includeTestResources>
+ </configuration>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+ <!-- Compiler settings properties -->
+ <maven.compiler.source>1.8</maven.compiler.source>
+ <maven.compiler.target>1.8</maven.compiler.target>
+
+ <kafka-client.version>2.7.2</kafka-client.version>
+ <scala.binary.version>2.13</scala.binary.version>
+ <!--debezium version-->
+ <debezium.version>1.7.2.Final</debezium.version>
+ <debezium.postgresql.version>42.3.3</debezium.postgresql.version>
+ <!--rocketmq version-->
+ <rocketmq.version>4.5.2</rocketmq.version>
+ <rocketmq-openmessaging.version>4.3.2</rocketmq-openmessaging.version>
+
+ <!--rocketmq connect version-->
+ <openmessaging-connector.version>0.1.3-SNAPSHOT</openmessaging-connector.version>
+ <openmessaging-api.version>0.3.1-alpha</openmessaging-api.version>
+ </properties>
+
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>kafka-connect-adaptor</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-connect-debezium-core</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </dependency>
+
+ <!-- rocketmq connect api -->
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-connector</artifactId>
+ <version>${openmessaging-connector.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-api</artifactId>
+ <version>${openmessaging-api.version}</version>
+ </dependency>
+
+ <!-- rocketmq client-->
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-client</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-tools</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-remoting</artifactId>
+ <version>${rocketmq.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-openmessaging</artifactId>
+ <version>${rocketmq-openmessaging.version}</version>
+ </dependency>
+
+ <!--kafka && kafka connect-->
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_${scala.binary.version}</artifactId>
+ <version>${kafka-client.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>connect-runtime</artifactId>
+ <version>${kafka-client.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-log4j-appender</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>connect-json</artifactId>
+ <version>${kafka-client.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>connect-api</artifactId>
+ <version>${kafka-client.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>connect-file</artifactId>
+ <version>${kafka-client.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+
+ <!-- testing -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.13.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <version>2.6.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>2.6.3</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ <version>1.2.80</version>
+ </dependency>
+
+
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ <version>1.2</version>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>1.12</version>
+ </dependency>
+
+
+ <!-- log package-->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.32</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>1.2.9</version>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ <version>1.2.9</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+</project>
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/pom.xml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/pom.xml
new file mode 100644
index 0000000..4862b5a
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/pom.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>rocketmq-connect-debezium</artifactId>
+ <groupId>org.apache.rocketmq</groupId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>rocketmq-connect-debezium-core</artifactId>
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ <scala.binary.version>2.13</scala.binary.version>
+ <scala-library.version>2.13.6</scala-library.version>
+ <debezium.version>1.7.2.Final</debezium.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>kafka-connect-adaptor</artifactId>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-connector</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-tools</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-remoting</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-openmessaging</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-core</artifactId>
+ <version>${debezium.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_${scala.binary.version}</artifactId>
+ <version>${kafka-client.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>connect-runtime</artifactId>
+ <version>${kafka-client.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-log4j-appender</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ </dependencies>
+
+
+</project>
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/ConnectUtil.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/ConnectUtil.java
new file mode 100644
index 0000000..439bcf5
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/ConnectUtil.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium;
+
+import com.beust.jcommander.internal.Sets;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+
+/**
+ * rocket connect util
+ */
+public class ConnectUtil {
+
+ private static final String SYS_TASK_CG_PREFIX = "connect-";
+
+ public static String createGroupName(String prefix) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(prefix).append("-");
+ sb.append(RemotingUtil.getLocalAddress()).append("-");
+ sb.append(UtilAll.getPid()).append("-");
+ sb.append(System.nanoTime());
+ return sb.toString().replace(".", "-");
+ }
+
+ public static String createGroupName(String prefix, String postfix) {
+ return new StringBuilder().append(prefix).append("-").append(postfix).toString();
+ }
+
+ public static String createInstance(String servers) {
+ String[] serversArray = servers.split(";");
+ List<String> serversList = new ArrayList<String>();
+ for (String server : serversArray) {
+ if (!serversList.contains(server)) {
+ serversList.add(server);
+ }
+ }
+ Collections.sort(serversList);
+ return String.valueOf(serversList.toString().hashCode());
+ }
+
+ public static String createUniqInstance(String prefix) {
+ return new StringBuffer(prefix).append("-").append(UUID.randomUUID().toString()).toString();
+ }
+
+
+ public static DefaultMQProducer initDefaultMQProducer(RocketMqConnectConfig connectConfig) {
+ RPCHook rpcHook = null;
+ if (connectConfig.isAclEnable()) {
+ rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
+ }
+ DefaultMQProducer producer = new DefaultMQProducer(rpcHook);
+ producer.setNamesrvAddr(connectConfig.getNamesrvAddr());
+ producer.setInstanceName(createUniqInstance(connectConfig.getNamesrvAddr()));
+ producer.setProducerGroup(connectConfig.getRmqProducerGroup());
+ producer.setSendMsgTimeout(connectConfig.getOperationTimeout());
+ producer.setLanguage(LanguageCode.JAVA);
+ return producer;
+ }
+
+ public static DefaultMQPullConsumer initDefaultMQPullConsumer(RocketMqConnectConfig connectConfig) {
+ RPCHook rpcHook = null;
+ if (connectConfig.isAclEnable()) {
+ rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
+ }
+ DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(rpcHook);
+ consumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
+ consumer.setInstanceName(createUniqInstance(connectConfig.getNamesrvAddr()));
+ consumer.setConsumerGroup(connectConfig.getRmqConsumerGroup());
+ consumer.setMaxReconsumeTimes(connectConfig.getRmqMaxRedeliveryTimes());
+ consumer.setConsumerPullTimeoutMillis((long) connectConfig.getRmqMessageConsumeTimeout());
+ consumer.setLanguage(LanguageCode.JAVA);
+ return consumer;
+ }
+
+ public static DefaultMQPushConsumer initDefaultMQPushConsumer(RocketMqConnectConfig connectConfig) {
+ RPCHook rpcHook = null;
+ if (connectConfig.isAclEnable()) {
+ rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
+ }
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(rpcHook);
+ consumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
+ consumer.setInstanceName(createUniqInstance(connectConfig.getNamesrvAddr()));
+ consumer.setConsumerGroup(createGroupName(connectConfig.getRmqConsumerGroup()));
+ consumer.setMaxReconsumeTimes(connectConfig.getRmqMaxRedeliveryTimes());
+ consumer.setConsumeTimeout((long) connectConfig.getRmqMessageConsumeTimeout());
+ consumer.setConsumeThreadMin(connectConfig.getRmqMinConsumeThreadNums());
+ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+ consumer.setLanguage(LanguageCode.JAVA);
+ return consumer;
+ }
+
+ public static DefaultMQAdminExt startMQAdminTool(RocketMqConnectConfig connectConfig) throws MQClientException {
+ RPCHook rpcHook = null;
+ if (connectConfig.isAclEnable()) {
+ rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
+ }
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ defaultMQAdminExt.setNamesrvAddr(connectConfig.getNamesrvAddr());
+ defaultMQAdminExt.setAdminExtGroup("connector-admin-group");
+ defaultMQAdminExt.setInstanceName(ConnectUtil.createUniqInstance(connectConfig.getNamesrvAddr()));
+ defaultMQAdminExt.start();
+ return defaultMQAdminExt;
+ }
+
+
+ public static void createTopic(RocketMqConnectConfig connectConfig, TopicConfig topicConfig) {
+ DefaultMQAdminExt defaultMQAdminExt = null;
+ try {
+ defaultMQAdminExt = startMQAdminTool(connectConfig);
+ ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+ HashMap<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
+ Set<String> clusterNameSet = clusterAddrTable.keySet();
+ for (String clusterName : clusterNameSet) {
+ Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+ for (String addr : masterSet) {
+ defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("create topic: " + topicConfig.getTopicName() + " failed", e);
+ } finally {
+ if (defaultMQAdminExt != null) {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+ }
+
+ public static boolean isTopicExist(RocketMqConnectConfig connectConfig, String topic) {
+ DefaultMQAdminExt defaultMQAdminExt = null;
+ boolean foundTopicRouteInfo = false;
+ try {
+ defaultMQAdminExt = startMQAdminTool(connectConfig);
+ TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+ if (topicRouteData != null) {
+ foundTopicRouteInfo = true;
+ }
+ } catch (Exception e) {
+ foundTopicRouteInfo = false;
+ } finally {
+ if (defaultMQAdminExt != null) {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+ return foundTopicRouteInfo;
+ }
+
+ public static Set<String> fetchAllConsumerGroupList(RocketMqConnectConfig connectConfig) {
+ Set<String> consumerGroupSet = Sets.newHashSet();
+ DefaultMQAdminExt defaultMQAdminExt = null;
+ try {
+ defaultMQAdminExt = startMQAdminTool(connectConfig);
+ ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+ for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) {
+ SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L);
+ consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException("fetch all topic failed", e);
+ } finally {
+ if (defaultMQAdminExt != null) {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+ return consumerGroupSet;
+ }
+
+ public static String createSubGroup(RocketMqConnectConfig connectConfig, String subGroup) {
+ DefaultMQAdminExt defaultMQAdminExt = null;
+ try {
+ defaultMQAdminExt = startMQAdminTool(connectConfig);
+ SubscriptionGroupConfig initConfig = new SubscriptionGroupConfig();
+ initConfig.setGroupName(subGroup);
+ ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+ HashMap<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
+ Set<String> clusterNameSet = clusterAddrTable.keySet();
+ for (String clusterName : clusterNameSet) {
+ Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+ for (String addr : masterSet) {
+ defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, initConfig);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("create subGroup: " + subGroup + " failed", e);
+ } finally {
+ if (defaultMQAdminExt != null) {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+ return subGroup;
+ }
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/DebeziumConnector.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/DebeziumConnector.java
new file mode 100644
index 0000000..7a3f51a
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/DebeziumConnector.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.connector.KafkaSourceAdaptorConnector;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * debezium connector
+ */
+public abstract class DebeziumConnector extends KafkaSourceAdaptorConnector {
+
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/DebeziumSource.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/DebeziumSource.java
new file mode 100644
index 0000000..388f722
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/DebeziumSource.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium;
+
+import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
+import io.openmessaging.KeyValue;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.task.KafkaConnectAdaptorSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * debezium source
+ */
+public abstract class DebeziumSource extends KafkaConnectAdaptorSource {
+ private static final Logger log = LoggerFactory.getLogger(DebeziumSource.class);
+
+ private static final String DEFAULT_HISTORY = "org.apache.rocketmq.connect.debezium.RocketMqDatabaseHistory";
+
+
+ @Override
+ public void start(KeyValue config) {
+ // database.history : implementation class for database history.
+ config.put(HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY.name(), DEFAULT_HISTORY);
+ // history config detail
+ super.start(config);
+ }
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqConnectConfig.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqConnectConfig.java
new file mode 100644
index 0000000..18397a6
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqConnectConfig.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium;
+
+
+import io.debezium.config.Configuration;
+
+/**
+ * rocketmq connect config
+ */
+public class RocketMqConnectConfig {
+
+ private String dbHistoryName;
+
+ private String namesrvAddr;
+
+ private String rmqProducerGroup;
+
+ private int operationTimeout = 3000;
+
+ private String rmqConsumerGroup;
+
+ private int rmqMaxRedeliveryTimes;
+
+ private int rmqMessageConsumeTimeout = 3000;
+
+ private int rmqMaxConsumeThreadNums = 32;
+
+ private int rmqMinConsumeThreadNums = 1;
+
+ private String adminExtGroup;
+
+ // set acl config
+ private boolean aclEnable;
+ private String accessKey;
+ private String secretKey;
+
+
+ public RocketMqConnectConfig() {
+ }
+
+ public RocketMqConnectConfig(Configuration config, String dbHistoryName) {
+ this.dbHistoryName = dbHistoryName;
+ // init config
+ this.rmqProducerGroup = this.dbHistoryName.concat("-producer-group");
+ this.rmqConsumerGroup = this.dbHistoryName.concat("-consumer-group");
+ this.adminExtGroup = this.dbHistoryName.concat("-admin-group");
+
+ // init rocketmq connection
+ this.namesrvAddr = config.getString(RocketMqDatabaseHistory.NAME_SRV_ADDR);
+ this.aclEnable = config.getBoolean(RocketMqDatabaseHistory.ROCKETMQ_ACL_ENABLE);
+ this.accessKey = config.getString(RocketMqDatabaseHistory.ROCKETMQ_ACCESS_KEY);
+ this.secretKey = config.getString(RocketMqDatabaseHistory.ROCKETMQ_SECRET_KEY);
+ }
+
+
+ public String getDbHistoryName() {
+ return dbHistoryName;
+ }
+
+ public void setDbHistoryName(String dbHistoryName) {
+ this.dbHistoryName = dbHistoryName;
+ }
+
+ public String getNamesrvAddr() {
+ return namesrvAddr;
+ }
+
+ public void setNamesrvAddr(String namesrvAddr) {
+ this.namesrvAddr = namesrvAddr;
+ }
+
+ public String getRmqProducerGroup() {
+ return rmqProducerGroup;
+ }
+
+ public void setRmqProducerGroup(String rmqProducerGroup) {
+ this.rmqProducerGroup = rmqProducerGroup;
+ }
+
+ public int getOperationTimeout() {
+ return operationTimeout;
+ }
+
+ public void setOperationTimeout(int operationTimeout) {
+ this.operationTimeout = operationTimeout;
+ }
+
+ public String getRmqConsumerGroup() {
+ return rmqConsumerGroup;
+ }
+
+ public void setRmqConsumerGroup(String rmqConsumerGroup) {
+ this.rmqConsumerGroup = rmqConsumerGroup;
+ }
+
+ public int getRmqMaxRedeliveryTimes() {
+ return rmqMaxRedeliveryTimes;
+ }
+
+ public void setRmqMaxRedeliveryTimes(int rmqMaxRedeliveryTimes) {
+ this.rmqMaxRedeliveryTimes = rmqMaxRedeliveryTimes;
+ }
+
+ public int getRmqMessageConsumeTimeout() {
+ return rmqMessageConsumeTimeout;
+ }
+
+ public void setRmqMessageConsumeTimeout(int rmqMessageConsumeTimeout) {
+ this.rmqMessageConsumeTimeout = rmqMessageConsumeTimeout;
+ }
+
+ public int getRmqMaxConsumeThreadNums() {
+ return rmqMaxConsumeThreadNums;
+ }
+
+ public void setRmqMaxConsumeThreadNums(int rmqMaxConsumeThreadNums) {
+ this.rmqMaxConsumeThreadNums = rmqMaxConsumeThreadNums;
+ }
+
+ public int getRmqMinConsumeThreadNums() {
+ return rmqMinConsumeThreadNums;
+ }
+
+ public void setRmqMinConsumeThreadNums(int rmqMinConsumeThreadNums) {
+ this.rmqMinConsumeThreadNums = rmqMinConsumeThreadNums;
+ }
+
+ public boolean isAclEnable() {
+ return aclEnable;
+ }
+
+ public void setAclEnable(boolean aclEnable) {
+ this.aclEnable = aclEnable;
+ }
+
+ public String getAccessKey() {
+ return accessKey;
+ }
+
+ public void setAccessKey(String accessKey) {
+ this.accessKey = accessKey;
+ }
+
+ public String getSecretKey() {
+ return secretKey;
+ }
+
+ public void setSecretKey(String secretKey) {
+ this.secretKey = secretKey;
+ }
+
+ public String getAdminExtGroup() {
+ return adminExtGroup;
+ }
+
+ public void setAdminExtGroup(String adminExtGroup) {
+ this.adminExtGroup = adminExtGroup;
+ }
+
+ @Override
+ public String toString() {
+ return "RocketMqConnectConfig{" +
+ "dbHistoryName='" + dbHistoryName + '\'' +
+ ", namesrvAddr='" + namesrvAddr + '\'' +
+ ", rmqProducerGroup='" + rmqProducerGroup + '\'' +
+ ", operationTimeout=" + operationTimeout +
+ ", rmqConsumerGroup='" + rmqConsumerGroup + '\'' +
+ ", rmqMaxRedeliveryTimes=" + rmqMaxRedeliveryTimes +
+ ", rmqMessageConsumeTimeout=" + rmqMessageConsumeTimeout +
+ ", rmqMaxConsumeThreadNums=" + rmqMaxConsumeThreadNums +
+ ", rmqMinConsumeThreadNums=" + rmqMinConsumeThreadNums +
+ ", adminExtGroup='" + adminExtGroup + '\'' +
+ ", aclEnable=" + aclEnable +
+ ", accessKey='" + accessKey + '\'' +
+ ", secretKey='" + secretKey + '\'' +
+ '}';
+ }
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqDatabaseHistory.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqDatabaseHistory.java
new file mode 100644
index 0000000..11814b9
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqDatabaseHistory.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium;
+
+import com.alibaba.fastjson.JSON;
+import io.debezium.config.Configuration;
+import io.debezium.config.Field;
+import io.debezium.document.DocumentReader;
+import io.debezium.relational.history.AbstractDatabaseHistory;
+import io.debezium.relational.history.DatabaseHistory;
+import io.debezium.relational.history.DatabaseHistoryException;
+import io.debezium.relational.history.DatabaseHistoryListener;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.HistoryRecordComparator;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.task.AbstractKafkaConnectSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Consumer;
+
+
+/**
+ * rocketmq database history
+ */
+public final class RocketMqDatabaseHistory extends AbstractDatabaseHistory {
+
+ private static final Logger log = LoggerFactory.getLogger(AbstractKafkaConnectSource.class);
+ public static final Field TOPIC = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "rocketmq.topic")
+ .withDisplayName("Database history topic name")
+ .withType(ConfigDef.Type.STRING)
+ .withWidth(ConfigDef.Width.LONG)
+ .withImportance(ConfigDef.Importance.HIGH)
+ .withDescription("The name of the topic for the database schema history")
+ .withValidation(Field::isRequired);
+
+ /**
+ * rocketmq name srv addr
+ */
+ public static final Field NAME_SRV_ADDR = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "name.srv.addr")
+ .withDisplayName("Rocketmq name srv addr")
+ .withType(ConfigDef.Type.STRING)
+ .withWidth(ConfigDef.Width.LONG)
+ .withImportance(ConfigDef.Importance.HIGH)
+ .withDescription("Rocketmq name srv addr")
+ .withValidation(Field::isRequired);
+
+ public static final Field ROCKETMQ_ACL_ENABLE = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "acl.enabled")
+ .withDisplayName("Rocketmq acl enabled")
+ .withType(ConfigDef.Type.BOOLEAN)
+ .withDefault(false)
+ .withWidth(ConfigDef.Width.LONG)
+ .withImportance(ConfigDef.Importance.HIGH)
+ .withDescription("Rocketmq acl enabled");
+
+ public static final Field ROCKETMQ_ACCESS_KEY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "access.key")
+ .withDisplayName("Rocketmq access key")
+ .withType(ConfigDef.Type.STRING)
+ .withWidth(ConfigDef.Width.LONG)
+ .withImportance(ConfigDef.Importance.HIGH)
+ .withDescription("Rocketmq access key");
+
+ public static final Field ROCKETMQ_SECRET_KEY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "secret.key")
+ .withDisplayName("Rocketmq secret key")
+ .withType(ConfigDef.Type.STRING)
+ .withWidth(ConfigDef.Width.LONG)
+ .withImportance(ConfigDef.Importance.HIGH)
+ .withDescription("Rocketmq secret key");
+
+ public static final Field CONNECTOR_NAME = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connector.name")
+ .withDisplayName("Connector name")
+ .withType(ConfigDef.Type.STRING)
+ .withWidth(ConfigDef.Width.LONG)
+ .withImportance(ConfigDef.Importance.HIGH)
+ .withDescription("connector name")
+ .withValidation(Field::isRequired);
+
+
+ private final DocumentReader reader = DocumentReader.defaultReader();
+ private String topicName;
+ private String dbHistoryName;
+ private RocketMqConnectConfig connectConfig;
+ private DefaultMQPushConsumer consumer;
+ private DefaultMQProducer producer;
+
+ @Override
+ public void configure(
+ Configuration config,
+ HistoryRecordComparator comparator,
+ DatabaseHistoryListener listener,
+ boolean useCatalogBeforeSchema) {
+ super.configure(config, comparator, listener, useCatalogBeforeSchema);
+ this.topicName = config.getString(TOPIC);
+
+ this.dbHistoryName = config.getString(DatabaseHistory.NAME, UUID.randomUUID().toString());
+ log.info("Configure to store the debezium database history {} to rocketmq topic {}",
+ dbHistoryName, topicName);
+ // init config
+ connectConfig = new RocketMqConnectConfig(config, dbHistoryName);
+ }
+
+ @Override
+ public void initializeStorage() {
+ super.initializeStorage();
+ log.info("try to create history topic: {}!", this.topicName);
+ TopicConfig topicConfig = new TopicConfig(this.topicName);
+ ConnectUtil.createTopic(connectConfig, topicConfig);
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ try {
+ // init consumer
+ this.consumer = ConnectUtil.initDefaultMQPushConsumer(connectConfig);
+ Set<String> consumerGroupSet = ConnectUtil.fetchAllConsumerGroupList(connectConfig);
+ if (!consumerGroupSet.contains(consumer.getConsumerGroup())) {
+ ConnectUtil.createSubGroup(connectConfig, consumer.getConsumerGroup());
+ }
+ this.consumer.subscribe(this.topicName, "*");
+ } catch (MQClientException e) {
+ throw new DatabaseHistoryException(e);
+ }
+
+ this.producer = ConnectUtil.initDefaultMQProducer(connectConfig);
+ try {
+ this.producer.start();
+ } catch (MQClientException e) {
+ throw new DatabaseHistoryException(e);
+ }
+ }
+
+
+ @Override
+ public void stop() {
+ try {
+ if (this.producer != null) {
+ producer.shutdown();
+ this.producer = null;
+ }
+ if (this.consumer != null) {
+ this.consumer.shutdown();
+ this.consumer = null;
+ }
+ } catch (Exception pe) {
+ log.warn("Failed to closing rocketmq client", pe);
+ }
+ }
+
+ /**
+ * recover record
+ *
+ * @param records
+ */
+ @Override
+ protected void recoverRecords(Consumer<HistoryRecord> records) {
+ consumer.registerMessageListener(new MessageListenerConcurrently() {
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
+ msgs.forEach(message -> {
+ try {
+ HistoryRecord recordObj = new HistoryRecord(reader.read(message.getBody()));
+ log.trace("Recovering database history: {}", recordObj);
+ if (recordObj == null || !recordObj.isValid()) {
+ log.warn("Skipping invalid database history record '{}'. " +
+ "This is often not an issue, but if it happens repeatedly please check the '{}' topic.",
+ recordObj, topicName);
+ } else {
+ records.accept(recordObj);
+ log.trace("Recovered database history: {}", recordObj);
+ }
+ } catch (IOException e) {
+ throw new DatabaseHistoryException(e);
+ }
+ });
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+ try {
+ consumer.start();
+ } catch (MQClientException e) {
+ throw new DatabaseHistoryException(e);
+ }
+ log.info("Consumer started.");
+ }
+
+ @Override
+ protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
+ log.info(record.toString());
+ if (this.producer == null) {
+ throw new IllegalStateException("No producer is available. Ensure that 'start()'"
+ + " is called before storing database history records.");
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("Storing record into database history: {}", record);
+ }
+ try {
+ Message sourceMessage = new Message();
+ sourceMessage.setTopic(this.topicName);
+ final byte[] messageBody = JSON.toJSONString(record).getBytes();
+ sourceMessage.setBody(messageBody);
+ producer.send(sourceMessage);
+ } catch (Exception e) {
+ throw new DatabaseHistoryException(e);
+ }
+ }
+
+
+ @Override
+ public boolean exists() {
+ return this.storageExists();
+ }
+
+ @Override
+ public boolean storageExists() {
+ // check topic is exist
+ return ConnectUtil.isTopicExist(connectConfig, this.topicName);
+ }
+
+ @Override
+ public String toString() {
+ if (topicName != null) {
+ return "Rocketmq topic (" + topicName + ")";
+ }
+ return "Rocketmq topic";
+ }
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/SchemaRenameTransformation.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/SchemaRenameTransformation.java
new file mode 100644
index 0000000..9d5b796
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/SchemaRenameTransformation.java
@@ -0,0 +1,29 @@
+package org.apache.rocketmq.connect.debezium;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+
+import java.util.Map;
+
+public class SchemaRenameTransformation implements Transformation {
+ @Override
+ public ConnectRecord apply(ConnectRecord connectRecord) {
+ return null;
+ }
+
+ @Override
+ public ConfigDef config() {
+ return null;
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void configure(Map<String, ?> map) {
+
+ }
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mongodb/pom.xml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mongodb/pom.xml
new file mode 100644
index 0000000..2130556
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mongodb/pom.xml
@@ -0,0 +1,190 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>rocketmq-connect-debezium</artifactId>
+ <groupId>org.apache.rocketmq</groupId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>rocketmq-connect-debezium-mongodb</artifactId>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>kafka-connect-adaptor</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-connect-debezium-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-connector</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-connector-mongodb</artifactId>
+ <version>${debezium.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>versions-maven-plugin</artifactId>
+ <version>2.3</version>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>clirr-maven-plugin</artifactId>
+ <version>2.7</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+ <outputDirectory>${project.build.directory}/lib</outputDirectory>
+ <excludeTransitive>false</excludeTransitive>
+ <stripVersion>true</stripVersion>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.6.1</version>
+ <configuration>
+ <source>${maven.compiler.source}</source>
+ <target>${maven.compiler.target}</target>
+ <compilerVersion>${maven.compiler.source}</compilerVersion>
+ <showDeprecation>true</showDeprecation>
+ <showWarnings>true</showWarnings>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.19.1</version>
+ <configuration>
+ <argLine>-Xms512m -Xmx1024m</argLine>
+ <forkMode>always</forkMode>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.6</version>
+ <configuration>
+ <locales>en_US</locales>
+ <outputEncoding>UTF-8</outputEncoding>
+ <inputEncoding>UTF-8</inputEncoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>3.0.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.10.4</version>
+ <configuration>
+ <charset>UTF-8</charset>
+ <locale>en_US</locale>
+ <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+ </configuration>
+ <executions>
+ <execution>
+ <id>aggregate</id>
+ <goals>
+ <goal>aggregate</goal>
+ </goals>
+ <phase>site</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>3.0.2</version>
+ <configuration>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>3.0.4</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.0.0</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>2.17</version>
+ <executions>
+ <execution>
+ <id>verify</id>
+ <phase>verify</phase>
+ <configuration>
+ <configLocation>../../../style/rmq_checkstyle.xml</configLocation>
+ <encoding>UTF-8</encoding>
+ <consoleOutput>true</consoleOutput>
+ <failsOnError>true</failsOnError>
+ <includeTestSourceDirectory>false</includeTestSourceDirectory>
+ <includeTestResources>false</includeTestResources>
+ </configuration>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mongodb/src/main/java/org/apache/rocketmq/connect/debezium/mongodb/DebeziumMongoDBConnector.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mongodb/src/main/java/org/apache/rocketmq/connect/debezium/mongodb/DebeziumMongoDBConnector.java
new file mode 100644
index 0000000..c45c7b0
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mongodb/src/main/java/org/apache/rocketmq/connect/debezium/mongodb/DebeziumMongoDBConnector.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium.mongodb;
+
+import io.openmessaging.connector.api.component.task.Task;
+import org.apache.rocketmq.connect.debezium.DebeziumConnector;
+
+
+
+
+/**
+ * debezium mongodb connector
+ */
+public class DebeziumMongoDBConnector extends DebeziumConnector {
+
+ private static final String DEFAULT_CONNECTOR = "io.debezium.connector.mongodb.MongoDbConnector";
+
+ /**
+ * Return the current connector class
+ * @return task implement class
+ */
+ @Override
+ public Class<? extends Task> taskClass() {
+ return DebeziumMongoDBSource.class;
+ }
+
+ /**
+ * get connector class
+ */
+ @Override
+ public String getConnectorClass() {
+ return DEFAULT_CONNECTOR;
+ }
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mongodb/src/main/java/org/apache/rocketmq/connect/debezium/mongodb/DebeziumMongoDBSource.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mongodb/src/main/java/org/apache/rocketmq/connect/debezium/mongodb/DebeziumMongoDBSource.java
new file mode 100644
index 0000000..7c8cee5
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mongodb/src/main/java/org/apache/rocketmq/connect/debezium/mongodb/DebeziumMongoDBSource.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium.mongodb;
+
+
+import org.apache.rocketmq.connect.debezium.DebeziumSource;
+
+
+/**
+ * A rocketmq connect source that runs debezium mongodb source
+ */
+public class DebeziumMongoDBSource extends DebeziumSource {
+ private static final String DEFAULT_TASK = "io.debezium.connector.mongodb.MongoDbConnectorTask";
+
+ /**
+ * get task class
+ */
+ @Override
+ public String getTaskClass() {
+ return DEFAULT_TASK;
+ }
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mongodb/src/main/resources/debezium-mongodb-source-config.yaml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mongodb/src/main/resources/debezium-mongodb-source-config.yaml
new file mode 100644
index 0000000..fe95886
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mongodb/src/main/resources/debezium-mongodb-source-config.yaml
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/pom.xml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/pom.xml
new file mode 100644
index 0000000..0c027d0
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/pom.xml
@@ -0,0 +1,197 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>rocketmq-connect-debezium</artifactId>
+ <groupId>org.apache.rocketmq</groupId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>rocketmq-connect-debezium-mysql</artifactId>
+
+
+ <dependencies>
+
+ <dependency>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-connector-mysql</artifactId>
+ <version>${debezium.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>kafka-connect-adaptor</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-connect-debezium-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-connector</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>kafka-connect-adaptor</artifactId>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>versions-maven-plugin</artifactId>
+ <version>2.3</version>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>clirr-maven-plugin</artifactId>
+ <version>2.7</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+ <outputDirectory>${project.build.directory}/lib</outputDirectory>
+ <excludeTransitive>false</excludeTransitive>
+ <stripVersion>true</stripVersion>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.6.1</version>
+ <configuration>
+ <source>${maven.compiler.source}</source>
+ <target>${maven.compiler.target}</target>
+ <compilerVersion>${maven.compiler.source}</compilerVersion>
+ <showDeprecation>true</showDeprecation>
+ <showWarnings>true</showWarnings>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.19.1</version>
+ <configuration>
+ <argLine>-Xms512m -Xmx1024m</argLine>
+ <forkMode>always</forkMode>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.6</version>
+ <configuration>
+ <locales>en_US</locales>
+ <outputEncoding>UTF-8</outputEncoding>
+ <inputEncoding>UTF-8</inputEncoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>3.0.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.10.4</version>
+ <configuration>
+ <charset>UTF-8</charset>
+ <locale>en_US</locale>
+ <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+ </configuration>
+ <executions>
+ <execution>
+ <id>aggregate</id>
+ <goals>
+ <goal>aggregate</goal>
+ </goals>
+ <phase>site</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>3.0.2</version>
+ <configuration>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>3.0.4</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.0.0</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>2.17</version>
+ <executions>
+ <execution>
+ <id>verify</id>
+ <phase>verify</phase>
+ <configuration>
+ <configLocation>../../../style/rmq_checkstyle.xml</configLocation>
+ <encoding>UTF-8</encoding>
+ <consoleOutput>true</consoleOutput>
+ <failsOnError>true</failsOnError>
+ <includeTestSourceDirectory>false</includeTestSourceDirectory>
+ <includeTestResources>false</includeTestResources>
+ </configuration>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/java/org/apache/rocketmq/connect/debezium/mysql/DebeziumMysqlConnector.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/java/org/apache/rocketmq/connect/debezium/mysql/DebeziumMysqlConnector.java
new file mode 100644
index 0000000..074146c
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/java/org/apache/rocketmq/connect/debezium/mysql/DebeziumMysqlConnector.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium.mysql;
+
+import io.openmessaging.connector.api.component.task.Task;
+import org.apache.rocketmq.connect.debezium.DebeziumConnector;
+
+
+
+/**
+ * debezium mysql connector
+ */
+public class DebeziumMysqlConnector extends DebeziumConnector {
+ private static final String DEFAULT_CONNECTOR = "io.debezium.connector.mysql.MySqlConnector";
+
+ /**
+ * Return the current connector class
+ * @return task implement class
+ */
+ @Override
+ public Class<? extends Task> taskClass() {
+ return DebeziumMysqlSource.class;
+ }
+
+ /**
+ * get connector class
+ */
+ @Override
+ public String getConnectorClass() {
+ return DEFAULT_CONNECTOR;
+ }
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/java/org/apache/rocketmq/connect/debezium/mysql/DebeziumMysqlSource.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/java/org/apache/rocketmq/connect/debezium/mysql/DebeziumMysqlSource.java
new file mode 100644
index 0000000..f177eba
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/java/org/apache/rocketmq/connect/debezium/mysql/DebeziumMysqlSource.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium.mysql;
+
+
+import org.apache.rocketmq.connect.debezium.DebeziumSource;
+
+
+/**
+ * A rocketmq connect source that runs debezium mysql source.
+ */
+public class DebeziumMysqlSource extends DebeziumSource {
+ private static final String DEFAULT_TASK = "io.debezium.connector.mysql.MySqlConnectorTask";
+
+ /**
+ * get task class
+ */
+ @Override
+ public String getTaskClass() {
+ return DEFAULT_TASK;
+ }
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/resources/debezium-mysql-source-config.yaml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/resources/debezium-mysql-source-config.yaml
new file mode 100644
index 0000000..a371d4d
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/resources/debezium-mysql-source-config.yaml
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+{
+ "connector-class":"org.apache.rocketmq.connect.debezium.mysql.DebeziumMysqlConnector",
+ "max-task":"1",
+ "connect-topicname":"debezium-mysql-source",
+ "debezium.transforms": "Unwrap",
+ "debezium.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
+ "debezium.transforms.Unwrap.delete.handling.mode": "drop",
+
+ "database.history.skip.unparseable.ddl": true,
+ "database.history.name.srv.addr": "localhost:9876",
+ "database.history.rocketmq.topic": "db-history-topic-02",
+ "database.history.store.only.monitored.tables.ddl": true,
+
+ "database.user": "*******",
+ "include.schema.changes": false,
+ "database.server.name": "test-server-02",
+ "database.port": 3306,
+ "database.hostname": "**********",
+ "database.connectionTimeZone": "Asia/Shanghai",
+ "database.password": "********",
+ "table.include.list": "db-01.table",
+ "max.batch.size": 50,
+ "database.include.list": "db-01",
+ "snapshot.mode": "when_needed",
+
+ "source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/pom.xml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/pom.xml
new file mode 100644
index 0000000..cd18267
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/pom.xml
@@ -0,0 +1,190 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>rocketmq-connect-debezium</artifactId>
+ <groupId>org.apache.rocketmq</groupId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>rocketmq-connect-debezium-oracle</artifactId>
+
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>kafka-connect-adaptor</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-connect-debezium-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-connector</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-connector-oracle</artifactId>
+ <version>${debezium.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>versions-maven-plugin</artifactId>
+ <version>2.3</version>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>clirr-maven-plugin</artifactId>
+ <version>2.7</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+ <outputDirectory>${project.build.directory}/lib</outputDirectory>
+ <excludeTransitive>false</excludeTransitive>
+ <stripVersion>true</stripVersion>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.6.1</version>
+ <configuration>
+ <source>${maven.compiler.source}</source>
+ <target>${maven.compiler.target}</target>
+ <compilerVersion>${maven.compiler.source}</compilerVersion>
+ <showDeprecation>true</showDeprecation>
+ <showWarnings>true</showWarnings>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.19.1</version>
+ <configuration>
+ <argLine>-Xms512m -Xmx1024m</argLine>
+ <forkMode>always</forkMode>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.6</version>
+ <configuration>
+ <locales>en_US</locales>
+ <outputEncoding>UTF-8</outputEncoding>
+ <inputEncoding>UTF-8</inputEncoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>3.0.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.10.4</version>
+ <configuration>
+ <charset>UTF-8</charset>
+ <locale>en_US</locale>
+ <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+ </configuration>
+ <executions>
+ <execution>
+ <id>aggregate</id>
+ <goals>
+ <goal>aggregate</goal>
+ </goals>
+ <phase>site</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>3.0.2</version>
+ <configuration>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>3.0.4</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.0.0</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>2.17</version>
+ <executions>
+ <execution>
+ <id>verify</id>
+ <phase>verify</phase>
+ <configuration>
+ <configLocation>../../../style/rmq_checkstyle.xml</configLocation>
+ <encoding>UTF-8</encoding>
+ <consoleOutput>true</consoleOutput>
+ <failsOnError>true</failsOnError>
+ <includeTestSourceDirectory>false</includeTestSourceDirectory>
+ <includeTestResources>false</includeTestResources>
+ </configuration>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/src/main/java/org/apache/rocketmq/connect/debezium/oracle/DebeziumOracleConnector.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/src/main/java/org/apache/rocketmq/connect/debezium/oracle/DebeziumOracleConnector.java
new file mode 100644
index 0000000..ad051d4
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/src/main/java/org/apache/rocketmq/connect/debezium/oracle/DebeziumOracleConnector.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium.oracle;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import org.apache.rocketmq.connect.debezium.DebeziumConnector;
+
+
+/**
+ * debezium oracle connector
+ */
+public class DebeziumOracleConnector extends DebeziumConnector {
+ private static final String DEFAULT_CONNECTOR = "io.debezium.connector.oracle.OracleConnector";
+
+ /**
+ * Return the current connector class
+ * @return task implement class
+ */
+ @Override
+ public Class<? extends Task> taskClass() {
+ return DebeziumOracleSource.class;
+ }
+
+ /**
+ * get connector class
+ */
+ @Override
+ public String getConnectorClass() {
+ return DEFAULT_CONNECTOR;
+ }
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/src/main/java/org/apache/rocketmq/connect/debezium/oracle/DebeziumOracleSource.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/src/main/java/org/apache/rocketmq/connect/debezium/oracle/DebeziumOracleSource.java
new file mode 100644
index 0000000..6ddd6d0
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/src/main/java/org/apache/rocketmq/connect/debezium/oracle/DebeziumOracleSource.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium.oracle;
+
+
+import org.apache.rocketmq.connect.debezium.DebeziumSource;
+
+/**
+ * A rocketmq connect source that runs debezium oracle source.
+ */
+public class DebeziumOracleSource extends DebeziumSource {
+ private static final String DEFAULT_TASK = "io.debezium.connector.oracle.OracleConnectorTask";
+
+
+ /**
+ * get task class
+ */
+ @Override
+ public String getTaskClass() {
+ return DEFAULT_TASK;
+ }
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/src/main/resources/debezium-oracle-source-config.yaml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/src/main/resources/debezium-oracle-source-config.yaml
new file mode 100644
index 0000000..d8a500d
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/src/main/resources/debezium-oracle-source-config.yaml
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-postgresql/pom.xml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-postgresql/pom.xml
new file mode 100644
index 0000000..849a0b7
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-postgresql/pom.xml
@@ -0,0 +1,195 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>rocketmq-connect-debezium</artifactId>
+ <groupId>org.apache.rocketmq</groupId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>rocketmq-connect-debezium-postgresql</artifactId>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>kafka-connect-adaptor</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-connect-debezium-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-connector</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-connector-postgres</artifactId>
+ <version>${debezium.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ <version>${debezium.postgresql.version}</version>
+ <scope>runtime</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>versions-maven-plugin</artifactId>
+ <version>2.3</version>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>clirr-maven-plugin</artifactId>
+ <version>2.7</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+ <outputDirectory>${project.build.directory}/lib</outputDirectory>
+ <excludeTransitive>false</excludeTransitive>
+ <stripVersion>true</stripVersion>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.6.1</version>
+ <configuration>
+ <source>${maven.compiler.source}</source>
+ <target>${maven.compiler.target}</target>
+ <compilerVersion>${maven.compiler.source}</compilerVersion>
+ <showDeprecation>true</showDeprecation>
+ <showWarnings>true</showWarnings>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.19.1</version>
+ <configuration>
+ <argLine>-Xms512m -Xmx1024m</argLine>
+ <forkMode>always</forkMode>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.6</version>
+ <configuration>
+ <locales>en_US</locales>
+ <outputEncoding>UTF-8</outputEncoding>
+ <inputEncoding>UTF-8</inputEncoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>3.0.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.10.4</version>
+ <configuration>
+ <charset>UTF-8</charset>
+ <locale>en_US</locale>
+ <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+ </configuration>
+ <executions>
+ <execution>
+ <id>aggregate</id>
+ <goals>
+ <goal>aggregate</goal>
+ </goals>
+ <phase>site</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>3.0.2</version>
+ <configuration>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>3.0.4</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.0.0</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>2.17</version>
+ <executions>
+ <execution>
+ <id>verify</id>
+ <phase>verify</phase>
+ <configuration>
+ <configLocation>../../../style/rmq_checkstyle.xml</configLocation>
+ <encoding>UTF-8</encoding>
+ <consoleOutput>true</consoleOutput>
+ <failsOnError>true</failsOnError>
+ <includeTestSourceDirectory>false</includeTestSourceDirectory>
+ <includeTestResources>false</includeTestResources>
+ </configuration>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-postgresql/src/main/java/org/apache/rocketmq/connect/debezium/postgres/DebeziumPostgresConnector.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-postgresql/src/main/java/org/apache/rocketmq/connect/debezium/postgres/DebeziumPostgresConnector.java
new file mode 100644
index 0000000..5149294
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-postgresql/src/main/java/org/apache/rocketmq/connect/debezium/postgres/DebeziumPostgresConnector.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium.postgres;
+
+import io.openmessaging.connector.api.component.task.Task;
+import org.apache.rocketmq.connect.debezium.DebeziumConnector;
+
+
+/**
+ * debezium postgresql connector
+ */
+public class DebeziumPostgresConnector extends DebeziumConnector {
+
+ private static final String DEFAULT_CONNECTOR = "io.debezium.connector.postgresql.PostgresConnector";
+
+ /**
+ * Return the current connector class
+ * @return task implement class
+ */
+ @Override
+ public Class<? extends Task> taskClass() {
+ return DebeziumPostgresSource.class;
+ }
+
+ /**
+ * get connector class
+ */
+ @Override
+ public String getConnectorClass() {
+ return DEFAULT_CONNECTOR;
+ }
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-postgresql/src/main/java/org/apache/rocketmq/connect/debezium/postgres/DebeziumPostgresSource.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-postgresql/src/main/java/org/apache/rocketmq/connect/debezium/postgres/DebeziumPostgresSource.java
new file mode 100644
index 0000000..274a0b4
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-postgresql/src/main/java/org/apache/rocketmq/connect/debezium/postgres/DebeziumPostgresSource.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium.postgres;
+
+import org.apache.rocketmq.connect.debezium.DebeziumSource;
+
+/**
+ * A rocketmq connect source that runs debezium postgresql source.
+ */
+public class DebeziumPostgresSource extends DebeziumSource {
+ private static final String DEFAULT_TASK = "io.debezium.connector.postgresql.PostgresConnectorTask";
+
+
+ /**
+ * get task class
+ */
+ @Override
+ public String getTaskClass() {
+ return DEFAULT_TASK;
+ }
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-postgresql/src/main/resources/debezium-postgres-source-config.yaml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-postgresql/src/main/resources/debezium-postgres-source-config.yaml
new file mode 100644
index 0000000..fe95886
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-postgresql/src/main/resources/debezium-postgres-source-config.yaml
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-sqlserver/pom.xml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-sqlserver/pom.xml
new file mode 100644
index 0000000..d160732
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-sqlserver/pom.xml
@@ -0,0 +1,187 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>rocketmq-connect-debezium</artifactId>
+ <groupId>org.apache.rocketmq</groupId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>rocketmq-connect-debezium-sqlserver</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>kafka-connect-adaptor</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>rocketmq-connect-debezium-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-connector</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>io.openmessaging</groupId>
+ <artifactId>openmessaging-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-connector-sqlserver</artifactId>
+ <version>${debezium.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ <filtering>true</filtering>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>versions-maven-plugin</artifactId>
+ <version>2.3</version>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>clirr-maven-plugin</artifactId>
+ <version>2.7</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+ <outputDirectory>${project.build.directory}/lib</outputDirectory>
+ <excludeTransitive>false</excludeTransitive>
+ <stripVersion>true</stripVersion>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.6.1</version>
+ <configuration>
+ <source>${maven.compiler.source}</source>
+ <target>${maven.compiler.target}</target>
+ <compilerVersion>${maven.compiler.source}</compilerVersion>
+ <showDeprecation>true</showDeprecation>
+ <showWarnings>true</showWarnings>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.19.1</version>
+ <configuration>
+ <argLine>-Xms512m -Xmx1024m</argLine>
+ <forkMode>always</forkMode>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.6</version>
+ <configuration>
+ <locales>en_US</locales>
+ <outputEncoding>UTF-8</outputEncoding>
+ <inputEncoding>UTF-8</inputEncoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>3.0.1</version>
+ <executions>
+ <execution>
+ <id>attach-sources</id>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.10.4</version>
+ <configuration>
+ <charset>UTF-8</charset>
+ <locale>en_US</locale>
+ <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+ </configuration>
+ <executions>
+ <execution>
+ <id>aggregate</id>
+ <goals>
+ <goal>aggregate</goal>
+ </goals>
+ <phase>site</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>3.0.2</version>
+ <configuration>
+ <encoding>${project.build.sourceEncoding}</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>3.0.4</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.0.0</version>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>2.17</version>
+ <executions>
+ <execution>
+ <id>verify</id>
+ <phase>verify</phase>
+ <configuration>
+ <configLocation>../../../style/rmq_checkstyle.xml</configLocation>
+ <encoding>UTF-8</encoding>
+ <consoleOutput>true</consoleOutput>
+ <failsOnError>true</failsOnError>
+ <includeTestSourceDirectory>false</includeTestSourceDirectory>
+ <includeTestResources>false</includeTestResources>
+ </configuration>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-sqlserver/src/main/java/org/apache/rocketmq/connect/debezium/sqlserver/DebeziumSqlServerConnector.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-sqlserver/src/main/java/org/apache/rocketmq/connect/debezium/sqlserver/DebeziumSqlServerConnector.java
new file mode 100644
index 0000000..db426e9
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-sqlserver/src/main/java/org/apache/rocketmq/connect/debezium/sqlserver/DebeziumSqlServerConnector.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium.sqlserver;
+
+import io.openmessaging.connector.api.component.task.Task;
+import org.apache.rocketmq.connect.debezium.DebeziumConnector;
+
+
+/**
+ * debezium sqlserver connector
+ */
+public class DebeziumSqlServerConnector extends DebeziumConnector {
+
+ private static final String DEFAULT_CONNECTOR = "io.debezium.connector.sqlserver.SqlServerConnector";
+
+ /**
+ * Return the current connector class
+ * @return task implement class
+ */
+ @Override
+ public Class<? extends Task> taskClass() {
+ return DebeziumSqlServerSource.class;
+ }
+
+ /**
+ * get connector class
+ */
+ @Override
+ public String getConnectorClass() {
+ return DEFAULT_CONNECTOR;
+ }
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-sqlserver/src/main/java/org/apache/rocketmq/connect/debezium/sqlserver/DebeziumSqlServerSource.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-sqlserver/src/main/java/org/apache/rocketmq/connect/debezium/sqlserver/DebeziumSqlServerSource.java
new file mode 100644
index 0000000..b9f68d1
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-sqlserver/src/main/java/org/apache/rocketmq/connect/debezium/sqlserver/DebeziumSqlServerSource.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium.sqlserver;
+
+import org.apache.rocketmq.connect.debezium.DebeziumSource;
+
+/**
+ * A rocketmq connect source that runs debezium sqlserver source
+ */
+public class DebeziumSqlServerSource extends DebeziumSource {
+ private static final String DEFAULT_TASK = "io.debezium.connector.sqlserver.SqlServerConnectorTask";
+
+ /**
+ * get task class
+ */
+ @Override
+ public String getTaskClass() {
+ return DEFAULT_TASK;
+ }
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-sqlserver/src/main/resources/debezium-sqlserver-source-config.yaml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-sqlserver/src/main/resources/debezium-sqlserver-source-config.yaml
new file mode 100644
index 0000000..b8981cf
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-sqlserver/src/main/resources/debezium-sqlserver-source-config.yaml
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# speercific language governing permissions and limitations
+# under the License.
+#
+
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/DebeziumTimeTypes.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/DebeziumTimeTypes.java
new file mode 100644
index 0000000..81268cd
--- /dev/null
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/DebeziumTimeTypes.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jdbc.common;
+
+import java.time.LocalDate;
+import java.time.ZoneOffset;
+
+
+/**
+ * debezium time type
+ */
+public class DebeziumTimeTypes {
+ public static final String DATE = "io.debezium.time.Date";
+ public static final String INTERVAL = "io.debezium.time.Interval";
+ public static final String MICRO_DURATION = "io.debezium.time.MicroDuration";
+ public static final String MICRO_TIME = "io.debezium.time.MicroTime";
+ public static final String MICRO_TIMESTAMP = "io.debezium.time.MicroTimestamp";
+ public static final String NANO_DURATION = "io.debezium.time.NanoDuration";
+ public static final String NANO_TIME = "io.debezium.time.NanoTime";
+ public static final String NANO_TIMESTAMP = "io.debezium.time.NanoTimestamp";
+ public static final String TIME = "io.debezium.time.Time";
+ public static final String TIMESTAMP = "io.debezium.time.Timestamp";
+ public static final String YEAR = "io.debezium.time.Year";
+ public static final String ZONED_TIME = "io.debezium.time.ZonedTime";
+ public static final String ZONED_TIMESTAMP = "io.debezium.time.ZonedTimestamp";
+
+ public static Object toMillsTimestamp(String schemaName, Object value){
+ if(schemaName == null){
+ return value;
+ }
+ switch (schemaName){
+ case DATE:
+ return LocalDate.ofEpochDay((long)value)
+ .atStartOfDay(ZoneOffset.ofHours(8))
+ .toInstant()
+ .toEpochMilli();
+ case TIMESTAMP:
+ return value;
+ default:
+ return value;
+ }
+ }
+
+}
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/HeaderField.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/HeaderField.java
new file mode 100644
index 0000000..8b150da
--- /dev/null
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/HeaderField.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jdbc.common;
+
+/**
+ * header field
+ */
+public interface HeaderField {
+
+ String __source_table_key="__source_table";
+ String __source_db_key="__source_db";
+ String __sink_table_key="__sink_table";
+ String __sink_db_key="__sink_db";
+}
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/GenericDatabaseDialect.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/GenericDatabaseDialect.java
index 089d10d..81cec14 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/GenericDatabaseDialect.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/GenericDatabaseDialect.java
@@ -22,6 +22,7 @@ import io.openmessaging.connector.api.data.SchemaBuilder;
import io.openmessaging.connector.api.data.logical.Date;
import io.openmessaging.connector.api.data.logical.Decimal;
import io.openmessaging.connector.api.data.logical.Time;
+import org.apache.rocketmq.connect.jdbc.common.DebeziumTimeTypes;
import org.apache.rocketmq.connect.jdbc.config.AbstractConfig;
import org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConfig;
import org.apache.rocketmq.connect.jdbc.connector.JdbcSourceConfig;
@@ -1611,6 +1612,19 @@ public class GenericDatabaseDialect implements DatabaseDialect {
DateTimeUtils.getTimeZoneCalendar(timeZone)
);
return true;
+
+ case DebeziumTimeTypes.DATE:
+ statement.setDate(index,
+ new java.sql.Date((long)DebeziumTimeTypes.toMillsTimestamp(DebeziumTimeTypes.DATE, value)),
+ DateTimeUtils.getTimeZoneCalendar(timeZone)
+ );
+ return true;
+ case DebeziumTimeTypes.TIMESTAMP:
+ statement.setTimestamp(index,
+ new java.sql.Timestamp((long)DebeziumTimeTypes.toMillsTimestamp(DebeziumTimeTypes.TIMESTAMP, value)),
+ DateTimeUtils.getTimeZoneCalendar(timeZone)
+ );
+ return true;
default:
return false;
}
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java
index dcd22f9..d1d031d 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.connect.jdbc.sink;
import io.openmessaging.connector.api.data.ConnectRecord;
+import org.apache.rocketmq.connect.jdbc.common.HeaderField;
import org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConfig;
import org.apache.rocketmq.connect.jdbc.dialect.DatabaseDialect;
import org.apache.rocketmq.connect.jdbc.dialect.provider.CachedConnectionProvider;
@@ -100,6 +101,9 @@ public class Updater {
TableId destinationTable(ConnectRecord record) {
// todo table from header
+ if (config.isTableFromHeader()){
+ return dbDialect.parseToTableId(record.getExtensions().getString(HeaderField.__source_table_key));
+ }
return dbDialect.parseToTableId(record.getSchema().getName());
}
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
index 125e269..f649bed 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
@@ -598,24 +598,27 @@ public class WorkerSinkTask implements WorkerTask {
String bodyStr = new String(body, StandardCharsets.UTF_8);
sinkDataEntry = new ConnectRecord(recordPartition, recordOffset, timestamp, schema, bodyStr);
- KeyValue keyValue = new DefaultKeyValue();
- if (MapUtils.isNotEmpty(properties)) {
- for (Map.Entry<String, String> entry : properties.entrySet()) {
- if (MQ_SYS_KEYS.contains(entry.getKey())) {
- keyValue.put("MQ-SYS-" + entry.getKey(), entry.getValue());
- } else if (entry.getKey().startsWith("connect-ext-")) {
- keyValue.put(entry.getKey().replaceAll("connect-ext-", ""), entry.getValue());
- } else {
- keyValue.put(entry.getKey(), entry.getValue());
- }
- }
- }
- sinkDataEntry.addExtension(keyValue);
+
} else {
final byte[] messageBody = message.getBody();
String s = new String(messageBody);
sinkDataEntry = JSON.parseObject(s, ConnectRecord.class);
}
+
+ KeyValue keyValue = new DefaultKeyValue();
+ if (MapUtils.isNotEmpty(properties)) {
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ if (MQ_SYS_KEYS.contains(entry.getKey())) {
+ keyValue.put("MQ-SYS-" + entry.getKey(), entry.getValue());
+ } else if (entry.getKey().startsWith("connect-ext-")) {
+ keyValue.put(entry.getKey().replaceAll("connect-ext-", ""), entry.getValue());
+ } else {
+ keyValue.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ sinkDataEntry.addExtension(keyValue);
+
return sinkDataEntry;
}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index 2a2cb07..1271df7 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -321,8 +321,8 @@ public class WorkerSourceTask implements WorkerTask {
throw new ConnectException("source connect lack of topic config");
}
sourceMessage.setTopic(topic);
+ putExtendMsgProperty(sourceDataEntry, sourceMessage, topic);
if (null == recordConverter || recordConverter instanceof RocketMQConverter) {
- putExtendMsgProperty(sourceDataEntry, sourceMessage, topic);
Object payload = sourceDataEntry.getData();
if (null != payload) {
final byte[] messageBody = (String.valueOf(payload)).getBytes();
@@ -396,6 +396,7 @@ public class WorkerSourceTask implements WorkerTask {
log.info("extension keySet null.");
return;
}
+
for (String key : keySet) {
if (WHITE_KEY_SET.contains(key)) {
MessageAccessor.putProperty(sourceMessage, key, extensionKeyValues.getString(key));
@@ -403,6 +404,7 @@ public class WorkerSourceTask implements WorkerTask {
MessageAccessor.putProperty(sourceMessage, "connect-ext-" + key, extensionKeyValues.getString(key));
}
}
+
}
@Override