You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/09/08 01:51:11 UTC

[rocketmq-connect] branch master updated: [ISSUE #220] Add unit test (service module) (#263)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new f515c6d4 [ISSUE #220] Add unit test (service module) (#263)
f515c6d4 is described below

commit f515c6d481e700b90691f9af4fb18278b8fe8b59
Author: Oliver <wq...@163.com>
AuthorDate: Thu Sep 8 09:51:07 2022 +0800

    [ISSUE #220] Add unit test (service module) (#263)
    
    * Add unit test
    
    * remove  unnecessary stub
    
    * fix failed unit test
    
    * fix failed unit test
---
 .../memory/FilePositionManagementServiceImpl.java  |   2 +-
 .../connectorwrapper/ServerResponseMocker.java     |   9 ++
 .../WorkerSinkTaskContextTest.java                 |   1 -
 .../connectorwrapper/testimpl/TestConnector.java   |   3 +-
 .../DistributedConnectControllerTest.java          | 101 +++++++++++++++++
 .../controller/isolation/TestFileSystem.java       |   4 +-
 .../StandaloneConnectControllerTest.java           |  94 ++++++++++++++++
 .../errors/DeadLetterQueueReporterTest.java        |   1 +
 .../connect/runtime/rest/RestHandlerTest.java      |  13 +++
 .../service/ClusterManagementServiceImplTest.java  | 110 +++++++++++++++++++
 .../service/ConfigManagementServiceImplTest.java   |  19 ++--
 .../service/DefaultConnectorContextTest.java       | 118 ++++++++++++++++++++
 .../service/PositionManagementServiceImplTest.java |   2 +
 .../FilePositionManagementServiceImplTest.java     |  89 +++++++++++++++
 .../MemoryConfigManagementServiceImplTest.java     | 120 +++++++++++++++++++++
 15 files changed, 674 insertions(+), 12 deletions(-)

diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImpl.java
index 024728c7..cc93e602 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImpl.java
@@ -145,7 +145,7 @@ public class FilePositionManagementServiceImpl implements PositionManagementServ
                 if (error != null) {
                     log.error("Failed to persist positions to storage: {}", error);
                 } else {
-                    log.trace("Successed to persist positions to storage: {} , {} ", partition, position);
+                    log.trace("Successes to persist positions to storage: {} , {} ", partition, position);
                 }
             }
         });
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/ServerResponseMocker.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/ServerResponseMocker.java
index 6e9b3f2b..e367000b 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/ServerResponseMocker.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/ServerResponseMocker.java
@@ -32,6 +32,7 @@ import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.concurrent.Future;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -40,6 +41,7 @@ import java.util.concurrent.ExecutionException;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.common.DataVersion;
@@ -147,12 +149,19 @@ public abstract class ServerResponseMocker {
                     response.setBody(JSON.toJSONBytes(wrapper));
                     break;
                 }
+
                 case RequestCode.GET_ROUTEINFO_BY_TOPIC: {
                     final TopicRouteData topicRouteData = buildTopicRouteData();
                     response.setBody(JSON.toJSONBytes(topicRouteData));
                     break;
                 }
 
