You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/07/15 05:57:25 UTC

[rocketmq-connect] branch master updated: [ISSUE #191]Encountered change event for table databasename.tablename whose schema isn`t known to this connector (#195)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b6da66  [ISSUE #191]Encountered change event for table databasename.tablename whose schema isn`t known to this connector (#195)
0b6da66 is described below

commit 0b6da66e85b002c1c51b5e433ce9c49fe35aa61f
Author: xiaoyi <su...@163.com>
AuthorDate: Fri Jul 15 13:57:20 2022 +0800

    [ISSUE #191]Encountered change event for table databasename.tablename whose schema isn`t known to this connector (#195)
    
    * Fix debezium demecial type conversion problem #190
    
    * Upgrade rocketmq-replicator API to v0.1.3 #189
    
    * Encountered change event for table databasename.tablename whose schema isn`t known to this connector #191
---
 .../kafka-connect-adaptor/pom.xml                  |   4 +-
 .../connector/AbstractKafkaSinkConnector.java      |  27 ++++-
 .../connector/AbstractKafkaSourceConnector.java    |  30 +++++-
 .../adaptor/connector/ConnectorClassSetter.java    |  32 ++++--
 .../adaptor/connector/KafkaConnectorContext.java   |  20 +++-
 .../connector/KafkaSinkAdaptorConnector.java       |  22 +++-
 .../connector/KafkaSourceAdaptorConnector.java     |  20 +++-
 .../context/RocketMQKafkaErrantRecordReporter.java |   3 +-
 .../context/RocketMQKafkaSinkTaskContext.java      |  22 ++--
 .../kafka/connect/adaptor/schema/Converters.java   |  28 ++---
 .../adaptor/schema/KafkaSinkSchemaConverter.java   |  16 +--
 .../adaptor/schema/KafkaSinkValueConverter.java    |   3 +-
 .../schema/RocketMQSourceSchemaConverter.java      |   9 +-
 .../schema/RocketMQSourceValueConverter.java       |   1 -
 .../adaptor/task/AbstractKafkaConnectSink.java     |  12 +--
 .../adaptor/task/AbstractKafkaConnectSource.java   |  14 +--
 .../adaptor/task/KafkaConnectAdaptorSink.java      |   1 +
 .../connect/adaptor/task/TaskClassSetter.java      |  29 +++--
 .../connect/adaptor/SourceRecordConverterTest.java |   4 +-
 connectors/rocketmq-connect-debezium/pom.xml       |   6 +-
 .../rocketmq-connect-debezium-core/pom.xml         |   4 +-
 .../connect/debezium/DebeziumConnector.java        |  10 +-
 .../{ConnectUtil.java => RocketMQConnectUtil.java} | 119 +++++++++------------
 .../connect/debezium/RocketMqConnectConfig.java    |  32 +-----
 .../connect/debezium/RocketMqDatabaseHistory.java  | 105 +++++++-----------
 .../rocketmq-connect-debezium-mysql/pom.xml        |   4 +-
 .../debezium/mysql/DebeziumMysqlConnector.java     |   2 +-
 .../resources/debezium-mysql-source-config.yaml    |   8 +-
 .../rocketmq-connect-debezium-oracle/pom.xml       |   4 +-
 .../debezium/oracle/DebeziumOracleConnector.java   |   2 +-
 .../connect/runtime/connectorwrapper/Worker.java   |   4 +-
 31 files changed, 330 insertions(+), 267 deletions(-)

diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/pom.xml b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/pom.xml
index 64190bb..b79d77b 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/pom.xml
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/pom.xml
@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <artifactId>rocketmq-connect-debezium</artifactId>
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/AbstractKafkaSinkConnector.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/AbstractKafkaSinkConnector.java
index 8e5391e..242acdf 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/AbstractKafkaSinkConnector.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/AbstractKafkaSinkConnector.java
@@ -1,8 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
 package org.apache.rocketmq.connect.kafka.connect.adaptor.connector;
 
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.sink.SinkConnector;
-import io.openmessaging.connector.api.component.task.source.SourceConnector;
 import io.openmessaging.connector.api.errors.ConnectException;
 import io.openmessaging.internal.DefaultKeyValue;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
@@ -36,15 +52,17 @@ public abstract class AbstractKafkaSinkConnector extends SinkConnector implement
 
     /**
      * try override start and stop
+     *
      * @return
      */
-    protected org.apache.kafka.connect.sink.SinkConnector originalSinkConnector(){
+    protected org.apache.kafka.connect.sink.SinkConnector originalSinkConnector() {
         return sinkConnector;
     }
 
     /**
      * Returns a set of configurations for Tasks based on the current configuration,
      * producing at most count configurations.
+     *
      * @param maxTasks maximum number of configurations to generate
      * @return configurations for Tasks
      */
@@ -54,7 +72,7 @@ public abstract class AbstractKafkaSinkConnector extends SinkConnector implement
         List<KeyValue> configs = new ArrayList<>();
         for (Map<String, String> configMaps : groupConnectors) {
             KeyValue keyValue = new DefaultKeyValue();
-            configMaps.forEach((k, v)->{
+            configMaps.forEach((k, v) -> {
                 keyValue.put(k, v);
             });
             configs.add(keyValue);
@@ -64,6 +82,7 @@ public abstract class AbstractKafkaSinkConnector extends SinkConnector implement
 
     /**
      * Start the component
+     *
      * @param config component context
      */
     @Override
@@ -90,7 +109,7 @@ public abstract class AbstractKafkaSinkConnector extends SinkConnector implement
      */
     @Override
     public void stop() {
-        if (sinkConnector != null){
+        if (sinkConnector != null) {
             sinkConnector = null;
             configValue = null;
             taskConfig = null;
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/AbstractKafkaSourceConnector.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/AbstractKafkaSourceConnector.java
index f9d5456..921455c 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/AbstractKafkaSourceConnector.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/AbstractKafkaSourceConnector.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
 package org.apache.rocketmq.connect.kafka.connect.adaptor.connector;
 
 import io.openmessaging.KeyValue;
@@ -36,15 +53,17 @@ public abstract class AbstractKafkaSourceConnector extends SourceConnector imple
 
     /**
      * try override start and stop
+     *
      * @return
      */
-    protected org.apache.kafka.connect.source.SourceConnector originalSinkConnector(){
+    protected org.apache.kafka.connect.source.SourceConnector originalSinkConnector() {
         return sourceConnector;
     }
 
     /**
      * Returns a set of configurations for Tasks based on the current configuration,
      * producing at most count configurations.
+     *
      * @param maxTasks maximum number of configurations to generate
      * @return configurations for Tasks
      */
@@ -54,7 +73,7 @@ public abstract class AbstractKafkaSourceConnector extends SourceConnector imple
         List<KeyValue> configs = new ArrayList<>();
         for (Map<String, String> configMaps : groupConnectors) {
             KeyValue keyValue = new DefaultKeyValue();
-            configMaps.forEach((k, v)->{
+            configMaps.forEach((k, v) -> {
                 keyValue.put(k, v);
             });
             configs.add(keyValue);
@@ -64,6 +83,7 @@ public abstract class AbstractKafkaSourceConnector extends SourceConnector imple
 
     /**
      * Start the component
+     *
      * @param config component context
      */
     @Override
@@ -73,7 +93,7 @@ public abstract class AbstractKafkaSourceConnector extends SourceConnector imple
             this.configValue.put(key, config.getString(key));
         });
         setConnectorClass(configValue);
-       taskConfig = new HashMap<>(configValue.config());
+        taskConfig = new HashMap<>(configValue.config());
         // get the source class name from config and create source task from reflection
         try {
             sourceConnector = Class.forName(taskConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG))
@@ -90,9 +110,9 @@ public abstract class AbstractKafkaSourceConnector extends SourceConnector imple
      */
     @Override
     public void stop() {
-        if (sourceConnector != null){
+        if (sourceConnector != null) {
             sourceConnector = null;
-            configValue =  null;
+            configValue = null;
             this.taskConfig = null;
         }
     }
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/ConnectorClassSetter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/ConnectorClassSetter.java
index b0c0007..f323d9a 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/ConnectorClassSetter.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/ConnectorClassSetter.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
 package org.apache.rocketmq.connect.kafka.connect.adaptor.connector;
 
 import io.openmessaging.KeyValue;
@@ -8,15 +25,16 @@ import org.apache.kafka.connect.runtime.ConnectorConfig;
  */
 public interface ConnectorClassSetter {
     /**
-     * set connector class
-     * @param config
+     * get connector class
      */
-     default void setConnectorClass(KeyValue config) {
-         config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, getConnectorClass());
-     }
+    String getConnectorClass();
 
     /**
-     * get connector class
+     * set connector class
+     *
+     * @param config
      */
-    String getConnectorClass();
+    default void setConnectorClass(KeyValue config) {
+        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, getConnectorClass());
+    }
 }
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaConnectorContext.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaConnectorContext.java
index a42c507..1054e8a 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaConnectorContext.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaConnectorContext.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
 package org.apache.rocketmq.connect.kafka.connect.adaptor.connector;
 
 import org.apache.kafka.connect.connector.ConnectorContext;
@@ -7,7 +24,8 @@ import org.apache.kafka.connect.connector.ConnectorContext;
  */
 public class KafkaConnectorContext implements ConnectorContext {
     io.openmessaging.connector.api.component.connector.ConnectorContext context;
-    public KafkaConnectorContext(io.openmessaging.connector.api.component.connector.ConnectorContext context){
+
+    public KafkaConnectorContext(io.openmessaging.connector.api.component.connector.ConnectorContext context) {
         this.context = context;
     }
 
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaSinkAdaptorConnector.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaSinkAdaptorConnector.java
index 90575bf..a4d6d10 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaSinkAdaptorConnector.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaSinkAdaptorConnector.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
 package org.apache.rocketmq.connect.kafka.connect.adaptor.connector;
 
 import io.openmessaging.KeyValue;
@@ -9,6 +26,7 @@ public abstract class KafkaSinkAdaptorConnector extends AbstractKafkaSinkConnect
 
     /**
      * Start the component
+     *
      * @param config component context
      */
     @Override
@@ -24,8 +42,8 @@ public abstract class KafkaSinkAdaptorConnector extends AbstractKafkaSinkConnect
      */
     @Override
     public void stop() {
-        if (sinkConnector != null){
-           sinkConnector.stop();
+        if (sinkConnector != null) {
+            sinkConnector.stop();
         }
         super.stop();
     }
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaSourceAdaptorConnector.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaSourceAdaptorConnector.java
index cea0a22..337c90b 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaSourceAdaptorConnector.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/connector/KafkaSourceAdaptorConnector.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
 package org.apache.rocketmq.connect.kafka.connect.adaptor.connector;
 
 import io.openmessaging.KeyValue;
@@ -9,6 +26,7 @@ public abstract class KafkaSourceAdaptorConnector extends AbstractKafkaSourceCon
 
     /**
      * Start the component
+     *
      * @param config component context
      */
     @Override
@@ -24,7 +42,7 @@ public abstract class KafkaSourceAdaptorConnector extends AbstractKafkaSourceCon
      */
     @Override
     public void stop() {
-        if (sourceConnector != null){
+        if (sourceConnector != null) {
             sourceConnector.stop();
         }
         super.stop();
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaErrantRecordReporter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaErrantRecordReporter.java
index 3b951c6..0cd3c2d 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaErrantRecordReporter.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaErrantRecordReporter.java
@@ -14,7 +14,8 @@ import java.util.concurrent.Future;
  */
 public class RocketMQKafkaErrantRecordReporter implements ErrantRecordReporter {
     final ErrorRecordReporter errorRecordReporter;
-    RocketMQKafkaErrantRecordReporter(SinkTaskContext context){
+
+    RocketMQKafkaErrantRecordReporter(SinkTaskContext context) {
         errorRecordReporter = context.errorRecordReporter();
     }
 
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaSinkTaskContext.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaSinkTaskContext.java
index 0fd48a8..63ce1c6 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaSinkTaskContext.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/context/RocketMQKafkaSinkTaskContext.java
@@ -58,7 +58,7 @@ public class RocketMQKafkaSinkTaskContext implements SinkTaskContext {
     @Override
     public void offset(Map<TopicPartition, Long> map) {
         Map<RecordPartition, RecordOffset> offsets = new HashMap<>();
-        map.forEach((topicPartition, offset)->{
+        map.forEach((topicPartition, offset) -> {
             offsets.put(convertToRecordPartition(topicPartition), convertToRecordOffset(offset));
         });
         sinkContext.resetOffset(offsets);
@@ -76,7 +76,7 @@ public class RocketMQKafkaSinkTaskContext implements SinkTaskContext {
 
     @Override
     public Set<TopicPartition> assignment() {
-        Set<TopicPartition> topicPartitions =  new HashSet<>();
+        Set<TopicPartition> topicPartitions = new HashSet<>();
         Set<RecordPartition> recordPartitions = sinkContext.assignment();
         recordPartitions.forEach(partition -> {
             topicPartitions.add(convertToTopicPartition(partition.getPartition()));
@@ -87,9 +87,9 @@ public class RocketMQKafkaSinkTaskContext implements SinkTaskContext {
     @Override
     public void pause(TopicPartition... topicPartitions) {
         List<RecordPartition> partitions = new ArrayList<>();
-        for (TopicPartition topicPartition : topicPartitions){
+        for (TopicPartition topicPartition : topicPartitions) {
             RecordPartition recordPartition = convertToRecordPartition(topicPartition);
-            if (recordPartition != null){
+            if (recordPartition != null) {
                 partitions.add(recordPartition);
             }
         }
@@ -99,9 +99,9 @@ public class RocketMQKafkaSinkTaskContext implements SinkTaskContext {
     @Override
     public void resume(TopicPartition... topicPartitions) {
         List<RecordPartition> partitions = new ArrayList<>();
-        for (TopicPartition topicPartition : topicPartitions){
+        for (TopicPartition topicPartition : topicPartitions) {
             RecordPartition recordPartition = convertToRecordPartition(topicPartition);
-            if (recordPartition != null){
+            if (recordPartition != null) {
                 partitions.add(recordPartition);
             }
         }
@@ -121,11 +121,12 @@ public class RocketMQKafkaSinkTaskContext implements SinkTaskContext {
 
     /**
      * convert to kafka topic partition
+     *
      * @param partitionMap
      * @return
      */
     public TopicPartition convertToTopicPartition(Map<String, ?> partitionMap) {
-        if (partitionMap.containsKey(TOPIC) && partitionMap.containsKey(QUEUE_ID)){
+        if (partitionMap.containsKey(TOPIC) && partitionMap.containsKey(QUEUE_ID)) {
             return new TopicPartition(partitionMap.get(TOPIC).toString(), Integer.valueOf(partitionMap.get(QUEUE_ID).toString()));
         }
         return null;
@@ -133,18 +134,19 @@ public class RocketMQKafkaSinkTaskContext implements SinkTaskContext {
 
     /**
      * convert to rocketmq record partition
+     *
      * @param topicPartition
      * @return
      */
     public RecordPartition convertToRecordPartition(TopicPartition topicPartition) {
-        if (topicPartition != null){
-            return new RecordPartition(Collections.singletonMap(topicPartition.topic(),topicPartition.partition()));
+        if (topicPartition != null) {
+            return new RecordPartition(Collections.singletonMap(topicPartition.topic(), topicPartition.partition()));
         }
         return null;
     }
 
     private RecordOffset convertToRecordOffset(long l) {
-        return new RecordOffset(Collections.singletonMap(QUEUE_OFFSET,l));
+        return new RecordOffset(Collections.singletonMap(QUEUE_OFFSET, l));
     }
 
 
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java
index abc2abf..9c0c529 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java
@@ -38,7 +38,7 @@ import java.util.Map;
 public class Converters {
 
 
-    public static ConnectRecord fromSourceRecord(SourceRecord record){
+    public static ConnectRecord fromSourceRecord(SourceRecord record) {
         // sourceRecord convert connect Record
         RocketMQSourceSchemaConverter rocketMQSourceSchemaConverter = new RocketMQSourceSchemaConverter(record.valueSchema());
         io.openmessaging.connector.api.data.Schema schema = rocketMQSourceSchemaConverter.schema();
@@ -58,7 +58,7 @@ public class Converters {
     }
 
 
-    public static ConnectRecord fromSinkRecord(SinkRecord record){
+    public static ConnectRecord fromSinkRecord(SinkRecord record) {
         // sourceRecord convert connect Record
         RocketMQSourceSchemaConverter rocketMQSourceSchemaConverter = new RocketMQSourceSchemaConverter(record.valueSchema());
         io.openmessaging.connector.api.data.Schema schema = rocketMQSourceSchemaConverter.schema();
@@ -78,13 +78,13 @@ public class Converters {
     }
 
 
-
     /**
      * convert rocketmq connect record to sink record
+     *
      * @param record
      * @return
      */
-    public static SinkRecord fromConnectRecord(ConnectRecord record){
+    public static SinkRecord fromConnectRecord(ConnectRecord record) {
         // connect record  convert kafka  sink record
         KafkaSinkSchemaConverter kafkaSinkSchemaConverter = new KafkaSinkSchemaConverter(record.getSchema());
         Schema schema = kafkaSinkSchemaConverter.schema();
@@ -94,7 +94,7 @@ public class Converters {
         Iterator extensions = record.getExtensions().keySet().iterator();
         while (extensions.hasNext()) {
             String key = String.valueOf(extensions.next());
-            headers.add(key, record.getExtensions().getString(key) , null);
+            headers.add(key, record.getExtensions().getString(key), null);
         }
 
         SinkRecord sinkRecord = new SinkRecord(
@@ -112,7 +112,7 @@ public class Converters {
         return sinkRecord;
     }
 
-    public static RecordPartition toRecordPartition(SinkRecord record){
+    public static RecordPartition toRecordPartition(SinkRecord record) {
 
         Map<String, String> recordPartitionMap = new HashMap<>();
         recordPartitionMap.put(RocketMQKafkaSinkTaskContext.TOPIC, record.topic());
@@ -120,7 +120,7 @@ public class Converters {
         return new RecordPartition(recordPartitionMap);
     }
 
-    public static RecordOffset toRecordOffset(SinkRecord record){
+    public static RecordOffset toRecordOffset(SinkRecord record) {
         Map<String, String> recordOffsetMap = new HashMap<>();
         recordOffsetMap.put(RocketMQKafkaSinkTaskContext.QUEUE_OFFSET, record.kafkaOffset() + "");
         return new RecordOffset(recordOffsetMap);
@@ -129,31 +129,35 @@ public class Converters {
 
     /**
      * get topic
+     *
      * @param partition
      * @return
      */
-    public static String topic(RecordPartition partition){
+    public static String topic(RecordPartition partition) {
         return partition.getPartition().get(RocketMQKafkaSinkTaskContext.TOPIC).toString();
     }
 
     /**
      * get partition
+     *
      * @param partition
      * @return
      */
-    public static int partition(RecordPartition partition){
-        if (partition.getPartition().containsKey(RocketMQKafkaSinkTaskContext.QUEUE_ID)){
+    public static int partition(RecordPartition partition) {
+        if (partition.getPartition().containsKey(RocketMQKafkaSinkTaskContext.QUEUE_ID)) {
             return Integer.valueOf(partition.getPartition().get(RocketMQKafkaSinkTaskContext.QUEUE_ID).toString());
         }
         return -1;
     }
+
     /**
      * get offset
+     *
      * @param offset
      * @return
      */
-    public static int offset(RecordOffset offset){
-        if (offset.getOffset().containsKey(RocketMQKafkaSinkTaskContext.QUEUE_OFFSET)){
+    public static int offset(RecordOffset offset) {
+        if (offset.getOffset().containsKey(RocketMQKafkaSinkTaskContext.QUEUE_OFFSET)) {
             return Integer.valueOf(offset.getOffset().get(RocketMQKafkaSinkTaskContext.QUEUE_OFFSET).toString());
         }
         return -1;
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkSchemaConverter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkSchemaConverter.java
index 4f718b0..d6c7574 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkSchemaConverter.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkSchemaConverter.java
@@ -37,14 +37,16 @@ import java.util.Map;
 public class KafkaSinkSchemaConverter {
     private static Logger logger = LoggerFactory.getLogger(KafkaSinkSchemaConverter.class);
     private static Map<String, String> logicalMapping = new HashMap<>();
+
     static {
-        logicalMapping.put(io.openmessaging.connector.api.data.logical.Decimal.LOGICAL_NAME,Decimal.LOGICAL_NAME);
+        logicalMapping.put(io.openmessaging.connector.api.data.logical.Decimal.LOGICAL_NAME, Decimal.LOGICAL_NAME);
         logicalMapping.put(io.openmessaging.connector.api.data.logical.Date.LOGICAL_NAME, Date.LOGICAL_NAME);
         logicalMapping.put(io.openmessaging.connector.api.data.logical.Time.LOGICAL_NAME, Time.LOGICAL_NAME);
         logicalMapping.put(io.openmessaging.connector.api.data.logical.Timestamp.LOGICAL_NAME, Timestamp.LOGICAL_NAME);
     }
 
     private SchemaBuilder builder;
+
     public KafkaSinkSchemaConverter(Schema schema) {
         builder = convertKafkaSchema(schema);
     }
@@ -55,6 +57,7 @@ public class KafkaSinkSchemaConverter {
 
     /**
      * convert kafka schema
+     *
      * @param originalSchema
      * @return
      */
@@ -151,8 +154,7 @@ public class KafkaSinkSchemaConverter {
                         .name(schemaName)
                         .doc(originalSchema.getDoc())
                         .defaultValue(originalSchema.getDefaultValue())
-                        .parameters(parameters)
-                        ;
+                        .parameters(parameters);
                 convertStructSchema(schemaBuilder, originalSchema);
                 return schemaBuilder;
             case ARRAY:
@@ -192,10 +194,10 @@ public class KafkaSinkSchemaConverter {
 
                 // schema
                 Schema schema = field.getSchema();
-                String schemaName =  convertSchemaName(field.getSchema().getName());
+                String schemaName = convertSchemaName(field.getSchema().getName());
 
                 // field name
-                String fieldName =  field.getName();
+                String fieldName = field.getName();
                 FieldType type = schema.getFieldType();
 
                 Map<String, String> parameters = schema.getParameters() == null ? new HashMap<>() : schema.getParameters();
@@ -337,8 +339,8 @@ public class KafkaSinkSchemaConverter {
     }
 
 
-    private String convertSchemaName(String schemaName){
-        if (logicalMapping.containsKey(schemaName)){
+    private String convertSchemaName(String schemaName) {
+        if (logicalMapping.containsKey(schemaName)) {
             return logicalMapping.get(schemaName);
         }
         return schemaName;
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkValueConverter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkValueConverter.java
index 0cbc2f9..7b37f59 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkValueConverter.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkValueConverter.java
@@ -35,12 +35,13 @@ public class KafkaSinkValueConverter {
 
     private static Logger logger = LoggerFactory.getLogger(KafkaSinkValueConverter.class);
 
-    public Object value(Schema schema,Object data) {
+    public Object value(Schema schema, Object data) {
         return convertKafkaValue(schema, data);
     }
 
     /**
      * convert value
+     *
      * @param targetSchema
      * @param originalValue
      * @return
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceSchemaConverter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceSchemaConverter.java
index 5ca408e..cee3d94 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceSchemaConverter.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceSchemaConverter.java
@@ -17,7 +17,6 @@
 
 package org.apache.rocketmq.connect.kafka.connect.adaptor.schema;
 
-import com.beust.jcommander.internal.Maps;
 import io.openmessaging.connector.api.data.SchemaBuilder;
 import io.openmessaging.connector.api.data.logical.Timestamp;
 import io.openmessaging.connector.api.errors.ConnectException;
@@ -39,6 +38,7 @@ import java.util.Map;
 public class RocketMQSourceSchemaConverter {
     private static Logger logger = LoggerFactory.getLogger(RocketMQSourceSchemaConverter.class);
     private static Map<String, String> logicalMapping = new HashMap<>();
+
     static {
         logicalMapping.put(Decimal.LOGICAL_NAME, io.openmessaging.connector.api.data.logical.Decimal.LOGICAL_NAME);
         logicalMapping.put(Date.LOGICAL_NAME, io.openmessaging.connector.api.data.logical.Date.LOGICAL_NAME);
@@ -148,8 +148,7 @@ public class RocketMQSourceSchemaConverter {
                         .name(schemaName)
                         .doc(originalSchema.doc())
                         .defaultValue(originalSchema.defaultValue())
-                        .parameters(parameters)
-                        ;
+                        .parameters(parameters);
                 convertStructSchema(schemaBuilder, originalSchema);
                 return schemaBuilder;
             case ARRAY:
@@ -328,8 +327,8 @@ public class RocketMQSourceSchemaConverter {
     }
 
 
-    private String convertSchemaName(String schemaName){
-        if (logicalMapping.containsKey(schemaName)){
+    private String convertSchemaName(String schemaName) {
+        if (logicalMapping.containsKey(schemaName)) {
             return logicalMapping.get(schemaName);
         }
         return schemaName;
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceValueConverter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceValueConverter.java
index 8801efb..40d6178 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceValueConverter.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceValueConverter.java
@@ -22,7 +22,6 @@ import io.openmessaging.connector.api.data.FieldType;
 import io.openmessaging.connector.api.data.Schema;
 import io.openmessaging.connector.api.data.Struct;
 import io.openmessaging.connector.api.errors.ConnectException;
-import org.apache.kafka.connect.source.SourceRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/AbstractKafkaConnectSink.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/AbstractKafkaConnectSink.java
index 8677db9..0f0c426 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/AbstractKafkaConnectSink.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/AbstractKafkaConnectSink.java
@@ -37,28 +37,27 @@ import java.util.Map;
 /**
  * abstract kafka connect sink
  */
-public abstract class AbstractKafkaConnectSink extends SinkTask implements TaskClassSetter{
+public abstract class AbstractKafkaConnectSink extends SinkTask implements TaskClassSetter {
 
     private static final Logger log = LoggerFactory.getLogger(AbstractKafkaConnectSink.class);
-
-    private SinkTaskContext kafkaSinkTaskContext;
-    private org.apache.kafka.connect.sink.SinkTask sinkTask;
-
     protected TransformationWrapper transformationWrapper;
-
     /**
      * kafka connect init
      */
     protected ConnectKeyValue configValue;
+    private SinkTaskContext kafkaSinkTaskContext;
+    private org.apache.kafka.connect.sink.SinkTask sinkTask;
 
     /**
      * convert by kafka sink transform
+     *
      * @param record
      */
     protected abstract SinkRecord transforms(SinkRecord record);
 
     /**
      * convert ConnectRecord to SinkRecord
+     *
      * @param record
      * @return
      */
@@ -67,6 +66,7 @@ public abstract class AbstractKafkaConnectSink extends SinkTask implements TaskC
 
     /**
      * Put the records to the sink
+     *
      * @param records sink records
      */
     @Override
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/AbstractKafkaConnectSource.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/AbstractKafkaConnectSource.java
index f062b3d..cf1595e 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/AbstractKafkaConnectSource.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/AbstractKafkaConnectSource.java
@@ -40,19 +40,18 @@ import java.util.Map;
 /**
  * abstract kafka connect
  */
-public abstract class AbstractKafkaConnectSource extends SourceTask implements TaskClassSetter{
+public abstract class AbstractKafkaConnectSource extends SourceTask implements TaskClassSetter {
 
     private static final Logger log = LoggerFactory.getLogger(AbstractKafkaConnectSource.class);
 
     protected TransformationWrapper transformationWrapper;
-    private SourceTaskContext kafkaSourceTaskContext;
-    private org.apache.kafka.connect.source.SourceTask sourceTask;
-    private OffsetStorageReader offsetReader;
-
     /**
      * kafka connect init
      */
     protected ConnectKeyValue configValue;
+    private SourceTaskContext kafkaSourceTaskContext;
+    private org.apache.kafka.connect.source.SourceTask sourceTask;
+    private OffsetStorageReader offsetReader;
 
     @Override
     public List<ConnectRecord> poll() throws InterruptedException {
@@ -74,6 +73,7 @@ public abstract class AbstractKafkaConnectSource extends SourceTask implements T
 
     /**
      * convert transform
+     *
      * @param sourceRecord
      */
     protected abstract SourceRecord transforms(SourceRecord sourceRecord);
@@ -103,7 +103,7 @@ public abstract class AbstractKafkaConnectSource extends SourceTask implements T
 
         // get the source class name from config and create source task from reflection
         try {
-            sourceTask = Class.forName(taskConfig.get(TaskConfig.TASK_CLASS_CONFIG))
+            sourceTask = Class.forName(taskConfig.get(TaskConfig.TASK_CLASS_CONFIG), true, AbstractKafkaConnectSource.class.getClassLoader())
                     .asSubclass(org.apache.kafka.connect.source.SourceTask.class)
                     .getDeclaredConstructor()
                     .newInstance();
@@ -112,7 +112,7 @@ public abstract class AbstractKafkaConnectSource extends SourceTask implements T
         }
 
         offsetReader = new KafkaOffsetStorageReader(
-               sourceTaskContext
+                sourceTaskContext
         );
 
         kafkaSourceTaskContext = new RocketMQKafkaSourceTaskContext(offsetReader, taskConfig);
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/KafkaConnectAdaptorSink.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/KafkaConnectAdaptorSink.java
index 439f733..2f3d364 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/KafkaConnectAdaptorSink.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/KafkaConnectAdaptorSink.java
@@ -55,6 +55,7 @@ public abstract class KafkaConnectAdaptorSink extends AbstractKafkaConnectSink {
 
     /**
      * convert ConnectRecord to SinkRecord
+     *
      * @param record
      * @return
      */
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/TaskClassSetter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/TaskClassSetter.java
index 64c58c4..dffb8b8 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/TaskClassSetter.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/task/TaskClassSetter.java
@@ -1,23 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
 package org.apache.rocketmq.connect.kafka.connect.adaptor.task;
 
 import io.openmessaging.KeyValue;
-import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.TaskConfig;
 
 /**
  * task class setter
  */
 public interface TaskClassSetter {
+    /**
+     * get connector class
+     */
+    String getTaskClass();
+
     /**
      * set connector class
+     *
      * @param config
      */
     default void setTaskClass(KeyValue config) {
         config.put(TaskConfig.TASK_CLASS_CONFIG, getTaskClass());
     }
-
-    /**
-     * get connector class
-     */
-    String getTaskClass();
 }
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/test/java/org/apache/rocketmq/connect/kafka/connect/adaptor/SourceRecordConverterTest.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/test/java/org/apache/rocketmq/connect/kafka/connect/adaptor/SourceRecordConverterTest.java
index 0264dcb..a6de571 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/test/java/org/apache/rocketmq/connect/kafka/connect/adaptor/SourceRecordConverterTest.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/test/java/org/apache/rocketmq/connect/kafka/connect/adaptor/SourceRecordConverterTest.java
@@ -181,7 +181,7 @@ public class SourceRecordConverterTest {
         // sourceRecord convert connect Record
         RocketMQSourceSchemaConverter rocketMQSourceSchemaConverter = new RocketMQSourceSchemaConverter(originalRecord.valueSchema());
 
-        io.openmessaging.connector.api.data.Schema schema =rocketMQSourceSchemaConverter.schema();
+        io.openmessaging.connector.api.data.Schema schema = rocketMQSourceSchemaConverter.schema();
         RocketMQSourceValueConverter rocketMQSourceValueConverter = new RocketMQSourceValueConverter();
         ConnectRecord connectRecord = new ConnectRecord(
                 new RecordPartition(originalRecord.sourcePartition()),
@@ -198,6 +198,6 @@ public class SourceRecordConverterTest {
         }
         final byte[] messageBody = JSON.toJSONString(connectRecord).getBytes();
         String bodyStr = new String(messageBody, StandardCharsets.UTF_8);
-        ConnectRecord newConnectRecord= JSON.parseObject(bodyStr, ConnectRecord.class);
+        ConnectRecord newConnectRecord = JSON.parseObject(bodyStr, ConnectRecord.class);
     }
 }
diff --git a/connectors/rocketmq-connect-debezium/pom.xml b/connectors/rocketmq-connect-debezium/pom.xml
index 82994e9..b5617e7 100644
--- a/connectors/rocketmq-connect-debezium/pom.xml
+++ b/connectors/rocketmq-connect-debezium/pom.xml
@@ -10,8 +10,8 @@
 	OF ANY KIND, either express or implied. See the License for the specific 
 	language governing permissions and limitations under the License. -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <groupId>org.apache.rocketmq</groupId>
@@ -180,7 +180,7 @@
         <debezium.version>1.7.2.Final</debezium.version>
         <debezium.postgresql.version>42.3.3</debezium.postgresql.version>
         <!--rocketmq version-->
-        <rocketmq.version>4.5.2</rocketmq.version>
+        <rocketmq.version>4.7.0</rocketmq.version>
         <rocketmq-openmessaging.version>4.3.2</rocketmq-openmessaging.version>
 
         <!--rocketmq connect version-->
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/pom.xml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/pom.xml
index 4862b5a..121e611 100644
--- a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/pom.xml
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/pom.xml
@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <artifactId>rocketmq-connect-debezium</artifactId>
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/DebeziumConnector.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/DebeziumConnector.java
index 7a3f51a..1db1208 100644
--- a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/DebeziumConnector.java
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/DebeziumConnector.java
@@ -17,20 +17,12 @@
 
 package org.apache.rocketmq.connect.debezium;
 
-import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.component.task.source.SourceConnector;
-import io.openmessaging.connector.api.errors.ConnectException;
 import org.apache.rocketmq.connect.kafka.connect.adaptor.connector.KafkaSourceAdaptorConnector;
 
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
 
 /**
  * debezium connector
  */
 public abstract class DebeziumConnector extends KafkaSourceAdaptorConnector {
-    
+
 }
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/ConnectUtil.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMQConnectUtil.java
similarity index 67%
rename from connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/ConnectUtil.java
rename to connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMQConnectUtil.java
index 439bcf5..4e33a93 100644
--- a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/ConnectUtil.java
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMQConnectUtil.java
@@ -18,10 +18,10 @@
 package org.apache.rocketmq.connect.debezium;
 
 import com.beust.jcommander.internal.Sets;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.acl.common.SessionCredentials;
-import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.common.TopicConfig;
@@ -38,10 +38,8 @@ import org.apache.rocketmq.remoting.protocol.LanguageCode;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.apache.rocketmq.tools.command.CommandUtil;
 
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 
@@ -49,9 +47,7 @@ import java.util.UUID;
 /**
  * rocket connect util
  */
-public class ConnectUtil {
-
-    private static final String SYS_TASK_CG_PREFIX = "connect-";
+public class RocketMQConnectUtil {
 
     public static String createGroupName(String prefix) {
         StringBuilder sb = new StringBuilder();
@@ -62,27 +58,47 @@ public class ConnectUtil {
         return sb.toString().replace(".", "-");
     }
 
-    public static String createGroupName(String prefix, String postfix) {
-        return new StringBuilder().append(prefix).append("-").append(postfix).toString();
+    public static String createUniqInstance(String prefix) {
+        return new StringBuffer(prefix).append("-").append(UUID.randomUUID()).toString();
     }
 
-    public static String createInstance(String servers) {
-        String[] serversArray = servers.split(";");
-        List<String> serversList = new ArrayList<String>();
-        for (String server : serversArray) {
-            if (!serversList.contains(server)) {
-                serversList.add(server);
-            }
-        }
-        Collections.sort(serversList);
-        return String.valueOf(serversList.toString().hashCode());
+    private static RPCHook getAclRPCHook(String accessKey, String secretKey) {
+        return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
     }
 
-    public static String createUniqInstance(String prefix) {
-        return new StringBuffer(prefix).append("-").append(UUID.randomUUID().toString()).toString();
+    /**
+     * init default lite pull consumer
+     *
+     * @param connectConfig
+     * @param topic
+     * @param autoCommit
+     * @return
+     * @throws MQClientException
+     */
+    public static DefaultLitePullConsumer initDefaultLitePullConsumer(RocketMqConnectConfig connectConfig, String topic, boolean autoCommit) throws MQClientException {
+        DefaultLitePullConsumer consumer = null;
+        if (Objects.isNull(consumer)) {
+            if (StringUtils.isBlank(connectConfig.getAccessKey()) && StringUtils.isBlank(connectConfig.getSecretKey())) {
+                consumer = new DefaultLitePullConsumer(
+                        connectConfig.getRmqConsumerGroup()
+                );
+            } else {
+                consumer = new DefaultLitePullConsumer(
+                        connectConfig.getRmqConsumerGroup(),
+                        getAclRPCHook(connectConfig.getAccessKey(), connectConfig.getSecretKey())
+                );
+            }
+        }
+        consumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
+        String uniqueName = Thread.currentThread().getName() + "-" + System.currentTimeMillis() % 1000;
+        consumer.setInstanceName(uniqueName);
+        consumer.setUnitName(uniqueName);
+        consumer.setAutoCommit(autoCommit);
+        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        consumer.subscribe(topic, "*");
+        return consumer;
     }
 
-
     public static DefaultMQProducer initDefaultMQProducer(RocketMqConnectConfig connectConfig) {
         RPCHook rpcHook = null;
         if (connectConfig.isAclEnable()) {
@@ -91,55 +107,24 @@ public class ConnectUtil {
         DefaultMQProducer producer = new DefaultMQProducer(rpcHook);
         producer.setNamesrvAddr(connectConfig.getNamesrvAddr());
         producer.setInstanceName(createUniqInstance(connectConfig.getNamesrvAddr()));
-        producer.setProducerGroup(connectConfig.getRmqProducerGroup());
+        producer.setProducerGroup(connectConfig.getRmqConsumerGroup());
         producer.setSendMsgTimeout(connectConfig.getOperationTimeout());
         producer.setLanguage(LanguageCode.JAVA);
         return producer;
     }
 
-    public static DefaultMQPullConsumer initDefaultMQPullConsumer(RocketMqConnectConfig connectConfig) {
-        RPCHook rpcHook = null;
-        if (connectConfig.isAclEnable()) {
-            rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
-        }
-        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(rpcHook);
-        consumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
-        consumer.setInstanceName(createUniqInstance(connectConfig.getNamesrvAddr()));
-        consumer.setConsumerGroup(connectConfig.getRmqConsumerGroup());
-        consumer.setMaxReconsumeTimes(connectConfig.getRmqMaxRedeliveryTimes());
-        consumer.setConsumerPullTimeoutMillis((long) connectConfig.getRmqMessageConsumeTimeout());
-        consumer.setLanguage(LanguageCode.JAVA);
-        return consumer;
-    }
-
-    public static DefaultMQPushConsumer initDefaultMQPushConsumer(RocketMqConnectConfig connectConfig) {
-        RPCHook rpcHook = null;
-        if (connectConfig.isAclEnable()) {
-            rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
-        }
-        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(rpcHook);
-        consumer.setNamesrvAddr(connectConfig.getNamesrvAddr());
-        consumer.setInstanceName(createUniqInstance(connectConfig.getNamesrvAddr()));
-        consumer.setConsumerGroup(createGroupName(connectConfig.getRmqConsumerGroup()));
-        consumer.setMaxReconsumeTimes(connectConfig.getRmqMaxRedeliveryTimes());
-        consumer.setConsumeTimeout((long) connectConfig.getRmqMessageConsumeTimeout());
-        consumer.setConsumeThreadMin(connectConfig.getRmqMinConsumeThreadNums());
-        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
-        consumer.setLanguage(LanguageCode.JAVA);
-        return consumer;
-    }
-
     public static DefaultMQAdminExt startMQAdminTool(RocketMqConnectConfig connectConfig) throws MQClientException {
-        RPCHook rpcHook = null;
+        DefaultMQAdminExt admin;
         if (connectConfig.isAclEnable()) {
-            rpcHook = new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey()));
+            admin = new DefaultMQAdminExt(new AclClientRPCHook(new SessionCredentials(connectConfig.getAccessKey(), connectConfig.getSecretKey())));
+        } else {
+            admin = new DefaultMQAdminExt();
         }
-        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
-        defaultMQAdminExt.setNamesrvAddr(connectConfig.getNamesrvAddr());
-        defaultMQAdminExt.setAdminExtGroup("connector-admin-group");
-        defaultMQAdminExt.setInstanceName(ConnectUtil.createUniqInstance(connectConfig.getNamesrvAddr()));
-        defaultMQAdminExt.start();
-        return defaultMQAdminExt;
+        admin.setNamesrvAddr(connectConfig.getNamesrvAddr());
+        admin.setAdminExtGroup(connectConfig.getRmqConsumerGroup());
+        admin.setInstanceName(RocketMQConnectUtil.createUniqInstance(connectConfig.getNamesrvAddr()));
+        admin.start();
+        return admin;
     }
 
 
@@ -165,7 +150,7 @@ public class ConnectUtil {
         }
     }
 
-    public static boolean isTopicExist(RocketMqConnectConfig connectConfig, String topic) {
+    public static boolean topicExist(RocketMqConnectConfig connectConfig, String topic) {
         DefaultMQAdminExt defaultMQAdminExt = null;
         boolean foundTopicRouteInfo = false;
         try {
@@ -184,7 +169,7 @@ public class ConnectUtil {
         return foundTopicRouteInfo;
     }
 
-    public static Set<String> fetchAllConsumerGroupList(RocketMqConnectConfig connectConfig) {
+    public static Set<String> fetchAllConsumerGroup(RocketMqConnectConfig connectConfig) {
         Set<String> consumerGroupSet = Sets.newHashSet();
         DefaultMQAdminExt defaultMQAdminExt = null;
         try {
@@ -230,3 +215,5 @@ public class ConnectUtil {
         return subGroup;
     }
 }
+
+
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqConnectConfig.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqConnectConfig.java
index 18397a6..580cb36 100644
--- a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqConnectConfig.java
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqConnectConfig.java
@@ -29,8 +29,6 @@ public class RocketMqConnectConfig {
 
     private String namesrvAddr;
 
-    private String rmqProducerGroup;
-
     private int operationTimeout = 3000;
 
     private String rmqConsumerGroup;
@@ -43,24 +41,18 @@ public class RocketMqConnectConfig {
 
     private int rmqMinConsumeThreadNums = 1;
 
-    private String adminExtGroup;
 
-    // set acl config
+    /** set acl config **/
     private boolean aclEnable;
     private String accessKey;
     private String secretKey;
 
 
-    public RocketMqConnectConfig() {
-    }
+    public RocketMqConnectConfig() {}
 
     public RocketMqConnectConfig(Configuration config, String dbHistoryName) {
         this.dbHistoryName = dbHistoryName;
-        // init config
-        this.rmqProducerGroup = this.dbHistoryName.concat("-producer-group");
-        this.rmqConsumerGroup = this.dbHistoryName.concat("-consumer-group");
-        this.adminExtGroup = this.dbHistoryName.concat("-admin-group");
-
+        this.rmqConsumerGroup = this.dbHistoryName.concat("-group");
         // init rocketmq connection
         this.namesrvAddr = config.getString(RocketMqDatabaseHistory.NAME_SRV_ADDR);
         this.aclEnable = config.getBoolean(RocketMqDatabaseHistory.ROCKETMQ_ACL_ENABLE);
@@ -85,14 +77,6 @@ public class RocketMqConnectConfig {
         this.namesrvAddr = namesrvAddr;
     }
 
-    public String getRmqProducerGroup() {
-        return rmqProducerGroup;
-    }
-
-    public void setRmqProducerGroup(String rmqProducerGroup) {
-        this.rmqProducerGroup = rmqProducerGroup;
-    }
-
     public int getOperationTimeout() {
         return operationTimeout;
     }
@@ -165,27 +149,17 @@ public class RocketMqConnectConfig {
         this.secretKey = secretKey;
     }
 
-    public String getAdminExtGroup() {
-        return adminExtGroup;
-    }
-
-    public void setAdminExtGroup(String adminExtGroup) {
-        this.adminExtGroup = adminExtGroup;
-    }
-
     @Override
     public String toString() {
         return "RocketMqConnectConfig{" +
                 "dbHistoryName='" + dbHistoryName + '\'' +
                 ", namesrvAddr='" + namesrvAddr + '\'' +
-                ", rmqProducerGroup='" + rmqProducerGroup + '\'' +
                 ", operationTimeout=" + operationTimeout +
                 ", rmqConsumerGroup='" + rmqConsumerGroup + '\'' +
                 ", rmqMaxRedeliveryTimes=" + rmqMaxRedeliveryTimes +
                 ", rmqMessageConsumeTimeout=" + rmqMessageConsumeTimeout +
                 ", rmqMaxConsumeThreadNums=" + rmqMaxConsumeThreadNums +
                 ", rmqMinConsumeThreadNums=" + rmqMinConsumeThreadNums +
-                ", adminExtGroup='" + adminExtGroup + '\'' +
                 ", aclEnable=" + aclEnable +
                 ", accessKey='" + accessKey + '\'' +
                 ", secretKey='" + secretKey + '\'' +
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqDatabaseHistory.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqDatabaseHistory.java
index 11814b9..a8fb38d 100644
--- a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqDatabaseHistory.java
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/RocketMqDatabaseHistory.java
@@ -17,7 +17,6 @@
 
 package org.apache.rocketmq.connect.debezium;
 
-import com.alibaba.fastjson.JSON;
 import io.debezium.config.Configuration;
 import io.debezium.config.Field;
 import io.debezium.document.DocumentReader;
@@ -28,10 +27,7 @@ import io.debezium.relational.history.DatabaseHistoryListener;
 import io.debezium.relational.history.HistoryRecord;
 import io.debezium.relational.history.HistoryRecordComparator;
 import org.apache.kafka.common.config.ConfigDef;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
-import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.common.TopicConfig;
@@ -45,6 +41,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
 import java.util.function.Consumer;
 
 
@@ -53,7 +50,6 @@ import java.util.function.Consumer;
  */
 public final class RocketMqDatabaseHistory extends AbstractDatabaseHistory {
 
-    private static final Logger log = LoggerFactory.getLogger(AbstractKafkaConnectSource.class);
     public static final Field TOPIC = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "rocketmq.topic")
             .withDisplayName("Database history topic name")
             .withType(ConfigDef.Type.STRING)
@@ -61,7 +57,6 @@ public final class RocketMqDatabaseHistory extends AbstractDatabaseHistory {
             .withImportance(ConfigDef.Importance.HIGH)
             .withDescription("The name of the topic for the database schema history")
             .withValidation(Field::isRequired);
-
     /**
      * rocketmq name srv addr
      */
@@ -72,7 +67,6 @@ public final class RocketMqDatabaseHistory extends AbstractDatabaseHistory {
             .withImportance(ConfigDef.Importance.HIGH)
             .withDescription("Rocketmq name srv addr")
             .withValidation(Field::isRequired);
-
     public static final Field ROCKETMQ_ACL_ENABLE = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "acl.enabled")
             .withDisplayName("Rocketmq acl enabled")
             .withType(ConfigDef.Type.BOOLEAN)
@@ -80,37 +74,27 @@ public final class RocketMqDatabaseHistory extends AbstractDatabaseHistory {
             .withWidth(ConfigDef.Width.LONG)
             .withImportance(ConfigDef.Importance.HIGH)
             .withDescription("Rocketmq acl enabled");
-
     public static final Field ROCKETMQ_ACCESS_KEY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "access.key")
             .withDisplayName("Rocketmq access key")
             .withType(ConfigDef.Type.STRING)
             .withWidth(ConfigDef.Width.LONG)
             .withImportance(ConfigDef.Importance.HIGH)
             .withDescription("Rocketmq access key");
-
     public static final Field ROCKETMQ_SECRET_KEY = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "secret.key")
             .withDisplayName("Rocketmq secret key")
             .withType(ConfigDef.Type.STRING)
             .withWidth(ConfigDef.Width.LONG)
             .withImportance(ConfigDef.Importance.HIGH)
             .withDescription("Rocketmq secret key");
-
-    public static final Field CONNECTOR_NAME = Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "connector.name")
-            .withDisplayName("Connector name")
-            .withType(ConfigDef.Type.STRING)
-            .withWidth(ConfigDef.Width.LONG)
-            .withImportance(ConfigDef.Importance.HIGH)
-            .withDescription("connector name")
-            .withValidation(Field::isRequired);
-
-
+    private static final Logger log = LoggerFactory.getLogger(AbstractKafkaConnectSource.class);
     private final DocumentReader reader = DocumentReader.defaultReader();
     private String topicName;
     private String dbHistoryName;
     private RocketMqConnectConfig connectConfig;
-    private DefaultMQPushConsumer consumer;
     private DefaultMQProducer producer;
 
+
+
     @Override
     public void configure(
             Configuration config,
@@ -119,7 +103,6 @@ public final class RocketMqDatabaseHistory extends AbstractDatabaseHistory {
             boolean useCatalogBeforeSchema) {
         super.configure(config, comparator, listener, useCatalogBeforeSchema);
         this.topicName = config.getString(TOPIC);
-
         this.dbHistoryName = config.getString(DatabaseHistory.NAME, UUID.randomUUID().toString());
         log.info("Configure to store the debezium database history {} to rocketmq topic {}",
                 dbHistoryName, topicName);
@@ -132,33 +115,24 @@ public final class RocketMqDatabaseHistory extends AbstractDatabaseHistory {
         super.initializeStorage();
         log.info("try to create history topic: {}!", this.topicName);
         TopicConfig topicConfig = new TopicConfig(this.topicName);
-        ConnectUtil.createTopic(connectConfig, topicConfig);
+        RocketMQConnectUtil.createTopic(connectConfig, topicConfig);
     }
 
     @Override
     public void start() {
         super.start();
         try {
-            // init consumer
-            this.consumer = ConnectUtil.initDefaultMQPushConsumer(connectConfig);
-            Set<String> consumerGroupSet = ConnectUtil.fetchAllConsumerGroupList(connectConfig);
-            if (!consumerGroupSet.contains(consumer.getConsumerGroup())) {
-                ConnectUtil.createSubGroup(connectConfig, consumer.getConsumerGroup());
+            Set<String> consumerGroupSet = RocketMQConnectUtil.fetchAllConsumerGroup(connectConfig);
+            if (!consumerGroupSet.contains(connectConfig.getRmqConsumerGroup())) {
+                RocketMQConnectUtil.createSubGroup(connectConfig, connectConfig.getRmqConsumerGroup());
             }
-            this.consumer.subscribe(this.topicName, "*");
-        } catch (MQClientException e) {
-            throw new DatabaseHistoryException(e);
-        }
-
-        this.producer = ConnectUtil.initDefaultMQProducer(connectConfig);
-        try {
+            this.producer = RocketMQConnectUtil.initDefaultMQProducer(connectConfig);
             this.producer.start();
         } catch (MQClientException e) {
             throw new DatabaseHistoryException(e);
         }
     }
 
-
     @Override
     public void stop() {
         try {
@@ -166,10 +140,6 @@ public final class RocketMqDatabaseHistory extends AbstractDatabaseHistory {
                 producer.shutdown();
                 this.producer = null;
             }
-            if (this.consumer != null) {
-                this.consumer.shutdown();
-                this.consumer = null;
-            }
         } catch (Exception pe) {
             log.warn("Failed to closing rocketmq client", pe);
         }
@@ -182,34 +152,37 @@ public final class RocketMqDatabaseHistory extends AbstractDatabaseHistory {
      */
     @Override
     protected void recoverRecords(Consumer<HistoryRecord> records) {
-        consumer.registerMessageListener(new MessageListenerConcurrently() {
-            @Override
-            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
-                msgs.forEach(message -> {
-                    try {
-                        HistoryRecord recordObj = new HistoryRecord(reader.read(message.getBody()));
-                        log.trace("Recovering database history: {}", recordObj);
-                        if (recordObj == null || !recordObj.isValid()) {
-                            log.warn("Skipping invalid database history record '{}'. " +
-                                            "This is often not an issue, but if it happens repeatedly please check the '{}' topic.",
-                                    recordObj, topicName);
-                        } else {
-                            records.accept(recordObj);
-                            log.trace("Recovered database history: {}", recordObj);
-                        }
-                    } catch (IOException e) {
-                        throw new DatabaseHistoryException(e);
-                    }
-                });
-                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
-            }
-        });
+        DefaultLitePullConsumer consumer = null;
         try {
+            consumer = RocketMQConnectUtil.initDefaultLitePullConsumer(connectConfig, topicName, false);
             consumer.start();
-        } catch (MQClientException e) {
+            while (true) {
+                List<MessageExt> result = consumer.poll(10000);
+                if (result == null || result.isEmpty()) {
+                    break;
+                }
+                for (MessageExt message : result) {
+                    HistoryRecord recordObj = new HistoryRecord(reader.read(message.getBody()));
+                    log.trace("Recovering database history: {}", recordObj);
+                    if (recordObj == null || !recordObj.isValid()) {
+                        log.warn("Skipping invalid database history record '{}'. " +
+                                        "This is often not an issue, but if it happens repeatedly please check the '{}' topic.",
+                                recordObj, topicName);
+                    } else {
+                        records.accept(recordObj);
+                        log.trace("Recovered database history: {}", recordObj);
+                    }
+                }
+            }
+        } catch (MQClientException ce) {
+            throw new DatabaseHistoryException(ce);
+        } catch (IOException e) {
             throw new DatabaseHistoryException(e);
+        } finally {
+            if (consumer != null) {
+                consumer.shutdown();
+            }
         }
-        log.info("Consumer started.");
     }
 
     @Override
@@ -225,7 +198,7 @@ public final class RocketMqDatabaseHistory extends AbstractDatabaseHistory {
         try {
             Message sourceMessage = new Message();
             sourceMessage.setTopic(this.topicName);
-            final byte[] messageBody = JSON.toJSONString(record).getBytes();
+            final byte[] messageBody = record.toString().getBytes();
             sourceMessage.setBody(messageBody);
             producer.send(sourceMessage);
         } catch (Exception e) {
@@ -242,7 +215,7 @@ public final class RocketMqDatabaseHistory extends AbstractDatabaseHistory {
     @Override
     public boolean storageExists() {
         // check topic is exist
-        return ConnectUtil.isTopicExist(connectConfig, this.topicName);
+        return RocketMQConnectUtil.topicExist(connectConfig, this.topicName);
     }
 
     @Override
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/pom.xml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/pom.xml
index 0c027d0..a3ee101 100644
--- a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/pom.xml
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/pom.xml
@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <artifactId>rocketmq-connect-debezium</artifactId>
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/java/org/apache/rocketmq/connect/debezium/mysql/DebeziumMysqlConnector.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/java/org/apache/rocketmq/connect/debezium/mysql/DebeziumMysqlConnector.java
index 074146c..3b04344 100644
--- a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/java/org/apache/rocketmq/connect/debezium/mysql/DebeziumMysqlConnector.java
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/java/org/apache/rocketmq/connect/debezium/mysql/DebeziumMysqlConnector.java
@@ -21,7 +21,6 @@ import io.openmessaging.connector.api.component.task.Task;
 import org.apache.rocketmq.connect.debezium.DebeziumConnector;
 
 
-
 /**
  * debezium mysql connector
  */
@@ -30,6 +29,7 @@ public class DebeziumMysqlConnector extends DebeziumConnector {
 
     /**
      * Return the current connector class
+     *
      * @return task implement class
      */
     @Override
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/resources/debezium-mysql-source-config.yaml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/resources/debezium-mysql-source-config.yaml
index a371d4d..5dac1dc 100644
--- a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/resources/debezium-mysql-source-config.yaml
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-mysql/src/main/resources/debezium-mysql-source-config.yaml
@@ -18,9 +18,9 @@
 #
 
 {
-  "connector-class":"org.apache.rocketmq.connect.debezium.mysql.DebeziumMysqlConnector",
-  "max-task":"1",
-  "connect-topicname":"debezium-mysql-source",
+  "connector-class": "org.apache.rocketmq.connect.debezium.mysql.DebeziumMysqlConnector",
+  "max-task": "1",
+  "connect-topicname": "debezium-mysql-source",
   "debezium.transforms": "Unwrap",
   "debezium.transforms.Unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
   "debezium.transforms.Unwrap.delete.handling.mode": "drop",
@@ -42,5 +42,5 @@
   "database.include.list": "db-01",
   "snapshot.mode": "when_needed",
 
-  "source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"
+  "source-record-converter": "org.apache.rocketmq.connect.runtime.converter.JsonConverter"
 }
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/pom.xml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/pom.xml
index cd18267..c504adf 100644
--- a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/pom.xml
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/pom.xml
@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <artifactId>rocketmq-connect-debezium</artifactId>
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/src/main/java/org/apache/rocketmq/connect/debezium/oracle/DebeziumOracleConnector.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/src/main/java/org/apache/rocketmq/connect/debezium/oracle/DebeziumOracleConnector.java
index ad051d4..a489df2 100644
--- a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/src/main/java/org/apache/rocketmq/connect/debezium/oracle/DebeziumOracleConnector.java
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-oracle/src/main/java/org/apache/rocketmq/connect/debezium/oracle/DebeziumOracleConnector.java
@@ -17,7 +17,6 @@
 
 package org.apache.rocketmq.connect.debezium.oracle;
 
-import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.Task;
 import org.apache.rocketmq.connect.debezium.DebeziumConnector;
 
@@ -30,6 +29,7 @@ public class DebeziumOracleConnector extends DebeziumConnector {
 
     /**
      * Return the current connector class
+     *
      * @return task implement class
      */
     @Override
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
index 319332a..f8986d1 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
@@ -119,7 +119,7 @@ public class Worker {
 
     private final Plugin plugin;
 
-    private static final int MAX_START_TIMEOUT_MILLS = 5000;
+    private static final int MAX_START_TIMEOUT_MILLS = 1000*60;
 
     private static final long MAX_STOP_TIMEOUT_MILLS = 20000;
 
@@ -541,7 +541,7 @@ public class Worker {
 
             try {
                 if (null != future) {
-                    future.get(1000, TimeUnit.MILLISECONDS);
+                    future.get(1000 * 60, TimeUnit.MILLISECONDS);
                 } else {
                     log.error("[BUG] errorTasks reference not found in taskFutureMap");
                 }