You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/07/13 11:23:25 UTC
[rocketmq-connect] branch master updated: Fix debezium demecial type conversion problem #190 (#193)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 27412d0 Fix debezium demecial type conversion problem #190 (#193)
27412d0 is described below
commit 27412d09d4d44f8f764ae08e1cbdb9929fce96ba
Author: xiaoyi <su...@163.com>
AuthorDate: Wed Jul 13 19:23:21 2022 +0800
Fix debezium demecial type conversion problem #190 (#193)
---
.../adaptor/schema/KafkaSinkSchemaConverter.java | 60 ++++++++++++++++----
.../schema/RocketMQSourceSchemaConverter.java | 64 +++++++++++++++++-----
.../adaptor/transforms/TransformationWrapper.java | 1 -
.../debezium/SchemaRenameTransformation.java | 29 ----------
connectors/rocketmq-connect-jdbc/pom.xml | 5 ++
.../jdbc/dialect/impl/GenericDatabaseDialect.java | 13 +++++
distribution/conf/connect-standalone.conf | 2 +-
.../runtime/connectorwrapper/WorkerDirectTask.java | 1 -
.../connect/runtime/service/RebalanceImpl.java | 5 +-
.../memory/MemoryConfigManagementServiceImpl.java | 11 +---
10 files changed, 124 insertions(+), 67 deletions(-)
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkSchemaConverter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkSchemaConverter.java
index 9e8a188..4f718b0 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkSchemaConverter.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/KafkaSinkSchemaConverter.java
@@ -60,6 +60,8 @@ public class KafkaSinkSchemaConverter {
*/
private SchemaBuilder convertKafkaSchema(io.openmessaging.connector.api.data.Schema originalSchema) {
String schemaName = convertSchemaName(originalSchema.getName());
+ Map<String, String> parameters = originalSchema.getParameters() == null ? new HashMap<>() : originalSchema.getParameters();
+
switch (originalSchema.getFieldType()) {
case INT8:
return SchemaBuilder
@@ -67,70 +69,90 @@ public class KafkaSinkSchemaConverter {
.optional()
.name(schemaName)
.doc(originalSchema.getDoc())
- .defaultValue(originalSchema.getDefaultValue());
+ .defaultValue(originalSchema.getDefaultValue())
+ .parameters(parameters)
+ ;
case INT16:
return SchemaBuilder
.int16()
.optional()
.name(schemaName)
.doc(originalSchema.getDoc())
- .defaultValue(originalSchema.getDefaultValue());
+ .defaultValue(originalSchema.getDefaultValue())
+ .parameters(parameters)
+ ;
case INT32:
return SchemaBuilder
.int32()
.optional()
.name(schemaName)
.doc(originalSchema.getDoc())
- .defaultValue(originalSchema.getDefaultValue());
+ .defaultValue(originalSchema.getDefaultValue())
+ .parameters(parameters)
+ ;
case INT64:
return SchemaBuilder
.int64()
.optional()
.name(schemaName)
.doc(originalSchema.getDoc())
- .defaultValue(originalSchema.getDefaultValue());
+ .defaultValue(originalSchema.getDefaultValue())
+ .parameters(parameters)
+ ;
case FLOAT32:
return SchemaBuilder
.float32()
.optional()
.name(schemaName)
.doc(originalSchema.getDoc())
- .defaultValue(originalSchema.getDefaultValue());
+ .defaultValue(originalSchema.getDefaultValue())
+ .parameters(parameters)
+ ;
case FLOAT64:
return SchemaBuilder
.float64()
.optional()
.name(schemaName)
.doc(originalSchema.getDoc())
- .defaultValue(originalSchema.getDefaultValue());
+ .defaultValue(originalSchema.getDefaultValue())
+ .parameters(parameters)
+ ;
case BOOLEAN:
return SchemaBuilder
.bool()
.optional()
.name(schemaName)
.doc(originalSchema.getDoc())
- .defaultValue(originalSchema.getDefaultValue());
+ .defaultValue(originalSchema.getDefaultValue())
+ .parameters(parameters)
+ ;
case STRING:
return SchemaBuilder.
string()
.optional()
.name(schemaName)
.doc(originalSchema.getDoc())
- .defaultValue(originalSchema.getDefaultValue());
+ .defaultValue(originalSchema.getDefaultValue())
+ .parameters(parameters)
+ ;
case BYTES:
return SchemaBuilder
.bytes()
.optional()
.name(schemaName)
.doc(originalSchema.getDoc())
- .defaultValue(originalSchema.getDefaultValue());
+ .defaultValue(originalSchema.getDefaultValue())
+ .parameters(parameters)
+ ;
case STRUCT:
SchemaBuilder schemaBuilder = SchemaBuilder
.struct()
.optional()
.name(schemaName)
.doc(originalSchema.getDoc())
- .defaultValue(originalSchema.getDefaultValue());
+ .defaultValue(originalSchema.getDefaultValue())
+ .parameters(parameters)
+ ;
convertStructSchema(schemaBuilder, originalSchema);
return schemaBuilder;
case ARRAY:
@@ -138,7 +160,9 @@ public class KafkaSinkSchemaConverter {
.optional()
.name(schemaName)
.doc(originalSchema.getDoc())
- .defaultValue(originalSchema.getDefaultValue());
+ .defaultValue(originalSchema.getDefaultValue())
+ .parameters(parameters)
+ ;
case MAP:
return SchemaBuilder.map(
convertKafkaSchema(originalSchema.getKeySchema()).build(),
@@ -146,7 +170,9 @@ public class KafkaSinkSchemaConverter {
).optional()
.name(schemaName)
.doc(originalSchema.getDoc())
- .defaultValue(originalSchema.getDefaultValue());
+ .defaultValue(originalSchema.getDefaultValue())
+ .parameters(parameters)
+ ;
default:
throw new RuntimeException(" Type not supported: {}" + originalSchema.getFieldType());
@@ -172,6 +198,7 @@ public class KafkaSinkSchemaConverter {
String fieldName = field.getName();
FieldType type = schema.getFieldType();
+ Map<String, String> parameters = schema.getParameters() == null ? new HashMap<>() : schema.getParameters();
switch (type) {
case INT8:
@@ -182,6 +209,7 @@ public class KafkaSinkSchemaConverter {
.name(schemaName)
.doc(schema.getDoc())
.defaultValue(schema.getDefaultValue())
+ .parameters(parameters)
.optional()
.build()
);
@@ -194,6 +222,7 @@ public class KafkaSinkSchemaConverter {
.name(schemaName)
.doc(schema.getDoc())
.defaultValue(schema.getDefaultValue())
+ .parameters(parameters)
.optional()
.build()
);
@@ -206,6 +235,7 @@ public class KafkaSinkSchemaConverter {
.name(schemaName)
.doc(schema.getDoc())
.defaultValue(schema.getDefaultValue())
+ .parameters(parameters)
.optional()
.build()
);
@@ -218,6 +248,7 @@ public class KafkaSinkSchemaConverter {
.name(schemaName)
.doc(schema.getDoc())
.defaultValue(schema.getDefaultValue())
+ .parameters(parameters)
.optional()
.build()
);
@@ -230,6 +261,7 @@ public class KafkaSinkSchemaConverter {
.name(schemaName)
.doc(schema.getDoc())
.defaultValue(schema.getDefaultValue())
+ .parameters(parameters)
.optional()
.build()
);
@@ -242,6 +274,7 @@ public class KafkaSinkSchemaConverter {
.name(schemaName)
.doc(schema.getDoc())
.defaultValue(schema.getDefaultValue())
+ .parameters(parameters)
.optional()
.build()
);
@@ -254,6 +287,7 @@ public class KafkaSinkSchemaConverter {
.name(schemaName)
.doc(schema.getDoc())
.defaultValue(schema.getDefaultValue())
+ .parameters(parameters)
.optional()
.build()
);
@@ -266,6 +300,7 @@ public class KafkaSinkSchemaConverter {
.name(schemaName)
.doc(schema.getDoc())
.defaultValue(schema.getDefaultValue())
+ .parameters(parameters)
.optional()
.build()
);
@@ -278,6 +313,7 @@ public class KafkaSinkSchemaConverter {
.name(schemaName)
.doc(schema.getDoc())
.defaultValue(schema.getDefaultValue())
+ .parameters(parameters)
.optional()
.build()
);
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceSchemaConverter.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceSchemaConverter.java
index b357fd7..5ca408e 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceSchemaConverter.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/schema/RocketMQSourceSchemaConverter.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.connect.kafka.connect.adaptor.schema;
+import com.beust.jcommander.internal.Maps;
import io.openmessaging.connector.api.data.SchemaBuilder;
import io.openmessaging.connector.api.data.logical.Timestamp;
import io.openmessaging.connector.api.errors.ConnectException;
@@ -25,7 +26,6 @@ import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Time;
-import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,6 +58,7 @@ public class RocketMQSourceSchemaConverter {
private SchemaBuilder convertKafkaSchema(org.apache.kafka.connect.data.Schema originalSchema) {
String schemaName = convertSchemaName(originalSchema.name());
+ Map<String, String> parameters = originalSchema.parameters() == null ? new HashMap<>() : originalSchema.parameters();
switch (originalSchema.type()) {
case INT8:
return SchemaBuilder
@@ -65,70 +66,90 @@ public class RocketMQSourceSchemaConverter {
.optional()
.name(schemaName)
.doc(originalSchema.doc())
- .defaultValue(originalSchema.defaultValue());
+ .defaultValue(originalSchema.defaultValue())
+ .parameters(parameters)
+ ;
case INT16:
return SchemaBuilder
.int16()
.optional()
.name(schemaName)
.doc(originalSchema.doc())
- .defaultValue(originalSchema.defaultValue());
+ .defaultValue(originalSchema.defaultValue())
+ .parameters(parameters)
+ ;
case INT32:
return SchemaBuilder
.int32()
.optional()
.name(schemaName)
.doc(originalSchema.doc())
- .defaultValue(originalSchema.defaultValue());
+ .defaultValue(originalSchema.defaultValue())
+ .parameters(parameters)
+ ;
case INT64:
return SchemaBuilder
.int64()
.optional()
.name(schemaName)
.doc(originalSchema.doc())
- .defaultValue(originalSchema.defaultValue());
+ .defaultValue(originalSchema.defaultValue())
+ .parameters(parameters)
+ ;
case FLOAT32:
return SchemaBuilder
.float32()
.optional()
.name(schemaName)
.doc(originalSchema.doc())
- .defaultValue(originalSchema.defaultValue());
+ .defaultValue(originalSchema.defaultValue())
+ .parameters(parameters)
+ ;
case FLOAT64:
return SchemaBuilder
.float64()
.optional()
.name(schemaName)
.doc(originalSchema.doc())
- .defaultValue(originalSchema.defaultValue());
+ .defaultValue(originalSchema.defaultValue())
+ .parameters(parameters)
+ ;
case BOOLEAN:
return SchemaBuilder
.bool()
.optional()
.name(schemaName)
.doc(originalSchema.doc())
- .defaultValue(originalSchema.defaultValue());
+ .defaultValue(originalSchema.defaultValue())
+ .parameters(parameters)
+ ;
case STRING:
return SchemaBuilder.
string()
.optional()
.name(schemaName)
.doc(originalSchema.doc())
- .defaultValue(originalSchema.defaultValue());
+ .defaultValue(originalSchema.defaultValue())
+ .parameters(parameters)
+ ;
case BYTES:
return SchemaBuilder
.bytes()
.optional()
.name(schemaName)
.doc(originalSchema.doc())
- .defaultValue(originalSchema.defaultValue());
+ .defaultValue(originalSchema.defaultValue())
+ .parameters(parameters)
+ ;
case STRUCT:
SchemaBuilder schemaBuilder = SchemaBuilder
.struct()
.optional()
.name(schemaName)
.doc(originalSchema.doc())
- .defaultValue(originalSchema.defaultValue());
+ .defaultValue(originalSchema.defaultValue())
+ .parameters(parameters)
+ ;
convertStructSchema(schemaBuilder, originalSchema);
return schemaBuilder;
case ARRAY:
@@ -136,7 +157,9 @@ public class RocketMQSourceSchemaConverter {
.optional()
.name(schemaName)
.doc(originalSchema.doc())
- .defaultValue(originalSchema.defaultValue());
+ .defaultValue(originalSchema.defaultValue())
+ .parameters(parameters)
+ ;
case MAP:
return SchemaBuilder.map(
convertKafkaSchema(originalSchema.keySchema()).build(),
@@ -144,7 +167,9 @@ public class RocketMQSourceSchemaConverter {
).optional()
.name(schemaName)
.doc(originalSchema.doc())
- .defaultValue(originalSchema.defaultValue());
+ .defaultValue(originalSchema.defaultValue())
+ .parameters(parameters)
+ ;
default:
throw new RuntimeException(" Type not supported: {}" + originalSchema.type());
@@ -163,7 +188,8 @@ public class RocketMQSourceSchemaConverter {
try {
Schema schema = field.schema();
org.apache.kafka.connect.data.Schema.Type type = schema.type();
- String schemaName = convertSchemaName(field.schema().name());
+ String schemaName = convertSchemaName(schema.name());
+ Map<String, String> parameters = schema.parameters() == null ? new HashMap<>() : schema.parameters();
switch (type) {
case INT8:
schemaBuilder.field(
@@ -173,6 +199,7 @@ public class RocketMQSourceSchemaConverter {
.name(schemaName)
.doc(schema.doc())
.defaultValue(schema.defaultValue())
+ .parameters(parameters)
.optional()
.build()
);
@@ -185,6 +212,7 @@ public class RocketMQSourceSchemaConverter {
.name(schemaName)
.doc(schema.doc())
.defaultValue(schema.defaultValue())
+ .parameters(parameters)
.optional()
.build()
);
@@ -197,9 +225,11 @@ public class RocketMQSourceSchemaConverter {
.name(schemaName)
.doc(schema.doc())
.defaultValue(schema.defaultValue())
+ .parameters(parameters)
.optional()
.build()
);
+
break;
case INT64:
schemaBuilder.field(
@@ -209,6 +239,7 @@ public class RocketMQSourceSchemaConverter {
.name(schemaName)
.doc(schema.doc())
.defaultValue(schema.defaultValue())
+ .parameters(parameters)
.optional()
.build()
);
@@ -221,6 +252,7 @@ public class RocketMQSourceSchemaConverter {
.name(schemaName)
.doc(schema.doc())
.defaultValue(schema.defaultValue())
+ .parameters(parameters)
.optional()
.build()
);
@@ -233,6 +265,7 @@ public class RocketMQSourceSchemaConverter {
.name(schemaName)
.doc(schema.doc())
.defaultValue(schema.defaultValue())
+ .parameters(parameters)
.optional()
.build()
);
@@ -245,6 +278,7 @@ public class RocketMQSourceSchemaConverter {
.name(schemaName)
.doc(schema.doc())
.defaultValue(schema.defaultValue())
+ .parameters(parameters)
.optional()
.build()
);
@@ -257,6 +291,7 @@ public class RocketMQSourceSchemaConverter {
.name(schemaName)
.doc(schema.doc())
.defaultValue(schema.defaultValue())
+ .parameters(parameters)
.optional()
.build()
);
@@ -269,6 +304,7 @@ public class RocketMQSourceSchemaConverter {
.name(schemaName)
.doc(schema.doc())
.defaultValue(schema.defaultValue())
+ .parameters(parameters)
.optional()
.build()
);
diff --git a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/transforms/TransformationWrapper.java b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/transforms/TransformationWrapper.java
index c0700a6..4b43474 100644
--- a/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/transforms/TransformationWrapper.java
+++ b/connectors/rocketmq-connect-debezium/kafka-connect-adaptor/src/main/java/org/apache/rocketmq/connect/kafka/connect/adaptor/transforms/TransformationWrapper.java
@@ -18,7 +18,6 @@
package org.apache.rocketmq.connect.kafka.connect.adaptor.transforms;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.Transformation;
diff --git a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/SchemaRenameTransformation.java b/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/SchemaRenameTransformation.java
deleted file mode 100644
index 9d5b796..0000000
--- a/connectors/rocketmq-connect-debezium/rocketmq-connect-debezium-core/src/main/java/org/apache/rocketmq/connect/debezium/SchemaRenameTransformation.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.apache.rocketmq.connect.debezium;
-
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.connect.connector.ConnectRecord;
-import org.apache.kafka.connect.transforms.Transformation;
-
-import java.util.Map;
-
-public class SchemaRenameTransformation implements Transformation {
- @Override
- public ConnectRecord apply(ConnectRecord connectRecord) {
- return null;
- }
-
- @Override
- public ConfigDef config() {
- return null;
- }
-
- @Override
- public void close() {
-
- }
-
- @Override
- public void configure(Map<String, ?> map) {
-
- }
-}
diff --git a/connectors/rocketmq-connect-jdbc/pom.xml b/connectors/rocketmq-connect-jdbc/pom.xml
index 4113d1a..cc810b4 100644
--- a/connectors/rocketmq-connect-jdbc/pom.xml
+++ b/connectors/rocketmq-connect-jdbc/pom.xml
@@ -147,6 +147,11 @@
<artifactId>commons-cli</artifactId>
<version>1.2</version>
</dependency>
+ <dependency>
+ <groupId>io.debezium</groupId>
+ <artifactId>debezium-core</artifactId>
+ <version>1.7.2.Final</version>
+ </dependency>
<dependency>
<groupId>mysql</groupId>
diff --git a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/GenericDatabaseDialect.java b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/GenericDatabaseDialect.java
index 81cec14..0c26be3 100644
--- a/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/GenericDatabaseDialect.java
+++ b/connectors/rocketmq-connect-jdbc/src/main/java/org/apache/rocketmq/connect/jdbc/dialect/impl/GenericDatabaseDialect.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.connect.jdbc.dialect.impl;
+import io.debezium.time.ZonedTimestamp;
import io.openmessaging.connector.api.data.FieldType;
import io.openmessaging.connector.api.data.Schema;
import io.openmessaging.connector.api.data.SchemaBuilder;
@@ -69,7 +70,9 @@ import java.sql.SQLXML;
import java.sql.Statement;
import java.sql.Timestamp;
import java.sql.Types;
+import java.time.LocalDateTime;
import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
@@ -1625,6 +1628,16 @@ public class GenericDatabaseDialect implements DatabaseDialect {
DateTimeUtils.getTimeZoneCalendar(timeZone)
);
return true;
+ case ZonedTimestamp.SCHEMA_NAME:
+ DateTimeFormatter formatter = ZonedTimestamp.FORMATTER;
+ LocalDateTime localDateTime=LocalDateTime.parse(value.toString(),formatter);
+ Long format = localDateTime.toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
+ statement.setTimestamp(index,
+ new java.sql.Timestamp(format),
+ DateTimeUtils.getTimeZoneCalendar(timeZone)
+ );
+ return true;
+
default:
return false;
}
diff --git a/distribution/conf/connect-standalone.conf b/distribution/conf/connect-standalone.conf
index 88134cf..06c16c8 100644
--- a/distribution/conf/connect-standalone.conf
+++ b/distribution/conf/connect-standalone.conf
@@ -14,7 +14,7 @@
# limitations under the License.
workerId=standalone-worker
-storePathRootDir=/tmp/storeRoot
+storePathRootDir=/tmp/standalone/storeRoot
## Http port for user to access REST API
httpPort=8082
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
index 9c2b7d1..3a56d72 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerDirectTask.java
@@ -178,7 +178,6 @@ public class WorkerDirectTask implements WorkerTask {
public KeyValue configs() {
return taskConfig;
}
-
@Override
public void resetOffset(RecordPartition recordPartition, RecordOffset recordOffset) {
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
index 7d8cbad..a041a58 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
@@ -63,7 +63,10 @@ public class RebalanceImpl {
private final AbstractConnectController connectController;
public RebalanceImpl(Worker worker,
- ConfigManagementService configManagementService, ClusterManagementService clusterManagementService, AllocateConnAndTaskStrategy strategy, AbstractConnectController connectController) {
+ ConfigManagementService configManagementService,
+ ClusterManagementService clusterManagementService,
+ AllocateConnAndTaskStrategy strategy,
+ AbstractConnectController connectController) {
this.worker = worker;
this.configManagementService = configManagementService;
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
index 5f763ab..d4574ba 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImpl.java
@@ -30,6 +30,7 @@ import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
import org.apache.rocketmq.connect.runtime.service.StagingMode;
import org.apache.rocketmq.connect.runtime.store.FileBaseKeyValueStore;
import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
+import org.apache.rocketmq.connect.runtime.store.MemoryBasedKeyValueStore;
import org.apache.rocketmq.connect.runtime.utils.FilePathConfigUtil;
import org.apache.rocketmq.connect.runtime.utils.Plugin;
import org.slf4j.Logger;
@@ -68,14 +69,8 @@ public class MemoryConfigManagementServiceImpl implements ConfigManagementServic
}
@Override public void initialize(ConnectConfig connectConfig, Plugin plugin) {
- this.connectorKeyValueStore = new FileBaseKeyValueStore<>(
- FilePathConfigUtil.getConnectorConfigPath(connectConfig.getStorePathRootDir()),
- new JsonConverter(),
- new JsonConverter(ConnectKeyValue.class));
- this.taskKeyValueStore = new FileBaseKeyValueStore<>(
- FilePathConfigUtil.getTaskConfigPath(connectConfig.getStorePathRootDir()),
- new JsonConverter(),
- new ListConverter(ConnectKeyValue.class));
+ this.connectorKeyValueStore = new MemoryBasedKeyValueStore<>();
+ this.taskKeyValueStore = new MemoryBasedKeyValueStore<>();
this.plugin = plugin;
}