+                case RequestCode.GET_CONSUMER_LIST_BY_GROUP: {
+                    GetConsumerListByGroupResponseBody getConsumerListByGroupResponseBody = new GetConsumerListByGroupResponseBody();
+                    getConsumerListByGroupResponseBody.setConsumerIdList(Collections.singletonList("mockConsumer1"));
+                    response.setBody(JSON.toJSONBytes(getConsumerListByGroupResponseBody));
+                    break;
+                }
                 default:
                     break;
             }
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContextTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContextTest.java
index 060a81d7..f4798665 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContextTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContextTest.java
@@ -45,7 +45,6 @@ public class WorkerSinkTaskContextTest {
 
     private DefaultLitePullConsumer consumer = new DefaultLitePullConsumer();
 
-
     private RecordPartition recordPartition;
 
     private RecordOffset recordOffset;
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestConnector.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestConnector.java
index 5275f18d..f2e20cfe 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestConnector.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestConnector.java
@@ -19,9 +19,9 @@
 package org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl;
 
 import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.component.connector.Connector;
 import io.openmessaging.connector.api.component.task.Task;
 import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import io.openmessaging.internal.DefaultKeyValue;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -50,6 +50,7 @@ public class TestConnector extends SourceConnector {
 
     @Override public List<KeyValue> taskConfigs(int maxTasks) {
         List<KeyValue> configs = new ArrayList<>();
+        this.config = new DefaultKeyValue();
         configs.add(this.config);
         return configs;
     }
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectControllerTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectControllerTest.java
new file mode 100644
index 00000000..d7eddf87
--- /dev/null
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectControllerTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.controller.distributed;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.NameServerMocker;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.ServerResponseMocker;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestPositionManageServiceImpl;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.PluginClassLoader;
+import org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
+import org.apache.rocketmq.connect.runtime.service.ClusterManagementServiceImpl;
+import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
+import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.service.StateManagementService;
+import org.apache.rocketmq.connect.runtime.service.StateManagementServiceImpl;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DistributedConnectControllerTest {
+
+    private DistributedConnectController distributedConnectController;
+
+    @Mock
+    private Plugin plugin;
+
+    private DistributedConfig distributedConfig = new DistributedConfig();
+
+    private ClusterManagementService clusterManagementService = new ClusterManagementServiceImpl();
+
+    private ConfigManagementService configManagementService = new TestConfigManagementService();
+
+    private PositionManagementService positionManagementService = new TestPositionManageServiceImpl();
+
+    private WorkerConfig connectConfig = new WorkerConfig();
+
+    private ServerResponseMocker nameServerMocker;
+
+    private ServerResponseMocker brokerMocker;
+
+    private StateManagementService stateManagementService;
+
+    private PluginClassLoader pluginClassLoader;
+
+    @Before
+    public void before() throws InterruptedException, MalformedURLException {
+        nameServerMocker = NameServerMocker.startByDefaultConf(9876, 10911);
+        brokerMocker = ServerResponseMocker.startServer(10911, "Hello World".getBytes(StandardCharsets.UTF_8));
+
+        URL url = new URL("file://src/test/java/org/apache/rocketmq/connect/runtime");
+        URL[] urls = new URL[]{};
+        pluginClassLoader = new PluginClassLoader(url, urls);
+        Thread.currentThread().setContextClassLoader(pluginClassLoader);
+        connectConfig.setNamesrvAddr("127.0.0.1:9876");
+        clusterManagementService.initialize(connectConfig);
+        stateManagementService = new StateManagementServiceImpl();
+        stateManagementService.initialize(connectConfig);
+        stateManagementService.start();
+        distributedConnectController = new DistributedConnectController(plugin, distributedConfig, clusterManagementService,
+            configManagementService, positionManagementService, stateManagementService);
+    }
+
+    @After
+    public void after() {
+        distributedConnectController.shutdown();
+        stateManagementService.stop();
+        brokerMocker.shutdown();
+        nameServerMocker.shutdown();
+    }
+
+    @Test
+    public void startTest() {
+        Assertions.assertThatCode(() -> distributedConnectController.start()).doesNotThrowAnyException();
+    }
+
+}
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/isolation/TestFileSystem.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/isolation/TestFileSystem.java
index 88eaa028..b7baea83 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/isolation/TestFileSystem.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/isolation/TestFileSystem.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.connect.runtime.controller.isolation;
 
+import com.sun.nio.zipfs.JarFileSystemProvider;
 import java.io.IOException;
 import java.nio.file.FileStore;
 import java.nio.file.FileSystem;
@@ -30,8 +31,7 @@ import org.jetbrains.annotations.NotNull;
 
 public class TestFileSystem extends FileSystem {
     @Override public FileSystemProvider provider() {
-//        return new MacOSXFileSystemProvider();
-        return null;
+        return new JarFileSystemProvider();
     }
 
     @Override public void close() throws IOException {
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectControllerTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectControllerTest.java
new file mode 100644
index 00000000..efbe169a
--- /dev/null
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectControllerTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.controller.standalone;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.NameServerMocker;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.ServerResponseMocker;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestPositionManageServiceImpl;
+import org.apache.rocketmq.connect.runtime.controller.distributed.TestConfigManagementService;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.PluginClassLoader;
+import org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
+import org.apache.rocketmq.connect.runtime.service.ClusterManagementServiceImpl;
+import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
+import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.service.StateManagementService;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class StandaloneConnectControllerTest {
+
+    private StandaloneConnectController standaloneConnectController;
+
+    @Mock
+    private Plugin plugin;
+
+    private StandaloneConfig standaloneConfig = new StandaloneConfig();
+
+    private ClusterManagementService clusterManagementService = new ClusterManagementServiceImpl();
+
+    private ConfigManagementService configManagementService = new TestConfigManagementService();
+
+    private PositionManagementService positionManagementService = new TestPositionManageServiceImpl();
+
+    @Mock
+    private StateManagementService stateManagementService;
+
+    private PluginClassLoader pluginClassLoader;
+
+    private ServerResponseMocker  nameServerMocker;
+
+    private ServerResponseMocker brokerMocker;
+
+    @Before
+    public void before() throws MalformedURLException {
+        nameServerMocker = NameServerMocker.startByDefaultConf(9876, 10911);
+        brokerMocker = ServerResponseMocker.startServer(10911, "Hello World".getBytes(StandardCharsets.UTF_8));
+
+        URL url = new URL("file://src/test/java/org/apache/rocketmq/connect/runtime");
+        URL[] urls = new URL[]{};
+        pluginClassLoader = new PluginClassLoader(url, urls);
+        Thread.currentThread().setContextClassLoader(pluginClassLoader);
+        standaloneConfig.setNamesrvAddr("127.0.0.1:9876");
+        standaloneConfig.setHttpPort(10001);
+        clusterManagementService.initialize(standaloneConfig);
+        standaloneConnectController = new StandaloneConnectController(plugin, standaloneConfig, clusterManagementService,
+            configManagementService, positionManagementService, stateManagementService);
+    }
+
+    @After
+    public void after() {
+        standaloneConnectController.shutdown();
+        brokerMocker.shutdown();
+        nameServerMocker.shutdown();
+    }
+
+    @Test
+    public void startTest() {
+        Assertions.assertThatCode(() -> standaloneConnectController.start()).doesNotThrowAnyException();
+    }
+}
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueReporterTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueReporterTest.java
index ac072c95..1b159469 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueReporterTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueReporterTest.java
@@ -86,6 +86,7 @@ public class DeadLetterQueueReporterTest {
         ConnectKeyValue sinkConfig = new ConnectKeyValue();
         Map<String, String> properties = new HashMap<>();
         properties.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, "DEAD_LETTER_TOPIC");
+
         sinkConfig.setProperties(properties);
         WorkerConfig workerConfig = new WorkerConfig();
         final DeadLetterQueueReporter deadLetterQueueReporter = DeadLetterQueueReporter.build("fileSinkConnector", sinkConfig, workerConfig);
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
index ec662b69..986badc3 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
@@ -25,6 +25,7 @@ import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.RecordConverter;
 import io.openmessaging.internal.DefaultKeyValue;
 import java.net.URI;
+import java.net.URL;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -50,6 +51,7 @@ import org.apache.rocketmq.connect.runtime.connectorwrapper.Worker;
 import org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerConnector;
 import org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSourceTask;
 import org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerState;
+import org.apache.rocketmq.connect.runtime.controller.isolation.PluginClassLoader;
 import org.apache.rocketmq.connect.runtime.errors.ReporterManagerUtil;
 import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.rocketmq.connect.runtime.rest.entities.PluginInfo;
@@ -59,12 +61,15 @@ import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
 import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
 import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
 import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
+import org.eclipse.jetty.webapp.WebAppClassLoader;
+import org.eclipse.jetty.webapp.WebAppContext;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
+import sun.applet.AppletClassLoader;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.when;
@@ -148,6 +153,8 @@ public class RestHandlerTest {
     @Mock
     private ConnectStatsService connectStatsService;
 
+    private PluginClassLoader pluginClassLoader;
+
     @Before
     public void init() throws Exception {
         workerState = new AtomicReference<>(WorkerState.STARTED);
@@ -229,7 +236,13 @@ public class RestHandlerTest {
         List<String> pluginPaths = new ArrayList<>();
         pluginPaths.add("src/test/java/org/apache/rocketmq/connect/runtime");
         Plugin plugin = new Plugin(pluginPaths);
+        plugin.initLoaders();
         when(connectController.plugin()).thenReturn(plugin);
+
+        URL url = new URL("file://src/test/java/org/apache/rocketmq/connect/runtime");
+        URL[] urls = new URL[]{};
+        pluginClassLoader = new PluginClassLoader(url, urls);
+        Thread.currentThread().setContextClassLoader(pluginClassLoader);
         restHandler = new RestHandler(connectController);
 
         httpClient = HttpClientBuilder.create().build();
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImplTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImplTest.java
new file mode 100644
index 00000000..26908208
--- /dev/null
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImplTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.service;
+
+import com.alibaba.fastjson.JSON;
+import io.netty.channel.ChannelHandlerContext;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
+import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.NameServerMocker;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.ServerResponseMocker;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ClusterManagementServiceImplTest {
+
+    private ClusterManagementServiceImpl clusterManagementService = new ClusterManagementServiceImpl();
+
+    private ServerResponseMocker nameServerMocker;
+
+    private ServerResponseMocker brokerMocker;
+
+    private WorkerConfig workerConfig;
+
+    @Mock
+    private RemotingClient  remotingClient;
+
+    private ClusterManagementServiceImpl.WorkerChangeListener workerChangeListener;
+
+    @Mock
+    private ChannelHandlerContext channelHandlerContext;
+
+    @Before
+    public void before() {
+        nameServerMocker = NameServerMocker.startByDefaultConf(9876, 10911);
+        brokerMocker = ServerResponseMocker.startServer(10911, "Hello World".getBytes(StandardCharsets.UTF_8));
+        workerConfig = new WorkerConfig();
+        workerConfig.setNamesrvAddr("localhost:9876");
+        clusterManagementService.initialize(workerConfig);
+        clusterManagementService.start();
+        workerChangeListener = clusterManagementService.new WorkerChangeListener();
+    }
+
+    @After
+    public void after() {
+        clusterManagementService.stop();
+        brokerMocker.shutdown();
+        nameServerMocker.shutdown();
+    }
+
+    @Test
+    public void hasClusterStoreTopicTest() {
+        final boolean flag = clusterManagementService.hasClusterStoreTopic();
+        Assert.assertTrue(flag);
+    }
+
+    @Test
+    public void getAllAliveWorkersTest() {
+        final List<String> workers = clusterManagementService.getAllAliveWorkers();
+        Assert.assertEquals(1, workers.size());
+        Assert.assertEquals("mockConsumer1", workers.get(0));
+    }
+
+    @Test
+    public void getCurrentWorkerTest() {
+        final String worker = clusterManagementService.getCurrentWorker();
+        Assert.assertNotNull(worker);
+    }
+
+    @Test
+    public void notifyConsumerIdChangedTest() throws Exception {
+        ClusterManagementService.WorkerStatusListener workerStatusListener = new ClusterManagementService.WorkerStatusListener() {
+            @Override public void onWorkerChange() {
+
+            }
+        };
+        clusterManagementService.registerListener(workerStatusListener);
+        NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
+        requestHeader.setConsumerGroup("mockConsumerGroup");
+        RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
+        remotingCommand.setBody(JSON.toJSONBytes(requestHeader));
+        final RemotingCommand result = workerChangeListener.processRequest(channelHandlerContext, remotingCommand);
+        Assert.assertNull(result);
+    }
+}
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImplTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImplTest.java
index b3bbaa6b..43fe6c6c 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImplTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImplTest.java
@@ -36,6 +36,7 @@ import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
 import org.apache.rocketmq.connect.runtime.config.ConnectorConfig;
 import org.apache.rocketmq.connect.runtime.connectorwrapper.NameServerMocker;
 import org.apache.rocketmq.connect.runtime.connectorwrapper.ServerResponseMocker;
+import org.apache.rocketmq.connect.runtime.controller.isolation.DelegatingClassLoader;
 import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
 import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
 import org.apache.rocketmq.connect.runtime.utils.TestUtils;
@@ -80,6 +81,9 @@ public class ConfigManagementServiceImplTest {
 
     private Plugin plugin;
 
+    @Mock
+    private DelegatingClassLoader delegatingClassLoader;
+
     private ServerResponseMocker nameServerMocker;
 
     private ServerResponseMocker brokerMocker;
@@ -88,6 +92,7 @@ public class ConfigManagementServiceImplTest {
     public void init() throws Exception {
         nameServerMocker = NameServerMocker.startByDefaultConf(9876, 10911);
         brokerMocker = ServerResponseMocker.startServer(10911, "Hello World".getBytes(StandardCharsets.UTF_8));
+
         String consumerGroup = UUID.randomUUID().toString();
         String producerGroup = UUID.randomUUID().toString();
 
@@ -138,12 +143,13 @@ public class ConfigManagementServiceImplTest {
         plugin = new Plugin(pluginPaths);
         configManagementService.initialize(connectConfig, plugin);
         configManagementService.start();
-        //final Field connectorKeyValueStoreField = ConfigManagementServiceImpl.class.getDeclaredField("connectorKeyValueStore");
-        //connectorKeyValueStoreField.setAccessible(true);
-        //connectorKeyValueStore = (KeyValueStore<String, ConnectKeyValue>) connectorKeyValueStoreField.get(configManagementService);
-//        final Field taskKeyValueStoreField = ConfigManagementServiceImpl.class.getDeclaredField("taskKeyValueStore");
-//        taskKeyValueStoreField.setAccessible(true);
-//        taskKeyValueStore = (KeyValueStore<String, List<ConnectKeyValue>>) taskKeyValueStoreField.get(configManagementService);
+
+        final Field connectorKeyValueStoreField = ConfigManagementServiceImpl.class.getSuperclass().getDeclaredField("connectorKeyValueStore");
+        connectorKeyValueStoreField.setAccessible(true);
+        connectorKeyValueStore = (KeyValueStore<String, ConnectKeyValue>) connectorKeyValueStoreField.get(configManagementService);
+        final Field taskKeyValueStoreField = ConfigManagementServiceImpl.class.getSuperclass().getDeclaredField("taskKeyValueStore");
+        taskKeyValueStoreField.setAccessible(true);
+        taskKeyValueStore = (KeyValueStore<String, List<ConnectKeyValue>>) taskKeyValueStoreField.get(configManagementService);
 
         final Field dataSynchronizerField = ConfigManagementServiceImpl.class.getDeclaredField("dataSynchronizer");
         dataSynchronizerField.setAccessible(true);
@@ -164,7 +170,6 @@ public class ConfigManagementServiceImplTest {
         TestUtils.deleteFile(new File(System.getProperty("user.home") + File.separator + "testConnectorStore"));
         brokerMocker.shutdown();
         nameServerMocker.shutdown();
-
     }
 
     @Test
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/DefaultConnectorContextTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/DefaultConnectorContextTest.java
new file mode 100644
index 00000000..aef48a9a
--- /dev/null
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/DefaultConnectorContextTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.service;
+
+import io.openmessaging.connector.api.component.connector.ConnectorContext;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.NameServerMocker;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.ServerResponseMocker;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerConnector;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.status.ConnectorStatus;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.status.WrapperStatusListener;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConnector;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.standalone.StandaloneConfig;
+import org.apache.rocketmq.connect.runtime.controller.standalone.StandaloneConnectController;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DefaultConnectorContextTest {
+
+    private DefaultConnectorContext defaultConnectorContext;
+
+    private StandaloneConnectController standaloneConnectController;
+
+    private Plugin plugin;
+    private StandaloneConfig standaloneConfig = new StandaloneConfig();
+
+    private ClusterManagementService clusterManagementService = new ClusterManagementServiceImpl();
+
+    private ConfigManagementService configManagementService;
+
+    private PositionManagementService positionManagementService = new PositionManagementServiceImpl();
+
+    private WorkerConfig workerConfig;
+
+    private ServerResponseMocker nameServerMocker;
+
+    private ServerResponseMocker brokerMocker;
+
+    private StateManagementService stateManagementService;
+
+    private ConnectorStatus.Listener statusListene;
+
+    @Before
+    public void before() {
+        nameServerMocker = NameServerMocker.startByDefaultConf(9876, 10911);
+        brokerMocker = ServerResponseMocker.startServer(10911, "Hello World".getBytes(StandardCharsets.UTF_8));
+        List<String> pluginPaths = new ArrayList<>();
+        pluginPaths.add("src/test/java/org/apache/rocketmq/connect/runtime");
+        plugin = new Plugin(pluginPaths);
+        workerConfig = new WorkerConfig();
+        workerConfig.setNamesrvAddr("localhost:9876");
+
+        configManagementService = new ConfigManagementServiceImpl();
+        configManagementService.initialize(workerConfig, plugin);
+        configManagementService.start();
+
+        clusterManagementService.initialize(workerConfig);
+        positionManagementService.initialize(workerConfig);
+        positionManagementService.start();
+        stateManagementService = new StateManagementServiceImpl();
+        stateManagementService.initialize(workerConfig);
+
+        standaloneConfig.setHttpPort(8083);
+        standaloneConnectController = new StandaloneConnectController(plugin, standaloneConfig, clusterManagementService,
+            configManagementService, positionManagementService, stateManagementService);
+        Set<WorkerConnector> workerConnectors = new HashSet<>();
+        ConnectorContext connectorContext = new DefaultConnectorContext("testConnector", standaloneConnectController);
+        statusListene = new WrapperStatusListener(stateManagementService, "worker1");
+        workerConnectors.add(new WorkerConnector("testConnector", new TestConnector(), new ConnectKeyValue(), connectorContext, statusListene, this.getClass().getClassLoader()));
+        standaloneConnectController.getWorker().setWorkingConnectors(workerConnectors);
+        standaloneConnectController.start();
+        defaultConnectorContext = new DefaultConnectorContext("testConnector", standaloneConnectController);
+    }
+
+    @After
+    public void after() {
+        standaloneConnectController.shutdown();
+        positionManagementService.stop();
+        configManagementService.stop();
+        brokerMocker.shutdown();
+        nameServerMocker.shutdown();
+    }
+
+    @Test
+    public void requestTaskReconfigurationTest() throws Exception {
+        configManagementService.getConnectorConfigs().put("testConnector", new ConnectKeyValue());
+        ConnectKeyValue connectKeyValue = new ConnectKeyValue();
+        connectKeyValue.put("connector.class", "org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConnector");
+        connectKeyValue.put("connect.topicname", "testTopic");
+        configManagementService.putConnectorConfig("testConnector", connectKeyValue);
+
+        Assertions.assertThatCode(() -> defaultConnectorContext.requestTaskReconfiguration()).doesNotThrowAnyException();
+    }
+}
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImplTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImplTest.java
index 65c5ec68..43d9e6ab 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImplTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImplTest.java
@@ -92,11 +92,13 @@ public class PositionManagementServiceImplTest {
     private ServerResponseMocker brokerMocker;
 
     private final String namespace = "namespace";
+
     @Before
     public void init() throws Exception {
         nameServerMocker = NameServerMocker.startByDefaultConf(9876, 10911);
         brokerMocker =  ServerResponseMocker.startServer(10911, "Hello World".getBytes(StandardCharsets.UTF_8));
         connectConfig = new WorkerConfig();
+
         connectConfig.setHttpPort(8081);
         connectConfig.setNamesrvAddr("localhost:9876");
         connectConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "testConnectorStore");
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImplTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImplTest.java
new file mode 100644
index 00000000..59765574
--- /dev/null
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImplTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.service.memory;
+
+import io.openmessaging.connector.api.data.RecordOffset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
+import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FilePositionManagementServiceImplTest {
+
+    private PositionManagementService filePositionManagementService = new FilePositionManagementServiceImpl();
+
+    private WorkerConfig workerConfig = new WorkerConfig();
+
+    @Before
+    public void before() {
+        filePositionManagementService.initialize(workerConfig);
+        filePositionManagementService.start();
+    }
+
+    @After
+    public void after() {
+        filePositionManagementService.stop();
+    }
+
+    @Test
+    public void persistTest() {
+        Assertions.assertThatCode(() -> filePositionManagementService.persist()).doesNotThrowAnyException();
+    }
+
+    @Test
+    public void loadTest() {
+        Assertions.assertThatCode(() -> filePositionManagementService.load()).doesNotThrowAnyException();
+    }
+
+    @Test
+    public void putPositionTest() {
+        Map<String, String> partition = new HashMap<>();
+        partition.put("topic", "testTopic");
+        partition.put("brokerName", "mockBroker");
+        partition.put("queueId", "0");
+        ExtendRecordPartition extendRecordPartition = new ExtendRecordPartition("testConnector", partition);
+        Map<String, Long> offset = new HashMap<>();
+        offset.put("queueOffset", 123L);
+        RecordOffset recordOffset = new RecordOffset(offset);
+        filePositionManagementService.putPosition(extendRecordPartition, recordOffset);
+        final RecordOffset position = filePositionManagementService.getPosition(extendRecordPartition);
+        Assert.assertEquals(123L, position.getOffset().get("queueOffset"));
+
+        Map<ExtendRecordPartition, RecordOffset> positions  = new HashMap<>();
+        Map<String, Long> offset2 = new HashMap<>();
+        offset2.put("queueOffset", 124L);
+        RecordOffset recordOffset2 = new RecordOffset(offset2);
+        positions.put(extendRecordPartition, recordOffset2);
+        filePositionManagementService.putPosition(positions);
+        final RecordOffset position2 = filePositionManagementService.getPosition(extendRecordPartition);
+        Assert.assertEquals(124L, position2.getOffset().get("queueOffset"));
+
+        List<ExtendRecordPartition> partitions = new ArrayList<>();
+        partitions.add(extendRecordPartition);
+        Assertions.assertThatCode(() ->  filePositionManagementService.removePosition(partitions)).doesNotThrowAnyException();
+
+    }
+}
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImplTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImplTest.java
new file mode 100644
index 00000000..651aaacd
--- /dev/null
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImplTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.service.memory;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
+import org.apache.rocketmq.connect.runtime.controller.isolation.DelegatingClassLoader;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.PluginClassLoader;
+import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class MemoryConfigManagementServiceImplTest {
+
+    private ConfigManagementService memoryConfigManagementService = new MemoryConfigManagementServiceImpl();
+
+    private WorkerConfig workerConfig = new WorkerConfig();
+
+    private Plugin plugin;
+
+    @Mock
+    private DelegatingClassLoader delegatingClassLoader;
+
+    private PluginClassLoader pluginClassLoader;
+
+    @Before
+    public void before() throws ClassNotFoundException, MalformedURLException {
+        List<String> pluginPaths = new ArrayList<>();
+        pluginPaths.add("src/test/java/org/apache/rocketmq/connect/runtime");
+        plugin = new Plugin(pluginPaths);
+
+        URL url = new URL("file://src/test/java/org/apache/rocketmq/connect/runtime");
+        URL[] urls = new URL[]{};
+        pluginClassLoader = new PluginClassLoader(url, urls);
+        memoryConfigManagementService.initialize(workerConfig, plugin);
+        memoryConfigManagementService.start();
+    }
+
+    @After
+    public void after() {
+        memoryConfigManagementService.stop();
+    }
+
+    @Test
+    public void putConnectorConfigTest() {
+        ConnectKeyValue connectKeyValue = new ConnectKeyValue();
+        connectKeyValue.put("connect.topicname", "testTopic");
+        connectKeyValue.put("connector.class", "org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConnector");
+        final String result = memoryConfigManagementService.putConnectorConfig("testConnector", connectKeyValue);
+        Assertions.assertThat("".equals(result));
+    }
+
+    @Test
+    public void getConnectorConfigsTest() {
+        ConnectKeyValue connectKeyValue = new ConnectKeyValue();
+        connectKeyValue.put("connect.topicname", "testTopic");
+        connectKeyValue.put("connector.class", "org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConnector");
+        memoryConfigManagementService.putConnectorConfig("testConnector", connectKeyValue);
+
+        final Map<String, ConnectKeyValue> configs = memoryConfigManagementService.getConnectorConfigs();
+        final ConnectKeyValue resultKeyValue = configs.get("testConnector");
+        Assert.assertEquals("testTopic", resultKeyValue.getString("connect.topicname"));
+        Assert.assertEquals("org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConnector", resultKeyValue.getString("connector.class"));
+    }
+
+    @Test
+    public void getConnectorConfigsIncludeDeletedTest() {
+        ConnectKeyValue connectKeyValue = new ConnectKeyValue();
+        connectKeyValue.put("connect.topicname", "testTopic");
+        connectKeyValue.put("connector.class", "org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConnector");
+        connectKeyValue.put("config-deleted", 1);
+        memoryConfigManagementService.putConnectorConfig("testConnector", connectKeyValue);
+
+        final Map<String, ConnectKeyValue> allData = memoryConfigManagementService.getConnectorConfigs();
+        final ConnectKeyValue resultKeyValue = allData.get("testConnector");
+        Assert.assertEquals("testTopic", resultKeyValue.getString("connect.topicname"));
+        Assert.assertEquals("org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConnector", resultKeyValue.getString("connector.class"));
+    }
+
+    @Test
+    public void removeConnectorConfigTest() {
+        ConnectKeyValue connectKeyValue = new ConnectKeyValue();
+        connectKeyValue.put("connect.topicname", "testTopic");
+        connectKeyValue.put("connector.class", "org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConnector");
+        memoryConfigManagementService.putConnectorConfig("testConnector", connectKeyValue);
+        Assertions.assertThatCode(() -> memoryConfigManagementService.deleteConnectorConfig("testConnector")).doesNotThrowAnyException();
+    }
+
+}