You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/07/28 06:36:39 UTC

[rocketmq-connect] branch master updated: [ISSUE #207]RecordConverter support convert record key (#213)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bd9f89  [ISSUE #207]RecordConverter support convert record key (#213)
9bd9f89 is described below

commit 9bd9f8991708609cd40f2a9dbfe9a5da2e7d3b56
Author: xiaoyi <su...@163.com>
AuthorDate: Thu Jul 28 14:36:34 2022 +0800

    [ISSUE #207]RecordConverter support convert record key (#213)
    
    * Optimize WorkerSourceTask and WorkerSinkTask #183
    
    * Optimize WorkerSourceTask and WorkerSinkTask #183
    
    * Optimize worker #183
    
    * fixed
    
    * fixed
    
    * format code
    
    * fixed
    
    * fixed WorkerTask #183
    
    * Fix debezium demecial type conversion problem #190
    
    * Upgrade rocketmq-replicator API to v0.1.3 #189
    
    * Encountered change event for table databasename.tablename whose schema isn`t known to this connector #191
    
    * Debezium mysql source connector delete event causes null pointer #196
    
    * remove local config
    
    * Debezium mysql source connector delete event causes null pointer #196
    
    * Ensure the orderly submission of offset #183
    
    * fixed
    
    * Optimize WorkerDirectTask #183
    
    * fixed  #183
    
    * test offset commit #183
    
    * fixed #183
    
    * fixed task id #183
    
    * fixed
    
    * upgrade fastjson
    
    * reformat code #183
    
    * Whether the same task is assigned to different worker nodes with different connectorTaskId [fixed]
    
    * fixed
    
    * Rocketmq replicator running null pointer #205
    
    * RecordConverter support convert record key #207
    
    * debezium record add key #207
    
    * jdbc plugin support record key #207
    
    * Jdbc connector no longer needs to serialize and deserialize ConnecRecord objects #209
    
    * support record delete #209
    
    * Fix test error
    
    * test convert record key #207
    
    * fixed
    
    * fixed
    
    * Update openmessaging connector version to 0.1.4
---
 .../kafka-connect-adaptor/pom.xml                  |   4 -
 .../kafka/connect/adaptor/schema/Converters.java   |  52 ++++++++---
 .../adaptor/schema/KafkaSinkValueConverter.java    |   6 ++
 .../schema/RocketMQSourceValueConverter.java       |   6 ++
 connectors/rocketmq-connect-debezium/pom.xml       |   7 +-
 .../rocketmq-connect-debezium-core/pom.xml         |   4 -
 connectors/rocketmq-connect-jdbc/pom.xml           |  46 +++++-----
 .../connect/jdbc/config/AbstractConfig.java        |   2 +-
 .../connect/jdbc/connector/JdbcSinkConfig.java     |  10 ++-
 .../jdbc/connector/JdbcSourceConnector.java        |   2 +-
 .../jdbc/dialect/PreparedStatementBinder.java      |  23 +++--
 .../connect/jdbc/sink/BufferedRecords.java         |  31 ++++++-
 .../connect/jdbc/sink/RecordValidator.java         | 100 +++++++++++++++++++++
 .../connect/jdbc/sink/metadata/FieldsMetadata.java |  64 +++++++++++++
 .../connect/jdbc/sink/metadata/SchemaPair.java     |   7 +-
 .../jdbc/source/offset/SourceOffsetCompute.java    |   3 +-
 pom.xml                                            |   2 +-
 .../connect/runtime/DistributedConnectStartup.java |   2 +-
 .../connect/runtime/StandaloneConnectStartup.java  |   7 +-
 .../connect/runtime/common/ConnectKeyValue.java    |  22 +++++
 .../runtime/config/RuntimeConfigDefine.java        |   5 +-
 .../runtime/connectorwrapper/TransformChain.java   |   4 +-
 .../connect/runtime/connectorwrapper/Worker.java   |  72 ++++++++-------
 .../runtime/connectorwrapper/WorkerDirectTask.java |   1 +
 .../runtime/connectorwrapper/WorkerSinkTask.java   |  52 +++++------
 .../runtime/connectorwrapper/WorkerSourceTask.java |  37 ++++----
 .../controller/AbstractConnectController.java      |   2 +-
 .../distributed/DistributedConnectController.java  |   2 +-
 .../{utils => controller/isolation}/Plugin.java    |  21 ++++-
 .../isolation}/PluginClassLoader.java              |   2 +-
 .../isolation}/PluginUtils.java                    |   2 +-
 .../isolation}/PluginWrapper.java                  |   2 +-
 .../standalone/StandaloneConnectController.java    |   2 +-
 .../runtime/converter/record/ConverterConfig.java  |  36 ++++++++
 .../runtime/converter/record/ConverterType.java    |  60 +++++++++++++
 .../runtime/service/ConfigManagementService.java   |   2 +-
 .../service/ConfigManagementServiceImpl.java       |   6 +-
 .../memory/MemoryConfigManagementServiceImpl.java  |   5 +-
 .../rocketmq/connect/runtime/utils/Base64Util.java |  52 +++++++++++
 .../runtime/connectorwrapper/WorkerTest.java       |   3 +-
 .../connect/runtime/rest/RestHandlerTest.java      |   6 +-
 .../service/ConfigManagementServiceImplTest.java   |   2 +-
 42 files changed, 603 insertions(+), 173 deletions(-)

diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/pom.xml b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/pom.xml
index b79d77b..4ee490b 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/pom.xml
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/pom.xml
@@ -39,10 +39,6 @@
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-remoting</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-openmessaging</artifactId>
-        </dependency>
 
         <dependency>
             <groupId>org.apache.kafka</groupId>
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java
index 9c0c529..1311554 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/Converters.java
@@ -40,15 +40,25 @@ public class Converters {
 
     public static ConnectRecord fromSourceRecord(SourceRecord record) {
         // sourceRecord convert connect Record
-        RocketMQSourceSchemaConverter rocketMQSourceSchemaConverter = new RocketMQSourceSchemaConverter(record.valueSchema());
-        io.openmessaging.connector.api.data.Schema schema = rocketMQSourceSchemaConverter.schema();
+        RocketMQSourceSchemaConverter valueSchemaConverter = new RocketMQSourceSchemaConverter(record.valueSchema());
+        io.openmessaging.connector.api.data.Schema valueSchema = valueSchemaConverter.schema();
+
+        io.openmessaging.connector.api.data.Schema keySchema = null;
+        if (record.keySchema() != null) {
+            RocketMQSourceSchemaConverter keySchemaConverter = new RocketMQSourceSchemaConverter(record.keySchema());
+            keySchema = keySchemaConverter.schema();
+        }
+
+
         RocketMQSourceValueConverter rocketMQSourceValueConverter = new RocketMQSourceValueConverter();
         ConnectRecord connectRecord = new ConnectRecord(
                 new RecordPartition(record.sourcePartition()),
                 new RecordOffset(record.sourceOffset()),
                 record.timestamp(),
-                schema,
-                rocketMQSourceValueConverter.value(schema, record.value()));
+                keySchema,
+                record.key() == null ? null : rocketMQSourceValueConverter.value(keySchema, record.key()),
+                valueSchema,
+                record.value() == null ? null : rocketMQSourceValueConverter.value(valueSchema, record.value()));
         Iterator<Header> headers = record.headers().iterator();
         while (headers.hasNext()) {
             Header header = headers.next();
@@ -60,15 +70,24 @@ public class Converters {
 
     public static ConnectRecord fromSinkRecord(SinkRecord record) {
         // sourceRecord convert connect Record
-        RocketMQSourceSchemaConverter rocketMQSourceSchemaConverter = new RocketMQSourceSchemaConverter(record.valueSchema());
-        io.openmessaging.connector.api.data.Schema schema = rocketMQSourceSchemaConverter.schema();
+        RocketMQSourceSchemaConverter valueSchemaConverter = new RocketMQSourceSchemaConverter(record.valueSchema());
+        io.openmessaging.connector.api.data.Schema valueSchema = valueSchemaConverter.schema();
+
+        io.openmessaging.connector.api.data.Schema keySchema = null;
+        if (record.keySchema() != null) {
+            RocketMQSourceSchemaConverter keySchemaConverter = new RocketMQSourceSchemaConverter(record.keySchema());
+            keySchema = keySchemaConverter.schema();
+        }
+
         RocketMQSourceValueConverter rocketMQSourceValueConverter = new RocketMQSourceValueConverter();
         ConnectRecord connectRecord = new ConnectRecord(
                 toRecordPartition(record),
                 toRecordOffset(record),
                 record.timestamp(),
-                schema,
-                rocketMQSourceValueConverter.value(schema, record.value()));
+                keySchema,
+                record.key() == null ? null : rocketMQSourceValueConverter.value(keySchema, record.key()),
+                valueSchema,
+                record.value() == null ? null :rocketMQSourceValueConverter.value(valueSchema, record.value()));
         Iterator<Header> headers = record.headers().iterator();
         while (headers.hasNext()) {
             Header header = headers.next();
@@ -86,8 +105,15 @@ public class Converters {
      */
     public static SinkRecord fromConnectRecord(ConnectRecord record) {
         // connect record  convert kafka  sink record
-        KafkaSinkSchemaConverter kafkaSinkSchemaConverter = new KafkaSinkSchemaConverter(record.getSchema());
-        Schema schema = kafkaSinkSchemaConverter.schema();
+        KafkaSinkSchemaConverter valueSchemaConverter = new KafkaSinkSchemaConverter(record.getSchema());
+        Schema schema = valueSchemaConverter.schema();
+        // key converter
+        Schema keySchema = null;
+        if (record.getKeySchema() != null){
+            KafkaSinkSchemaConverter keySchemaConverter = new KafkaSinkSchemaConverter(record.getKeySchema());
+            keySchema = keySchemaConverter.schema();
+        }
+
         KafkaSinkValueConverter sinkValueConverter = new KafkaSinkValueConverter();
         // add headers
         Headers headers = new ConnectHeaders();
@@ -100,10 +126,10 @@ public class Converters {
         SinkRecord sinkRecord = new SinkRecord(
                 topic(record.getPosition().getPartition()),
                 partition(record.getPosition().getPartition()),
-                null,
-                null,
+                keySchema,
+                record.getKey() == null ? null : sinkValueConverter.value(keySchema, record.getKey()),
                 schema,
-                sinkValueConverter.value(schema, record.getData()),
+                record.getData() == null ? null : sinkValueConverter.value(schema, record.getData()),
                 offset(record.getPosition().getOffset()),
                 record.getTimestamp(),
                 TimestampType.NO_TIMESTAMP_TYPE,
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkValueConverter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkValueConverter.java
index 7b37f59..de986bd 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkValueConverter.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkValueConverter.java
@@ -47,6 +47,12 @@ public class KafkaSinkValueConverter {
      * @return
      */
     private Object convertKafkaValue(Schema targetSchema, Object originalValue) {
+        if (targetSchema == null) {
+            if (originalValue == null) {
+                return null;
+            }
+            return  originalValue;
+        }
         switch (targetSchema.type()) {
             case INT8:
             case INT16:
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceValueConverter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceValueConverter.java
index 40d6178..2934564 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceValueConverter.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceValueConverter.java
@@ -49,6 +49,12 @@ public class RocketMQSourceValueConverter {
      * @return
      */
     private Object convertKafkaValue(Schema targetSchema, Object originalValue) {
+        if (targetSchema == null) {
+            if (originalValue == null) {
+                return null;
+            }
+            return  originalValue;
+        }
         switch (targetSchema.getFieldType()) {
             case INT8:
             case INT16:
diff --git a/connectors/rocketmq-connect-debezium/pom.xml b/connectors/rocketmq-connect-debezium/pom.xml
index 0715731..5f0e2ec 100644
--- a/connectors/rocketmq-connect-debezium/pom.xml
+++ b/connectors/rocketmq-connect-debezium/pom.xml
@@ -184,7 +184,7 @@
         <rocketmq-openmessaging.version>4.3.2</rocketmq-openmessaging.version>
 
         <!--rocketmq connect version-->
-        <openmessaging-connector.version>0.1.3</openmessaging-connector.version>
+        <openmessaging-connector.version>0.1.4</openmessaging-connector.version>
         <openmessaging-api.version>0.3.1-alpha</openmessaging-api.version>
     </properties>
 
@@ -231,11 +231,6 @@
                 <artifactId>rocketmq-remoting</artifactId>
                 <version>${rocketmq.version}</version>
             </dependency>
-            <dependency>
-                <groupId>org.apache.rocketmq</groupId>
-                <artifactId>rocketmq-openmessaging</artifactId>
-                <version>${rocketmq-openmessaging.version}</version>
-            </dependency>
 
             <!--kafka && kafka connect-->
             <dependency>
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/pom.xml b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/pom.xml
index 121e611..50cb6b9 100644
--- a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/pom.xml
+++ b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/pom.xml
@@ -68,10 +68,6 @@
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-remoting</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-openmessaging</artifactId>
-        </dependency>
 
         <dependency>
             <groupId>io.debezium</groupId>
diff --git a/connectors/rocketmq-connect-jdbc/pom.xml b/connectors/rocketmq-connect-jdbc/pom.xml
index 7617043..097bb3a 100644
--- a/connectors/rocketmq-connect-jdbc/pom.xml
+++ b/connectors/rocketmq-connect-jdbc/pom.xml
@@ -47,7 +47,7 @@
         <assertj.version>2.6.0</assertj.version>
         <mockito.version>2.6.3</mockito.version>
         <!--rocket connect api-->
-        <openmessaging-connector.version>0.1.3</openmessaging-connector.version>
+        <openmessaging-connector.version>0.1.4</openmessaging-connector.version>
         <openmessaging-api.version>0.3.1-alpha</openmessaging-api.version>
 
         <!--fast json-->
@@ -58,28 +58,34 @@
 
     <dependencies>
 
-        <!---->
         <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-client</artifactId>
-            <version>${rocketmq.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-tools</artifactId>
-            <version>${rocketmq.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-remoting</artifactId>
-            <version>${rocketmq.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.rocketmq</groupId>
-            <artifactId>rocketmq-openmessaging</artifactId>
-            <version>${rocketmq.version}</version>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.12.0</version>
         </dependency>
 
+<!--        <dependency>-->
+<!--            <groupId>org.apache.rocketmq</groupId>-->
+<!--            <artifactId>rocketmq-client</artifactId>-->
+<!--            <version>${rocketmq.version}</version>-->
+<!--        </dependency>-->
+<!--        <dependency>-->
+<!--            <groupId>org.apache.rocketmq</groupId>-->
+<!--            <artifactId>rocketmq-tools</artifactId>-->
+<!--            <version>${rocketmq.version}</version>-->
+<!--        </dependency>-->
+<!--        <dependency>-->
+<!--            <groupId>org.apache.rocketmq</groupId>-->
+<!--            <artifactId>rocketmq-remoting</artifactId>-->
+<!--            <version>${rocketmq.version}</version>-->
+<!--        </dependency>-->
+<!--        <dependency>-->
+<!--            <groupId>org.apache.rocketmq</groupId>-->
+<!--            <artifactId>rocketmq-openmessaging</artifactId>-->
+<!--            <version>${rocketmq.version}</version>-->
+<!--        </dependency>-->
+
+
         <!--rocketmq connect api-->
         <dependency>
             <groupId>io.openmessaging</groupId>
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/AbstractConfig.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/AbstractConfig.java
index 9d53fdc..3d7c52d 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/AbstractConfig.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/config/AbstractConfig.java
@@ -16,7 +16,7 @@
  */
 package org.apache.rocketmq.connect.jdbc.config;
 
-import com.beust.jcommander.internal.Lists;
+import com.google.common.collect.Lists;
 import io.openmessaging.KeyValue;
 import org.apache.rocketmq.connect.jdbc.util.QuoteMethod;
 
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConfig.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConfig.java
index 7c1f0b3..481bd9a 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConfig.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSinkConfig.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.connect.jdbc.connector;
 import io.openmessaging.KeyValue;
 import org.apache.rocketmq.connect.jdbc.config.AbstractConfig;
 import org.apache.rocketmq.connect.jdbc.dialect.DatabaseDialect;
+import org.apache.rocketmq.connect.jdbc.exception.ConfigException;
 import org.apache.rocketmq.connect.jdbc.schema.table.TableId;
 import org.apache.rocketmq.connect.jdbc.util.TableType;
 
@@ -187,7 +188,7 @@ public class JdbcSinkConfig extends AbstractConfig {
         tableNameFormat = config.getString(TABLE_NAME_FORMAT, TABLE_NAME_FORMAT_DEFAULT).trim();
         tableFromHeader = getBoolean(config, TABLE_NAME_FROM_HEADER, false);
         batchSize = config.getInt(BATCH_SIZE, BATCH_SIZE_DEFAULT);
-        deleteEnabled = getBoolean(config, DELETE_ENABLED, DELETE_ENABLED_DEFAULT);
+
         maxRetries = config.getInt(MAX_RETRIES, MAX_RETRIES_DEFAULT);
         retryBackoffMs = config.getInt(RETRY_BACKOFF_MS, RETRY_BACKOFF_MS_DEFAULT);
         autoCreate = getBoolean(config, AUTO_CREATE, AUTO_CREATE_DEFAULT);
@@ -195,9 +196,13 @@ public class JdbcSinkConfig extends AbstractConfig {
         if (Objects.nonNull(config.getString(INSERT_MODE))) {
             insertMode = InsertMode.valueOf(config.getString(INSERT_MODE, INSERT_MODE_DEFAULT).toUpperCase());
         }
-
+        deleteEnabled = getBoolean(config, DELETE_ENABLED, DELETE_ENABLED_DEFAULT);
         pkMode = PrimaryKeyMode.valueOf(config.getString(PK_MODE, PK_MODE_DEFAULT).toUpperCase());
         pkFields = getList(config, PK_FIELDS);
+        if (deleteEnabled && pkMode != PrimaryKeyMode.RECORD_KEY) {
+            throw new ConfigException(
+                    "Primary key mode must be 'record_key' when delete support is enabled");
+        }
         dialectName = config.getString(DIALECT_NAME_CONFIG);
         fieldsWhitelist = new HashSet<>(getList(config, FIELDS_WHITELIST));
         // table white list
@@ -206,6 +211,7 @@ public class JdbcSinkConfig extends AbstractConfig {
         timeZone = TimeZone.getTimeZone(ZoneId.of(dbTimeZone));
         tableTypes = TableType.parse(getList(config, TABLE_TYPES_CONFIG, TABLE_TYPES_DEFAULT));
 
+
     }
 
     public String getTableNameFormat() {
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
index 4ad9e4f..4a2efde 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/connector/JdbcSourceConnector.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.connect.jdbc.connector;
 
 import java.util.List;
 
-import com.beust.jcommander.internal.Lists;
+import com.google.common.collect.Lists;
 import io.openmessaging.connector.api.component.task.Task;
 import io.openmessaging.connector.api.component.task.source.SourceConnector;
 import io.openmessaging.internal.DefaultKeyValue;
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/PreparedStatementBinder.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/PreparedStatementBinder.java
index ddd5532..0443756 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/PreparedStatementBinder.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/PreparedStatementBinder.java
@@ -16,8 +16,6 @@
  */
 package org.apache.rocketmq.connect.jdbc.dialect;
 
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.Field;
 import io.openmessaging.connector.api.data.Schema;
@@ -98,10 +96,21 @@ public class PreparedStatementBinder implements DatabaseDialect.StatementBinder
                     throw new AssertionError();
                 }
                 break;
+            case RECORD_KEY: {
+                if (schemaPair.keySchema.getFieldType().isPrimitive()) {
+                    assert fieldsMetadata.keyFieldNames.size() == 1;
+                    bindField(index++, schemaPair.keySchema, record.getKey(),
+                            fieldsMetadata.keyFieldNames.iterator().next());
+                } else {
+                    for (String fieldName : fieldsMetadata.keyFieldNames) {
+                        final Field field = schemaPair.keySchema.getField(fieldName);
+                        bindField(index++, field.getSchema(), ((Struct) record.getKey()).get(field), fieldName);
+                    }
+                }
+            }
+            break;
             case RECORD_VALUE: {
-                String jsonData = JSON.toJSONString(record.getData(), SerializerFeature.DisableCircularReferenceDetect);
-                Struct struct = JSON.parseObject(jsonData, Struct.class);
-                struct.setValues(JSON.parseObject(jsonData).getJSONArray("values").toArray());
+                Struct struct = (Struct) record.getData();
                 for (String fieldName : fieldsMetadata.keyFieldNames) {
                     final Field field = schemaPair.schema.getField(fieldName);
                     bindField(index++, field.getSchema(), struct.get(fieldName), fieldName);
@@ -118,9 +127,7 @@ public class PreparedStatementBinder implements DatabaseDialect.StatementBinder
             ConnectRecord record,
             int index
     ) throws SQLException {
-        String jsonData = JSON.toJSONString(record.getData(), SerializerFeature.DisableCircularReferenceDetect);
-        Struct struct = JSON.parseObject(jsonData, Struct.class);
-        struct.setValues(JSON.parseObject(jsonData).getJSONArray("values").toArray());
+        Struct struct = (Struct) record.getData();
         for (final String fieldName : fieldsMetadata.nonKeyFieldNames) {
             final Field field = record.getSchema().getField(fieldName);
             bindField(index++, field.getSchema(), struct.get(fieldName), fieldName);
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/BufferedRecords.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/BufferedRecords.java
index 3ad19be..88e058f 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/BufferedRecords.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/BufferedRecords.java
@@ -57,12 +57,15 @@ public class BufferedRecords {
     private final Connection connection;
 
     private List<ConnectRecord> records = new ArrayList<>();
+    private Schema keySchema;
     private Schema schema;
     private FieldsMetadata fieldsMetadata;
+    private RecordValidator recordValidator;
     private PreparedStatement updatePreparedStatement;
     private PreparedStatement deletePreparedStatement;
     private DatabaseDialect.StatementBinder updateStatementBinder;
     private DatabaseDialect.StatementBinder deleteStatementBinder;
+    private boolean deletesInBatch = false;
 
     public BufferedRecords(
             JdbcSinkConfig config,
@@ -76,6 +79,7 @@ public class BufferedRecords {
         this.dbDialect = dbDialect;
         this.dbStructure = dbStructure;
         this.connection = connection;
+        this.recordValidator = RecordValidator.create(config);
     }
 
     /**
@@ -86,10 +90,27 @@ public class BufferedRecords {
      * @throws SQLException
      */
     public List<ConnectRecord> add(ConnectRecord record) throws SQLException {
+        recordValidator.validate(record);
         final List<ConnectRecord> flushed = new ArrayList<>();
         boolean schemaChanged = false;
-        // check and update schema
-        if (!Objects.equals(schema, record.getSchema())) {
+        if (!Objects.equals(keySchema, record.getKeySchema())) {
+            keySchema = record.getKeySchema();
+            schemaChanged = true;
+        }
+        if (isNull(record.getSchema())) {
+            // For deletes, value and optionally value schema come in as null.
+            // We don't want to treat this as a schema change if key schemas is the same
+            // otherwise we flush unnecessarily.
+            if (config.isDeleteEnabled()) {
+                deletesInBatch = true;
+            }
+        } else if (Objects.equals(schema, record.getSchema())) {
+            if (config.isDeleteEnabled() && deletesInBatch) {
+                // flush so an insert after a delete of same record isn't lost
+                flushed.addAll(flush());
+            }
+        } else {
+            // value schema is not null and has changed. This is a real schema change.
             schema = record.getSchema();
             schemaChanged = true;
         }
@@ -99,6 +120,7 @@ public class BufferedRecords {
             flushed.addAll(flush());
             // re-initialize everything that depends on the record schema
             final SchemaPair schemaPair = new SchemaPair(
+                    record.getKeySchema(),
                     record.getSchema(),
                     record.getExtensions()
             );
@@ -149,6 +171,11 @@ public class BufferedRecords {
             }
         }
 
+        // set deletesInBatch if schema value is not null
+        if (isNull(record.getData()) && config.isDeleteEnabled()) {
+            deletesInBatch = true;
+        }
+
         records.add(record);
         if (records.size() >= config.getBatchSize()) {
             flushed.addAll(flush());
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/RecordValidator.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/RecordValidator.java
new file mode 100644
index 0000000..03c9f55
--- /dev/null
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/RecordValidator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.jdbc.sink;
+
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.errors.ConnectException;
+import org.apache.rocketmq.connect.jdbc.connector.JdbcSinkConfig;
+
+@FunctionalInterface
+public interface RecordValidator {
+
+  RecordValidator NO_OP = (record) -> { };
+
+  void validate(ConnectRecord record);
+
+  default RecordValidator and(RecordValidator other) {
+    if (other == null || other == NO_OP || other == this) {
+      return this;
+    }
+    if (this == NO_OP) {
+      return other;
+    }
+    RecordValidator thisValidator = this;
+    return (record) -> {
+      thisValidator.validate(record);
+      other.validate(record);
+    };
+  }
+
+  static RecordValidator create(JdbcSinkConfig config) {
+    RecordValidator requiresKey = requiresKey(config);
+    RecordValidator requiresValue = requiresValue(config);
+
+    RecordValidator keyValidator = NO_OP;
+    RecordValidator valueValidator = NO_OP;
+    switch (config.pkMode) {
+      case RECORD_KEY:
+        keyValidator = keyValidator.and(requiresKey);
+        break;
+      case RECORD_VALUE:
+      case NONE:
+        valueValidator = valueValidator.and(requiresValue);
+        break;
+      default:
+        // no primary key is required
+        break;
+    }
+
+    if (config.isDeleteEnabled()) {
+      // When delete is enabled, we need a key
+      keyValidator = keyValidator.and(requiresKey);
+    } else {
+      // When delete is disabled, we need non-tombstone values
+      valueValidator = valueValidator.and(requiresValue);
+    }
+
+    // Compose the validator that may or may be NO_OP
+    return keyValidator.and(valueValidator);
+  }
+
+  static RecordValidator requiresValue(JdbcSinkConfig config) {
+    return record -> {
+      Schema valueSchema = record.getSchema();
+      if (record.getData() != null
+          && valueSchema != null
+          && valueSchema.getFieldType() == FieldType.STRUCT) {
+        return;
+      }
+      throw new ConnectException(record.toString());
+    };
+  }
+
+  static RecordValidator requiresKey(JdbcSinkConfig config) {
+    return record -> {
+      Schema keySchema = record.getKeySchema();
+      if (record.getKey() != null
+          && keySchema != null
+          && (keySchema.getFieldType() == FieldType.STRUCT || keySchema.getFieldType().isPrimitive())) {
+        return;
+      }
+      throw new ConnectException(record.toString());
+    };
+  }
+}
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/metadata/FieldsMetadata.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/metadata/FieldsMetadata.java
index d185218..88d3810 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/metadata/FieldsMetadata.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/metadata/FieldsMetadata.java
@@ -74,6 +74,7 @@ public class FieldsMetadata {
                 pkMode,
                 configuredPkFields,
                 fieldsWhitelist,
+                schemaPair.keySchema,
                 schemaPair.schema,
                 schemaPair.extensions
         );
@@ -95,6 +96,7 @@ public class FieldsMetadata {
             final JdbcSinkConfig.PrimaryKeyMode pkMode,
             final List<String> configuredPkFields,
             final Set<String> fieldsWhitelist,
+            final Schema keySchema,
             final Schema schema,
             final KeyValue headers
     ) {
@@ -106,6 +108,9 @@ public class FieldsMetadata {
         switch (pkMode) {
             case NONE:
                 break;
+            case RECORD_KEY:
+                extractRecordKeyPk(tableName, configuredPkFields, keySchema, allFields, keyFieldNames);
+                break;
             case RECORD_VALUE:
                 extractRecordValuePk(tableName, configuredPkFields, schema, headers, allFields, keyFieldNames);
                 break;
@@ -157,6 +162,65 @@ public class FieldsMetadata {
         return new FieldsMetadata(keyFieldNames, nonKeyFieldNames, allFieldsOrdered);
     }
 
+    private static void extractRecordKeyPk(
+            final String tableName,
+            final List<String> configuredPkFields,
+            final Schema keySchema,
+            final Map<String, SinkRecordField> allFields,
+            final Set<String> keyFieldNames
+    ) {
+        if (keySchema == null) {
+            throw new ConnectException(String.format(
+                    "PK mode for table '%s' is %s, but record key schema is missing",
+                    tableName,
+                    JdbcSinkConfig.PrimaryKeyMode.RECORD_KEY
+            ));
+        }
+        final FieldType keySchemaType = keySchema.getFieldType();
+        switch (keySchemaType){
+            case STRUCT:
+                if (configuredPkFields.isEmpty()) {
+                    keySchema.getFields().forEach(keyField->{
+                        keyFieldNames.add(keyField.getName());
+                    });
+                } else {
+                    for (String fieldName : configuredPkFields) {
+                        final Field keyField = keySchema.getField(fieldName);
+                        if (keyField == null) {
+                            throw new ConnectException(String.format(
+                                    "PK mode for table '%s' is %s with configured PK fields %s, but record key "
+                                            + "schema does not contain field: %s",
+                                    tableName, JdbcSinkConfig.PrimaryKeyMode.RECORD_KEY, configuredPkFields, fieldName
+                            ));
+                        }
+                    }
+                    keyFieldNames.addAll(configuredPkFields);
+                }
+                for (String fieldName : keyFieldNames) {
+                    final Schema fieldSchema = keySchema.getField(fieldName).getSchema();
+                    allFields.put(fieldName, new SinkRecordField(fieldSchema, fieldName, true));
+                }
+                break;
+            default:
+                // todo 基本数据类型,必须指定主键字段,因为基本类型不带schema
+                if (keySchemaType.isPrimitive()) {
+                    if (configuredPkFields.size() != 1) {
+                        throw new ConnectException(String.format(
+                                "Need exactly one PK column defined since the key schema for records is a "
+                                        + "primitive type, defined columns are: %s",
+                                configuredPkFields
+                        ));
+                    }
+                    final String fieldName = configuredPkFields.get(0);
+                    keyFieldNames.add(fieldName);
+                    allFields.put(fieldName, new SinkRecordField(keySchema, fieldName, true));
+                } else {
+                    throw new ConnectException(
+                            "Key schema must be primitive type or Struct, but is of type: " + keySchemaType
+                    );
+                }
+        }
+    }
 
     /**
      * record value
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/metadata/SchemaPair.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/metadata/SchemaPair.java
index 77955d7..f2196f4 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/metadata/SchemaPair.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/sink/metadata/SchemaPair.java
@@ -25,15 +25,18 @@ import java.util.Objects;
  * schema pair
  */
 public class SchemaPair {
+    public final Schema keySchema;
     public final Schema schema;
     public final KeyValue extensions;
 
-    public SchemaPair(Schema schema) {
+    public SchemaPair(Schema keySchema, Schema schema) {
+        this.keySchema = keySchema;
         this.schema = schema;
         this.extensions = null;
     }
 
-    public SchemaPair(Schema schema, KeyValue extensions) {
+    public SchemaPair(Schema keySchema, Schema schema, KeyValue extensions) {
+        this.keySchema = keySchema;
         this.schema = schema;
         this.extensions = extensions;
     }
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/offset/SourceOffsetCompute.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/offset/SourceOffsetCompute.java
index 0ae2f2c..359e129 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/offset/SourceOffsetCompute.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/source/offset/SourceOffsetCompute.java
@@ -16,7 +16,8 @@
  */
 package org.apache.rocketmq.connect.jdbc.source.offset;
 
-import com.beust.jcommander.internal.Maps;
+
+import com.google.common.collect.Maps;
 import io.openmessaging.connector.api.component.task.source.SourceTaskContext;
 import io.openmessaging.connector.api.data.RecordOffset;
 import io.openmessaging.connector.api.data.RecordPartition;
diff --git a/pom.xml b/pom.xml
index 64b719a..f4bd51b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,7 +43,7 @@
         <assertj.version>2.6.0</assertj.version>
         <mockito.version>3.2.4</mockito.version>
         <httpclient.version>4.5.13</httpclient.version>
-        <openmessaging.connector.version>0.1.4-SNAPSHOT</openmessaging.connector.version>
+        <openmessaging.connector.version>0.1.4</openmessaging.connector.version>
         <fastjson.version>1.2.83</fastjson.version>
         <javalin.version>2.8.0</javalin.version>
         <slf4j.version>1.7.7</slf4j.version>
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java
index 2f0dcda..3e958d5 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/DistributedConnectStartup.java
@@ -40,7 +40,7 @@ import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
 import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
 import org.apache.rocketmq.connect.runtime.service.StagingMode;
 import org.apache.rocketmq.connect.runtime.utils.FileAndPropertyUtil;
-import org.apache.rocketmq.connect.runtime.utils.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
 import org.apache.rocketmq.connect.runtime.utils.ServerUtil;
 import org.apache.rocketmq.connect.runtime.utils.ServiceProviderUtil;
 import org.slf4j.Logger;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java
index 2770265..c7adad2 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java
@@ -17,8 +17,6 @@
 
 package org.apache.rocketmq.connect.runtime;
 
-import ch.qos.logback.classic.LoggerContext;
-import ch.qos.logback.classic.joran.JoranConfigurator;
 import java.io.BufferedInputStream;
 import java.io.FileInputStream;
 import java.io.InputStream;
@@ -26,6 +24,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicInteger;
+
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.joran.JoranConfigurator;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
@@ -40,7 +41,7 @@ import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
 import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
 import org.apache.rocketmq.connect.runtime.service.StagingMode;
 import org.apache.rocketmq.connect.runtime.utils.FileAndPropertyUtil;
-import org.apache.rocketmq.connect.runtime.utils.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
 import org.apache.rocketmq.connect.runtime.utils.ServerUtil;
 import org.apache.rocketmq.connect.runtime.utils.ServiceProviderUtil;
 import org.slf4j.Logger;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValue.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValue.java
index dc8e962..f64b9e9 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValue.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/common/ConnectKeyValue.java
@@ -125,6 +125,26 @@ public class ConnectKeyValue implements KeyValue, Serializable {
         this.properties = properties;
     }
 
+    /**
+     * Gets all original settings with the given prefix.
+     * @param prefix the prefix to use as a filter
+     * @param strip strip the prefix before adding to the output if set true
+     * @return a Map containing the settings with the prefix
+     */
+    public Map<String, String> originalsWithPrefix(String prefix, boolean strip) {
+        Map<String, String> result = new ConcurrentHashMap<>();
+        for (Map.Entry<String, String> entry : this.properties.entrySet()) {
+            if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length()) {
+                if (strip) {
+                    result.put(entry.getKey().substring(prefix.length()), entry.getValue());
+                } else {
+                    result.put(entry.getKey(), entry.getValue());
+                }
+            }
+        }
+        return result;
+    }
+
     @Override
     public boolean equals(Object obj) {
 
@@ -145,4 +165,6 @@ public class ConnectKeyValue implements KeyValue, Serializable {
             "properties=" + properties +
             '}';
     }
+
+
 }
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/RuntimeConfigDefine.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/RuntimeConfigDefine.java
index d7a13e2..7110e6e 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/RuntimeConfigDefine.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/RuntimeConfigDefine.java
@@ -61,7 +61,10 @@ public class RuntimeConfigDefine {
     /**
      * The full class name of record converter. Which is used to parse {@link ConnectRecord} to/from byte[].
      */
-    public static final String SOURCE_RECORD_CONVERTER = "source-record-converter";
+    public static final String SOURCE_RECORD_CONVERTER = "value-converter";
+    public static final String SOURCE_RECORD_CONVERTER_DEFAULT = "org.apache.rocketmq.connect.runtime.converter.record.StringConverter";
+    public static final String SOURCE_RECORD_KEY_CONVERTER = "key-converter";
+    public static final String SOURCE_RECORD_KEY_CONVERTER_DEFAULT = "org.apache.rocketmq.connect.runtime.converter.record.StringConverter";
 
     public static final String NAMESRV_ADDR = "namesrv-addr";
 
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
index df09f36..ef3d264 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/TransformChain.java
@@ -29,8 +29,8 @@ import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
 import org.apache.rocketmq.connect.runtime.errors.ErrorReporter;
 import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
-import org.apache.rocketmq.connect.runtime.utils.Plugin;
-import org.apache.rocketmq.connect.runtime.utils.PluginClassLoader;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.PluginClassLoader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
index dec3e88..cbfd271 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
@@ -27,7 +27,6 @@ import io.openmessaging.connector.api.component.task.source.SourceTask;
 import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.RecordConverter;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
@@ -35,6 +34,8 @@ import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
 import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
 import org.apache.rocketmq.connect.runtime.controller.AbstractConnectController;
+import org.apache.rocketmq.connect.runtime.converter.record.ConverterConfig;
+import org.apache.rocketmq.connect.runtime.converter.record.ConverterType;
 import org.apache.rocketmq.connect.runtime.errors.ReporterManagerUtil;
 import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
@@ -44,8 +45,8 @@ import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
 import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
 import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
 import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
-import org.apache.rocketmq.connect.runtime.utils.Plugin;
-import org.apache.rocketmq.connect.runtime.utils.PluginClassLoader;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.PluginClassLoader;
 import org.apache.rocketmq.connect.runtime.utils.ServiceThread;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -65,7 +66,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -553,32 +553,38 @@ public class Worker {
                     createDirectTask(id, keyValue);
                     continue;
                 }
+
+
+                ClassLoader savedLoader = plugin.currentThreadLoader();
                 try {
-                    String taskClass = keyValue.getString(RuntimeConfigDefine.TASK_CLASS);
-                    ClassLoader loader = plugin.getPluginClassLoader(taskClass);
-                    final ClassLoader currentThreadLoader = plugin.currentThreadLoader();
-                    Class taskClazz;
-                    boolean isolationFlag = false;
-                    if (loader instanceof PluginClassLoader) {
-                        taskClazz = ((PluginClassLoader) loader).loadClass(taskClass, false);
-                        isolationFlag = true;
-                    } else {
-                        taskClazz = Class.forName(taskClass);
-                    }
-                    final Task task = (Task) taskClazz.getDeclaredConstructor().newInstance();
-                    final String converterClazzName = keyValue.getString(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER);
-                    RecordConverter recordConverter = null;
-                    if (StringUtils.isNotEmpty(converterClazzName)) {
-                        recordConverter = Class.forName(converterClazzName)
-                                .asSubclass(RecordConverter.class)
-                                .getDeclaredConstructor()
-                                .newInstance();
-                        recordConverter.configure(keyValue.getProperties());
-                    }
 
-                    if (isolationFlag) {
-                        Plugin.compareAndSwapLoaders(loader);
-                    }
+                    String connType = keyValue.getString(RuntimeConfigDefine.CONNECTOR_CLASS);
+                    ClassLoader connectorLoader = plugin.getPluginClassLoader(connType);
+                    savedLoader = Plugin.compareAndSwapLoaders(connectorLoader);
+
+                    final Class<? extends Task> taskClass = plugin.currentThreadLoader().loadClass(keyValue.getString(RuntimeConfigDefine.TASK_CLASS)).asSubclass(Task.class);
+
+                    final Task task = plugin.newTask(taskClass);
+
+                    final String valueConverterClazzName = keyValue.getString(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, RuntimeConfigDefine.SOURCE_RECORD_CONVERTER_DEFAULT);
+                    final String keyConverterClazzName = keyValue.getString(RuntimeConfigDefine.SOURCE_RECORD_KEY_CONVERTER, RuntimeConfigDefine.SOURCE_RECORD_KEY_CONVERTER_DEFAULT);
+                    // new stance
+                    RecordConverter valueConverter = Class.forName(valueConverterClazzName).asSubclass(RecordConverter.class).getDeclaredConstructor().newInstance();
+                    RecordConverter keyConverter = Class.forName(keyConverterClazzName).asSubclass(RecordConverter.class).getDeclaredConstructor().newInstance();
+
+                    //value config
+                    Map<String, String> valueConverterConfig = keyValue.originalsWithPrefix(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER,true);
+                    valueConverterConfig.put(ConverterConfig.TYPE_CONFIG, ConverterType.VALUE.getName());
+                    valueConverter.configure(valueConverterConfig);
+
+                    //key config
+                    Map<String, String> keyConverterConfig = keyValue.originalsWithPrefix(RuntimeConfigDefine.SOURCE_RECORD_KEY_CONVERTER, true);
+                    keyConverterConfig.put(ConverterConfig.TYPE_CONFIG, ConverterType.KEY.getName());
+                    keyConverter.configure(keyConverterConfig);
+
+//                    if (isolationFlag) {
+//                        Plugin.compareAndSwapLoaders(loader);
+//                    }
                     if (task instanceof SourceTask) {
                         DefaultMQProducer producer = ConnectUtil.initDefaultMQProducer(workerConfig);
                         TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
@@ -587,8 +593,8 @@ public class Worker {
                         retryWithToleranceOperator.reporters(ReporterManagerUtil.sourceTaskReporters(connectorName, keyValue));
 
                         WorkerSourceTask workerSourceTask = new WorkerSourceTask(workerConfig, id,
-                                (SourceTask) task, loader, keyValue, positionManagementService, recordConverter, producer, workerState, connectStatsManager, connectStatsService, transformChain, retryWithToleranceOperator);
-                        Plugin.compareAndSwapLoaders(currentThreadLoader);
+                                (SourceTask) task, savedLoader, keyValue, positionManagementService, keyConverter, valueConverter, producer, workerState, connectStatsManager, connectStatsService, transformChain, retryWithToleranceOperator);
+
                         Future future = taskExecutor.submit(workerSourceTask);
                         // schedule offset committer
                         sourceTaskOffsetCommitter.ifPresent(committer -> committer.schedule(id, workerSourceTask));
@@ -609,13 +615,13 @@ public class Worker {
                         retryWithToleranceOperator.reporters(ReporterManagerUtil.sinkTaskReporters(connectorName, keyValue, workerConfig));
 
                         WorkerSinkTask workerSinkTask = new WorkerSinkTask(workerConfig, id,
-                                (SinkTask) task, loader, keyValue, recordConverter, consumer, workerState, connectStatsManager, connectStatsService, transformChain,
-                                retryWithToleranceOperator, ReporterManagerUtil.createWorkerErrorRecordReporter(keyValue, retryWithToleranceOperator, recordConverter));
-                        Plugin.compareAndSwapLoaders(currentThreadLoader);
+                                (SinkTask) task, savedLoader, keyValue, keyConverter, valueConverter, consumer, workerState, connectStatsManager, connectStatsService, transformChain,
+                                retryWithToleranceOperator, ReporterManagerUtil.createWorkerErrorRecordReporter(keyValue, retryWithToleranceOperator, valueConverter));
                         Future future = taskExecutor.submit(workerSinkTask);
                         taskToFutureMap.put(workerSinkTask, future);
                         this.pendingTasks.put(workerSinkTask, System.currentTimeMillis());
                     }
+                    Plugin.compareAndSwapLoaders(savedLoader);
                 } catch (Exception e) {
                     log.error("start worker task exception. config {}" + JSON.toJSONString(keyValue), e);
                 }
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
index ce56766..0648e90 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
@@ -84,6 +84,7 @@ public class WorkerDirectTask extends WorkerSourceTask {
                 positionManagementService,
                 null,
                 null,
+                null,
                 workerState,
                 connectStatsManager,
                 connectStatsService,
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
index 70fb7bc..cf286aa 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java
@@ -50,13 +50,13 @@ import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.rocketmq.connect.runtime.errors.WorkerErrorRecordReporter;
 import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
 import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
+import org.apache.rocketmq.connect.runtime.utils.Base64Util;
 import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
 import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
 import org.apache.rocketmq.connect.runtime.utils.Utils;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -71,6 +71,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
+
 /**
  * A wrapper of {@link SinkTask} for runtime.
  */
@@ -90,7 +91,8 @@ public class WorkerSinkTask extends WorkerTask {
     /**
      * A converter to parse sink data entry to object.
      */
-    private RecordConverter recordConverter;
+    private RecordConverter keyConverter;
+    private RecordConverter valueConverter;
 
     private final ConcurrentHashMap<MessageQueue, Long> messageQueuesOffsetMap;
 
@@ -150,7 +152,8 @@ public class WorkerSinkTask extends WorkerTask {
                           SinkTask sinkTask,
                           ClassLoader classLoader,
                           ConnectKeyValue taskConfig,
-                          RecordConverter recordConverter,
+                          RecordConverter keyConverter,
+                          RecordConverter valueConverter,
                           DefaultMQPullConsumer consumer,
                           AtomicReference<WorkerState> workerState,
                           ConnectStatsManager connectStatsManager,
@@ -161,7 +164,8 @@ public class WorkerSinkTask extends WorkerTask {
         super(workerConfig, id, classLoader, taskConfig, retryWithToleranceOperator, transformChain, workerState);
         this.sinkTask = sinkTask;
         this.consumer = consumer;
-        this.recordConverter = recordConverter;
+        this.keyConverter = keyConverter;
+        this.valueConverter = valueConverter;
         this.messageQueuesOffsetMap = new ConcurrentHashMap<>(256);
         this.messageQueuesStateMap = new ConcurrentHashMap<>(256);
         this.connectStatsManager = connectStatsManager;
@@ -437,28 +441,24 @@ public class WorkerSinkTask extends WorkerTask {
 
     private ConnectRecord convertMessages(MessageExt message) {
         Map<String, String> properties = message.getProperties();
-        ConnectRecord record;
-        // start convert
-        if (recordConverter == null) {
-            final byte[] messageBody = message.getBody();
-            String s = new String(messageBody);
-            record = JSON.parseObject(s, ConnectRecord.class);
-        } else {
-            // timestamp
-            String connectTimestamp = properties.get(RuntimeConfigDefine.CONNECT_TIMESTAMP);
-            Long timestamp = StringUtils.isNotEmpty(connectTimestamp) ? Long.valueOf(connectTimestamp) : null;
-
-            // partition and offset
-            RecordPartition recordPartition = ConnectUtil.convertToRecordPartition(message.getTopic(), message.getBrokerName(), message.getQueueId());
-            RecordOffset recordOffset = ConnectUtil.convertToRecordOffset(message.getQueueOffset());
-
-            // convert
-            SchemaAndValue schemaAndValue = retryWithToleranceOperator.execute(() -> recordConverter.toConnectData(message.getTopic(), message.getBody()),
-                    ErrorReporter.Stage.CONVERTER, recordConverter.getClass());
-            record = new ConnectRecord(recordPartition, recordOffset, timestamp, schemaAndValue.schema(), schemaAndValue.value());
-            if (retryWithToleranceOperator.failed()) {
-                return null;
-            }
+        // timestamp
+        String connectTimestamp = properties.get(RuntimeConfigDefine.CONNECT_TIMESTAMP);
+        Long timestamp = StringUtils.isNotEmpty(connectTimestamp) ? Long.valueOf(connectTimestamp) : null;
+
+        // partition and offset
+        RecordPartition recordPartition = ConnectUtil.convertToRecordPartition(message.getTopic(), message.getBrokerName(), message.getQueueId());
+        RecordOffset recordOffset = ConnectUtil.convertToRecordOffset(message.getQueueOffset());
+
+
+        SchemaAndValue schemaAndKey = retryWithToleranceOperator.execute(() -> keyConverter.toConnectData(message.getTopic(), Base64Util.base64Decode(message.getKeys())),
+                ErrorReporter.Stage.CONVERTER, keyConverter.getClass());
+
+        // convert value
+        SchemaAndValue schemaAndValue = retryWithToleranceOperator.execute(() -> valueConverter.toConnectData(message.getTopic(), message.getBody()),
+                ErrorReporter.Stage.CONVERTER, valueConverter.getClass());
+        ConnectRecord record = new ConnectRecord(recordPartition, recordOffset, timestamp, schemaAndKey.schema(), schemaAndKey.value(), schemaAndValue.schema(), schemaAndValue.value());
+        if (retryWithToleranceOperator.failed()) {
+            return null;
         }
 
         // Apply the transformations
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
index 746c871..0a26faa 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java
@@ -19,7 +19,6 @@
 package org.apache.rocketmq.connect.runtime.connectorwrapper;
 
 import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.serializer.SerializerFeature;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.source.SourceTask;
 import io.openmessaging.connector.api.data.ConnectRecord;
@@ -51,13 +50,13 @@ import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
 import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
 import org.apache.rocketmq.connect.runtime.store.PositionStorageReaderImpl;
 import org.apache.rocketmq.connect.runtime.store.PositionStorageWriter;
+import org.apache.rocketmq.connect.runtime.utils.Base64Util;
 import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
 import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
 import org.apache.rocketmq.connect.runtime.utils.Utils;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -105,7 +104,8 @@ public class WorkerSourceTask extends WorkerTask {
     /**
      * A converter to parse source data entry to byte[].
      */
-    private final RecordConverter recordConverter;
+    private final RecordConverter keyConverter;
+    private final RecordConverter valueConverter;
 
     /**
      * stat connect
@@ -136,7 +136,8 @@ public class WorkerSourceTask extends WorkerTask {
                             ClassLoader classLoader,
                             ConnectKeyValue taskConfig,
                             PositionManagementService positionManagementService,
-                            RecordConverter recordConverter,
+                            RecordConverter keyConverter,
+                            RecordConverter valueConverter,
                             DefaultMQProducer producer,
                             AtomicReference<WorkerState> workerState,
                             ConnectStatsManager connectStatsManager,
@@ -149,7 +150,8 @@ public class WorkerSourceTask extends WorkerTask {
         this.offsetStorageReader = new PositionStorageReaderImpl(id.connector(), positionManagementService);
         this.positionStorageWriter = new PositionStorageWriter(id.connector(), positionManagementService);
         this.producer = producer;
-        this.recordConverter = recordConverter;
+        this.valueConverter = valueConverter;
+        this.keyConverter = keyConverter;
         this.connectStatsManager = connectStatsManager;
         this.connectStatsService = connectStatsService;
         this.sourceTaskContext = new WorkerSourceTaskContext(offsetStorageReader, this, taskConfig);
@@ -356,22 +358,16 @@ public class WorkerSourceTask extends WorkerTask {
         }
         Message sourceMessage = new Message();
         sourceMessage.setTopic(topic);
-        // converter
-        if (recordConverter == null) {
-            final byte[] messageBody = JSON.toJSONString(record, SerializerFeature.DisableCircularReferenceDetect, SerializerFeature.WriteMapNullValue).getBytes();
-            if (messageBody.length > RuntimeConfigDefine.MAX_MESSAGE_SIZE) {
-                log.error("Send record, message size is greater than {} bytes, record: {}", RuntimeConfigDefine.MAX_MESSAGE_SIZE, JSON.toJSONString(record));
-            }
-            sourceMessage.setBody(messageBody);
-        } else {
-            byte[] messageBody = retryWithToleranceOperator.execute(() -> recordConverter.fromConnectData(topic, record.getSchema(), record.getData()),
-                    ErrorReporter.Stage.CONVERTER, recordConverter.getClass());
-            if (messageBody.length > RuntimeConfigDefine.MAX_MESSAGE_SIZE) {
-                log.error("Send record, message size is greater than {} bytes, record: {}", RuntimeConfigDefine.MAX_MESSAGE_SIZE, JSON.toJSONString(record));
-            }
-            sourceMessage.setBody(messageBody);
-        }
+        byte[] key = retryWithToleranceOperator.execute(() -> keyConverter.fromConnectData(topic, record.getKeySchema(), record.getKey()),
+                ErrorReporter.Stage.CONVERTER, keyConverter.getClass());
 
+        byte[] value = retryWithToleranceOperator.execute(() -> valueConverter.fromConnectData(topic, record.getSchema(), record.getData()),
+                ErrorReporter.Stage.CONVERTER, valueConverter.getClass());
+        if (value.length > RuntimeConfigDefine.MAX_MESSAGE_SIZE) {
+            log.error("Send record, message size is greater than {} bytes, record: {}", RuntimeConfigDefine.MAX_MESSAGE_SIZE, JSON.toJSONString(record));
+        }
+        sourceMessage.setKeys(Base64Util.base64Encode(key));
+        sourceMessage.setBody(value);
         if (retryWithToleranceOperator.failed()) {
             return null;
         }
@@ -588,7 +584,6 @@ public class WorkerSourceTask extends WorkerTask {
             positionStorageWriter.cancelFlush();
             return false;
         }
-
         long durationMillis = System.currentTimeMillis() - started;
         log.debug("{} Finished commitOffsets successfully in {} ms",
                 this, durationMillis);
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java
index 84128ae..7ca5a43 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/AbstractConnectController.java
@@ -26,7 +26,7 @@ import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
 import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
 import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
 import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
-import org.apache.rocketmq.connect.runtime.utils.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectController.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectController.java
index 403cf30..a6f9d47 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectController.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectController.java
@@ -26,7 +26,7 @@ import org.apache.rocketmq.connect.runtime.service.RebalanceImpl;
 import org.apache.rocketmq.connect.runtime.service.RebalanceService;
 import org.apache.rocketmq.connect.runtime.service.strategy.AllocateConnAndTaskStrategy;
 import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
-import org.apache.rocketmq.connect.runtime.utils.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Plugin.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/Plugin.java
similarity index 88%
rename from rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Plugin.java
rename to rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/Plugin.java
index 39ca220..66350da 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Plugin.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/Plugin.java
@@ -14,7 +14,7 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package org.apache.rocketmq.connect.runtime.utils;
+package org.apache.rocketmq.connect.runtime.controller.isolation;
 
 import io.openmessaging.connector.api.component.Transform;
 import io.openmessaging.connector.api.component.connector.Connector;
@@ -34,6 +34,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+
+import io.openmessaging.connector.api.errors.ConnectException;
 import org.reflections.Configuration;
 import org.reflections.Reflections;
 import org.reflections.ReflectionsException;
@@ -166,6 +168,23 @@ public class Plugin extends URLClassLoader {
         return null;
     }
 
+    public Task newTask(Class<? extends Task> taskClass) {
+        return newPlugin(taskClass);
+    }
+
+    protected static <T> T newPlugin(Class<T> klass) {
+        // KAFKA-8340: The thread classloader is used during static initialization and must be
+        // set to the plugin's classloader during instantiation
+        ClassLoader savedLoader = compareAndSwapLoaders(klass.getClassLoader());
+        try {
+            return klass.getDeclaredConstructor().newInstance();
+        } catch (Throwable t) {
+            throw new ConnectException("Instantiation error", t);
+        } finally {
+            compareAndSwapLoaders(savedLoader);
+        }
+    }
+
     public ClassLoader currentThreadLoader() {
         return Thread.currentThread().getContextClassLoader();
     }
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/PluginClassLoader.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginClassLoader.java
similarity index 97%
rename from rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/PluginClassLoader.java
rename to rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginClassLoader.java
index 5f8153e..c5419b4 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/PluginClassLoader.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginClassLoader.java
@@ -14,7 +14,7 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package org.apache.rocketmq.connect.runtime.utils;
+package org.apache.rocketmq.connect.runtime.controller.isolation;
 
 import java.net.URL;
 import java.net.URLClassLoader;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/PluginUtils.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginUtils.java
similarity index 99%
rename from rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/PluginUtils.java
rename to rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginUtils.java
index b2bf34c..4a6492a 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/PluginUtils.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginUtils.java
@@ -14,7 +14,7 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package org.apache.rocketmq.connect.runtime.utils;
+package org.apache.rocketmq.connect.runtime.controller.isolation;
 
 import java.io.IOException;
 import java.nio.file.DirectoryStream;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/PluginWrapper.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginWrapper.java
similarity index 97%
rename from rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/PluginWrapper.java
rename to rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginWrapper.java
index 0173430..070f6ce 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/PluginWrapper.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/isolation/PluginWrapper.java
@@ -14,7 +14,7 @@
  *  See the License for the specific language governing permissions and
  *  limitations under the License.
  */
-package org.apache.rocketmq.connect.runtime.utils;
+package org.apache.rocketmq.connect.runtime.controller.isolation;
 
 import io.openmessaging.connector.api.component.task.sink.SinkConnector;
 import io.openmessaging.connector.api.component.task.source.SourceConnector;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectController.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectController.java
index feb2bb9..f056af0 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectController.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectController.java
@@ -25,7 +25,7 @@ import org.apache.rocketmq.connect.runtime.service.RebalanceImpl;
 import org.apache.rocketmq.connect.runtime.service.memory.StandaloneRebalanceService;
 import org.apache.rocketmq.connect.runtime.service.strategy.AllocateConnAndTaskStrategy;
 import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
-import org.apache.rocketmq.connect.runtime.utils.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
 
 
 /**
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/ConverterConfig.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/ConverterConfig.java
new file mode 100644
index 0000000..16e1913
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/ConverterConfig.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.converter.record;
+
+import io.openmessaging.KeyValue;
+
+/**
+ * converter config
+ */
+public class ConverterConfig {
+
+    public static final String TYPE_CONFIG = "converter.type";
+    private static final String TYPE_DOC = "How this converter will be used.";
+
+    /**
+     * Get the type of converter as defined by the {@link #TYPE_CONFIG} configuration.
+     * @return the converter type; never null
+     */
+    public ConverterType type(KeyValue config) {
+        return ConverterType.withName(config.getString(TYPE_CONFIG));
+    }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/ConverterType.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/ConverterType.java
new file mode 100644
index 0000000..c7f427b
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/converter/record/ConverterType.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.connect.runtime.converter.record;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+
+
+/**
+ *  converter type
+ */
+public enum ConverterType {
+    KEY,
+    VALUE;
+
+    private static final Map<String, ConverterType> NAME_TO_TYPE;
+
+    static {
+        ConverterType[] types = ConverterType.values();
+        Map<String, ConverterType> nameToType = new HashMap<>(types.length);
+        for (ConverterType type : types) {
+            nameToType.put(type.name, type);
+        }
+        NAME_TO_TYPE = Collections.unmodifiableMap(nameToType);
+    }
+
+
+    public static ConverterType withName(String name) {
+        if (name == null) {
+            return null;
+        }
+        return NAME_TO_TYPE.get(name.toLowerCase(Locale.getDefault()));
+    }
+
+    private String name;
+
+    ConverterType() {
+        this.name = this.name().toLowerCase(Locale.ROOT);
+    }
+
+    public String getName() {
+        return name;
+    }
+}
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java
index 60343e8..bf1584d 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java
@@ -24,7 +24,7 @@ import java.util.Map;
 
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
-import org.apache.rocketmq.connect.runtime.utils.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
 
 /**
  * Interface for config manager. Contains connector configs and task configs. All worker in a cluster should keep the
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
index 1461f5e..a37fea0 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
@@ -17,9 +17,8 @@
 
 package org.apache.rocketmq.connect.runtime.service;
 
-import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.connector.Connector;
-import java.util.ArrayList;
+
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -31,7 +30,6 @@ import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
 import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
-import org.apache.rocketmq.connect.runtime.connectorwrapper.Worker;
 import org.apache.rocketmq.connect.runtime.converter.ConnAndTaskConfigConverter;
 import org.apache.rocketmq.connect.runtime.converter.JsonConverter;
 import org.apache.rocketmq.connect.runtime.converter.ListConverter;
@@ -39,7 +37,7 @@ import org.apache.rocketmq.connect.runtime.store.FileBaseKeyValueStore;
 import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
 import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
 import org.apache.rocketmq.connect.runtime.utils.FilePathConfigUtil;
-import org.apache.rocketmq.connect.runtime.utils.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
 import org.apache.rocketmq.connect.runtime.utils.datasync.BrokerBasedLog;
 import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizer;
 import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizerCallback;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
index fa5425b..06256ad 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
@@ -17,7 +17,6 @@
 package org.apache.rocketmq.connect.runtime.service.memory;
 
 
-import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.connector.Connector;
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
@@ -27,14 +26,12 @@ import org.apache.rocketmq.connect.runtime.service.AbstractConfigManagementServi
 import org.apache.rocketmq.connect.runtime.service.StagingMode;
 import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
 import org.apache.rocketmq.connect.runtime.store.MemoryBasedKeyValueStore;
-import org.apache.rocketmq.connect.runtime.utils.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.List;
-import java.util.ArrayList;
-import java.util.Set;
 import java.util.Map;
 /**
  * memory config management service impl for standalone
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Base64Util.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Base64Util.java
new file mode 100644
index 0000000..bc9c876
--- /dev/null
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/Base64Util.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.utils;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Base64;
+
+/**
+ * base64 utils
+ */
+public class Base64Util {
+
+    /**
+     * encode
+     * @param in
+     * @return
+     */
+    public static String base64Encode(byte[] in) {
+        if (in == null) {
+            return null;
+        }
+        return Base64.getEncoder().encodeToString(in);
+    }
+
+    /**
+     * decode
+     * @param in
+     * @return
+     */
+    public static byte[] base64Decode(String in) {
+        if (StringUtils.isEmpty(in)) {
+            return null;
+        }
+        return Base64.getDecoder().decode(in);
+    }
+}
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
index 529d1ba..63e966b 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTest.java
@@ -46,7 +46,7 @@ import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
 import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
 import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
 import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
-import org.apache.rocketmq.connect.runtime.utils.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
 import org.apache.rocketmq.connect.runtime.utils.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -126,6 +126,7 @@ public class WorkerTest {
                 connectKeyValue,
                 new TestPositionManageServiceImpl(),
                 new JsonConverter(),
+                    new JsonConverter(),
                 producer,
                 new AtomicReference(WorkerState.STARTED),
                 connectStatsManager, connectStatsService,
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
index f82fcb2..04c4934 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
@@ -58,7 +58,7 @@ import org.apache.rocketmq.connect.runtime.service.PositionManagementServiceImpl
 import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
 import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
 import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
-import org.apache.rocketmq.connect.runtime.utils.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -213,13 +213,13 @@ public class RestHandlerTest {
         RetryWithToleranceOperator retryWithToleranceOperator = ReporterManagerUtil.createRetryWithToleranceOperator(connectKeyValue);
         retryWithToleranceOperator.reporters(ReporterManagerUtil.sourceTaskReporters("testConnectorName1", connectKeyValue));
 
-        WorkerSourceTask workerSourceTask1 = new WorkerSourceTask(new ConnectConfig(),new ConnectorTaskId("testConnectorName1", 1), sourceTask,null, connectKeyValue, positionManagementServiceImpl, converter, producer, workerState, connectStatsManager, connectStatsService, transformChain,retryWithToleranceOperator);
+        WorkerSourceTask workerSourceTask1 = new WorkerSourceTask(new ConnectConfig(),new ConnectorTaskId("testConnectorName1", 1), sourceTask,null, connectKeyValue, positionManagementServiceImpl, converter, converter, producer, workerState, connectStatsManager, connectStatsService, transformChain,retryWithToleranceOperator);
 
         // create retry operator
         RetryWithToleranceOperator retryWithToleranceOperator02 = ReporterManagerUtil.createRetryWithToleranceOperator(connectKeyValue);
         retryWithToleranceOperator02.reporters(ReporterManagerUtil.sourceTaskReporters("testConnectorName2", connectKeyValue));
 
-        WorkerSourceTask workerSourceTask2 = new WorkerSourceTask(new ConnectConfig(), new ConnectorTaskId("testConnectorName2", 1), sourceTask,null, connectKeyValue1, positionManagementServiceImpl, converter, producer, workerState, connectStatsManager, connectStatsService, transformChain,retryWithToleranceOperator02);
+        WorkerSourceTask workerSourceTask2 = new WorkerSourceTask(new ConnectConfig(), new ConnectorTaskId("testConnectorName2", 1), sourceTask,null, connectKeyValue1, positionManagementServiceImpl, converter, converter, producer, workerState, connectStatsManager, connectStatsService, transformChain,retryWithToleranceOperator02);
         workerTasks = new HashSet<Runnable>() {
             {
                 add(workerSourceTask1);
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImplTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImplTest.java
index 03b5e01..df974a4 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImplTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImplTest.java
@@ -33,7 +33,7 @@ import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
 import org.apache.rocketmq.connect.runtime.config.RuntimeConfigDefine;
 import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
-import org.apache.rocketmq.connect.runtime.utils.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
 import org.apache.rocketmq.connect.runtime.utils.TestUtils;
 import org.apache.rocketmq.connect.runtime.utils.datasync.BrokerBasedLog;
 import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizer;