You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/06/22 06:26:50 UTC

[rocketmq-connect] branch master updated: [ISSUE #93] Connect plugins supports CDC mode, such as MySQL binlog (#152)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new d8ee3af  [ISSUE #93] Connect plugins supports CDC mode, such as MySQL binlog  (#152)
d8ee3af is described below

commit d8ee3af54c7cb00ad22c4180e313cb20fd5dc6c7
Author: xiaoyi <su...@163.com>
AuthorDate: Wed Jun 22 14:26:46 2022 +0800

    [ISSUE #93] Connect plugins supports CDC mode, such as MySQL binlog  (#152)
    
    * rocketmq connect supports debezium CDC mode #93
    
    * Abstruct  DebeziumConnector
    
    * add debezium connector validate
    
    * add connect validate、transform #93
    
    * test debezium mysql #93
    
    * modify kafka transform prefix
    
    * add debezium config template
    
    * upgrade rocketmq connect JDBC plug-in #153
    
    * fixed api #93
    
    * fixed converter
    
    * perfect conversion
    
    * upgrade rocketmq jdbc plugin and runtime api to 0.1.3 #153
    
    * upgrade debezium api to 0.1.3
    
    * upgrade debezium api to 0.1.3
    
    * Enhance Kafka connect adaptability #93
    
    * fixed
    
    * add Decimal logical
    
    * optimize transform api
    
    * upgrade api to 0.1.3
    
    * upgrade api to 0.1.3
    
    * upgrade api to 0.1.3
    
    * fixed
    
    * fixed
    
    * fixed
    
    Co-authored-by: “sunxiaojian” <“sunxiaojian926@163.com”>
    Co-authored-by: xiaojian.sxj <xi...@alibaba-inc.com>
---
 connectors/rocketmq-connect-debezium/README.md     |   0
 .../kafka-connect-adaptor/pom.xml                  |  78 +++++
 .../connect/adaptor/config/ConnectKeyValue.java    | 120 ++++++++
 .../connector/AbstractKafkaSinkConnector.java      |  99 ++++++
 .../connector/AbstractKafkaSourceConnector.java    |  99 ++++++
 .../adaptor/connector/ConnectorClassSetter.java    |  22 ++
 .../adaptor/connector/KafkaConnectorContext.java   |  23 ++
 .../connector/KafkaSinkAdaptorConnector.java       |  32 ++
 .../connector/KafkaSourceAdaptorConnector.java     |  32 ++
 .../adaptor/context/KafkaOffsetStorageReader.java  |  69 +++++
 .../context/RocketMQKafkaErrantRecordReporter.java |  28 ++
 .../context/RocketMQKafkaSinkTaskContext.java      | 151 +++++++++
 .../context/RocketMQKafkaSourceTaskContext.java    |  44 +++
 .../kafka/connect/adaptor/schema/Converters.java   | 161 ++++++++++
 .../adaptor/schema/KafkaSinkSchemaConverter.java   | 311 +++++++++++++++++++
 .../adaptor/schema/KafkaSinkValueConverter.java    | 132 ++++++++
 .../schema/RocketMQSourceSchemaConverter.java      | 302 ++++++++++++++++++
 .../schema/RocketMQSourceValueConverter.java       | 137 +++++++++
 .../adaptor/task/AbstractKafkaConnectSink.java     | 122 ++++++++
 .../adaptor/task/AbstractKafkaConnectSource.java   | 132 ++++++++
 .../adaptor/task/KafkaConnectAdaptorSink.java      |  67 ++++
 .../adaptor/task/KafkaConnectAdaptorSource.java    |  69 +++++
 .../connect/adaptor/task/TaskClassSetter.java      |  23 ++
 .../adaptor/transforms/TransformationWrapper.java  |  89 ++++++
 .../connect/adaptor/SourceRecordConverterTest.java | 203 +++++++++++++
 connectors/rocketmq-connect-debezium/pom.xml       | 338 +++++++++++++++++++++
 .../rocketmq-connect-debezium-core/pom.xml         | 103 +++++++
 .../rocketmq/connect/debezium/ConnectUtil.java     | 232 ++++++++++++++
 .../connect/debezium/DebeziumConnector.java        |  36 +++
 .../rocketmq/connect/debezium/DebeziumSource.java  |  42 +++
 .../connect/debezium/RocketMqConnectConfig.java    | 194 ++++++++++++
 .../connect/debezium/RocketMqDatabaseHistory.java  | 255 ++++++++++++++++
 .../debezium/SchemaRenameTransformation.java       |  29 ++
 .../rocketmq-connect-debezium-mongodb/pom.xml      | 190 ++++++++++++
 .../debezium/mongodb/DebeziumMongoDBConnector.java |  49 +++
 .../debezium/mongodb/DebeziumMongoDBSource.java    |  37 +++
 .../resources/debezium-mongodb-source-config.yaml  |  18 ++
 .../rocketmq-connect-debezium-mysql/pom.xml        | 197 ++++++++++++
 .../debezium/mysql/DebeziumMysqlConnector.java     |  47 +++
 .../debezium/mysql/DebeziumMysqlSource.java        |  37 +++
 .../resources/debezium-mysql-source-config.yaml    |  46 +++
 .../rocketmq-connect-debezium-oracle/pom.xml       | 190 ++++++++++++
 .../debezium/oracle/DebeziumOracleConnector.java   |  47 +++
 .../debezium/oracle/DebeziumOracleSource.java      |  37 +++
 .../resources/debezium-oracle-source-config.yaml   |  19 ++
 .../rocketmq-connect-debezium-postgresql/pom.xml   | 195 ++++++++++++
 .../postgres/DebeziumPostgresConnector.java        |  47 +++
 .../debezium/postgres/DebeziumPostgresSource.java  |  36 +++
 .../resources/debezium-postgres-source-config.yaml |  18 ++
 .../rocketmq-connect-debezium-sqlserver/pom.xml    | 187 ++++++++++++
 .../sqlserver/DebeziumSqlServerConnector.java      |  47 +++
 .../sqlserver/DebeziumSqlServerSource.java         |  35 +++
 .../debezium-sqlserver-source-config.yaml          |  19 ++
 .../connect/jdbc/common/DebeziumTimeTypes.java     |  59 ++++
 .../rocketmq/connect/jdbc/common/HeaderField.java  |  29 ++
 .../jdbc/dialect/impl/GenericDatabaseDialect.java  |  14 +
 .../apache/rocketmq/connect/jdbc/sink/Updater.java |   4 +
 .../runtime/connectorwrapper/WorkerSinkTask.java   |  29 +-
 .../runtime/connectorwrapper/WorkerSourceTask.java |   4 +-
 59 files changed, 5397 insertions(+), 14 deletions(-)

diff --git a/connectors/rocketmq-connect-debezium/README.md b/connectors/rocketmq-connect-debezium/README.md
new file mode 100644
index 0000000..e69de29
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/pom.xml b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/pom.xml
new file mode 100644
index 0000000..64190bb
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/pom.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>rocketmq-connect-debezium</artifactId>
+        <groupId>org.apache.rocketmq</groupId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>kafka-connect-adaptor</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+
+    <dependencies>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>io.openmessaging</groupId>
+                    <artifactId>openmessaging-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-tools</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-remoting</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-openmessaging</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-runtime</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/config/ConnectKeyValue.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/config/ConnectKeyValue.java
new file mode 100644
index 0000000..6856a07
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/config/ConnectKeyValue.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.connect.adaptor.config;
+
+import io.openmessaging.KeyValue;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * connect key value
+ */
+public class ConnectKeyValue implements KeyValue {
+    private Map<String, String> properties;
+
+    public ConnectKeyValue() {
+        properties = new ConcurrentHashMap<String, String>();
+    }
+
+    @Override
+    public KeyValue put(String key, int value) {
+        properties.put(key, String.valueOf(value));
+        return this;
+    }
+
+    @Override
+    public KeyValue put(String key, long value) {
+        properties.put(key, String.valueOf(value));
+        return this;
+    }
+
+    @Override
+    public KeyValue put(String key, double value) {
+        properties.put(key, String.valueOf(value));
+        return this;
+    }
+
+    @Override
+    public KeyValue put(String key, String value) {
+        properties.put(key, String.valueOf(value));
+        return this;
+    }
+
+    @Override
+    public int getInt(String key) {
+        if (!properties.containsKey(key))
+            return 0;
+        return Integer.valueOf(properties.get(key));
+    }
+
+    @Override
+    public int getInt(final String key, final int defaultValue) {
+        return properties.containsKey(key) ? getInt(key) : defaultValue;
+    }
+
+    @Override
+    public long getLong(String key) {
+        if (!properties.containsKey(key))
+            return 0;
+        return Long.valueOf(properties.get(key));
+    }
+
+    @Override
+    public long getLong(final String key, final long defaultValue) {
+        return properties.containsKey(key) ? getLong(key) : defaultValue;
+    }
+
+    @Override
+    public double getDouble(String key) {
+        if (!properties.containsKey(key))
+            return 0;
+        return Double.valueOf(properties.get(key));
+    }
+
+    @Override
+    public double getDouble(final String key, final double defaultValue) {
+        return properties.containsKey(key) ? getDouble(key) : defaultValue;
+    }
+
+    @Override
+    public String getString(String key) {
+        return properties.get(key);
+    }
+
+    @Override
+    public String getString(final String key, final String defaultValue) {
+        return properties.containsKey(key) ? getString(key) : defaultValue;
+    }
+
+    @Override
+    public Set<String> keySet() {
+        return properties.keySet();
+    }
+
+    @Override
+    public boolean containsKey(String key) {
+        return properties.containsKey(key);
+    }
+
+    public Map<String, String> config() {
+        return properties;
+    }
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/AbstractKafkaSinkConnector.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/AbstractKafkaSinkConnector.java
new file mode 100644
index 0000000..8e5391e
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/AbstractKafkaSinkConnector.java
@@ -0,0 +1,99 @@
+package org.apache.rocketmq.connect.kafka.connect.adaptor.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkConnector;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import io.openmessaging.connector.api.errors.ConnectException;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.config.ConnectKeyValue;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * kafka source connector
+ */
+public abstract class AbstractKafkaSinkConnector extends SinkConnector implements ConnectorClassSetter {
+
+    /**
+     * kafka connect init
+     */
+    protected ConnectKeyValue configValue;
+
+    /**
+     * source connector
+     */
+    protected org.apache.kafka.connect.sink.SinkConnector sinkConnector;
+
+    /**
+     * task config
+     */
+    protected Map<String, String> taskConfig;
+
+    /**
+     * try override start and stop
+     * @return
+     */
+    protected org.apache.kafka.connect.sink.SinkConnector originalSinkConnector(){
+        return sinkConnector;
+    }
+
+    /**
+     * Returns a set of configurations for Tasks based on the current configuration,
+     * producing at most count configurations.
+     * @param maxTasks maximum number of configurations to generate
+     * @return configurations for Tasks
+     */
+    @Override
+    public List<KeyValue> taskConfigs(int maxTasks) {
+        List<Map<String, String>> groupConnectors = sinkConnector.taskConfigs(maxTasks);
+        List<KeyValue> configs = new ArrayList<>();
+        for (Map<String, String> configMaps : groupConnectors) {
+            KeyValue keyValue = new DefaultKeyValue();
+            configMaps.forEach((k, v)->{
+                keyValue.put(k, v);
+            });
+            configs.add(keyValue);
+        }
+        return configs;
+    }
+
+    /**
+     * Start the component
+     * @param config component context
+     */
+    @Override
+    public void start(KeyValue config) {
+        this.configValue = new ConnectKeyValue();
+        config.keySet().forEach(key -> {
+            this.configValue.put(key, config.getString(key));
+        });
+        setConnectorClass(configValue);
+        taskConfig = new HashMap<>(configValue.config());
+        // get the source class name from config and create source task from reflection
+        try {
+            sinkConnector = Class.forName(taskConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG))
+                    .asSubclass(org.apache.kafka.connect.sink.SinkConnector.class)
+                    .getDeclaredConstructor()
+                    .newInstance();
+        } catch (Exception e) {
+            throw new ConnectException("Load task class failed, " + taskConfig.get(TaskConfig.TASK_CLASS_CONFIG));
+        }
+    }
+
+    /**
+     * Stop the component.
+     */
+    @Override
+    public void stop() {
+        if (sinkConnector != null){
+            sinkConnector = null;
+            configValue = null;
+            taskConfig = null;
+        }
+    }
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/AbstractKafkaSourceConnector.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/AbstractKafkaSourceConnector.java
new file mode 100644
index 0000000..f9d5456
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/AbstractKafkaSourceConnector.java
@@ -0,0 +1,99 @@
+package org.apache.rocketmq.connect.kafka.connect.adaptor.connector;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import io.openmessaging.connector.api.errors.ConnectException;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.config.ConnectKeyValue;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * kafka source connector
+ */
+public abstract class AbstractKafkaSourceConnector extends SourceConnector implements ConnectorClassSetter {
+
+
+    /**
+     * kafka connect init
+     */
+    protected ConnectKeyValue configValue;
+
+    /**
+     * task config
+     */
+    protected Map<String, String> taskConfig;
+
+    /**
+     * source connector
+     */
+    protected org.apache.kafka.connect.source.SourceConnector sourceConnector;
+
+    /**
+     * try override start and stop
+     * @return
+     */
+    protected org.apache.kafka.connect.source.SourceConnector originalSinkConnector(){
+        return sourceConnector;
+    }
+
+    /**
+     * Returns a set of configurations for Tasks based on the current configuration,
+     * producing at most count configurations.
+     * @param maxTasks maximum number of configurations to generate
+     * @return configurations for Tasks
+     */
+    @Override
+    public List<KeyValue> taskConfigs(int maxTasks) {
+        List<Map<String, String>> groupConnectors = sourceConnector.taskConfigs(maxTasks);
+        List<KeyValue> configs = new ArrayList<>();
+        for (Map<String, String> configMaps : groupConnectors) {
+            KeyValue keyValue = new DefaultKeyValue();
+            configMaps.forEach((k, v)->{
+                keyValue.put(k, v);
+            });
+            configs.add(keyValue);
+        }
+        return configs;
+    }
+
+    /**
+     * Start the component
+     * @param config component context
+     */
+    @Override
+    public void start(KeyValue config) {
+        this.configValue = new ConnectKeyValue();
+        config.keySet().forEach(key -> {
+            this.configValue.put(key, config.getString(key));
+        });
+        setConnectorClass(configValue);
+       taskConfig = new HashMap<>(configValue.config());
+        // get the source class name from config and create source task from reflection
+        try {
+            sourceConnector = Class.forName(taskConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG))
+                    .asSubclass(org.apache.kafka.connect.source.SourceConnector.class)
+                    .getDeclaredConstructor()
+                    .newInstance();
+        } catch (Exception e) {
+            throw new ConnectException("Load task class failed, " + taskConfig.get(TaskConfig.TASK_CLASS_CONFIG));
+        }
+    }
+
+    /**
+     * Stop the component.
+     */
+    @Override
+    public void stop() {
+        if (sourceConnector != null){
+            sourceConnector = null;
+            configValue =  null;
+            this.taskConfig = null;
+        }
+    }
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/ConnectorClassSetter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/ConnectorClassSetter.java
new file mode 100644
index 0000000..b0c0007
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/ConnectorClassSetter.java
@@ -0,0 +1,22 @@
+package org.apache.rocketmq.connect.kafka.connect.adaptor.connector;
+
+import io.openmessaging.KeyValue;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+
+/**
+ * connector class setter
+ */
+public interface ConnectorClassSetter {
+    /**
+     * set connector class
+     * @param config
+     */
+     default void setConnectorClass(KeyValue config) {
+         config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, getConnectorClass());
+     }
+
+    /**
+     * get connector class
+     */
+    String getConnectorClass();
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaConnectorContext.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaConnectorContext.java
new file mode 100644
index 0000000..a42c507
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaConnectorContext.java
@@ -0,0 +1,23 @@
+package org.apache.rocketmq.connect.kafka.connect.adaptor.connector;
+
+import org.apache.kafka.connect.connector.ConnectorContext;
+
+/**
+ * kafka connect context
+ */
+public class KafkaConnectorContext implements ConnectorContext {
+    io.openmessaging.connector.api.component.connector.ConnectorContext context;
+    public KafkaConnectorContext(io.openmessaging.connector.api.component.connector.ConnectorContext context){
+        this.context = context;
+    }
+
+    @Override
+    public void requestTaskReconfiguration() {
+        context.requestTaskReconfiguration();
+    }
+
+    @Override
+    public void raiseError(Exception e) {
+        context.raiseError(e);
+    }
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaSinkAdaptorConnector.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaSinkAdaptorConnector.java
new file mode 100644
index 0000000..90575bf
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaSinkAdaptorConnector.java
@@ -0,0 +1,32 @@
+package org.apache.rocketmq.connect.kafka.connect.adaptor.connector;
+
+import io.openmessaging.KeyValue;
+
+/**
+ * kafka source connector
+ */
+public abstract class KafkaSinkAdaptorConnector extends AbstractKafkaSinkConnector {
+
+    /**
+     * Start the component
+     * @param config component context
+     */
+    @Override
+    public void start(KeyValue config) {
+        super.start(config);
+        sinkConnector.validate(taskConfig);
+        sinkConnector.initialize(new KafkaConnectorContext(connectorContext));
+        sinkConnector.start(taskConfig);
+    }
+
+    /**
+     * Stop the component.
+     */
+    @Override
+    public void stop() {
+        if (sinkConnector != null){
+           sinkConnector.stop();
+        }
+        super.stop();
+    }
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaSourceAdaptorConnector.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaSourceAdaptorConnector.java
new file mode 100644
index 0000000..cea0a22
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaSourceAdaptorConnector.java
@@ -0,0 +1,32 @@
+package org.apache.rocketmq.connect.kafka.connect.adaptor.connector;
+
+import io.openmessaging.KeyValue;
+
+/**
+ * kafka source connector
+ */
+public abstract class KafkaSourceAdaptorConnector extends AbstractKafkaSourceConnector {
+
+    /**
+     * Start the component
+     * @param config component context
+     */
+    @Override
+    public void start(KeyValue config) {
+        super.start(config);
+        sourceConnector.validate(taskConfig);
+        sourceConnector.initialize(new KafkaConnectorContext(connectorContext));
+        sourceConnector.start(taskConfig);
+    }
+
+    /**
+     * Stop the component.
+     */
+    @Override
+    public void stop() {
+        if (sourceConnector != null){
+            sourceConnector.stop();
+        }
+        super.stop();
+    }
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/KafkaOffsetStorageReader.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/KafkaOffsetStorageReader.java
new file mode 100644
index 0000000..ac8307b
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/KafkaOffsetStorageReader.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.connect.adaptor.context;
+
+import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * kafka offset storage reader
+ */
+public class KafkaOffsetStorageReader implements CloseableOffsetStorageReader {
+    SourceTaskContext context;
+
+    public KafkaOffsetStorageReader(SourceTaskContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public void close() {
+        this.context = null;
+    }
+
+    @Override
+    public <T> Map<String, Object> offset(Map<String, T> map) {
+        RecordPartition partition = new RecordPartition(map);
+        RecordOffset offset = context.offsetStorageReader().readOffset(partition);
+        return (Map<String, Object>) offset.getOffset();
+    }
+
+    @Override
+    public <T> Map<Map<String, T>, Map<String, Object>> offsets(Collection<Map<String, T>> collection) {
+        List<RecordPartition> partitions = new ArrayList<>();
+        collection.forEach(partitionMap -> {
+            RecordPartition partition = new RecordPartition(partitionMap);
+            partitions.add(partition);
+        });
+        Map<Map<String, T>, Map<String, Object>> offsetMap = new ConcurrentHashMap<>();
+        Map<RecordPartition, RecordOffset> offsets = context.offsetStorageReader().readOffsets(partitions);
+        offsets.forEach((partition, offset) -> {
+            Map<String, T> mapPartition = (Map<String, T>) partition.getPartition();
+            Map<String, Object> mapOffset = (Map<String, Object>) offset.getOffset();
+            offsetMap.put(mapPartition, mapOffset);
+        });
+        return offsetMap;
+    }
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaErrantRecordReporter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaErrantRecordReporter.java
new file mode 100644
index 0000000..3b951c6
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaErrantRecordReporter.java
@@ -0,0 +1,28 @@
+package org.apache.rocketmq.connect.kafka.connect.adaptor.context;
+
+import io.openmessaging.connector.api.component.task.sink.ErrorRecordReporter;
+import io.openmessaging.connector.api.component.task.sink.SinkTaskContext;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.schema.Converters;
+
+import java.util.concurrent.Future;
+
+/**
+ * rocketmq kafka error record reporter
+ */
+public class RocketMQKafkaErrantRecordReporter implements ErrantRecordReporter {
+    final ErrorRecordReporter errorRecordReporter;
+    RocketMQKafkaErrantRecordReporter(SinkTaskContext context){
+        errorRecordReporter = context.errorRecordReporter();
+    }
+
+    @Override
+    public Future<Void> report(SinkRecord sinkRecord, Throwable throwable) {
+        // 数据转换
+        ConnectRecord record = Converters.fromSinkRecord(sinkRecord);
+        errorRecordReporter.report(record, throwable);
+        return null;
+    }
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaSinkTaskContext.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaSinkTaskContext.java
new file mode 100644
index 0000000..0fd48a8
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaSinkTaskContext.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.connect.adaptor.context;
+
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.sink.ErrantRecordReporter;
+import org.apache.kafka.connect.sink.SinkTaskContext;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * rocketmq kafka sink task context
+ */
+public class RocketMQKafkaSinkTaskContext implements SinkTaskContext {
+    public static final String BROKER_NAME = "brokerName";
+    public static final String QUEUE_ID = "queueId";
+    public static final String TOPIC = "topic";
+    public static final String QUEUE_OFFSET = "queueOffset";
+
+    private final io.openmessaging.connector.api.component.task.sink.SinkTaskContext sinkContext;
+    private final Map<String, String> taskConfig;
+    private final ErrantRecordReporter errantRecordReporter;
+
+    public RocketMQKafkaSinkTaskContext(io.openmessaging.connector.api.component.task.sink.SinkTaskContext context, Map<String, String> taskConfig) {
+        this.sinkContext = context;
+        this.taskConfig = taskConfig;
+        this.errantRecordReporter = new RocketMQKafkaErrantRecordReporter(context);
+    }
+
+    @Override
+    public Map<String, String> configs() {
+        return taskConfig;
+    }
+
+    @Override
+    public void offset(Map<TopicPartition, Long> map) {
+        Map<RecordPartition, RecordOffset> offsets = new HashMap<>();
+        map.forEach((topicPartition, offset)->{
+            offsets.put(convertToRecordPartition(topicPartition), convertToRecordOffset(offset));
+        });
+        sinkContext.resetOffset(offsets);
+    }
+
+    @Override
+    public void offset(TopicPartition topicPartition, long l) {
+        sinkContext.resetOffset(convertToRecordPartition(topicPartition), convertToRecordOffset(l));
+    }
+
+    @Override
+    public void timeout(long l) {
+
+    }
+
+    @Override
+    public Set<TopicPartition> assignment() {
+        Set<TopicPartition> topicPartitions =  new HashSet<>();
+        Set<RecordPartition> recordPartitions = sinkContext.assignment();
+        recordPartitions.forEach(partition -> {
+            topicPartitions.add(convertToTopicPartition(partition.getPartition()));
+        });
+        return topicPartitions;
+    }
+
+    @Override
+    public void pause(TopicPartition... topicPartitions) {
+        List<RecordPartition> partitions = new ArrayList<>();
+        for (TopicPartition topicPartition : topicPartitions){
+            RecordPartition recordPartition = convertToRecordPartition(topicPartition);
+            if (recordPartition != null){
+                partitions.add(recordPartition);
+            }
+        }
+        sinkContext.pause(partitions);
+    }
+
+    @Override
+    public void resume(TopicPartition... topicPartitions) {
+        List<RecordPartition> partitions = new ArrayList<>();
+        for (TopicPartition topicPartition : topicPartitions){
+            RecordPartition recordPartition = convertToRecordPartition(topicPartition);
+            if (recordPartition != null){
+                partitions.add(recordPartition);
+            }
+        }
+        sinkContext.resume(partitions);
+    }
+
+    @Override
+    public void requestCommit() {
+    }
+
+
+    @Override
+    public ErrantRecordReporter errantRecordReporter() {
+        return errantRecordReporter;
+    }
+
+
+    /**
+     * convert to kafka topic partition
+     * @param partitionMap
+     * @return
+     */
+    public TopicPartition convertToTopicPartition(Map<String, ?> partitionMap) {
+        if (partitionMap.containsKey(TOPIC) && partitionMap.containsKey(QUEUE_ID)){
+            return new TopicPartition(partitionMap.get(TOPIC).toString(), Integer.valueOf(partitionMap.get(QUEUE_ID).toString()));
+        }
+        return null;
+    }
+
+    /**
+     * convert to rocketmq record partition
+     * @param topicPartition
+     * @return
+     */
+    public RecordPartition convertToRecordPartition(TopicPartition topicPartition) {
+        if (topicPartition != null){
+            return new RecordPartition(Collections.singletonMap(topicPartition.topic(),topicPartition.partition()));
+        }
+        return null;
+    }
+
+    private RecordOffset convertToRecordOffset(long l) {
+        return new RecordOffset(Collections.singletonMap(QUEUE_OFFSET,l));
+    }
+
+
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaSourceTaskContext.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaSourceTaskContext.java
new file mode 100644
index 0000000..6716dcf
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaSourceTaskContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.connect.adaptor.context;
+
+import org.apache.kafka.connect.source.SourceTaskContext;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
+
+import java.util.Map;
+
+public class RocketMQKafkaSourceTaskContext implements SourceTaskContext {
+
+    private final OffsetStorageReader reader;
+    private final Map<String, String> taskConfig;
+
+    public RocketMQKafkaSourceTaskContext(OffsetStorageReader reader, Map<String, String> taskConfig) {
+        this.reader = reader;
+        this.taskConfig = taskConfig;
+    }
+
+    @Override
+    public Map<String, String> configs() {
+        return taskConfig;
+    }
+
+    @Override
+    public OffsetStorageReader offsetStorageReader() {
+        return reader;
+    }
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java
new file mode 100644
index 0000000..abc2abf
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.connect.kafka.connect.adaptor.schema;
+
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.context.RocketMQKafkaSinkTaskContext;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * converter transforms record
+ */
+public class Converters {
+
+
+    public static ConnectRecord fromSourceRecord(SourceRecord record){
+        // sourceRecord convert connect Record
+        RocketMQSourceSchemaConverter rocketMQSourceSchemaConverter = new RocketMQSourceSchemaConverter(record.valueSchema());
+        io.openmessaging.connector.api.data.Schema schema = rocketMQSourceSchemaConverter.schema();
+        RocketMQSourceValueConverter rocketMQSourceValueConverter = new RocketMQSourceValueConverter();
+        ConnectRecord connectRecord = new ConnectRecord(
+                new RecordPartition(record.sourcePartition()),
+                new RecordOffset(record.sourceOffset()),
+                record.timestamp(),
+                schema,
+                rocketMQSourceValueConverter.value(schema, record.value()));
+        Iterator<Header> headers = record.headers().iterator();
+        while (headers.hasNext()) {
+            Header header = headers.next();
+            connectRecord.addExtension(header.key(), String.valueOf(header.value()));
+        }
+        return connectRecord;
+    }
+
+
+    public static ConnectRecord fromSinkRecord(SinkRecord record){
+        // sourceRecord convert connect Record
+        RocketMQSourceSchemaConverter rocketMQSourceSchemaConverter = new RocketMQSourceSchemaConverter(record.valueSchema());
+        io.openmessaging.connector.api.data.Schema schema = rocketMQSourceSchemaConverter.schema();
+        RocketMQSourceValueConverter rocketMQSourceValueConverter = new RocketMQSourceValueConverter();
+        ConnectRecord connectRecord = new ConnectRecord(
+                toRecordPartition(record),
+                toRecordOffset(record),
+                record.timestamp(),
+                schema,
+                rocketMQSourceValueConverter.value(schema, record.value()));
+        Iterator<Header> headers = record.headers().iterator();
+        while (headers.hasNext()) {
+            Header header = headers.next();
+            connectRecord.addExtension(header.key(), String.valueOf(header.value()));
+        }
+        return connectRecord;
+    }
+
+
+
+    /**
+     * convert rocketmq connect record to sink record
+     * @param record
+     * @return
+     */
+    public static SinkRecord fromConnectRecord(ConnectRecord record){
+        // connect record  convert kafka  sink record
+        KafkaSinkSchemaConverter kafkaSinkSchemaConverter = new KafkaSinkSchemaConverter(record.getSchema());
+        Schema schema = kafkaSinkSchemaConverter.schema();
+        KafkaSinkValueConverter sinkValueConverter = new KafkaSinkValueConverter();
+        // add headers
+        Headers headers = new ConnectHeaders();
+        Iterator extensions = record.getExtensions().keySet().iterator();
+        while (extensions.hasNext()) {
+            String key = String.valueOf(extensions.next());
+            headers.add(key, record.getExtensions().getString(key) , null);
+        }
+
+        SinkRecord sinkRecord = new SinkRecord(
+                topic(record.getPosition().getPartition()),
+                partition(record.getPosition().getPartition()),
+                null,
+                null,
+                schema,
+                sinkValueConverter.value(schema, record.getData()),
+                offset(record.getPosition().getOffset()),
+                record.getTimestamp(),
+                TimestampType.NO_TIMESTAMP_TYPE,
+                headers
+        );
+        return sinkRecord;
+    }
+
+    public static RecordPartition toRecordPartition(SinkRecord record){
+
+        Map<String, String> recordPartitionMap = new HashMap<>();
+        recordPartitionMap.put(RocketMQKafkaSinkTaskContext.TOPIC, record.topic());
+        recordPartitionMap.put(RocketMQKafkaSinkTaskContext.QUEUE_ID, record.kafkaPartition() + "");
+        return new RecordPartition(recordPartitionMap);
+    }
+
+    public static RecordOffset toRecordOffset(SinkRecord record){
+        Map<String, String> recordOffsetMap = new HashMap<>();
+        recordOffsetMap.put(RocketMQKafkaSinkTaskContext.QUEUE_OFFSET, record.kafkaOffset() + "");
+        return new RecordOffset(recordOffsetMap);
+    }
+
+
+    /**
+     * get topic
+     * @param partition
+     * @return
+     */
+    public static String topic(RecordPartition partition){
+        return partition.getPartition().get(RocketMQKafkaSinkTaskContext.TOPIC).toString();
+    }
+
+    /**
+     * get partition
+     * @param partition
+     * @return
+     */
+    public static int partition(RecordPartition partition){
+        if (partition.getPartition().containsKey(RocketMQKafkaSinkTaskContext.QUEUE_ID)){
+            return Integer.valueOf(partition.getPartition().get(RocketMQKafkaSinkTaskContext.QUEUE_ID).toString());
+        }
+        return -1;
+    }
+    /**
+     * get offset
+     * @param offset
+     * @return
+     */
+    public static int offset(RecordOffset offset){
+        if (offset.getOffset().containsKey(RocketMQKafkaSinkTaskContext.QUEUE_OFFSET)){
+            return Integer.valueOf(offset.getOffset().get(RocketMQKafkaSinkTaskContext.QUEUE_OFFSET).toString());
+        }
+        return -1;
+    }
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkSchemaConverter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkSchemaConverter.java
new file mode 100644
index 0000000..9e8a188
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkSchemaConverter.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.connect.kafka.connect.adaptor.schema;
+
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.logical.Timestamp;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.kafka.connect.data.Date;
+import org.apache.kafka.connect.data.Decimal;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * convert rocketmq connect record data to kafka sink record data
+ */
+public class KafkaSinkSchemaConverter {
+    private static Logger logger = LoggerFactory.getLogger(KafkaSinkSchemaConverter.class);
+    private static Map<String, String> logicalMapping = new HashMap<>();
+    static {
+        logicalMapping.put(io.openmessaging.connector.api.data.logical.Decimal.LOGICAL_NAME,Decimal.LOGICAL_NAME);
+        logicalMapping.put(io.openmessaging.connector.api.data.logical.Date.LOGICAL_NAME, Date.LOGICAL_NAME);
+        logicalMapping.put(io.openmessaging.connector.api.data.logical.Time.LOGICAL_NAME, Time.LOGICAL_NAME);
+        logicalMapping.put(io.openmessaging.connector.api.data.logical.Timestamp.LOGICAL_NAME, Timestamp.LOGICAL_NAME);
+    }
+
+    private SchemaBuilder builder;
+    public KafkaSinkSchemaConverter(Schema schema) {
+        builder = convertKafkaSchema(schema);
+    }
+
+    public org.apache.kafka.connect.data.Schema schema() {
+        return builder.build();
+    }
+
+    /**
+     * convert kafka schema
+     * @param originalSchema
+     * @return
+     */
+    private SchemaBuilder convertKafkaSchema(io.openmessaging.connector.api.data.Schema originalSchema) {
+        String schemaName = convertSchemaName(originalSchema.getName());
+        switch (originalSchema.getFieldType()) {
+            case INT8:
+                return SchemaBuilder
+                        .int8()
+                        .optional()
+                        .name(schemaName)
+                        .doc(originalSchema.getDoc())
+                        .defaultValue(originalSchema.getDefaultValue());
+            case INT16:
+                return SchemaBuilder
+                        .int16()
+                        .optional()
+                        .name(schemaName)
+                        .doc(originalSchema.getDoc())
+                        .defaultValue(originalSchema.getDefaultValue());
+            case INT32:
+                return SchemaBuilder
+                        .int32()
+                        .optional()
+                        .name(schemaName)
+                        .doc(originalSchema.getDoc())
+                        .defaultValue(originalSchema.getDefaultValue());
+            case INT64:
+                return SchemaBuilder
+                        .int64()
+                        .optional()
+                        .name(schemaName)
+                        .doc(originalSchema.getDoc())
+                        .defaultValue(originalSchema.getDefaultValue());
+            case FLOAT32:
+                return SchemaBuilder
+                        .float32()
+                        .optional()
+                        .name(schemaName)
+                        .doc(originalSchema.getDoc())
+                        .defaultValue(originalSchema.getDefaultValue());
+            case FLOAT64:
+                return SchemaBuilder
+                        .float64()
+                        .optional()
+                        .name(schemaName)
+                        .doc(originalSchema.getDoc())
+                        .defaultValue(originalSchema.getDefaultValue());
+            case BOOLEAN:
+                return SchemaBuilder
+                        .bool()
+                        .optional()
+                        .name(schemaName)
+                        .doc(originalSchema.getDoc())
+                        .defaultValue(originalSchema.getDefaultValue());
+            case STRING:
+                return SchemaBuilder.
+                        string()
+                        .optional()
+                        .name(schemaName)
+                        .doc(originalSchema.getDoc())
+                        .defaultValue(originalSchema.getDefaultValue());
+            case BYTES:
+                return SchemaBuilder
+                        .bytes()
+                        .optional()
+                        .name(schemaName)
+                        .doc(originalSchema.getDoc())
+                        .defaultValue(originalSchema.getDefaultValue());
+            case STRUCT:
+                SchemaBuilder schemaBuilder = SchemaBuilder
+                        .struct()
+                        .optional()
+                        .name(schemaName)
+                        .doc(originalSchema.getDoc())
+                        .defaultValue(originalSchema.getDefaultValue());
+                convertStructSchema(schemaBuilder, originalSchema);
+                return schemaBuilder;
+            case ARRAY:
+                return SchemaBuilder.array(convertKafkaSchema(originalSchema.getValueSchema()).build())
+                        .optional()
+                        .name(schemaName)
+                        .doc(originalSchema.getDoc())
+                        .defaultValue(originalSchema.getDefaultValue());
+            case MAP:
+                return SchemaBuilder.map(
+                        convertKafkaSchema(originalSchema.getKeySchema()).build(),
+                        convertKafkaSchema(originalSchema.getValueSchema()).build()
+                ).optional()
+                        .name(schemaName)
+                        .doc(originalSchema.getDoc())
+                        .defaultValue(originalSchema.getDefaultValue());
+            default:
+                throw new RuntimeException(" Type not supported: {}" + originalSchema.getFieldType());
+
+        }
+
+    }
+
+    /**
+     * convert schema
+     *
+     * @param schemaBuilder
+     * @param originalSchema
+     */
+    private void convertStructSchema(org.apache.kafka.connect.data.SchemaBuilder schemaBuilder, io.openmessaging.connector.api.data.Schema originalSchema) {
+        for (Field field : originalSchema.getFields()) {
+            try {
+
+                // schema
+                Schema schema = field.getSchema();
+                String schemaName =  convertSchemaName(field.getSchema().getName());
+
+                // field name
+                String fieldName =  field.getName();
+                FieldType type = schema.getFieldType();
+
+
+                switch (type) {
+                    case INT8:
+                        schemaBuilder.field(
+                                fieldName,
+                                SchemaBuilder
+                                        .int8()
+                                        .name(schemaName)
+                                        .doc(schema.getDoc())
+                                        .defaultValue(schema.getDefaultValue())
+                                        .optional()
+                                        .build()
+                        );
+                        break;
+                    case INT16:
+                        schemaBuilder.field(
+                                fieldName,
+                                SchemaBuilder
+                                        .int16()
+                                        .name(schemaName)
+                                        .doc(schema.getDoc())
+                                        .defaultValue(schema.getDefaultValue())
+                                        .optional()
+                                        .build()
+                        );
+                        break;
+                    case INT32:
+                        schemaBuilder.field(
+                                fieldName,
+                                SchemaBuilder
+                                        .int32()
+                                        .name(schemaName)
+                                        .doc(schema.getDoc())
+                                        .defaultValue(schema.getDefaultValue())
+                                        .optional()
+                                        .build()
+                        );
+                        break;
+                    case INT64:
+                        schemaBuilder.field(
+                                fieldName,
+                                SchemaBuilder
+                                        .int64()
+                                        .name(schemaName)
+                                        .doc(schema.getDoc())
+                                        .defaultValue(schema.getDefaultValue())
+                                        .optional()
+                                        .build()
+                        );
+                        break;
+                    case FLOAT32:
+                        schemaBuilder.field(
+                                fieldName,
+                                SchemaBuilder
+                                        .float32()
+                                        .name(schemaName)
+                                        .doc(schema.getDoc())
+                                        .defaultValue(schema.getDefaultValue())
+                                        .optional()
+                                        .build()
+                        );
+                        break;
+                    case FLOAT64:
+                        schemaBuilder.field(
+                                fieldName,
+                                SchemaBuilder
+                                        .float64()
+                                        .name(schemaName)
+                                        .doc(schema.getDoc())
+                                        .defaultValue(schema.getDefaultValue())
+                                        .optional()
+                                        .build()
+                        );
+                        break;
+                    case BOOLEAN:
+                        schemaBuilder.field(
+                                fieldName,
+                                SchemaBuilder
+                                        .bool()
+                                        .name(schemaName)
+                                        .doc(schema.getDoc())
+                                        .defaultValue(schema.getDefaultValue())
+                                        .optional()
+                                        .build()
+                        );
+                        break;
+                    case STRING:
+                        schemaBuilder.field(
+                                fieldName,
+                                SchemaBuilder
+                                        .string()
+                                        .name(schemaName)
+                                        .doc(schema.getDoc())
+                                        .defaultValue(schema.getDefaultValue())
+                                        .optional()
+                                        .build()
+                        );
+                        break;
+                    case BYTES:
+                        schemaBuilder.field(
+                                fieldName,
+                                SchemaBuilder
+                                        .bytes()
+                                        .name(schemaName)
+                                        .doc(schema.getDoc())
+                                        .defaultValue(schema.getDefaultValue())
+                                        .optional()
+                                        .build()
+                        );
+                        break;
+                    case STRUCT:
+                    case ARRAY:
+                    case MAP:
+                        schemaBuilder.field(
+                                fieldName,
+                                convertKafkaSchema(field.getSchema()).build()
+                        );
+                        break;
+                    default:
+                        break;
+                }
+            } catch (Exception ex) {
+                logger.error("Convert schema failure! ex {}", ex);
+                throw new ConnectException(ex);
+            }
+        }
+    }
+
+
+    private String convertSchemaName(String schemaName){
+        if (logicalMapping.containsKey(schemaName)){
+            return logicalMapping.get(schemaName);
+        }
+        return schemaName;
+    }
+
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkValueConverter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkValueConverter.java
new file mode 100644
index 0000000..0cbc2f9
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkValueConverter.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.connect.kafka.connect.adaptor.schema;
+
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * convert rocketmq connect record to kafka sink record
+ */
+public class KafkaSinkValueConverter {
+
+    private static Logger logger = LoggerFactory.getLogger(KafkaSinkValueConverter.class);
+
+    public Object value(Schema schema,Object data) {
+        return convertKafkaValue(schema, data);
+    }
+
+    /**
+     * convert value
+     * @param targetSchema
+     * @param originalValue
+     * @return
+     */
+    private Object convertKafkaValue(Schema targetSchema, Object originalValue) {
+        switch (targetSchema.type()) {
+            case INT8:
+            case INT16:
+            case INT32:
+            case INT64:
+            case FLOAT32:
+            case FLOAT64:
+            case BOOLEAN:
+            case STRING:
+            case BYTES:
+                return originalValue;
+            case STRUCT:
+                Struct toStruct = new Struct(targetSchema);
+                if (originalValue != null) {
+                    convertStructValue(toStruct, (org.apache.kafka.connect.data.Struct) originalValue);
+                }
+                return toStruct;
+            case ARRAY:
+                List<Object> array = (List<Object>) originalValue;
+                List<Object> newArray = new ArrayList<>();
+                array.forEach(item -> {
+                    newArray.add(convertKafkaValue(targetSchema.valueSchema(), item));
+                });
+                return newArray;
+            case MAP:
+                Map mapData = (Map) originalValue;
+                Map newMapData = new ConcurrentHashMap();
+                mapData.forEach((k, v) -> {
+                    newMapData.put(
+                            convertKafkaValue(targetSchema.keySchema(), k),
+                            convertKafkaValue(targetSchema.valueSchema(), v)
+                    );
+                });
+                return newMapData;
+            default:
+                throw new RuntimeException(" Type not supported: {}" + targetSchema.type());
+
+        }
+
+    }
+
+    /**
+     * convert struct value
+     *
+     * @param toStruct
+     * @param originalStruct
+     */
+    private void convertStructValue(Struct toStruct, org.apache.kafka.connect.data.Struct originalStruct) {
+
+        for (Field field : toStruct.schema().fields()) {
+            try {
+                Schema.Type type = field.schema().type();
+                Object value = originalStruct.get(field.name());
+                switch (type) {
+                    case INT8:
+                    case INT16:
+                    case INT32:
+                    case INT64:
+                    case FLOAT32:
+                    case FLOAT64:
+                    case BOOLEAN:
+                    case STRING:
+                    case BYTES:
+                        toStruct.put(field.name(), value);
+                        break;
+                    case STRUCT:
+                    case ARRAY:
+                    case MAP:
+                        toStruct.put(
+                                field.name(),
+                                convertKafkaValue(
+                                        toStruct.schema().field(field.name()).schema(),
+                                        value
+                                )
+                        );
+                        break;
+                }
+            } catch (Exception ex) {
+                logger.error("Convert to kafka schema failure, {}", ex);
+                throw new ConnectException(ex);
+            }
+        }
+    }
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceSchemaConverter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceSchemaConverter.java
new file mode 100644
index 0000000..b357fd7
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceSchemaConverter.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.connect.adaptor.schema;
+
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.logical.Timestamp;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.kafka.connect.data.Date;
+import org.apache.kafka.connect.data.Decimal;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Time;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * schema  converter
+ */
+public class RocketMQSourceSchemaConverter {
+    private static Logger logger = LoggerFactory.getLogger(RocketMQSourceSchemaConverter.class);
+    private static Map<String, String> logicalMapping = new HashMap<>();
+    static {
+        logicalMapping.put(Decimal.LOGICAL_NAME, io.openmessaging.connector.api.data.logical.Decimal.LOGICAL_NAME);
+        logicalMapping.put(Date.LOGICAL_NAME, io.openmessaging.connector.api.data.logical.Date.LOGICAL_NAME);
+        logicalMapping.put(Time.LOGICAL_NAME, io.openmessaging.connector.api.data.logical.Time.LOGICAL_NAME);
+        logicalMapping.put(Timestamp.LOGICAL_NAME, io.openmessaging.connector.api.data.logical.Timestamp.LOGICAL_NAME);
+    }
+
+    private SchemaBuilder builder;
+
+    public RocketMQSourceSchemaConverter(Schema schema) {
+        builder = convertKafkaSchema(schema);
+    }
+
+    public io.openmessaging.connector.api.data.Schema schema() {
+        return builder.build();
+    }
+
+    private SchemaBuilder convertKafkaSchema(org.apache.kafka.connect.data.Schema originalSchema) {
+        String schemaName = convertSchemaName(originalSchema.name());
+        switch (originalSchema.type()) {
+            case INT8:
+                return SchemaBuilder
+                        .int8()
+                        .optional()
+                        .name(schemaName)
+                        .doc(originalSchema.doc())
+                        .defaultValue(originalSchema.defaultValue());
+            case INT16:
+                return SchemaBuilder
+                        .int16()
+                        .optional()
+                        .name(schemaName)
+                        .doc(originalSchema.doc())
+                        .defaultValue(originalSchema.defaultValue());
+            case INT32:
+                return SchemaBuilder
+                        .int32()
+                        .optional()
+                        .name(schemaName)
+                        .doc(originalSchema.doc())
+                        .defaultValue(originalSchema.defaultValue());
+            case INT64:
+                return SchemaBuilder
+                        .int64()
+                        .optional()
+                        .name(schemaName)
+                        .doc(originalSchema.doc())
+                        .defaultValue(originalSchema.defaultValue());
+            case FLOAT32:
+                return SchemaBuilder
+                        .float32()
+                        .optional()
+                        .name(schemaName)
+                        .doc(originalSchema.doc())
+                        .defaultValue(originalSchema.defaultValue());
+            case FLOAT64:
+                return SchemaBuilder
+                        .float64()
+                        .optional()
+                        .name(schemaName)
+                        .doc(originalSchema.doc())
+                        .defaultValue(originalSchema.defaultValue());
+            case BOOLEAN:
+                return SchemaBuilder
+                        .bool()
+                        .optional()
+                        .name(schemaName)
+                        .doc(originalSchema.doc())
+                        .defaultValue(originalSchema.defaultValue());
+            case STRING:
+                return SchemaBuilder.
+                        string()
+                        .optional()
+                        .name(schemaName)
+                        .doc(originalSchema.doc())
+                        .defaultValue(originalSchema.defaultValue());
+            case BYTES:
+                return SchemaBuilder
+                        .bytes()
+                        .optional()
+                        .name(schemaName)
+                        .doc(originalSchema.doc())
+                        .defaultValue(originalSchema.defaultValue());
+            case STRUCT:
+                SchemaBuilder schemaBuilder = SchemaBuilder
+                        .struct()
+                        .optional()
+                        .name(schemaName)
+                        .doc(originalSchema.doc())
+                        .defaultValue(originalSchema.defaultValue());
+                convertStructSchema(schemaBuilder, originalSchema);
+                return schemaBuilder;
+            case ARRAY:
+                return SchemaBuilder.array(convertKafkaSchema(originalSchema.valueSchema()).build())
+                        .optional()
+                        .name(schemaName)
+                        .doc(originalSchema.doc())
+                        .defaultValue(originalSchema.defaultValue());
+            case MAP:
+                return SchemaBuilder.map(
+                        convertKafkaSchema(originalSchema.keySchema()).build(),
+                        convertKafkaSchema(originalSchema.valueSchema()).build()
+                ).optional()
+                        .name(schemaName)
+                        .doc(originalSchema.doc())
+                        .defaultValue(originalSchema.defaultValue());
+            default:
+                throw new RuntimeException(" Type not supported: {}" + originalSchema.type());
+
+        }
+
+    }
+
+    /**
+     * convert schema
+     *
+     * @param schemaBuilder
+     * @param originalSchema
+     */
+    private void convertStructSchema(io.openmessaging.connector.api.data.SchemaBuilder schemaBuilder, org.apache.kafka.connect.data.Schema originalSchema) {
+        for (Field field : originalSchema.fields()) {
+            try {
+                Schema schema = field.schema();
+                org.apache.kafka.connect.data.Schema.Type type = schema.type();
+                String schemaName =  convertSchemaName(field.schema().name());
+                switch (type) {
+                    case INT8:
+                        schemaBuilder.field(
+                                field.name(),
+                                SchemaBuilder
+                                        .int8()
+                                        .name(schemaName)
+                                        .doc(schema.doc())
+                                        .defaultValue(schema.defaultValue())
+                                        .optional()
+                                        .build()
+                        );
+                        break;
+                    case INT16:
+                        schemaBuilder.field(
+                                field.name(),
+                                SchemaBuilder
+                                        .int16()
+                                        .name(schemaName)
+                                        .doc(schema.doc())
+                                        .defaultValue(schema.defaultValue())
+                                        .optional()
+                                        .build()
+                        );
+                        break;
+                    case INT32:
+                        schemaBuilder.field(
+                                field.name(),
+                                SchemaBuilder
+                                        .int32()
+                                        .name(schemaName)
+                                        .doc(schema.doc())
+                                        .defaultValue(schema.defaultValue())
+                                        .optional()
+                                        .build()
+                        );
+                        break;
+                    case INT64:
+                        schemaBuilder.field(
+                                field.name(),
+                                SchemaBuilder
+                                        .int64()
+                                        .name(schemaName)
+                                        .doc(schema.doc())
+                                        .defaultValue(schema.defaultValue())
+                                        .optional()
+                                        .build()
+                        );
+                        break;
+                    case FLOAT32:
+                        schemaBuilder.field(
+                                field.name(),
+                                SchemaBuilder
+                                        .float32()
+                                        .name(schemaName)
+                                        .doc(schema.doc())
+                                        .defaultValue(schema.defaultValue())
+                                        .optional()
+                                        .build()
+                        );
+                        break;
+                    case FLOAT64:
+                        schemaBuilder.field(
+                                field.name(),
+                                SchemaBuilder
+                                        .float64()
+                                        .name(schemaName)
+                                        .doc(schema.doc())
+                                        .defaultValue(schema.defaultValue())
+                                        .optional()
+                                        .build()
+                        );
+                        break;
+                    case BOOLEAN:
+                        schemaBuilder.field(
+                                field.name(),
+                                SchemaBuilder
+                                        .bool()
+                                        .name(schemaName)
+                                        .doc(schema.doc())
+                                        .defaultValue(schema.defaultValue())
+                                        .optional()
+                                        .build()
+                        );
+                        break;
+                    case STRING:
+                        schemaBuilder.field(
+                                field.name(),
+                                SchemaBuilder
+                                        .string()
+                                        .name(schemaName)
+                                        .doc(schema.doc())
+                                        .defaultValue(schema.defaultValue())
+                                        .optional()
+                                        .build()
+                        );
+                        break;
+                    case BYTES:
+                        schemaBuilder.field(
+                                field.name(),
+                                SchemaBuilder
+                                        .bytes()
+                                        .name(schemaName)
+                                        .doc(schema.doc())
+                                        .defaultValue(schema.defaultValue())
+                                        .optional()
+                                        .build()
+                        );
+                        break;
+                    case STRUCT:
+                    case ARRAY:
+                    case MAP:
+                        schemaBuilder.field(
+                                field.name(),
+                                convertKafkaSchema(field.schema()).build()
+                        );
+                        break;
+                    default:
+                        break;
+                }
+            } catch (Exception ex) {
+                logger.error("Convert schema failure! ex {}", ex);
+                throw new ConnectException(ex);
+            }
+        }
+    }
+
+
+    private String convertSchemaName(String schemaName){
+        if (logicalMapping.containsKey(schemaName)){
+            return logicalMapping.get(schemaName);
+        }
+        return schemaName;
+    }
+
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceValueConverter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceValueConverter.java
new file mode 100644
index 0000000..8801efb
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceValueConverter.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.connect.adaptor.schema;
+
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.Struct;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * value converter
+ */
+public class RocketMQSourceValueConverter {
+    private static Logger logger = LoggerFactory.getLogger(RocketMQSourceValueConverter.class);
+
+    public Object value(Schema schema, Object value) {
+        return convertKafkaValue(schema, value);
+    }
+
+    /**
+     * convert value
+     *
+     * @param targetSchema
+     * @param originalValue
+     * @return
+     */
+    private Object convertKafkaValue(Schema targetSchema, Object originalValue) {
+        switch (targetSchema.getFieldType()) {
+            case INT8:
+            case INT16:
+            case INT32:
+            case INT64:
+            case FLOAT32:
+            case FLOAT64:
+            case BOOLEAN:
+            case STRING:
+            case BYTES:
+                return originalValue;
+            case STRUCT:
+                Struct toStruct = new Struct(targetSchema);
+                if (originalValue != null) {
+                    convertStructValue(toStruct, (org.apache.kafka.connect.data.Struct) originalValue);
+                }
+                return toStruct;
+            case ARRAY:
+                List<Object> array = (List<Object>) originalValue;
+                List<Object> newArray = new ArrayList<>();
+                array.forEach(item -> {
+                    newArray.add(convertKafkaValue(targetSchema.getValueSchema(), item));
+                });
+                return newArray;
+            case MAP:
+                Map mapData = (Map) originalValue;
+                Map newMapData = new ConcurrentHashMap();
+                mapData.forEach((k, v) -> {
+                    newMapData.put(
+                            convertKafkaValue(targetSchema.getKeySchema(), k),
+                            convertKafkaValue(targetSchema.getValueSchema(), v)
+                    );
+                });
+                return newMapData;
+            default:
+                throw new RuntimeException(" Type not supported: {}" + targetSchema.getFieldType());
+
+        }
+
+    }
+
+    /**
+     * convert struct value
+     *
+     * @param toStruct
+     * @param originalStruct
+     */
+    private void convertStructValue(Struct toStruct, org.apache.kafka.connect.data.Struct originalStruct) {
+
+        for (Field field : toStruct.schema().getFields()) {
+            try {
+                FieldType type = field.getSchema().getFieldType();
+                Object value = originalStruct.get(field.getName());
+                switch (type) {
+                    case INT8:
+                    case INT16:
+                    case INT32:
+                    case INT64:
+                    case FLOAT32:
+                    case FLOAT64:
+                    case BOOLEAN:
+                    case STRING:
+                    case BYTES:
+                        toStruct.put(field.getName(), value);
+                        break;
+                    case STRUCT:
+                    case ARRAY:
+                    case MAP:
+                        toStruct.put(
+                                field.getName(),
+                                convertKafkaValue(
+                                        toStruct.schema().getField(field.getName()).getSchema(),
+                                        value
+                                )
+                        );
+                        break;
+                }
+            } catch (Exception ex) {
+                logger.error("Convert schema failure! ex {}", ex);
+                throw new ConnectException(ex);
+            }
+        }
+    }
+
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/AbstractKafkaConnectSink.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/AbstractKafkaConnectSink.java
new file mode 100644
index 0000000..8677db9
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/AbstractKafkaConnectSink.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.connect.kafka.connect.adaptor.task;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.sink.SinkTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTaskContext;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.config.ConnectKeyValue;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.context.RocketMQKafkaSinkTaskContext;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.transforms.TransformationWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * abstract kafka connect sink
+ */
+public abstract class AbstractKafkaConnectSink extends SinkTask implements TaskClassSetter{
+
+    private static final Logger log = LoggerFactory.getLogger(AbstractKafkaConnectSink.class);
+
+    private SinkTaskContext kafkaSinkTaskContext;
+    private org.apache.kafka.connect.sink.SinkTask sinkTask;
+
+    protected TransformationWrapper transformationWrapper;
+
+    /**
+     * kafka connect init
+     */
+    protected ConnectKeyValue configValue;
+
+    /**
+     * convert by kafka sink transform
+     * @param record
+     */
+    protected abstract SinkRecord transforms(SinkRecord record);
+
+    /**
+     * convert ConnectRecord to SinkRecord
+     * @param record
+     * @return
+     */
+    public abstract SinkRecord processSinkRecord(ConnectRecord record);
+
+
+    /**
+     * Put the records to the sink
+     * @param records sink records
+     */
+    @Override
+    public void put(List<ConnectRecord> records) {
+        // convert sink data
+        List<SinkRecord> sinkRecords = new ArrayList<>();
+        records.forEach(connectRecord -> {
+            SinkRecord record = this.processSinkRecord(connectRecord);
+            sinkRecords.add(this.transforms(record));
+        });
+        sinkTask.put(sinkRecords);
+    }
+
+
+    @Override
+    public void validate(KeyValue keyValue) {
+    }
+
+    @Override
+    public void start(KeyValue keyValue) {
+        this.configValue = new ConnectKeyValue();
+        keyValue.keySet().forEach(key -> {
+            this.configValue.put(key, keyValue.getString(key));
+        });
+
+        setTaskClass(configValue);
+
+        Map<String, String> taskConfig = new HashMap<>(configValue.config());
+        // get the source class name from config and create source task from reflection
+        try {
+            sinkTask = Class.forName(taskConfig.get(TaskConfig.TASK_CLASS_CONFIG))
+                    .asSubclass(org.apache.kafka.connect.sink.SinkTask.class)
+                    .getDeclaredConstructor()
+                    .newInstance();
+        } catch (Exception e) {
+            throw new ConnectException("Load task class failed, " + taskConfig.get(TaskConfig.TASK_CLASS_CONFIG));
+        }
+
+        kafkaSinkTaskContext = new RocketMQKafkaSinkTaskContext(sinkTaskContext, taskConfig);
+        sinkTask.initialize(kafkaSinkTaskContext);
+        sinkTask.start(taskConfig);
+        transformationWrapper = new TransformationWrapper(taskConfig);
+    }
+
+
+    @Override
+    public void stop() {
+        if (sinkTask != null) {
+            sinkTask.stop();
+            sinkTask = null;
+        }
+    }
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/AbstractKafkaConnectSource.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/AbstractKafkaConnectSource.java
new file mode 100644
index 0000000..f062b3d
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/AbstractKafkaConnectSource.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.connect.kafka.connect.adaptor.task;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTaskContext;
+import org.apache.kafka.connect.storage.OffsetStorageReader;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.config.ConnectKeyValue;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.context.KafkaOffsetStorageReader;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.context.RocketMQKafkaSourceTaskContext;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.schema.Converters;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.transforms.TransformationWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * abstract kafka connect
+ */
+public abstract class AbstractKafkaConnectSource extends SourceTask implements TaskClassSetter{
+
+    private static final Logger log = LoggerFactory.getLogger(AbstractKafkaConnectSource.class);
+
+    protected TransformationWrapper transformationWrapper;
+    private SourceTaskContext kafkaSourceTaskContext;
+    private org.apache.kafka.connect.source.SourceTask sourceTask;
+    private OffsetStorageReader offsetReader;
+
+    /**
+     * kafka connect init
+     */
+    protected ConnectKeyValue configValue;
+
+    @Override
+    public List<ConnectRecord> poll() throws InterruptedException {
+        List<SourceRecord> recordList = sourceTask.poll();
+        if (recordList == null || recordList.isEmpty()) {
+            Thread.sleep(1000);
+        }
+        List<ConnectRecord> records = new ArrayList<>();
+        for (SourceRecord sourceRecord : recordList) {
+            // transforms
+            SourceRecord transformRecord = transforms(sourceRecord);
+            ConnectRecord processRecord = Converters.fromSourceRecord(transformRecord);
+            if (processRecord != null) {
+                records.add(processRecord);
+            }
+        }
+        return records;
+    }
+
+    /**
+     * convert transform
+     * @param sourceRecord
+     */
+    protected abstract SourceRecord transforms(SourceRecord sourceRecord);
+
+    /**
+     * process source record
+     *
+     * @param next
+     * @return
+     */
+    public abstract ConnectRecord processSourceRecord(SourceRecord next);
+
+
+    @Override
+    public void validate(KeyValue keyValue) {
+    }
+
+    @Override
+    public void start(KeyValue keyValue) {
+        this.configValue = new ConnectKeyValue();
+        keyValue.keySet().forEach(key -> {
+            this.configValue.put(key, keyValue.getString(key));
+        });
+
+        setTaskClass(configValue);
+        Map<String, String> taskConfig = new HashMap<>(configValue.config());
+
+        // get the source class name from config and create source task from reflection
+        try {
+            sourceTask = Class.forName(taskConfig.get(TaskConfig.TASK_CLASS_CONFIG))
+                    .asSubclass(org.apache.kafka.connect.source.SourceTask.class)
+                    .getDeclaredConstructor()
+                    .newInstance();
+        } catch (Exception e) {
+            throw new ConnectException("Load task class failed, " + taskConfig.get(TaskConfig.TASK_CLASS_CONFIG));
+        }
+
+        offsetReader = new KafkaOffsetStorageReader(
+               sourceTaskContext
+        );
+
+        kafkaSourceTaskContext = new RocketMQKafkaSourceTaskContext(offsetReader, taskConfig);
+        sourceTask.initialize(kafkaSourceTaskContext);
+        sourceTask.start(taskConfig);
+        transformationWrapper = new TransformationWrapper(taskConfig);
+    }
+
+
+    @Override
+    public void stop() {
+        if (sourceTask != null) {
+            sourceTask.stop();
+            sourceTask = null;
+        }
+    }
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/KafkaConnectAdaptorSink.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/KafkaConnectAdaptorSink.java
new file mode 100644
index 0000000..439f733
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/KafkaConnectAdaptorSink.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.connect.adaptor.task;
+
+import io.openmessaging.connector.api.data.ConnectRecord;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.schema.Converters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * kafka connect adaptor sink
+ */
+public abstract class KafkaConnectAdaptorSink extends AbstractKafkaConnectSink {
+    private static final Logger log = LoggerFactory.getLogger(KafkaConnectAdaptorSink.class);
+
+    /**
+     * convert by kafka sink transform
+     *
+     * @param record
+     */
+    @Override
+    protected SinkRecord transforms(SinkRecord record) {
+        List<Transformation> transformations = transformationWrapper.transformations();
+        Iterator transformationIterator = transformations.iterator();
+        while (transformationIterator.hasNext()) {
+            Transformation<SinkRecord> transformation = (Transformation) transformationIterator.next();
+            log.trace("applying transformation {} to {}", transformation.getClass().getName(), record);
+            record = transformation.apply(record);
+            if (record == null) {
+                break;
+            }
+        }
+        return record;
+    }
+
+    /**
+     * convert ConnectRecord to SinkRecord
+     * @param record
+     * @return
+     */
+    @Override
+    public SinkRecord processSinkRecord(ConnectRecord record) {
+        SinkRecord sinkRecord = Converters.fromConnectRecord(record);
+        return transforms(sinkRecord);
+    }
+
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/KafkaConnectAdaptorSource.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/KafkaConnectAdaptorSource.java
new file mode 100644
index 0000000..06140cf
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/KafkaConnectAdaptorSource.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.connect.adaptor.task;
+
+import io.openmessaging.connector.api.data.ConnectRecord;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.schema.Converters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.List;
+
+
+/**
+ * kafka connect adaptor source
+ */
+public abstract class KafkaConnectAdaptorSource extends AbstractKafkaConnectSource {
+    private static final Logger log = LoggerFactory.getLogger(KafkaConnectAdaptorSource.class);
+
+    /**
+     * convert transform
+     *
+     * @param record
+     */
+    @Override
+    protected SourceRecord transforms(SourceRecord record) {
+        List<Transformation> transformations = transformationWrapper.transformations();
+        Iterator transformationIterator = transformations.iterator();
+        while (transformationIterator.hasNext()) {
+            Transformation<SourceRecord> transformation = (Transformation) transformationIterator.next();
+            log.trace("applying transformation {} to {}", transformation.getClass().getName(), record);
+            record = transformation.apply(record);
+            if (record == null) {
+                break;
+            }
+        }
+        return record;
+    }
+
+    /**
+     * process source record
+     *
+     * @param record
+     * @return
+     */
+    @Override
+    public ConnectRecord processSourceRecord(SourceRecord record) {
+        record = this.transforms(record);
+        ConnectRecord connectRecord = Converters.fromSourceRecord(record);
+        return connectRecord;
+    }
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/TaskClassSetter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/TaskClassSetter.java
new file mode 100644
index 0000000..64c58c4
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/TaskClassSetter.java
@@ -0,0 +1,23 @@
+package org.apache.rocketmq.connect.kafka.connect.adaptor.task;
+
+import io.openmessaging.KeyValue;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.TaskConfig;
+
+/**
+ * task class setter
+ */
+public interface TaskClassSetter {
+    /**
+     * set connector class
+     * @param config
+     */
+    default void setTaskClass(KeyValue config) {
+        config.put(TaskConfig.TASK_CLASS_CONFIG, getTaskClass());
+    }
+
+    /**
+     * get connector class
+     */
+    String getTaskClass();
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/transforms/TransformationWrapper.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/transforms/TransformationWrapper.java
new file mode 100644
index 0000000..c0700a6
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/transforms/TransformationWrapper.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.kafka.connect.adaptor.transforms;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * transformation utils
+ */
+public class TransformationWrapper {
+
+    private static final String DEFAULT_TRANSFORM = "kafka.transforms";
+    private Map<String, String> props;
+
+    public TransformationWrapper(Map<String, String> props) {
+        this.props = props;
+    }
+
+    public List<Transformation> transformations() {
+        if (!props.containsKey(DEFAULT_TRANSFORM)) {
+            return Collections.emptyList();
+        }
+        List<String> transformAliases = Arrays.asList(props.get(DEFAULT_TRANSFORM).split(","));
+        List<Transformation> transformations = new ArrayList(transformAliases.size());
+        Iterator transformIterator = transformAliases.iterator();
+        while (transformIterator.hasNext()) {
+            String alias = (String) transformIterator.next();
+            String prefix = DEFAULT_TRANSFORM + "." + alias + ".";
+            try {
+                Transformation<SourceRecord> transformation = (Transformation) Utils.newInstance(getClass(prefix + "type"), Transformation.class);
+                Map<String, Object> configs = originalsWithPrefix(prefix);
+                transformation.configure(configs);
+                transformations.add(transformation);
+            } catch (Exception var12) {
+                throw new ConnectException(var12);
+            }
+        }
+        return transformations;
+    }
+
+    public Map<String, Object> originalsWithPrefix(String prefix) {
+        Map<String, Object> result = new ConcurrentHashMap<>();
+
+        Iterator entryIterator = props.entrySet().iterator();
+        while (entryIterator.hasNext()) {
+            Map.Entry<String, ?> entry = (Map.Entry) entryIterator.next();
+            if ((entry.getKey()).startsWith(prefix) && (entry.getKey()).length() > prefix.length()) {
+                result.put((entry.getKey()).substring(prefix.length()), entry.getValue());
+            }
+        }
+        return result;
+    }
+
+    public Class<?> getClass(String key) {
+        try {
+            return (Class<?>) Class.forName(props.get(key));
+        } catch (ClassNotFoundException e) {
+            throw new ConnectException(e);
+        }
+    }
+
+}
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/test/java/org/apache/rocketmq/connect/kafka/connect/adaptor/SourceRecordConverterTest.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/test/java/org/apache/rocketmq/connect/kafka/connect/adaptor/SourceRecordConverterTest.java
new file mode 100644
index 0000000..0264dcb
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/test/java/org/apache/rocketmq/connect/kafka/connect/adaptor/SourceRecordConverterTest.java
@@ -0,0 +1,203 @@
+package org.apache.rocketmq.connect.kafka.connect.adaptor;
+
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.schema.RocketMQSourceSchemaConverter;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.schema.RocketMQSourceValueConverter;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * source record converter test
+ */
+public class SourceRecordConverterTest {
+    private SourceRecord originalRecord;
+
+    @Before
+    public void before() {
+        // init record
+        Schema schema = SchemaBuilder.struct()
+                .field("field-string-01", Schema.INT64_SCHEMA)
+                .field("field-boolean-02", Schema.BOOLEAN_SCHEMA)
+                .field(
+                        "field-struct-03",
+                        SchemaBuilder.struct()
+                                .field("field-struct-int64-01", Schema.INT64_SCHEMA)
+                                .field("field-struct-boolean-02", Schema.BOOLEAN_SCHEMA)
+                )
+                .field("field-array-04-01", SchemaBuilder.array(Schema.STRING_SCHEMA))
+                .field("field-array-04-02",
+                        SchemaBuilder.array(
+                                SchemaBuilder.struct()
+                                        .field("field-struct-string-01", Schema.STRING_SCHEMA)
+                                        .field("field-struct-boolean-02", Schema.BOOLEAN_SCHEMA)
+                        )
+                )
+                .field("field-map-05",
+                        SchemaBuilder.map(
+                                Schema.STRING_SCHEMA,
+                                SchemaBuilder.struct()
+                                        .field("field-struct-string-01", Schema.STRING_SCHEMA)
+                                        .field("field-struct-boolean-02", Schema.BOOLEAN_SCHEMA)
+                        )
+                ).build();
+
+        Struct struct = new Struct(schema);
+        struct.put("field-string-01", 111111L);
+        struct.put("field-boolean-02", true);
+
+        Struct struct_01 = new Struct(schema.field("field-struct-03").schema());
+        struct_01.put("field-struct-int64-01", 64l);
+        struct_01.put("field-struct-boolean-02", true);
+        // add struct
+        struct.put("field-struct-03", struct_01);
+
+        // add primate array
+        List<String> strings = new ArrayList<>();
+        strings.add("xxxx");
+        struct.put("field-array-04-01", strings);
+
+        // add struct array
+        List<Struct> structs = new ArrayList<>();
+        structs.add(new Struct(schema.field("field-array-04-02").schema().valueSchema())
+                .put("field-struct-string-01", "scsdcsd")
+                .put("field-struct-boolean-02", true));
+
+        struct.put("field-array-04-02", structs);
+
+        // add map
+
+        Map<String, Struct> maps = new HashMap<>();
+        Struct mapStruct = new Struct(schema.field("field-map-05").schema().valueSchema());
+        mapStruct.put("field-struct-string-01", "scdsc");
+        mapStruct.put("field-struct-boolean-02", true);
+        maps.put("XXXX-test", mapStruct);
+        struct.put("field-map-05", maps);
+
+        originalRecord = new SourceRecord(new HashMap<>(), new HashMap<>(), "test-Topic", 1, schema, struct);
+    }
+
+    public Struct initStruct(Schema schema) {
+        Struct struct = new Struct(schema);
+        schema.fields().forEach(field -> {
+            Schema.Type type = field.schema().type();
+            switch (type) {
+                case STRUCT:
+                    struct.put(field.name(), initStruct(field.schema()));
+                    break;
+                case INT8:
+                case INT16:
+                case INT32:
+                    struct.put(field.name(), 100);
+                    break;
+                case INT64:
+                    struct.put(field.name(), 100000L);
+                    break;
+                case FLOAT32:
+                    struct.put(field.name(), 100.00);
+                    break;
+                case FLOAT64:
+                    struct.put(field.name(), 100000.00);
+                    break;
+                case BOOLEAN:
+                    struct.put(field.name(), true);
+                    break;
+                case STRING:
+                    struct.put(field.name(), "test");
+                    break;
+                case ARRAY:
+                    struct.put(field.name(), initArray(schema.valueSchema()));
+                    break;
+                case MAP:
+                    struct.put(field.name(), initMap(schema.keySchema(), schema.valueSchema()));
+            }
+        });
+        return struct;
+    }
+
+    public Object initArray(Schema schema) {
+        Schema.Type type = schema.type();
+        switch (type) {
+            case STRUCT:
+                Struct[] structs = new Struct[1];
+                structs[0] = initStruct(schema);
+                return structs;
+            case INT8:
+            case INT16:
+            case INT32:
+                Integer[] integers = new Integer[1];
+                integers[0] = 100;
+                return integers;
+            case INT64:
+                Long[] longs = new Long[1];
+                longs[0] = 1000000L;
+                return longs;
+            case FLOAT32:
+                Float[] floats = new Float[1];
+                floats[0] = 1000.00f;
+                return floats;
+            case FLOAT64:
+                Double[] doubles = new Double[1];
+                doubles[0] = 1000.00;
+                return doubles;
+            case BOOLEAN:
+                Boolean[] booleans = new Boolean[1];
+                booleans[0] = true;
+                return booleans;
+            case STRING:
+                String[] strings = new String[1];
+                strings[0] = "********XXXX";
+                return strings;
+            case ARRAY:
+                Object[] obj = new Object[1];
+                obj[0] = initArray(schema.valueSchema());
+                return obj;
+            case MAP:
+                break;
+        }
+        return null;
+    }
+
+    private Object initMap(Schema keySchema, Schema valueSchema) {
+        return null;
+    }
+
+    @Test
+    public void testConverter() {
+        // sourceRecord convert connect Record
+        RocketMQSourceSchemaConverter rocketMQSourceSchemaConverter = new RocketMQSourceSchemaConverter(originalRecord.valueSchema());
+
+        io.openmessaging.connector.api.data.Schema schema =rocketMQSourceSchemaConverter.schema();
+        RocketMQSourceValueConverter rocketMQSourceValueConverter = new RocketMQSourceValueConverter();
+        ConnectRecord connectRecord = new ConnectRecord(
+                new RecordPartition(originalRecord.sourcePartition()),
+                new RecordOffset(originalRecord.sourceOffset()),
+                originalRecord.timestamp(),
+                rocketMQSourceSchemaConverter.schema(),
+                rocketMQSourceValueConverter.value(schema, originalRecord.value()));
+        Iterator<Header> headers = originalRecord.headers().iterator();
+        while (headers.hasNext()) {
+            Header header = headers.next();
+            if (header.schema().type().isPrimitive()) {
+                connectRecord.addExtension(header.key(), (String) header.value());
+            }
+        }
+        final byte[] messageBody = JSON.toJSONString(connectRecord).getBytes();
+        String bodyStr = new String(messageBody, StandardCharsets.UTF_8);
+        ConnectRecord newConnectRecord= JSON.parseObject(bodyStr, ConnectRecord.class);
+    }
+}
diff --git a/connectors/rocketmq-connect-debezium/pom.xml b/connectors/rocketmq-connect-debezium/pom.xml
new file mode 100644
index 0000000..8f2f286
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/pom.xml
@@ -0,0 +1,338 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+	license agreements. See the NOTICE file distributed with this work for additional 
+	information regarding copyright ownership. The ASF licenses this file to 
+	You under the Apache License, Version 2.0 (the "License"); you may not use 
+	this file except in compliance with the License. You may obtain a copy of 
+	the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+	by applicable law or agreed to in writing, software distributed under the 
+	License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+	OF ANY KIND, either express or implied. See the License for the specific 
+	language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-connect-debezium</artifactId>
+    <packaging>pom</packaging>
+    <version>0.0.1-SNAPSHOT</version>
+    <modules>
+        <module>kafka-connect-adaptor</module>
+        <module>rocketmq-connect-debezium-core</module>
+        <module>rocketmq-connect-debezium-mysql</module>
+        <module>rocketmq-connect-debezium-oracle</module>
+        <module>rocketmq-connect-debezium-postgresql</module>
+        <module>rocketmq-connect-debezium-mongodb</module>
+        <module>rocketmq-connect-debezium-sqlserver</module>
+    </modules>
+
+    <name>rocketmq-connect-debezium</name>
+    <url>https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-connect-jdbc</url>
+
+    <licenses>
+        <license>
+            <name>The Apache Software License, Version 2.0</name>
+            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+        </license>
+    </licenses>
+
+    <issueManagement>
+        <system>jira</system>
+        <url>https://issues.apache.org/jira/browse/RocketMQ</url>
+    </issueManagement>
+
+    <build>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+                <filtering>true</filtering>
+            </resource>
+        </resources>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>versions-maven-plugin</artifactId>
+                <version>2.3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.7</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <configuration>
+                    <outputDirectory>${project.build.directory}/lib</outputDirectory>
+                    <excludeTransitive>false</excludeTransitive>
+                    <stripVersion>true</stripVersion>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.6.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.19.1</version>
+                <configuration>
+                    <argLine>-Xms512m -Xmx1024m</argLine>
+                    <forkMode>always</forkMode>
+                    <includes>
+                        <include>**/*Test.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-site-plugin</artifactId>
+                <version>3.6</version>
+                <configuration>
+                    <locales>en_US</locales>
+                    <outputEncoding>UTF-8</outputEncoding>
+                    <inputEncoding>UTF-8</inputEncoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                    <locale>en_US</locale>
+                    <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>aggregate</id>
+                        <goals>
+                            <goal>aggregate</goal>
+                        </goals>
+                        <phase>site</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>3.0.2</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <version>2.17</version>
+                <executions>
+                    <execution>
+                        <id>verify</id>
+                        <phase>verify</phase>
+                        <configuration>
+                            <configLocation>../../style/rmq_checkstyle.xml</configLocation>
+                            <encoding>UTF-8</encoding>
+                            <consoleOutput>true</consoleOutput>
+                            <failsOnError>true</failsOnError>
+                            <includeTestSourceDirectory>false</includeTestSourceDirectory>
+                            <includeTestResources>false</includeTestResources>
+                        </configuration>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+        <!-- Compiler settings properties -->
+        <maven.compiler.source>1.8</maven.compiler.source>
+        <maven.compiler.target>1.8</maven.compiler.target>
+
+        <kafka-client.version>2.7.2</kafka-client.version>
+        <scala.binary.version>2.13</scala.binary.version>
+        <!--debezium version-->
+        <debezium.version>1.7.2.Final</debezium.version>
+        <debezium.postgresql.version>42.3.3</debezium.postgresql.version>
+        <!--rocketmq version-->
+        <rocketmq.version>4.5.2</rocketmq.version>
+        <rocketmq-openmessaging.version>4.3.2</rocketmq-openmessaging.version>
+
+        <!--rocketmq connect version-->
+        <openmessaging-connector.version>0.1.3-SNAPSHOT</openmessaging-connector.version>
+        <openmessaging-api.version>0.3.1-alpha</openmessaging-api.version>
+    </properties>
+
+
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.apache.rocketmq</groupId>
+                <artifactId>kafka-connect-adaptor</artifactId>
+                <version>0.0.1-SNAPSHOT</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.rocketmq</groupId>
+                <artifactId>rocketmq-connect-debezium-core</artifactId>
+                <version>0.0.1-SNAPSHOT</version>
+            </dependency>
+
+            <!-- rocketmq connect api -->
+            <dependency>
+                <groupId>io.openmessaging</groupId>
+                <artifactId>openmessaging-connector</artifactId>
+                <version>${openmessaging-connector.version}</version>
+                <scope>provided</scope>
+            </dependency>
+            <dependency>
+                <groupId>io.openmessaging</groupId>
+                <artifactId>openmessaging-api</artifactId>
+                <version>${openmessaging-api.version}</version>
+            </dependency>
+
+            <!-- rocketmq client-->
+            <dependency>
+                <groupId>org.apache.rocketmq</groupId>
+                <artifactId>rocketmq-client</artifactId>
+                <version>${rocketmq.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.rocketmq</groupId>
+                <artifactId>rocketmq-tools</artifactId>
+                <version>${rocketmq.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.rocketmq</groupId>
+                <artifactId>rocketmq-remoting</artifactId>
+                <version>${rocketmq.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.rocketmq</groupId>
+                <artifactId>rocketmq-openmessaging</artifactId>
+                <version>${rocketmq-openmessaging.version}</version>
+            </dependency>
+
+            <!--kafka && kafka connect-->
+            <dependency>
+                <groupId>org.apache.kafka</groupId>
+                <artifactId>kafka_${scala.binary.version}</artifactId>
+                <version>${kafka-client.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.kafka</groupId>
+                <artifactId>connect-runtime</artifactId>
+                <version>${kafka-client.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.apache.kafka</groupId>
+                        <artifactId>kafka-log4j-appender</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.kafka</groupId>
+                <artifactId>connect-json</artifactId>
+                <version>${kafka-client.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.kafka</groupId>
+                <artifactId>connect-api</artifactId>
+                <version>${kafka-client.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.kafka</groupId>
+                <artifactId>connect-file</artifactId>
+                <version>${kafka-client.version}</version>
+                <scope>test</scope>
+            </dependency>
+
+
+            <!-- testing -->
+            <dependency>
+                <groupId>junit</groupId>
+                <artifactId>junit</artifactId>
+                <version>4.13.1</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.assertj</groupId>
+                <artifactId>assertj-core</artifactId>
+                <version>2.6.0</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.mockito</groupId>
+                <artifactId>mockito-core</artifactId>
+                <version>2.6.3</version>
+                <scope>test</scope>
+            </dependency>
+
+            <dependency>
+                <groupId>com.alibaba</groupId>
+                <artifactId>fastjson</artifactId>
+                <version>1.2.80</version>
+            </dependency>
+
+
+            <dependency>
+                <groupId>commons-cli</groupId>
+                <artifactId>commons-cli</artifactId>
+                <version>1.2</version>
+            </dependency>
+
+            <dependency>
+                <groupId>commons-codec</groupId>
+                <artifactId>commons-codec</artifactId>
+                <version>1.12</version>
+            </dependency>
+
+
+            <!-- log package-->
+            <dependency>
+                <groupId>org.slf4j</groupId>
+                <artifactId>slf4j-api</artifactId>
+                <version>1.7.32</version>
+            </dependency>
+            <dependency>
+                <groupId>ch.qos.logback</groupId>
+                <artifactId>logback-classic</artifactId>
+                <version>1.2.9</version>
+            </dependency>
+            <dependency>
+                <groupId>ch.qos.logback</groupId>
+                <artifactId>logback-core</artifactId>
+                <version>1.2.9</version>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+</project>
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/pom.xml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/pom.xml
new file mode 100644
index 0000000..4862b5a
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/pom.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>rocketmq-connect-debezium</artifactId>
+        <groupId>org.apache.rocketmq</groupId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>rocketmq-connect-debezium-core</artifactId>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <scala.binary.version>2.13</scala.binary.version>
+        <scala-library.version>2.13.6</scala-library.version>
+        <debezium.version>1.7.2.Final</debezium.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>kafka-connect-adaptor</artifactId>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>io.openmessaging</groupId>
+                    <artifactId>openmessaging-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-tools</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-remoting</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-openmessaging</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.debezium</groupId>
+            <artifactId>debezium-core</artifactId>
+            <version>${debezium.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_${scala.binary.version}</artifactId>
+            <version>${kafka-client.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-runtime</artifactId>
+            <version>${kafka-client.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.kafka</groupId>
+                    <artifactId>kafka-log4j-appender</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+    </dependencies>
+
+
+</project>
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/ConnectUtil.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/ConnectUtil.java
new file mode 100644
index 0000000..439bcf5
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/ConnectUtil.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium;
+
+import com.beust.jcommander.internal.Sets;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+
+/**
+ * rocket connect util
+ */
+public class ConnectUtil {
+
+    private static final String SYS_TASK_CG_PREFIX = "connect-";
+
+    public static String createGroupName(String prefix) {
+        StringBuilder sb = new StringBuilder();
+        sb.append(prefix).append("-");
+        sb.append(RemotingUtil.getLocalAddress()).append("-");
+        sb.append(UtilAll.getPid()).append("-");
+        sb.append(System.nanoTime());
+        return sb.toString().replace(".", "-");
+    }
+
+    public static String createGroupName(String prefix, String postfix) {
+        return new StringBuilder().append(prefix).append("-").append(postfix).toString();
+    }
+
+    public static String createInstance(String servers) {
+        String[] serversArray = servers.split(";");
+        List<String> serversList = new ArrayList<String>();
+        for (String server : serversArray) {
+            if (!serversList.contains(server)) {
+                serversList.add(server);
+            }
+        }
+        Collections.sort(serversList);
+        return String.valueOf(serversList.toString().hashCode());
+    }
+
+    public static String createUniqInstance(String prefix) {
+        return new StringBuffer(prefix).append("-").append(UUID.randomUUID().toString()).toString();
+    }
+
+
+    public static DefaultMQProducer initDefaultMQProducer(RocketMqConnectConfig connectConfig) {
+        RPCHook rpcHook = null;
+        if (connectConfig.isAclEnable()) {
+            rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
+        }
+        DefaultMQProducer producer = new DefaultMQProducer(rpcHook);
+        producer.setNamesrvAddr(connectConfig.getNamesrvAddr());
+        producer.setInstanceName(createUniqInstance(connectConfig.getNamesrvAddr()));
+        producer.setProducerGroup(connectConfig.getRmqProducerGroup());
+        producer.setSendMsgTimeout(connectConfig.getOperationTimeout());
+        producer.setLanguage(LanguageCode.JAVA);
+        return producer;
+    }
+
+    public static DefaultMQPullConsumer initDefaultMQPullConsumer(RocketMqConnectConfig connectConfig) {
+        RPCHook rpcHook = null;
+        if (connectConfig.isAclEnable()) {
+            rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
+        }
+        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(rpcHook);
+        consumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
+        consumer.setInstanceName(createUniqInstance(connectConfig.getNamesrvAddr()));
+        consumer.setConsumerGroup(connectConfig.getRmqConsumerGroup());
+        consumer.setMaxReconsumeTimes(connectConfig.getRmqMaxRedeliveryTimes());
+        consumer.setConsumerPullTimeoutMillis((long) connectConfig.getRmqMessageConsumeTimeout());
+        consumer.setLanguage(LanguageCode.JAVA);
+        return consumer;
+    }
+
+    public static DefaultMQPushConsumer initDefaultMQPushConsumer(RocketMqConnectConfig connectConfig) {
+        RPCHook rpcHook = null;
+        if (connectConfig.isAclEnable()) {
+            rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
+        }
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(rpcHook);
+        consumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
+        consumer.setInstanceName(createUniqInstance(connectConfig.getNamesrvAddr()));
+        consumer.setConsumerGroup(createGroupName(connectConfig.getRmqConsumerGroup()));
+        consumer.setMaxReconsumeTimes(connectConfig.getRmqMaxRedeliveryTimes());
+        consumer.setConsumeTimeout((long) connectConfig.getRmqMessageConsumeTimeout());
+        consumer.setConsumeThreadMin(connectConfig.getRmqMinConsumeThreadNums());
+        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+        consumer.setLanguage(LanguageCode.JAVA);
+        return consumer;
+    }
+
+    public static DefaultMQAdminExt startMQAdminTool(RocketMqConnectConfig connectConfig) throws MQClientException {
+        RPCHook rpcHook = null;
+        if (connectConfig.isAclEnable()) {
+            rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
+        }
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        defaultMQAdminExt.setNamesrvAddr(connectConfig.getNamesrvAddr());
+        defaultMQAdminExt.setAdminExtGroup("connector-admin-group");
+        defaultMQAdminExt.setInstanceName(ConnectUtil.createUniqInstance(connectConfig.getNamesrvAddr()));
+        defaultMQAdminExt.start();
+        return defaultMQAdminExt;
+    }
+
+
+    public static void createTopic(RocketMqConnectConfig connectConfig, TopicConfig topicConfig) {
+        DefaultMQAdminExt defaultMQAdminExt = null;
+        try {
+            defaultMQAdminExt = startMQAdminTool(connectConfig);
+            ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+            HashMap<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
+            Set<String> clusterNameSet = clusterAddrTable.keySet();
+            for (String clusterName : clusterNameSet) {
+                Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+                for (String addr : masterSet) {
+                    defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("create topic: " + topicConfig.getTopicName() + " failed", e);
+        } finally {
+            if (defaultMQAdminExt != null) {
+                defaultMQAdminExt.shutdown();
+            }
+        }
+    }
+
+    public static boolean isTopicExist(RocketMqConnectConfig connectConfig, String topic) {
+        DefaultMQAdminExt defaultMQAdminExt = null;
+        boolean foundTopicRouteInfo = false;
+        try {
+            defaultMQAdminExt = startMQAdminTool(connectConfig);
+            TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+            if (topicRouteData != null) {
+                foundTopicRouteInfo = true;
+            }
+        } catch (Exception e) {
+            foundTopicRouteInfo = false;
+        } finally {
+            if (defaultMQAdminExt != null) {
+                defaultMQAdminExt.shutdown();
+            }
+        }
+        return foundTopicRouteInfo;
+    }
+
+    public static Set<String> fetchAllConsumerGroupList(RocketMqConnectConfig connectConfig) {
+        Set<String> consumerGroupSet = Sets.newHashSet();
+        DefaultMQAdminExt defaultMQAdminExt = null;
+        try {
+            defaultMQAdminExt = startMQAdminTool(connectConfig);
+            ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+            for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) {
+                SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L);
+                consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet());
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            throw new RuntimeException("fetch all topic  failed", e);
+        } finally {
+            if (defaultMQAdminExt != null) {
+                defaultMQAdminExt.shutdown();
+            }
+        }
+        return consumerGroupSet;
+    }
+
+    public static String createSubGroup(RocketMqConnectConfig connectConfig, String subGroup) {
+        DefaultMQAdminExt defaultMQAdminExt = null;
+        try {
+            defaultMQAdminExt = startMQAdminTool(connectConfig);
+            SubscriptionGroupConfig initConfig = new SubscriptionGroupConfig();
+            initConfig.setGroupName(subGroup);
+            ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+            HashMap<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
+            Set<String> clusterNameSet = clusterAddrTable.keySet();
+            for (String clusterName : clusterNameSet) {
+                Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+                for (String addr : masterSet) {
+                    defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, initConfig);
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("create subGroup: " + subGroup + " failed", e);
+        } finally {
+            if (defaultMQAdminExt != null) {
+                defaultMQAdminExt.shutdown();
+            }
+        }
+        return subGroup;
+    }
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/DebeziumConnector.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/DebeziumConnector.java
new file mode 100644
index 0000000..7a3f51a
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/DebeziumConnector.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.connector.KafkaSourceAdaptorConnector;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * debezium connector
+ */
+public abstract class DebeziumConnector extends KafkaSourceAdaptorConnector {
+    
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/DebeziumSource.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/DebeziumSource.java
new file mode 100644
index 0000000..388f722
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/DebeziumSource.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium;
+
+import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
+import io.openmessaging.KeyValue;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.task.KafkaConnectAdaptorSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * debezium source
+ */
+public abstract class DebeziumSource extends KafkaConnectAdaptorSource {
+    private static final Logger log = LoggerFactory.getLogger(DebeziumSource.class);
+
+    private static final String DEFAULT_HISTORY = "org.apache.rocketmq.connect.debezium.RocketMqDatabaseHistory";
+
+
+    @Override
+    public void start(KeyValue config) {
+        // database.history : implementation class for database history.
+        config.put(HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY.name(), DEFAULT_HISTORY);
+        // history config detail
+        super.start(config);
+    }
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqConnectConfig.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqConnectConfig.java
new file mode 100644
index 0000000..18397a6
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqConnectConfig.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium;
+
+
+import io.debezium.config.Configuration;
+
+/**
+ * rocketmq connect config
+ */
+public class RocketMqConnectConfig {
+
+    private String dbHistoryName;
+
+    private String namesrvAddr;
+
+    private String rmqProducerGroup;
+
+    private int operationTimeout = 3000;
+
+    private String rmqConsumerGroup;
+
+    private int rmqMaxRedeliveryTimes;
+
+    private int rmqMessageConsumeTimeout = 3000;
+
+    private int rmqMaxConsumeThreadNums = 32;
+
+    private int rmqMinConsumeThreadNums = 1;
+
+    private String adminExtGroup;
+
+    // set acl config
+    private boolean aclEnable;
+    private String accessKey;
+    private String secretKey;
+
+
+    public RocketMqConnectConfig() {
+    }
+
+    public RocketMqConnectConfig(Configuration config, String dbHistoryName) {
+        this.dbHistoryName = dbHistoryName;
+        // init config
+        this.rmqProducerGroup = this.dbHistoryName.concat("-producer-group");
+        this.rmqConsumerGroup = this.dbHistoryName.concat("-consumer-group");
+        this.adminExtGroup = this.dbHistoryName.concat("-admin-group");
+
+        // init rocketmq connection
+        this.namesrvAddr = config.getString(RocketMqDatabaseHistory.NAME_SRV_ADDR);
+        this.aclEnable = config.getBoolean(RocketMqDatabaseHistory.ROCKETMQ_ACL_ENABLE);
+        this.accessKey = config.getString(RocketMqDatabaseHistory.ROCKETMQ_ACCESS_KEY);
+        this.secretKey = config.getString(RocketMqDatabaseHistory.ROCKETMQ_SECRET_KEY);
+    }
+
+
+    public String getDbHistoryName() {
+        return dbHistoryName;
+    }
+
+    public void setDbHistoryName(String dbHistoryName) {
+        this.dbHistoryName = dbHistoryName;
+    }
+
+    public String getNamesrvAddr() {
+        return namesrvAddr;
+    }
+
+    public void setNamesrvAddr(String namesrvAddr) {
+        this.namesrvAddr = namesrvAddr;
+    }
+
+    public String getRmqProducerGroup() {
+        return rmqProducerGroup;
+    }
+
+    public void setRmqProducerGroup(String rmqProducerGroup) {
+        this.rmqProducerGroup = rmqProducerGroup;
+    }
+
+    public int getOperationTimeout() {
+        return operationTimeout;
+    }
+
+    public void setOperationTimeout(int operationTimeout) {
+        this.operationTimeout = operationTimeout;
+    }
+
+    public String getRmqConsumerGroup() {
+        return rmqConsumerGroup;
+    }
+
+    public void setRmqConsumerGroup(String rmqConsumerGroup) {
+        this.rmqConsumerGroup = rmqConsumerGroup;
+    }
+
+    public int getRmqMaxRedeliveryTimes() {
+        return rmqMaxRedeliveryTimes;
+    }
+
+    public void setRmqMaxRedeliveryTimes(int rmqMaxRedeliveryTimes) {
+        this.rmqMaxRedeliveryTimes = rmqMaxRedeliveryTimes;
+    }
+
+    public int getRmqMessageConsumeTimeout() {
+        return rmqMessageConsumeTimeout;
+    }
+
+    public void setRmqMessageConsumeTimeout(int rmqMessageConsumeTimeout) {
+        this.rmqMessageConsumeTimeout = rmqMessageConsumeTimeout;
+    }
+
+    public int getRmqMaxConsumeThreadNums() {
+        return rmqMaxConsumeThreadNums;
+    }
+
+    public void setRmqMaxConsumeThreadNums(int rmqMaxConsumeThreadNums) {
+        this.rmqMaxConsumeThreadNums = rmqMaxConsumeThreadNums;
+    }
+
+    public int getRmqMinConsumeThreadNums() {
+        return rmqMinConsumeThreadNums;
+    }
+
+    public void setRmqMinConsumeThreadNums(int rmqMinConsumeThreadNums) {
+        this.rmqMinConsumeThreadNums = rmqMinConsumeThreadNums;
+    }
+
+    public boolean isAclEnable() {
+        return aclEnable;
+    }
+
+    public void setAclEnable(boolean aclEnable) {
+        this.aclEnable = aclEnable;
+    }
+
+    public String getAccessKey() {
+        return accessKey;
+    }
+
+    public void setAccessKey(String accessKey) {
+        this.accessKey = accessKey;
+    }
+
+    public String getSecretKey() {
+        return secretKey;
+    }
+
+    public void setSecretKey(String secretKey) {
+        this.secretKey = secretKey;
+    }
+
+    public String getAdminExtGroup() {
+        return adminExtGroup;
+    }
+
+    public void setAdminExtGroup(String adminExtGroup) {
+        this.adminExtGroup = adminExtGroup;
+    }
+
+    @Override
+    public String toString() {
+        return "RocketMqConnectConfig{" +
+                "dbHistoryName='" + dbHistoryName + '\'' +
+                ", namesrvAddr='" + namesrvAddr + '\'' +
+                ", rmqProducerGroup='" + rmqProducerGroup + '\'' +
+                ", operationTimeout=" + operationTimeout +
+                ", rmqConsumerGroup='" + rmqConsumerGroup + '\'' +
+                ", rmqMaxRedeliveryTimes=" + rmqMaxRedeliveryTimes +
+                ", rmqMessageConsumeTimeout=" + rmqMessageConsumeTimeout +
+                ", rmqMaxConsumeThreadNums=" + rmqMaxConsumeThreadNums +
+                ", rmqMinConsumeThreadNums=" + rmqMinConsumeThreadNums +
+                ", adminExtGroup='" + adminExtGroup + '\'' +
+                ", aclEnable=" + aclEnable +
+                ", accessKey='" + accessKey + '\'' +
+                ", secretKey='" + secretKey + '\'' +
+                '}';
+    }
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqDatabaseHistory.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqDatabaseHistory.java
new file mode 100644
index 0000000..11814b9
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqDatabaseHistory.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium;
+
+import com.alibaba.fastjson.JSON;
+import io.debezium.config.Configuration;
+import io.debezium.config.Field;
+import io.debezium.document.DocumentReader;
+import io.debezium.relational.history.AbstractDatabaseHistory;
+import io.debezium.relational.history.DatabaseHistory;
+import io.debezium.relational.history.DatabaseHistoryException;
+import io.debezium.relational.history.DatabaseHistoryListener;
+import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.HistoryRecordComparator;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.connect.kafka.connect.adaptor.task.AbstractKafkaConnectSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Consumer;
+
+
+/**
+ * rocketmq database history
+ */
+public final class RocketMqDatabaseHistory extends AbstractDatabaseHistory {
+
+    private static final Logger log = LoggerFactory.getLogger(AbstractKafkaConnectSource.class);
+    public static final Field TOPIC = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "rocketmq.topic")
+            .withDisplayName("Database history topic name")
+            .withType(ConfigDef.Type.STRING)
+            .withWidth(ConfigDef.Width.LONG)
+            .withImportance(ConfigDef.Importance.HIGH)
+            .withDescription("The name of the topic for the database schema history")
+            .withValidation(Field::isRequired);
+
+    /**
+     * rocketmq name srv addr
+     */
+    public static final Field NAME_SRV_ADDR = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "name.srv.addr")
+            .withDisplayName("Rocketmq name srv addr")
+            .withType(ConfigDef.Type.STRING)
+            .withWidth(ConfigDef.Width.LONG)
+            .withImportance(ConfigDef.Importance.HIGH)
+            .withDescription("Rocketmq name srv addr")
+            .withValidation(Field::isRequired);
+
+    public static final Field ROCKETMQ_ACL_ENABLE = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "acl.enabled")
+            .withDisplayName("Rocketmq acl enabled")
+            .withType(ConfigDef.Type.BOOLEAN)
+            .withDefault(false)
+            .withWidth(ConfigDef.Width.LONG)
+            .withImportance(ConfigDef.Importance.HIGH)
+            .withDescription("Rocketmq acl enabled");
+
+    public static final Field ROCKETMQ_ACCESS_KEY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "access.key")
+            .withDisplayName("Rocketmq access key")
+            .withType(ConfigDef.Type.STRING)
+            .withWidth(ConfigDef.Width.LONG)
+            .withImportance(ConfigDef.Importance.HIGH)
+            .withDescription("Rocketmq access key");
+
+    public static final Field ROCKETMQ_SECRET_KEY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "secret.key")
+            .withDisplayName("Rocketmq secret key")
+            .withType(ConfigDef.Type.STRING)
+            .withWidth(ConfigDef.Width.LONG)
+            .withImportance(ConfigDef.Importance.HIGH)
+            .withDescription("Rocketmq secret key");
+
+    public static final Field CONNECTOR_NAME = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connector.name")
+            .withDisplayName("Connector name")
+            .withType(ConfigDef.Type.STRING)
+            .withWidth(ConfigDef.Width.LONG)
+            .withImportance(ConfigDef.Importance.HIGH)
+            .withDescription("connector name")
+            .withValidation(Field::isRequired);
+
+
+    private final DocumentReader reader = DocumentReader.defaultReader();
+    private String topicName;
+    private String dbHistoryName;
+    private RocketMqConnectConfig connectConfig;
+    private DefaultMQPushConsumer consumer;
+    private DefaultMQProducer producer;
+
+    @Override
+    public void configure(
+            Configuration config,
+            HistoryRecordComparator comparator,
+            DatabaseHistoryListener listener,
+            boolean useCatalogBeforeSchema) {
+        super.configure(config, comparator, listener, useCatalogBeforeSchema);
+        this.topicName = config.getString(TOPIC);
+
+        this.dbHistoryName = config.getString(DatabaseHistory.NAME, UUID.randomUUID().toString());
+        log.info("Configure to store the debezium database history {} to rocketmq topic {}",
+                dbHistoryName, topicName);
+        // init config
+        connectConfig = new RocketMqConnectConfig(config, dbHistoryName);
+    }
+
+    @Override
+    public void initializeStorage() {
+        super.initializeStorage();
+        log.info("try to create history topic: {}!", this.topicName);
+        TopicConfig topicConfig = new TopicConfig(this.topicName);
+        ConnectUtil.createTopic(connectConfig, topicConfig);
+    }
+
+    @Override
+    public void start() {
+        super.start();
+        try {
+            // init consumer
+            this.consumer = ConnectUtil.initDefaultMQPushConsumer(connectConfig);
+            Set<String> consumerGroupSet = ConnectUtil.fetchAllConsumerGroupList(connectConfig);
+            if (!consumerGroupSet.contains(consumer.getConsumerGroup())) {
+                ConnectUtil.createSubGroup(connectConfig, consumer.getConsumerGroup());
+            }
+            this.consumer.subscribe(this.topicName, "*");
+        } catch (MQClientException e) {
+            throw new DatabaseHistoryException(e);
+        }
+
+        this.producer = ConnectUtil.initDefaultMQProducer(connectConfig);
+        try {
+            this.producer.start();
+        } catch (MQClientException e) {
+            throw new DatabaseHistoryException(e);
+        }
+    }
+
+
+    @Override
+    public void stop() {
+        try {
+            if (this.producer != null) {
+                producer.shutdown();
+                this.producer = null;
+            }
+            if (this.consumer != null) {
+                this.consumer.shutdown();
+                this.consumer = null;
+            }
+        } catch (Exception pe) {
+            log.warn("Failed to closing rocketmq client", pe);
+        }
+    }
+
+    /**
+     * recover record
+     *
+     * @param records
+     */
+    @Override
+    protected void recoverRecords(Consumer<HistoryRecord> records) {
+        consumer.registerMessageListener(new MessageListenerConcurrently() {
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
+                msgs.forEach(message -> {
+                    try {
+                        HistoryRecord recordObj = new HistoryRecord(reader.read(message.getBody()));
+                        log.trace("Recovering database history: {}", recordObj);
+                        if (recordObj == null || !recordObj.isValid()) {
+                            log.warn("Skipping invalid database history record '{}'. " +
+                                            "This is often not an issue, but if it happens repeatedly please check the '{}' topic.",
+                                    recordObj, topicName);
+                        } else {
+                            records.accept(recordObj);
+                            log.trace("Recovered database history: {}", recordObj);
+                        }
+                    } catch (IOException e) {
+                        throw new DatabaseHistoryException(e);
+                    }
+                });
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        });
+        try {
+            consumer.start();
+        } catch (MQClientException e) {
+            throw new DatabaseHistoryException(e);
+        }
+        log.info("Consumer started.");
+    }
+
+    @Override
+    protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
+        log.info(record.toString());
+        if (this.producer == null) {
+            throw new IllegalStateException("No producer is available. Ensure that 'start()'"
+                    + " is called before storing database history records.");
+        }
+        if (log.isTraceEnabled()) {
+            log.trace("Storing record into database history: {}", record);
+        }
+        try {
+            Message sourceMessage = new Message();
+            sourceMessage.setTopic(this.topicName);
+            final byte[] messageBody = JSON.toJSONString(record).getBytes();
+            sourceMessage.setBody(messageBody);
+            producer.send(sourceMessage);
+        } catch (Exception e) {
+            throw new DatabaseHistoryException(e);
+        }
+    }
+
+
+    @Override
+    public boolean exists() {
+        return this.storageExists();
+    }
+
+    @Override
+    public boolean storageExists() {
+        // check topic is exist
+        return ConnectUtil.isTopicExist(connectConfig, this.topicName);
+    }
+
+    @Override
+    public String toString() {
+        if (topicName != null) {
+            return "Rocketmq topic (" + topicName + ")";
+        }
+        return "Rocketmq topic";
+    }
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/SchemaRenameTransformation.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/SchemaRenameTransformation.java
new file mode 100644
index 0000000..9d5b796
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/SchemaRenameTransformation.java
@@ -0,0 +1,29 @@
+package org.apache.rocketmq.connect.debezium;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+
+import java.util.Map;
+
+public class SchemaRenameTransformation implements Transformation {
+    @Override
+    public ConnectRecord apply(ConnectRecord connectRecord) {
+        return null;
+    }
+
+    @Override
+    public ConfigDef config() {
+        return null;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public void configure(Map<String, ?> map) {
+
+    }
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mongodb/pom.xml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mongodb/pom.xml
new file mode 100644
index 0000000..2130556
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mongodb/pom.xml
@@ -0,0 +1,190 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>rocketmq-connect-debezium</artifactId>
+        <groupId>org.apache.rocketmq</groupId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>rocketmq-connect-debezium-mongodb</artifactId>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>kafka-connect-adaptor</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-connect-debezium-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>io.openmessaging</groupId>
+                    <artifactId>openmessaging-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.debezium</groupId>
+            <artifactId>debezium-connector-mongodb</artifactId>
+            <version>${debezium.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+                <filtering>true</filtering>
+            </resource>
+        </resources>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>versions-maven-plugin</artifactId>
+                <version>2.3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.7</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <configuration>
+                    <outputDirectory>${project.build.directory}/lib</outputDirectory>
+                    <excludeTransitive>false</excludeTransitive>
+                    <stripVersion>true</stripVersion>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.6.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.19.1</version>
+                <configuration>
+                    <argLine>-Xms512m -Xmx1024m</argLine>
+                    <forkMode>always</forkMode>
+                    <includes>
+                        <include>**/*Test.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-site-plugin</artifactId>
+                <version>3.6</version>
+                <configuration>
+                    <locales>en_US</locales>
+                    <outputEncoding>UTF-8</outputEncoding>
+                    <inputEncoding>UTF-8</inputEncoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                    <locale>en_US</locale>
+                    <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>aggregate</id>
+                        <goals>
+                            <goal>aggregate</goal>
+                        </goals>
+                        <phase>site</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>3.0.2</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.0.0</version>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <version>2.17</version>
+                <executions>
+                    <execution>
+                        <id>verify</id>
+                        <phase>verify</phase>
+                        <configuration>
+                            <configLocation>../../../style/rmq_checkstyle.xml</configLocation>
+                            <encoding>UTF-8</encoding>
+                            <consoleOutput>true</consoleOutput>
+                            <failsOnError>true</failsOnError>
+                            <includeTestSourceDirectory>false</includeTestSourceDirectory>
+                            <includeTestResources>false</includeTestResources>
+                        </configuration>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mongodb/src/main/java/org/apache/rocketmq/connect/debezium/mongodb/DebeziumMongoDBConnector.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mongodb/src/main/java/org/apache/rocketmq/connect/debezium/mongodb/DebeziumMongoDBConnector.java
new file mode 100644
index 0000000..c45c7b0
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mongodb/src/main/java/org/apache/rocketmq/connect/debezium/mongodb/DebeziumMongoDBConnector.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium.mongodb;
+
+import io.openmessaging.connector.api.component.task.Task;
+import org.apache.rocketmq.connect.debezium.DebeziumConnector;
+
+
+
+
+/**
+ * debezium mongodb connector
+ */
+public class DebeziumMongoDBConnector extends DebeziumConnector {
+
+    private static final String DEFAULT_CONNECTOR = "io.debezium.connector.mongodb.MongoDbConnector";
+
+    /**
+     * Return the current connector class
+     * @return task implement class
+     */
+    @Override
+    public Class<? extends Task> taskClass() {
+        return DebeziumMongoDBSource.class;
+    }
+
+    /**
+     * get connector class
+     */
+    @Override
+    public String getConnectorClass() {
+        return DEFAULT_CONNECTOR;
+    }
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mongodb/src/main/java/org/apache/rocketmq/connect/debezium/mongodb/DebeziumMongoDBSource.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mongodb/src/main/java/org/apache/rocketmq/connect/debezium/mongodb/DebeziumMongoDBSource.java
new file mode 100644
index 0000000..7c8cee5
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mongodb/src/main/java/org/apache/rocketmq/connect/debezium/mongodb/DebeziumMongoDBSource.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium.mongodb;
+
+
+import org.apache.rocketmq.connect.debezium.DebeziumSource;
+
+
+/**
+ * A rocketmq connect source that runs debezium mongodb source
+ */
+public class DebeziumMongoDBSource extends DebeziumSource {
+    private static final String DEFAULT_TASK = "io.debezium.connector.mongodb.MongoDbConnectorTask";
+
+    /**
+     * get task class
+     */
+    @Override
+    public String getTaskClass() {
+        return DEFAULT_TASK;
+    }
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mongodb/src/main/resources/debezium-mongodb-source-config.yaml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mongodb/src/main/resources/debezium-mongodb-source-config.yaml
new file mode 100644
index 0000000..fe95886
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mongodb/src/main/resources/debezium-mongodb-source-config.yaml
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/pom.xml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/pom.xml
new file mode 100644
index 0000000..0c027d0
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/pom.xml
@@ -0,0 +1,197 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>rocketmq-connect-debezium</artifactId>
+        <groupId>org.apache.rocketmq</groupId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>rocketmq-connect-debezium-mysql</artifactId>
+
+
+    <dependencies>
+
+        <dependency>
+            <groupId>io.debezium</groupId>
+            <artifactId>debezium-connector-mysql</artifactId>
+            <version>${debezium.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>kafka-connect-adaptor</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-connect-debezium-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>io.openmessaging</groupId>
+                    <artifactId>openmessaging-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>kafka-connect-adaptor</artifactId>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+                <filtering>true</filtering>
+            </resource>
+        </resources>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>versions-maven-plugin</artifactId>
+                <version>2.3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.7</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <configuration>
+                    <outputDirectory>${project.build.directory}/lib</outputDirectory>
+                    <excludeTransitive>false</excludeTransitive>
+                    <stripVersion>true</stripVersion>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.6.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.19.1</version>
+                <configuration>
+                    <argLine>-Xms512m -Xmx1024m</argLine>
+                    <forkMode>always</forkMode>
+                    <includes>
+                        <include>**/*Test.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-site-plugin</artifactId>
+                <version>3.6</version>
+                <configuration>
+                    <locales>en_US</locales>
+                    <outputEncoding>UTF-8</outputEncoding>
+                    <inputEncoding>UTF-8</inputEncoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                    <locale>en_US</locale>
+                    <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>aggregate</id>
+                        <goals>
+                            <goal>aggregate</goal>
+                        </goals>
+                        <phase>site</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>3.0.2</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.0.0</version>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <version>2.17</version>
+                <executions>
+                    <execution>
+                        <id>verify</id>
+                        <phase>verify</phase>
+                        <configuration>
+                            <configLocation>../../../style/rmq_checkstyle.xml</configLocation>
+                            <encoding>UTF-8</encoding>
+                            <consoleOutput>true</consoleOutput>
+                            <failsOnError>true</failsOnError>
+                            <includeTestSourceDirectory>false</includeTestSourceDirectory>
+                            <includeTestResources>false</includeTestResources>
+                        </configuration>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/java/org/apache/rocketmq/connect/debezium/mysql/DebeziumMysqlConnector.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/java/org/apache/rocketmq/connect/debezium/mysql/DebeziumMysqlConnector.java
new file mode 100644
index 0000000..074146c
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/java/org/apache/rocketmq/connect/debezium/mysql/DebeziumMysqlConnector.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium.mysql;
+
+import io.openmessaging.connector.api.component.task.Task;
+import org.apache.rocketmq.connect.debezium.DebeziumConnector;
+
+
+
+/**
+ * debezium mysql connector
+ */
+public class DebeziumMysqlConnector extends DebeziumConnector {
+    private static final String DEFAULT_CONNECTOR = "io.debezium.connector.mysql.MySqlConnector";
+
+    /**
+     * Return the current connector class
+     * @return task implement class
+     */
+    @Override
+    public Class<? extends Task> taskClass() {
+        return DebeziumMysqlSource.class;
+    }
+
+    /**
+     * get connector class
+     */
+    @Override
+    public String getConnectorClass() {
+        return DEFAULT_CONNECTOR;
+    }
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/java/org/apache/rocketmq/connect/debezium/mysql/DebeziumMysqlSource.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/java/org/apache/rocketmq/connect/debezium/mysql/DebeziumMysqlSource.java
new file mode 100644
index 0000000..f177eba
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/java/org/apache/rocketmq/connect/debezium/mysql/DebeziumMysqlSource.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium.mysql;
+
+
+import org.apache.rocketmq.connect.debezium.DebeziumSource;
+
+
+/**
+ * A rocketmq connect source that runs debezium mysql source.
+ */
+public class DebeziumMysqlSource extends DebeziumSource {
+    private static final String DEFAULT_TASK = "io.debezium.connector.mysql.MySqlConnectorTask";
+
+    /**
+     * get task class
+     */
+    @Override
+    public String getTaskClass() {
+        return DEFAULT_TASK;
+    }
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/resources/debezium-mysql-source-config.yaml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/resources/debezium-mysql-source-config.yaml
new file mode 100644
index 0000000..a371d4d
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/resources/debezium-mysql-source-config.yaml
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+{
+  "connector-class":"org.apache.rocketmq.connect.debezium.mysql.DebeziumMysqlConnector",
+  "max-task":"1",
+  "connect-topicname":"debezium-mysql-source",
+  "debezium.transforms": "Unwrap",
+  "debezium.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
+  "debezium.transforms.Unwrap.delete.handling.mode": "drop",
+
+  "database.history.skip.unparseable.ddl": true,
+  "database.history.name.srv.addr": "localhost:9876",
+  "database.history.rocketmq.topic": "db-history-topic-02",
+  "database.history.store.only.monitored.tables.ddl": true,
+
+  "database.user": "*******",
+  "include.schema.changes": false,
+  "database.server.name": "test-server-02",
+  "database.port": 3306,
+  "database.hostname": "**********",
+  "database.connectionTimeZone": "Asia/Shanghai",
+  "database.password": "********",
+  "table.include.list": "db-01.table",
+  "max.batch.size": 50,
+  "database.include.list": "db-01",
+  "snapshot.mode": "when_needed",
+
+  "source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/pom.xml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/pom.xml
new file mode 100644
index 0000000..cd18267
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/pom.xml
@@ -0,0 +1,190 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>rocketmq-connect-debezium</artifactId>
+        <groupId>org.apache.rocketmq</groupId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>rocketmq-connect-debezium-oracle</artifactId>
+
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>kafka-connect-adaptor</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-connect-debezium-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>io.openmessaging</groupId>
+                    <artifactId>openmessaging-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.debezium</groupId>
+            <artifactId>debezium-connector-oracle</artifactId>
+            <version>${debezium.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+                <filtering>true</filtering>
+            </resource>
+        </resources>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>versions-maven-plugin</artifactId>
+                <version>2.3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.7</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <configuration>
+                    <outputDirectory>${project.build.directory}/lib</outputDirectory>
+                    <excludeTransitive>false</excludeTransitive>
+                    <stripVersion>true</stripVersion>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.6.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.19.1</version>
+                <configuration>
+                    <argLine>-Xms512m -Xmx1024m</argLine>
+                    <forkMode>always</forkMode>
+                    <includes>
+                        <include>**/*Test.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-site-plugin</artifactId>
+                <version>3.6</version>
+                <configuration>
+                    <locales>en_US</locales>
+                    <outputEncoding>UTF-8</outputEncoding>
+                    <inputEncoding>UTF-8</inputEncoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                    <locale>en_US</locale>
+                    <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>aggregate</id>
+                        <goals>
+                            <goal>aggregate</goal>
+                        </goals>
+                        <phase>site</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>3.0.2</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.0.0</version>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <version>2.17</version>
+                <executions>
+                    <execution>
+                        <id>verify</id>
+                        <phase>verify</phase>
+                        <configuration>
+                            <configLocation>../../../style/rmq_checkstyle.xml</configLocation>
+                            <encoding>UTF-8</encoding>
+                            <consoleOutput>true</consoleOutput>
+                            <failsOnError>true</failsOnError>
+                            <includeTestSourceDirectory>false</includeTestSourceDirectory>
+                            <includeTestResources>false</includeTestResources>
+                        </configuration>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/src/main/java/org/apache/rocketmq/connect/debezium/oracle/DebeziumOracleConnector.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/src/main/java/org/apache/rocketmq/connect/debezium/oracle/DebeziumOracleConnector.java
new file mode 100644
index 0000000..ad051d4
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/src/main/java/org/apache/rocketmq/connect/debezium/oracle/DebeziumOracleConnector.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium.oracle;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.Task;
+import org.apache.rocketmq.connect.debezium.DebeziumConnector;
+
+
+/**
+ * debezium oracle connector
+ */
+public class DebeziumOracleConnector extends DebeziumConnector {
+    private static final String DEFAULT_CONNECTOR = "io.debezium.connector.oracle.OracleConnector";
+
+    /**
+     * Return the current connector class
+     * @return task implement class
+     */
+    @Override
+    public Class<? extends Task> taskClass() {
+        return DebeziumOracleSource.class;
+    }
+
+    /**
+     * get connector class
+     */
+    @Override
+    public String getConnectorClass() {
+        return DEFAULT_CONNECTOR;
+    }
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/src/main/java/org/apache/rocketmq/connect/debezium/oracle/DebeziumOracleSource.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/src/main/java/org/apache/rocketmq/connect/debezium/oracle/DebeziumOracleSource.java
new file mode 100644
index 0000000..6ddd6d0
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/src/main/java/org/apache/rocketmq/connect/debezium/oracle/DebeziumOracleSource.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium.oracle;
+
+
+import org.apache.rocketmq.connect.debezium.DebeziumSource;
+
+/**
+ * A rocketmq connect source that runs debezium oracle source.
+ */
+public class DebeziumOracleSource extends DebeziumSource {
+    private static final String DEFAULT_TASK = "io.debezium.connector.oracle.OracleConnectorTask";
+
+
+    /**
+     * get task class
+     */
+    @Override
+    public String getTaskClass() {
+        return DEFAULT_TASK;
+    }
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/src/main/resources/debezium-oracle-source-config.yaml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/src/main/resources/debezium-oracle-source-config.yaml
new file mode 100644
index 0000000..d8a500d
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/src/main/resources/debezium-oracle-source-config.yaml
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-postgresql/pom.xml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-postgresql/pom.xml
new file mode 100644
index 0000000..849a0b7
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-postgresql/pom.xml
@@ -0,0 +1,195 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>rocketmq-connect-debezium</artifactId>
+        <groupId>org.apache.rocketmq</groupId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>rocketmq-connect-debezium-postgresql</artifactId>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>kafka-connect-adaptor</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-connect-debezium-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>io.openmessaging</groupId>
+                    <artifactId>openmessaging-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.debezium</groupId>
+            <artifactId>debezium-connector-postgres</artifactId>
+            <version>${debezium.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>${debezium.postgresql.version}</version>
+            <scope>runtime</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+                <filtering>true</filtering>
+            </resource>
+        </resources>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>versions-maven-plugin</artifactId>
+                <version>2.3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.7</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <configuration>
+                    <outputDirectory>${project.build.directory}/lib</outputDirectory>
+                    <excludeTransitive>false</excludeTransitive>
+                    <stripVersion>true</stripVersion>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.6.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.19.1</version>
+                <configuration>
+                    <argLine>-Xms512m -Xmx1024m</argLine>
+                    <forkMode>always</forkMode>
+                    <includes>
+                        <include>**/*Test.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-site-plugin</artifactId>
+                <version>3.6</version>
+                <configuration>
+                    <locales>en_US</locales>
+                    <outputEncoding>UTF-8</outputEncoding>
+                    <inputEncoding>UTF-8</inputEncoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                    <locale>en_US</locale>
+                    <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>aggregate</id>
+                        <goals>
+                            <goal>aggregate</goal>
+                        </goals>
+                        <phase>site</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>3.0.2</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.0.0</version>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <version>2.17</version>
+                <executions>
+                    <execution>
+                        <id>verify</id>
+                        <phase>verify</phase>
+                        <configuration>
+                            <configLocation>../../../style/rmq_checkstyle.xml</configLocation>
+                            <encoding>UTF-8</encoding>
+                            <consoleOutput>true</consoleOutput>
+                            <failsOnError>true</failsOnError>
+                            <includeTestSourceDirectory>false</includeTestSourceDirectory>
+                            <includeTestResources>false</includeTestResources>
+                        </configuration>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-postgresql/src/main/java/org/apache/rocketmq/connect/debezium/postgres/DebeziumPostgresConnector.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-postgresql/src/main/java/org/apache/rocketmq/connect/debezium/postgres/DebeziumPostgresConnector.java
new file mode 100644
index 0000000..5149294
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-postgresql/src/main/java/org/apache/rocketmq/connect/debezium/postgres/DebeziumPostgresConnector.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium.postgres;
+
+import io.openmessaging.connector.api.component.task.Task;
+import org.apache.rocketmq.connect.debezium.DebeziumConnector;
+
+
+/**
+ * debezium postgresql connector
+ */
+public class DebeziumPostgresConnector extends DebeziumConnector {
+
+    private static final String DEFAULT_CONNECTOR = "io.debezium.connector.postgresql.PostgresConnector";
+
+    /**
+     * Return the current connector class
+     * @return task implement class
+     */
+    @Override
+    public Class<? extends Task> taskClass() {
+        return DebeziumPostgresSource.class;
+    }
+
+    /**
+     * get connector class
+     */
+    @Override
+    public String getConnectorClass() {
+        return DEFAULT_CONNECTOR;
+    }
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-postgresql/src/main/java/org/apache/rocketmq/connect/debezium/postgres/DebeziumPostgresSource.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-postgresql/src/main/java/org/apache/rocketmq/connect/debezium/postgres/DebeziumPostgresSource.java
new file mode 100644
index 0000000..274a0b4
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-postgresql/src/main/java/org/apache/rocketmq/connect/debezium/postgres/DebeziumPostgresSource.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium.postgres;
+
+import org.apache.rocketmq.connect.debezium.DebeziumSource;
+
+/**
+ * A rocketmq connect source that runs debezium postgresql source.
+ */
+public class DebeziumPostgresSource extends DebeziumSource {
+    private static final String DEFAULT_TASK = "io.debezium.connector.postgresql.PostgresConnectorTask";
+
+
+    /**
+     * get task class
+     */
+    @Override
+    public String getTaskClass() {
+        return DEFAULT_TASK;
+    }
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-postgresql/src/main/resources/debezium-postgres-source-config.yaml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-postgresql/src/main/resources/debezium-postgres-source-config.yaml
new file mode 100644
index 0000000..fe95886
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-postgresql/src/main/resources/debezium-postgres-source-config.yaml
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-sqlserver/pom.xml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-sqlserver/pom.xml
new file mode 100644
index 0000000..d160732
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-sqlserver/pom.xml
@@ -0,0 +1,187 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>rocketmq-connect-debezium</artifactId>
+        <groupId>org.apache.rocketmq</groupId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>rocketmq-connect-debezium-sqlserver</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>kafka-connect-adaptor</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-connect-debezium-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-connector</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>io.openmessaging</groupId>
+                    <artifactId>openmessaging-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>io.openmessaging</groupId>
+            <artifactId>openmessaging-api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.debezium</groupId>
+            <artifactId>debezium-connector-sqlserver</artifactId>
+            <version>${debezium.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+                <filtering>true</filtering>
+            </resource>
+        </resources>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>versions-maven-plugin</artifactId>
+                <version>2.3</version>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>clirr-maven-plugin</artifactId>
+                <version>2.7</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <configuration>
+                    <outputDirectory>${project.build.directory}/lib</outputDirectory>
+                    <excludeTransitive>false</excludeTransitive>
+                    <stripVersion>true</stripVersion>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.6.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.19.1</version>
+                <configuration>
+                    <argLine>-Xms512m -Xmx1024m</argLine>
+                    <forkMode>always</forkMode>
+                    <includes>
+                        <include>**/*Test.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-site-plugin</artifactId>
+                <version>3.6</version>
+                <configuration>
+                    <locales>en_US</locales>
+                    <outputEncoding>UTF-8</outputEncoding>
+                    <inputEncoding>UTF-8</inputEncoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-source-plugin</artifactId>
+                <version>3.0.1</version>
+                <executions>
+                    <execution>
+                        <id>attach-sources</id>
+                        <goals>
+                            <goal>jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <version>2.10.4</version>
+                <configuration>
+                    <charset>UTF-8</charset>
+                    <locale>en_US</locale>
+                    <excludePackageNames>io.openmessaging.internal</excludePackageNames>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>aggregate</id>
+                        <goals>
+                            <goal>aggregate</goal>
+                        </goals>
+                        <phase>site</phase>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-resources-plugin</artifactId>
+                <version>3.0.2</version>
+                <configuration>
+                    <encoding>${project.build.sourceEncoding}</encoding>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>3.0.4</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.0.0</version>
+                <configuration>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <version>2.17</version>
+                <executions>
+                    <execution>
+                        <id>verify</id>
+                        <phase>verify</phase>
+                        <configuration>
+                            <configLocation>../../../style/rmq_checkstyle.xml</configLocation>
+                            <encoding>UTF-8</encoding>
+                            <consoleOutput>true</consoleOutput>
+                            <failsOnError>true</failsOnError>
+                            <includeTestSourceDirectory>false</includeTestSourceDirectory>
+                            <includeTestResources>false</includeTestResources>
+                        </configuration>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-sqlserver/src/main/java/org/apache/rocketmq/connect/debezium/sqlserver/DebeziumSqlServerConnector.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-sqlserver/src/main/java/org/apache/rocketmq/connect/debezium/sqlserver/DebeziumSqlServerConnector.java
new file mode 100644
index 0000000..db426e9
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-sqlserver/src/main/java/org/apache/rocketmq/connect/debezium/sqlserver/DebeziumSqlServerConnector.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium.sqlserver;
+
+import io.openmessaging.connector.api.component.task.Task;
+import org.apache.rocketmq.connect.debezium.DebeziumConnector;
+
+
+/**
+ * debezium sqlserver connector
+ */
+public class DebeziumSqlServerConnector extends DebeziumConnector {
+
+    private static final String DEFAULT_CONNECTOR = "io.debezium.connector.sqlserver.SqlServerConnector";
+
+    /**
+     * Return the current connector class
+     * @return task implement class
+     */
+    @Override
+    public Class<? extends Task> taskClass() {
+        return DebeziumSqlServerSource.class;
+    }
+
+    /**
+     * get connector class
+     */
+    @Override
+    public String getConnectorClass() {
+        return DEFAULT_CONNECTOR;
+    }
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-sqlserver/src/main/java/org/apache/rocketmq/connect/debezium/sqlserver/DebeziumSqlServerSource.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-sqlserver/src/main/java/org/apache/rocketmq/connect/debezium/sqlserver/DebeziumSqlServerSource.java
new file mode 100644
index 0000000..b9f68d1
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-sqlserver/src/main/java/org/apache/rocketmq/connect/debezium/sqlserver/DebeziumSqlServerSource.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.debezium.sqlserver;
+
+import org.apache.rocketmq.connect.debezium.DebeziumSource;
+
+/**
+ * A rocketmq connect source that runs debezium sqlserver source
+ */
+public class DebeziumSqlServerSource extends DebeziumSource {
+    private static final String DEFAULT_TASK = "io.debezium.connector.sqlserver.SqlServerConnectorTask";
+
+    /**
+     * get task class
+     */
+    @Override
+    public String getTaskClass() {
+        return DEFAULT_TASK;
+    }
+}
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-sqlserver/src/main/resources/debezium-sqlserver-source-config.yaml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-sqlserver/src/main/resources/debezium-sqlserver-source-config.yaml
new file mode 100644
index 0000000..b8981cf
--- /dev/null
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-sqlserver/src/main/resources/debezium-sqlserver-source-config.yaml
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# speercific language governing permissions and limitations
+# under the License.
+#
+
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/DebeziumTimeTypes.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/DebeziumTimeTypes.java
new file mode 100644
index 0000000..81268cd
--- /dev/null
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/DebeziumTimeTypes.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jdbc.common;
+
+import java.time.LocalDate;
+import java.time.ZoneOffset;
+
+
+/**
+ * debezium time type
+ */
+public class DebeziumTimeTypes {
+    public static final String DATE = "io.debezium.time.Date";
+    public static final String INTERVAL = "io.debezium.time.Interval";
+    public static final String MICRO_DURATION = "io.debezium.time.MicroDuration";
+    public static final String MICRO_TIME = "io.debezium.time.MicroTime";
+    public static final String MICRO_TIMESTAMP = "io.debezium.time.MicroTimestamp";
+    public static final String NANO_DURATION = "io.debezium.time.NanoDuration";
+    public static final String NANO_TIME = "io.debezium.time.NanoTime";
+    public static final String NANO_TIMESTAMP = "io.debezium.time.NanoTimestamp";
+    public static final String TIME = "io.debezium.time.Time";
+    public static final String TIMESTAMP = "io.debezium.time.Timestamp";
+    public static final String YEAR = "io.debezium.time.Year";
+    public static final String ZONED_TIME = "io.debezium.time.ZonedTime";
+    public static final String ZONED_TIMESTAMP = "io.debezium.time.ZonedTimestamp";
+
+    public static Object toMillsTimestamp(String schemaName, Object value){
+        if(schemaName == null){
+            return value;
+        }
+        switch (schemaName){
+            case DATE:
+                return LocalDate.ofEpochDay((long)value)
+                        .atStartOfDay(ZoneOffset.ofHours(8))
+                        .toInstant()
+                        .toEpochMilli();
+            case TIMESTAMP:
+                return value;
+            default:
+                return value;
+        }
+    }
+
+}
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/HeaderField.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/HeaderField.java
new file mode 100644
index 0000000..8b150da
--- /dev/null
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/common/HeaderField.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.jdbc.common;
+
+/**
+ * header field
+ */
+public interface HeaderField {
+
+    String __source_table_key="__source_table";
+    String __source_db_key="__source_db";
+    String __sink_table_key="__sink_table";
+    String __sink_db_key="__sink_db";
+}
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/GenericDatabaseDialect.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/GenericDatabaseDialect.java
index 089d10d..81cec14 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/GenericDatabaseDialect.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/GenericDatabaseDialect.java
@@ -22,6 +22,7 @@ import io.openmessaging.connector.api.data.SchemaBuilder;
 import io.openmessaging.connector.api.data.logical.Date;
 import io.openmessaging.connector.api.data.logical.Decimal;
 import io.openmessaging.connector.api.data.logical.Time;
+import org.apache.rocketmq.connect.jdbc.common.DebeziumTimeTypes;
 import org.apache.rocketmq.connect.jdbc.config.AbstractConfig;
 import org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConfig;
 import org.apache.rocketmq.connect.jdbc.connector.JdbcSourceConfig;
@@ -1611,6 +1612,19 @@ public class GenericDatabaseDialect implements DatabaseDialect {
                             DateTimeUtils.getTimeZoneCalendar(timeZone)
                     );
                     return true;
+
+                case DebeziumTimeTypes.DATE:
+                    statement.setDate(index,
+                            new java.sql.Date((long)DebeziumTimeTypes.toMillsTimestamp(DebeziumTimeTypes.DATE, value)),
+                            DateTimeUtils.getTimeZoneCalendar(timeZone)
+                    );
+                    return true;
+                case DebeziumTimeTypes.TIMESTAMP:
+                    statement.setTimestamp(index,
+                            new java.sql.Timestamp((long)DebeziumTimeTypes.toMillsTimestamp(DebeziumTimeTypes.TIMESTAMP, value)),
+                            DateTimeUtils.getTimeZoneCalendar(timeZone)
+                    );
+                    return true;
                 default:
                     return false;
             }
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java
index dcd22f9..d1d031d 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/Updater.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.connect.jdbc.sink;
 
 import io.openmessaging.connector.api.data.ConnectRecord;
+import org.apache.rocketmq.connect.jdbc.common.HeaderField;
 import org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConfig;
 import org.apache.rocketmq.connect.jdbc.dialect.DatabaseDialect;
 import org.apache.rocketmq.connect.jdbc.dialect.provider.CachedConnectionProvider;
@@ -100,6 +101,9 @@ public class Updater {
 
     TableId destinationTable(ConnectRecord record) {
         // todo table from header
+        if (config.isTableFromHeader()){
+            return dbDialect.parseToTableId(record.getExtensions().getString(HeaderField.__source_table_key));
+        }
         return dbDialect.parseToTableId(record.getSchema().getName());
     }
 }
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
index 125e269..f649bed 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
@@ -598,24 +598,27 @@ public class WorkerSinkTask implements WorkerTask {
 
             String bodyStr = new String(body, StandardCharsets.UTF_8);
             sinkDataEntry = new ConnectRecord(recordPartition, recordOffset, timestamp, schema, bodyStr);
-            KeyValue keyValue = new DefaultKeyValue();
-            if (MapUtils.isNotEmpty(properties)) {
-                for (Map.Entry<String, String> entry : properties.entrySet()) {
-                    if (MQ_SYS_KEYS.contains(entry.getKey())) {
-                        keyValue.put("MQ-SYS-" + entry.getKey(), entry.getValue());
-                    } else if (entry.getKey().startsWith("connect-ext-")) {
-                        keyValue.put(entry.getKey().replaceAll("connect-ext-", ""), entry.getValue());
-                    } else {
-                        keyValue.put(entry.getKey(), entry.getValue());
-                    }
-                }
-            }
-            sinkDataEntry.addExtension(keyValue);
+
         } else {
             final byte[] messageBody = message.getBody();
             String s = new String(messageBody);
             sinkDataEntry = JSON.parseObject(s, ConnectRecord.class);
         }
+
+        KeyValue keyValue = new DefaultKeyValue();
+        if (MapUtils.isNotEmpty(properties)) {
+            for (Map.Entry<String, String> entry : properties.entrySet()) {
+                if (MQ_SYS_KEYS.contains(entry.getKey())) {
+                    keyValue.put("MQ-SYS-" + entry.getKey(), entry.getValue());
+                } else if (entry.getKey().startsWith("connect-ext-")) {
+                    keyValue.put(entry.getKey().replaceAll("connect-ext-", ""), entry.getValue());
+                } else {
+                    keyValue.put(entry.getKey(), entry.getValue());
+                }
+            }
+        }
+        sinkDataEntry.addExtension(keyValue);
+
         return sinkDataEntry;
     }
 
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index 2a2cb07..1271df7 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -321,8 +321,8 @@ public class WorkerSourceTask implements WorkerTask {
                 throw new ConnectException("source connect lack of topic config");
             }
             sourceMessage.setTopic(topic);
+            putExtendMsgProperty(sourceDataEntry, sourceMessage, topic);
             if (null == recordConverter || recordConverter instanceof RocketMQConverter) {
-                putExtendMsgProperty(sourceDataEntry, sourceMessage, topic);
                 Object payload = sourceDataEntry.getData();
                 if (null != payload) {
                     final byte[] messageBody = (String.valueOf(payload)).getBytes();
@@ -396,6 +396,7 @@ public class WorkerSourceTask implements WorkerTask {
             log.info("extension keySet null.");
             return;
         }
+
         for (String key : keySet) {
             if (WHITE_KEY_SET.contains(key)) {
                 MessageAccessor.putProperty(sourceMessage, key, extensionKeyValues.getString(key));
@@ -403,6 +404,7 @@ public class WorkerSourceTask implements WorkerTask {
                 MessageAccessor.putProperty(sourceMessage, "connect-ext-" + key, extensionKeyValues.getString(key));
             }
         }
+
     }
 
     @Override