You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/07/13 11:23:25 UTC

[rocketmq-connect] branch master updated: Fix debezium demecial type conversion problem #190 (#193)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 27412d0  Fix debezium demecial type conversion problem #190 (#193)
27412d0 is described below

commit 27412d09d4d44f8f764ae08e1cbdb9929fce96ba
Author: xiaoyi <su...@163.com>
AuthorDate: Wed Jul 13 19:23:21 2022 +0800

    Fix debezium demecial type conversion problem #190 (#193)
---
 .../adaptor/schema/KafkaSinkSchemaConverter.java   | 60 ++++++++++++++++----
 .../schema/RocketMQSourceSchemaConverter.java      | 64 +++++++++++++++++-----
 .../adaptor/transforms/TransformationWrapper.java  |  1 -
 .../debezium/SchemaRenameTransformation.java       | 29 ----------
 connectors/rocketmq-connect-jdbc/pom.xml           |  5 ++
 .../jdbc/dialect/impl/GenericDatabaseDialect.java  | 13 +++++
 distribution/conf/connect-standalone.conf          |  2 +-
 .../runtime/connectorwrapper/WorkerDirectTask.java |  1 -
 .../connect/runtime/service/RebalanceImpl.java     |  5 +-
 .../memory/MemoryConfigManagementServiceImpl.java  | 11 +---
 10 files changed, 124 insertions(+), 67 deletions(-)

diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkSchemaConverter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkSchemaConverter.java
index 9e8a188..4f718b0 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkSchemaConverter.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkSchemaConverter.java
@@ -60,6 +60,8 @@ public class KafkaSinkSchemaConverter {
      */
     private SchemaBuilder convertKafkaSchema(io.openmessaging.connector.api.data.Schema originalSchema) {
         String schemaName = convertSchemaName(originalSchema.getName());
+        Map<String, String> parameters = originalSchema.getParameters() == null ? new HashMap<>() : originalSchema.getParameters();
+
         switch (originalSchema.getFieldType()) {
             case INT8:
                 return SchemaBuilder
@@ -67,70 +69,90 @@ public class KafkaSinkSchemaConverter {
                         .optional()
                         .name(schemaName)
                         .doc(originalSchema.getDoc())
-                        .defaultValue(originalSchema.getDefaultValue());
+                        .defaultValue(originalSchema.getDefaultValue())
+                        .parameters(parameters)
+                        ;
             case INT16:
                 return SchemaBuilder
                         .int16()
                         .optional()
                         .name(schemaName)
                         .doc(originalSchema.getDoc())
-                        .defaultValue(originalSchema.getDefaultValue());
+                        .defaultValue(originalSchema.getDefaultValue())
+                        .parameters(parameters)
+                        ;
             case INT32:
                 return SchemaBuilder
                         .int32()
                         .optional()
                         .name(schemaName)
                         .doc(originalSchema.getDoc())
-                        .defaultValue(originalSchema.getDefaultValue());
+                        .defaultValue(originalSchema.getDefaultValue())
+                        .parameters(parameters)
+                        ;
             case INT64:
                 return SchemaBuilder
                         .int64()
                         .optional()
                         .name(schemaName)
                         .doc(originalSchema.getDoc())
-                        .defaultValue(originalSchema.getDefaultValue());
+                        .defaultValue(originalSchema.getDefaultValue())
+                        .parameters(parameters)
+                        ;
             case FLOAT32:
                 return SchemaBuilder
                         .float32()
                         .optional()
                         .name(schemaName)
                         .doc(originalSchema.getDoc())
-                        .defaultValue(originalSchema.getDefaultValue());
+                        .defaultValue(originalSchema.getDefaultValue())
+                        .parameters(parameters)
+                        ;
             case FLOAT64:
                 return SchemaBuilder
                         .float64()
                         .optional()
                         .name(schemaName)
                         .doc(originalSchema.getDoc())
-                        .defaultValue(originalSchema.getDefaultValue());
+                        .defaultValue(originalSchema.getDefaultValue())
+                        .parameters(parameters)
+                        ;
             case BOOLEAN:
                 return SchemaBuilder
                         .bool()
                         .optional()
                         .name(schemaName)
                         .doc(originalSchema.getDoc())
-                        .defaultValue(originalSchema.getDefaultValue());
+                        .defaultValue(originalSchema.getDefaultValue())
+                        .parameters(parameters)
+                        ;
             case STRING:
                 return SchemaBuilder.
                         string()
                         .optional()
                         .name(schemaName)
                         .doc(originalSchema.getDoc())
-                        .defaultValue(originalSchema.getDefaultValue());
+                        .defaultValue(originalSchema.getDefaultValue())
+                        .parameters(parameters)
+                        ;
             case BYTES:
                 return SchemaBuilder
                         .bytes()
                         .optional()
                         .name(schemaName)
                         .doc(originalSchema.getDoc())
-                        .defaultValue(originalSchema.getDefaultValue());
+                        .defaultValue(originalSchema.getDefaultValue())
+                        .parameters(parameters)
+                        ;
             case STRUCT:
                 SchemaBuilder schemaBuilder = SchemaBuilder
                         .struct()
                         .optional()
                         .name(schemaName)
                         .doc(originalSchema.getDoc())
-                        .defaultValue(originalSchema.getDefaultValue());
+                        .defaultValue(originalSchema.getDefaultValue())
+                        .parameters(parameters)
+                        ;
                 convertStructSchema(schemaBuilder, originalSchema);
                 return schemaBuilder;
             case ARRAY:
@@ -138,7 +160,9 @@ public class KafkaSinkSchemaConverter {
                         .optional()
                         .name(schemaName)
                         .doc(originalSchema.getDoc())
-                        .defaultValue(originalSchema.getDefaultValue());
+                        .defaultValue(originalSchema.getDefaultValue())
+                        .parameters(parameters)
+                        ;
             case MAP:
                 return SchemaBuilder.map(
                         convertKafkaSchema(originalSchema.getKeySchema()).build(),
@@ -146,7 +170,9 @@ public class KafkaSinkSchemaConverter {
                 ).optional()
                         .name(schemaName)
                         .doc(originalSchema.getDoc())
-                        .defaultValue(originalSchema.getDefaultValue());
+                        .defaultValue(originalSchema.getDefaultValue())
+                        .parameters(parameters)
+                        ;
             default:
                 throw new RuntimeException(" Type not supported: {}" + originalSchema.getFieldType());
 
@@ -172,6 +198,7 @@ public class KafkaSinkSchemaConverter {
                 String fieldName =  field.getName();
                 FieldType type = schema.getFieldType();
 
+                Map<String, String> parameters = schema.getParameters() == null ? new HashMap<>() : schema.getParameters();
 
                 switch (type) {
                     case INT8:
@@ -182,6 +209,7 @@ public class KafkaSinkSchemaConverter {
                                         .name(schemaName)
                                         .doc(schema.getDoc())
                                         .defaultValue(schema.getDefaultValue())
+                                        .parameters(parameters)
                                         .optional()
                                         .build()
                         );
@@ -194,6 +222,7 @@ public class KafkaSinkSchemaConverter {
                                         .name(schemaName)
                                         .doc(schema.getDoc())
                                         .defaultValue(schema.getDefaultValue())
+                                        .parameters(parameters)
                                         .optional()
                                         .build()
                         );
@@ -206,6 +235,7 @@ public class KafkaSinkSchemaConverter {
                                         .name(schemaName)
                                         .doc(schema.getDoc())
                                         .defaultValue(schema.getDefaultValue())
+                                        .parameters(parameters)
                                         .optional()
                                         .build()
                         );
@@ -218,6 +248,7 @@ public class KafkaSinkSchemaConverter {
                                         .name(schemaName)
                                         .doc(schema.getDoc())
                                         .defaultValue(schema.getDefaultValue())
+                                        .parameters(parameters)
                                         .optional()
                                         .build()
                         );
@@ -230,6 +261,7 @@ public class KafkaSinkSchemaConverter {
                                         .name(schemaName)
                                         .doc(schema.getDoc())
                                         .defaultValue(schema.getDefaultValue())
+                                        .parameters(parameters)
                                         .optional()
                                         .build()
                         );
@@ -242,6 +274,7 @@ public class KafkaSinkSchemaConverter {
                                         .name(schemaName)
                                         .doc(schema.getDoc())
                                         .defaultValue(schema.getDefaultValue())
+                                        .parameters(parameters)
                                         .optional()
                                         .build()
                         );
@@ -254,6 +287,7 @@ public class KafkaSinkSchemaConverter {
                                         .name(schemaName)
                                         .doc(schema.getDoc())
                                         .defaultValue(schema.getDefaultValue())
+                                        .parameters(parameters)
                                         .optional()
                                         .build()
                         );
@@ -266,6 +300,7 @@ public class KafkaSinkSchemaConverter {
                                         .name(schemaName)
                                         .doc(schema.getDoc())
                                         .defaultValue(schema.getDefaultValue())
+                                        .parameters(parameters)
                                         .optional()
                                         .build()
                         );
@@ -278,6 +313,7 @@ public class KafkaSinkSchemaConverter {
                                         .name(schemaName)
                                         .doc(schema.getDoc())
                                         .defaultValue(schema.getDefaultValue())
+                                        .parameters(parameters)
                                         .optional()
                                         .build()
                         );
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceSchemaConverter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceSchemaConverter.java
index b357fd7..5ca408e 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceSchemaConverter.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceSchemaConverter.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.connect.kafka.connect.adaptor.schema;
 
+import com.beust.jcommander.internal.Maps;
 import io.openmessaging.connector.api.data.SchemaBuilder;
 import io.openmessaging.connector.api.data.logical.Timestamp;
 import io.openmessaging.connector.api.errors.ConnectException;
@@ -25,7 +26,6 @@ import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.data.Field;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Time;
-import org.apache.kafka.connect.source.SourceRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,6 +58,7 @@ public class RocketMQSourceSchemaConverter {
 
     private SchemaBuilder convertKafkaSchema(org.apache.kafka.connect.data.Schema originalSchema) {
         String schemaName = convertSchemaName(originalSchema.name());
+        Map<String, String> parameters = originalSchema.parameters() == null ? new HashMap<>() : originalSchema.parameters();
         switch (originalSchema.type()) {
             case INT8:
                 return SchemaBuilder
@@ -65,70 +66,90 @@ public class RocketMQSourceSchemaConverter {
                         .optional()
                         .name(schemaName)
                         .doc(originalSchema.doc())
-                        .defaultValue(originalSchema.defaultValue());
+                        .defaultValue(originalSchema.defaultValue())
+                        .parameters(parameters)
+                        ;
             case INT16:
                 return SchemaBuilder
                         .int16()
                         .optional()
                         .name(schemaName)
                         .doc(originalSchema.doc())
-                        .defaultValue(originalSchema.defaultValue());
+                        .defaultValue(originalSchema.defaultValue())
+                        .parameters(parameters)
+                        ;
             case INT32:
                 return SchemaBuilder
                         .int32()
                         .optional()
                         .name(schemaName)
                         .doc(originalSchema.doc())
-                        .defaultValue(originalSchema.defaultValue());
+                        .defaultValue(originalSchema.defaultValue())
+                        .parameters(parameters)
+                        ;
             case INT64:
                 return SchemaBuilder
                         .int64()
                         .optional()
                         .name(schemaName)
                         .doc(originalSchema.doc())
-                        .defaultValue(originalSchema.defaultValue());
+                        .defaultValue(originalSchema.defaultValue())
+                        .parameters(parameters)
+                        ;
             case FLOAT32:
                 return SchemaBuilder
                         .float32()
                         .optional()
                         .name(schemaName)
                         .doc(originalSchema.doc())
-                        .defaultValue(originalSchema.defaultValue());
+                        .defaultValue(originalSchema.defaultValue())
+                        .parameters(parameters)
+                        ;
             case FLOAT64:
                 return SchemaBuilder
                         .float64()
                         .optional()
                         .name(schemaName)
                         .doc(originalSchema.doc())
-                        .defaultValue(originalSchema.defaultValue());
+                        .defaultValue(originalSchema.defaultValue())
+                        .parameters(parameters)
+                        ;
             case BOOLEAN:
                 return SchemaBuilder
                         .bool()
                         .optional()
                         .name(schemaName)
                         .doc(originalSchema.doc())
-                        .defaultValue(originalSchema.defaultValue());
+                        .defaultValue(originalSchema.defaultValue())
+                        .parameters(parameters)
+                        ;
             case STRING:
                 return SchemaBuilder.
                         string()
                         .optional()
                         .name(schemaName)
                         .doc(originalSchema.doc())
-                        .defaultValue(originalSchema.defaultValue());
+                        .defaultValue(originalSchema.defaultValue())
+                        .parameters(parameters)
+                        ;
             case BYTES:
                 return SchemaBuilder
                         .bytes()
                         .optional()
                         .name(schemaName)
                         .doc(originalSchema.doc())
-                        .defaultValue(originalSchema.defaultValue());
+                        .defaultValue(originalSchema.defaultValue())
+                        .parameters(parameters)
+                        ;
             case STRUCT:
                 SchemaBuilder schemaBuilder = SchemaBuilder
                         .struct()
                         .optional()
                         .name(schemaName)
                         .doc(originalSchema.doc())
-                        .defaultValue(originalSchema.defaultValue());
+                        .defaultValue(originalSchema.defaultValue())
+                        .parameters(parameters)
+                        ;
                 convertStructSchema(schemaBuilder, originalSchema);
                 return schemaBuilder;
             case ARRAY:
@@ -136,7 +157,9 @@ public class RocketMQSourceSchemaConverter {
                         .optional()
                         .name(schemaName)
                         .doc(originalSchema.doc())
-                        .defaultValue(originalSchema.defaultValue());
+                        .defaultValue(originalSchema.defaultValue())
+                        .parameters(parameters)
+                        ;
             case MAP:
                 return SchemaBuilder.map(
                         convertKafkaSchema(originalSchema.keySchema()).build(),
@@ -144,7 +167,9 @@ public class RocketMQSourceSchemaConverter {
                 ).optional()
                         .name(schemaName)
                         .doc(originalSchema.doc())
-                        .defaultValue(originalSchema.defaultValue());
+                        .defaultValue(originalSchema.defaultValue())
+                        .parameters(parameters)
+                        ;
             default:
                 throw new RuntimeException(" Type not supported: {}" + originalSchema.type());
 
@@ -163,7 +188,8 @@ public class RocketMQSourceSchemaConverter {
             try {
                 Schema schema = field.schema();
                 org.apache.kafka.connect.data.Schema.Type type = schema.type();
-                String schemaName =  convertSchemaName(field.schema().name());
+                String schemaName = convertSchemaName(schema.name());
+                Map<String, String> parameters = schema.parameters() == null ? new HashMap<>() : schema.parameters();
                 switch (type) {
                     case INT8:
                         schemaBuilder.field(
@@ -173,6 +199,7 @@ public class RocketMQSourceSchemaConverter {
                                         .name(schemaName)
                                         .doc(schema.doc())
                                         .defaultValue(schema.defaultValue())
+                                        .parameters(parameters)
                                         .optional()
                                         .build()
                         );
@@ -185,6 +212,7 @@ public class RocketMQSourceSchemaConverter {
                                         .name(schemaName)
                                         .doc(schema.doc())
                                         .defaultValue(schema.defaultValue())
+                                        .parameters(parameters)
                                         .optional()
                                         .build()
                         );
@@ -197,9 +225,11 @@ public class RocketMQSourceSchemaConverter {
                                         .name(schemaName)
                                         .doc(schema.doc())
                                         .defaultValue(schema.defaultValue())
+                                        .parameters(parameters)
                                         .optional()
                                         .build()
                         );
+
                         break;
                     case INT64:
                         schemaBuilder.field(
@@ -209,6 +239,7 @@ public class RocketMQSourceSchemaConverter {
                                         .name(schemaName)
                                         .doc(schema.doc())
                                         .defaultValue(schema.defaultValue())
+                                        .parameters(parameters)
                                         .optional()
                                         .build()
                         );
@@ -221,6 +252,7 @@ public class RocketMQSourceSchemaConverter {
                                         .name(schemaName)
                                         .doc(schema.doc())
                                         .defaultValue(schema.defaultValue())
+                                        .parameters(parameters)
                                         .optional()
                                         .build()
                         );
@@ -233,6 +265,7 @@ public class RocketMQSourceSchemaConverter {
                                         .name(schemaName)
                                         .doc(schema.doc())
                                         .defaultValue(schema.defaultValue())
+                                        .parameters(parameters)
                                         .optional()
                                         .build()
                         );
@@ -245,6 +278,7 @@ public class RocketMQSourceSchemaConverter {
                                         .name(schemaName)
                                         .doc(schema.doc())
                                         .defaultValue(schema.defaultValue())
+                                        .parameters(parameters)
                                         .optional()
                                         .build()
                         );
@@ -257,6 +291,7 @@ public class RocketMQSourceSchemaConverter {
                                         .name(schemaName)
                                         .doc(schema.doc())
                                         .defaultValue(schema.defaultValue())
+                                        .parameters(parameters)
                                         .optional()
                                         .build()
                         );
@@ -269,6 +304,7 @@ public class RocketMQSourceSchemaConverter {
                                         .name(schemaName)
                                         .doc(schema.doc())
                                         .defaultValue(schema.defaultValue())
+                                        .parameters(parameters)
                                         .optional()
                                         .build()
                         );
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/transforms/TransformationWrapper.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/transforms/TransformationWrapper.java
index c0700a6..4b43474 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/transforms/TransformationWrapper.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/transforms/TransformationWrapper.java
@@ -18,7 +18,6 @@
 package org.apache.rocketmq.connect.kafka.connect.adaptor.transforms;
 
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.transforms.Transformation;
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/SchemaRenameTransformation.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/SchemaRenameTransformation.java
deleted file mode 100644
index 9d5b796..0000000
--- a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/SchemaRenameTransformation.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.apache.rocketmq.connect.debezium;
-
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.connect.connector.ConnectRecord;
-import org.apache.kafka.connect.transforms.Transformation;
-
-import java.util.Map;
-
-public class SchemaRenameTransformation implements Transformation {
-    @Override
-    public ConnectRecord apply(ConnectRecord connectRecord) {
-        return null;
-    }
-
-    @Override
-    public ConfigDef config() {
-        return null;
-    }
-
-    @Override
-    public void close() {
-
-    }
-
-    @Override
-    public void configure(Map<String, ?> map) {
-
-    }
-}
diff --git a/connectors/rocketmq-connect-jdbc/pom.xml b/connectors/rocketmq-connect-jdbc/pom.xml
index 4113d1a..cc810b4 100644
--- a/connectors/rocketmq-connect-jdbc/pom.xml
+++ b/connectors/rocketmq-connect-jdbc/pom.xml
@@ -147,6 +147,11 @@
             <artifactId>commons-cli</artifactId>
             <version>1.2</version>
         </dependency>
+        <dependency>
+            <groupId>io.debezium</groupId>
+            <artifactId>debezium-core</artifactId>
+            <version>1.7.2.Final</version>
+        </dependency>
 
         <dependency>
             <groupId>mysql</groupId>
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/GenericDatabaseDialect.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/GenericDatabaseDialect.java
index 81cec14..0c26be3 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/GenericDatabaseDialect.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/GenericDatabaseDialect.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.connect.jdbc.dialect.impl;
 
+import io.debezium.time.ZonedTimestamp;
 import io.openmessaging.connector.api.data.FieldType;
 import io.openmessaging.connector.api.data.Schema;
 import io.openmessaging.connector.api.data.SchemaBuilder;
@@ -69,7 +70,9 @@ import java.sql.SQLXML;
 import java.sql.Statement;
 import java.sql.Timestamp;
 import java.sql.Types;
+import java.time.LocalDateTime;
 import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
@@ -1625,6 +1628,16 @@ public class GenericDatabaseDialect implements DatabaseDialect {
                             DateTimeUtils.getTimeZoneCalendar(timeZone)
                     );
                     return true;
+                case ZonedTimestamp.SCHEMA_NAME:
+                    DateTimeFormatter formatter = ZonedTimestamp.FORMATTER;
+                    LocalDateTime localDateTime=LocalDateTime.parse(value.toString(),formatter);
+                    Long format = localDateTime.toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
+                    statement.setTimestamp(index,
+                            new java.sql.Timestamp(format),
+                            DateTimeUtils.getTimeZoneCalendar(timeZone)
+                    );
+                    return true;
+
                 default:
                     return false;
             }
diff --git a/distribution/conf/connect-standalone.conf b/distribution/conf/connect-standalone.conf
index 88134cf..06c16c8 100644
--- a/distribution/conf/connect-standalone.conf
+++ b/distribution/conf/connect-standalone.conf
@@ -14,7 +14,7 @@
 #  limitations under the License.
 
 workerId=standalone-worker
-storePathRootDir=/tmp/storeRoot
+storePathRootDir=/tmp/standalone/storeRoot
 
 ## Http port for user to access REST API
 httpPort=8082
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
index 9c2b7d1..3a56d72 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
@@ -178,7 +178,6 @@ public class WorkerDirectTask implements WorkerTask {
             public KeyValue configs() {
                 return taskConfig;
             }
-
             @Override
             public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) {
 
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
index 7d8cbad..a041a58 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
@@ -63,7 +63,10 @@ public class RebalanceImpl {
     private final AbstractConnectController connectController;
 
     public RebalanceImpl(Worker worker,
-                         ConfigManagementService configManagementService, ClusterManagementService clusterManagementService, AllocateConnAndTaskStrategy strategy, AbstractConnectController connectController) {
+                         ConfigManagementService configManagementService,
+                         ClusterManagementService clusterManagementService,
+                         AllocateConnAndTaskStrategy strategy,
+                         AbstractConnectController connectController) {
 
         this.worker = worker;
         this.configManagementService = configManagementService;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
index 5f763ab..d4574ba 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
@@ -30,6 +30,7 @@ import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
 import org.apache.rocketmq.connect.runtime.service.StagingMode;
 import org.apache.rocketmq.connect.runtime.store.FileBaseKeyValueStore;
 import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
+import org.apache.rocketmq.connect.runtime.store.MemoryBasedKeyValueStore;
 import org.apache.rocketmq.connect.runtime.utils.FilePathConfigUtil;
 import org.apache.rocketmq.connect.runtime.utils.Plugin;
 import org.slf4j.Logger;
@@ -68,14 +69,8 @@ public class MemoryConfigManagementServiceImpl implements ConfigManagementServic
     }
 
     @Override public void initialize(ConnectConfig connectConfig, Plugin plugin) {
-        this.connectorKeyValueStore = new FileBaseKeyValueStore<>(
-            FilePathConfigUtil.getConnectorConfigPath(connectConfig.getStorePathRootDir()),
-            new JsonConverter(),
-            new JsonConverter(ConnectKeyValue.class));
-        this.taskKeyValueStore = new FileBaseKeyValueStore<>(
-            FilePathConfigUtil.getTaskConfigPath(connectConfig.getStorePathRootDir()),
-            new JsonConverter(),
-            new ListConverter(ConnectKeyValue.class));
+        this.connectorKeyValueStore = new MemoryBasedKeyValueStore<>();
+        this.taskKeyValueStore = new MemoryBasedKeyValueStore<>();
         this.plugin = plugin;
     }