You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/09/08 01:51:11 UTC
[rocketmq-connect] branch master updated: [ISSUE #220] Add unit test (service module) (#263)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new f515c6d4 [ISSUE #220] Add unit test (service module) (#263)
f515c6d4 is described below
commit f515c6d481e700b90691f9af4fb18278b8fe8b59
Author: Oliver <wq...@163.com>
AuthorDate: Thu Sep 8 09:51:07 2022 +0800
[ISSUE #220] Add unit test (service module) (#263)
* Add unit test
* remove unnecessary stub
* fix failed unit test
* fix failed unit test
---
.../memory/FilePositionManagementServiceImpl.java | 2 +-
.../connectorwrapper/ServerResponseMocker.java | 9 ++
.../WorkerSinkTaskContextTest.java | 1 -
.../connectorwrapper/testimpl/TestConnector.java | 3 +-
.../DistributedConnectControllerTest.java | 101 +++++++++++++++++
.../controller/isolation/TestFileSystem.java | 4 +-
.../StandaloneConnectControllerTest.java | 94 ++++++++++++++++
.../errors/DeadLetterQueueReporterTest.java | 1 +
.../connect/runtime/rest/RestHandlerTest.java | 13 +++
.../service/ClusterManagementServiceImplTest.java | 110 +++++++++++++++++++
.../service/ConfigManagementServiceImplTest.java | 19 ++--
.../service/DefaultConnectorContextTest.java | 118 ++++++++++++++++++++
.../service/PositionManagementServiceImplTest.java | 2 +
.../FilePositionManagementServiceImplTest.java | 89 +++++++++++++++
.../MemoryConfigManagementServiceImplTest.java | 120 +++++++++++++++++++++
15 files changed, 674 insertions(+), 12 deletions(-)
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImpl.java
index 024728c7..cc93e602 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImpl.java
@@ -145,7 +145,7 @@ public class FilePositionManagementServiceImpl implements PositionManagementServ
if (error != null) {
log.error("Failed to persist positions to storage: {}", error);
} else {
- log.trace("Successed to persist positions to storage: {} , {} ", partition, position);
+ log.trace("Successes to persist positions to storage: {} , {} ", partition, position);
}
}
});
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/ServerResponseMocker.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/ServerResponseMocker.java
index 6e9b3f2b..e367000b 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/ServerResponseMocker.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/ServerResponseMocker.java
@@ -32,6 +32,7 @@ import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -40,6 +41,7 @@ import java.util.concurrent.ExecutionException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.DataVersion;
@@ -147,12 +149,19 @@ public abstract class ServerResponseMocker {
response.setBody(JSON.toJSONBytes(wrapper));
break;
}
+
case RequestCode.GET_ROUTEINFO_BY_TOPIC: {
final TopicRouteData topicRouteData = buildTopicRouteData();
response.setBody(JSON.toJSONBytes(topicRouteData));
break;
}
+ case RequestCode.GET_CONSUMER_LIST_BY_GROUP: {
+ GetConsumerListByGroupResponseBody getConsumerListByGroupResponseBody = new GetConsumerListByGroupResponseBody();
+ getConsumerListByGroupResponseBody.setConsumerIdList(Collections.singletonList("mockConsumer1"));
+ response.setBody(JSON.toJSONBytes(getConsumerListByGroupResponseBody));
+ break;
+ }
default:
break;
}
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContextTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContextTest.java
index 060a81d7..f4798665 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContextTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTaskContextTest.java
@@ -45,7 +45,6 @@ public class WorkerSinkTaskContextTest {
private DefaultLitePullConsumer consumer = new DefaultLitePullConsumer();
-
private RecordPartition recordPartition;
private RecordOffset recordOffset;
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestConnector.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestConnector.java
index 5275f18d..f2e20cfe 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestConnector.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/connectorwrapper/testimpl/TestConnector.java
@@ -19,9 +19,9 @@
package org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl;
import io.openmessaging.KeyValue;
-import io.openmessaging.connector.api.component.connector.Connector;
import io.openmessaging.connector.api.component.task.Task;
import io.openmessaging.connector.api.component.task.source.SourceConnector;
+import io.openmessaging.internal.DefaultKeyValue;
import java.util.ArrayList;
import java.util.List;
@@ -50,6 +50,7 @@ public class TestConnector extends SourceConnector {
@Override public List<KeyValue> taskConfigs(int maxTasks) {
List<KeyValue> configs = new ArrayList<>();
+ this.config = new DefaultKeyValue();
configs.add(this.config);
return configs;
}
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectControllerTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectControllerTest.java
new file mode 100644
index 00000000..d7eddf87
--- /dev/null
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/distributed/DistributedConnectControllerTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.controller.distributed;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.NameServerMocker;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.ServerResponseMocker;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestPositionManageServiceImpl;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.PluginClassLoader;
+import org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
+import org.apache.rocketmq.connect.runtime.service.ClusterManagementServiceImpl;
+import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
+import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.service.StateManagementService;
+import org.apache.rocketmq.connect.runtime.service.StateManagementServiceImpl;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DistributedConnectControllerTest {
+
+ private DistributedConnectController distributedConnectController;
+
+ @Mock
+ private Plugin plugin;
+
+ private DistributedConfig distributedConfig = new DistributedConfig();
+
+ private ClusterManagementService clusterManagementService = new ClusterManagementServiceImpl();
+
+ private ConfigManagementService configManagementService = new TestConfigManagementService();
+
+ private PositionManagementService positionManagementService = new TestPositionManageServiceImpl();
+
+ private WorkerConfig connectConfig = new WorkerConfig();
+
+ private ServerResponseMocker nameServerMocker;
+
+ private ServerResponseMocker brokerMocker;
+
+ private StateManagementService stateManagementService;
+
+ private PluginClassLoader pluginClassLoader;
+
+ @Before
+ public void before() throws InterruptedException, MalformedURLException {
+ nameServerMocker = NameServerMocker.startByDefaultConf(9876, 10911);
+ brokerMocker = ServerResponseMocker.startServer(10911, "Hello World".getBytes(StandardCharsets.UTF_8));
+
+ URL url = new URL("file://src/test/java/org/apache/rocketmq/connect/runtime");
+ URL[] urls = new URL[]{};
+ pluginClassLoader = new PluginClassLoader(url, urls);
+ Thread.currentThread().setContextClassLoader(pluginClassLoader);
+ connectConfig.setNamesrvAddr("127.0.0.1:9876");
+ clusterManagementService.initialize(connectConfig);
+ stateManagementService = new StateManagementServiceImpl();
+ stateManagementService.initialize(connectConfig);
+ stateManagementService.start();
+ distributedConnectController = new DistributedConnectController(plugin, distributedConfig, clusterManagementService,
+ configManagementService, positionManagementService, stateManagementService);
+ }
+
+ @After
+ public void after() {
+ distributedConnectController.shutdown();
+ stateManagementService.stop();
+ brokerMocker.shutdown();
+ nameServerMocker.shutdown();
+ }
+
+ @Test
+ public void startTest() {
+ Assertions.assertThatCode(() -> distributedConnectController.start()).doesNotThrowAnyException();
+ }
+
+}
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/isolation/TestFileSystem.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/isolation/TestFileSystem.java
index 88eaa028..b7baea83 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/isolation/TestFileSystem.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/isolation/TestFileSystem.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.connect.runtime.controller.isolation;
+import com.sun.nio.zipfs.JarFileSystemProvider;
import java.io.IOException;
import java.nio.file.FileStore;
import java.nio.file.FileSystem;
@@ -30,8 +31,7 @@ import org.jetbrains.annotations.NotNull;
public class TestFileSystem extends FileSystem {
@Override public FileSystemProvider provider() {
-// return new MacOSXFileSystemProvider();
- return null;
+ return new JarFileSystemProvider();
}
@Override public void close() throws IOException {
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectControllerTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectControllerTest.java
new file mode 100644
index 00000000..efbe169a
--- /dev/null
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/controller/standalone/StandaloneConnectControllerTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.controller.standalone;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.NameServerMocker;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.ServerResponseMocker;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestPositionManageServiceImpl;
+import org.apache.rocketmq.connect.runtime.controller.distributed.TestConfigManagementService;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.PluginClassLoader;
+import org.apache.rocketmq.connect.runtime.service.ClusterManagementService;
+import org.apache.rocketmq.connect.runtime.service.ClusterManagementServiceImpl;
+import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
+import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.service.StateManagementService;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class StandaloneConnectControllerTest {
+
+ private StandaloneConnectController standaloneConnectController;
+
+ @Mock
+ private Plugin plugin;
+
+ private StandaloneConfig standaloneConfig = new StandaloneConfig();
+
+ private ClusterManagementService clusterManagementService = new ClusterManagementServiceImpl();
+
+ private ConfigManagementService configManagementService = new TestConfigManagementService();
+
+ private PositionManagementService positionManagementService = new TestPositionManageServiceImpl();
+
+ @Mock
+ private StateManagementService stateManagementService;
+
+ private PluginClassLoader pluginClassLoader;
+
+ private ServerResponseMocker nameServerMocker;
+
+ private ServerResponseMocker brokerMocker;
+
+ @Before
+ public void before() throws MalformedURLException {
+ nameServerMocker = NameServerMocker.startByDefaultConf(9876, 10911);
+ brokerMocker = ServerResponseMocker.startServer(10911, "Hello World".getBytes(StandardCharsets.UTF_8));
+
+ URL url = new URL("file://src/test/java/org/apache/rocketmq/connect/runtime");
+ URL[] urls = new URL[]{};
+ pluginClassLoader = new PluginClassLoader(url, urls);
+ Thread.currentThread().setContextClassLoader(pluginClassLoader);
+ standaloneConfig.setNamesrvAddr("127.0.0.1:9876");
+ standaloneConfig.setHttpPort(10001);
+ clusterManagementService.initialize(standaloneConfig);
+ standaloneConnectController = new StandaloneConnectController(plugin, standaloneConfig, clusterManagementService,
+ configManagementService, positionManagementService, stateManagementService);
+ }
+
+ @After
+ public void after() {
+ standaloneConnectController.shutdown();
+ brokerMocker.shutdown();
+ nameServerMocker.shutdown();
+ }
+
+ @Test
+ public void startTest() {
+ Assertions.assertThatCode(() -> standaloneConnectController.start()).doesNotThrowAnyException();
+ }
+}
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueReporterTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueReporterTest.java
index ac072c95..1b159469 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueReporterTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/errors/DeadLetterQueueReporterTest.java
@@ -86,6 +86,7 @@ public class DeadLetterQueueReporterTest {
ConnectKeyValue sinkConfig = new ConnectKeyValue();
Map<String, String> properties = new HashMap<>();
properties.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, "DEAD_LETTER_TOPIC");
+
sinkConfig.setProperties(properties);
WorkerConfig workerConfig = new WorkerConfig();
final DeadLetterQueueReporter deadLetterQueueReporter = DeadLetterQueueReporter.build("fileSinkConnector", sinkConfig, workerConfig);
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
index ec662b69..986badc3 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/rest/RestHandlerTest.java
@@ -25,6 +25,7 @@ import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.RecordConverter;
import io.openmessaging.internal.DefaultKeyValue;
import java.net.URI;
+import java.net.URL;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
@@ -50,6 +51,7 @@ import org.apache.rocketmq.connect.runtime.connectorwrapper.Worker;
import org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerConnector;
import org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSourceTask;
import org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerState;
+import org.apache.rocketmq.connect.runtime.controller.isolation.PluginClassLoader;
import org.apache.rocketmq.connect.runtime.errors.ReporterManagerUtil;
import org.apache.rocketmq.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.rocketmq.connect.runtime.rest.entities.PluginInfo;
@@ -59,12 +61,15 @@ import org.apache.rocketmq.connect.runtime.stats.ConnectStatsManager;
import org.apache.rocketmq.connect.runtime.stats.ConnectStatsService;
import org.apache.rocketmq.connect.runtime.utils.ConnectorTaskId;
import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
+import org.eclipse.jetty.webapp.WebAppClassLoader;
+import org.eclipse.jetty.webapp.WebAppContext;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
+import sun.applet.AppletClassLoader;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.when;
@@ -148,6 +153,8 @@ public class RestHandlerTest {
@Mock
private ConnectStatsService connectStatsService;
+ private PluginClassLoader pluginClassLoader;
+
@Before
public void init() throws Exception {
workerState = new AtomicReference<>(WorkerState.STARTED);
@@ -229,7 +236,13 @@ public class RestHandlerTest {
List<String> pluginPaths = new ArrayList<>();
pluginPaths.add("src/test/java/org/apache/rocketmq/connect/runtime");
Plugin plugin = new Plugin(pluginPaths);
+ plugin.initLoaders();
when(connectController.plugin()).thenReturn(plugin);
+
+ URL url = new URL("file://src/test/java/org/apache/rocketmq/connect/runtime");
+ URL[] urls = new URL[]{};
+ pluginClassLoader = new PluginClassLoader(url, urls);
+ Thread.currentThread().setContextClassLoader(pluginClassLoader);
restHandler = new RestHandler(connectController);
httpClient = HttpClientBuilder.create().build();
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImplTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImplTest.java
new file mode 100644
index 00000000..26908208
--- /dev/null
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImplTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.service;
+
+import com.alibaba.fastjson.JSON;
+import io.netty.channel.ChannelHandlerContext;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
+import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.NameServerMocker;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.ServerResponseMocker;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ClusterManagementServiceImplTest {
+
+ private ClusterManagementServiceImpl clusterManagementService = new ClusterManagementServiceImpl();
+
+ private ServerResponseMocker nameServerMocker;
+
+ private ServerResponseMocker brokerMocker;
+
+ private WorkerConfig workerConfig;
+
+ @Mock
+ private RemotingClient remotingClient;
+
+ private ClusterManagementServiceImpl.WorkerChangeListener workerChangeListener;
+
+ @Mock
+ private ChannelHandlerContext channelHandlerContext;
+
+ @Before
+ public void before() {
+ nameServerMocker = NameServerMocker.startByDefaultConf(9876, 10911);
+ brokerMocker = ServerResponseMocker.startServer(10911, "Hello World".getBytes(StandardCharsets.UTF_8));
+ workerConfig = new WorkerConfig();
+ workerConfig.setNamesrvAddr("localhost:9876");
+ clusterManagementService.initialize(workerConfig);
+ clusterManagementService.start();
+ workerChangeListener = clusterManagementService.new WorkerChangeListener();
+ }
+
+ @After
+ public void after() {
+ clusterManagementService.stop();
+ brokerMocker.shutdown();
+ nameServerMocker.shutdown();
+ }
+
+ @Test
+ public void hasClusterStoreTopicTest() {
+ final boolean flag = clusterManagementService.hasClusterStoreTopic();
+ Assert.assertTrue(flag);
+ }
+
+ @Test
+ public void getAllAliveWorkersTest() {
+ final List<String> workers = clusterManagementService.getAllAliveWorkers();
+ Assert.assertEquals(1, workers.size());
+ Assert.assertEquals("mockConsumer1", workers.get(0));
+ }
+
+ @Test
+ public void getCurrentWorkerTest() {
+ final String worker = clusterManagementService.getCurrentWorker();
+ Assert.assertNotNull(worker);
+ }
+
+ @Test
+ public void notifyConsumerIdChangedTest() throws Exception {
+ ClusterManagementService.WorkerStatusListener workerStatusListener = new ClusterManagementService.WorkerStatusListener() {
+ @Override public void onWorkerChange() {
+
+ }
+ };
+ clusterManagementService.registerListener(workerStatusListener);
+ NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
+ requestHeader.setConsumerGroup("mockConsumerGroup");
+ RemotingCommand remotingCommand = RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
+ remotingCommand.setBody(JSON.toJSONBytes(requestHeader));
+ final RemotingCommand result = workerChangeListener.processRequest(channelHandlerContext, remotingCommand);
+ Assert.assertNull(result);
+ }
+}
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImplTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImplTest.java
index b3bbaa6b..43fe6c6c 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImplTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImplTest.java
@@ -36,6 +36,7 @@ import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
import org.apache.rocketmq.connect.runtime.config.ConnectorConfig;
import org.apache.rocketmq.connect.runtime.connectorwrapper.NameServerMocker;
import org.apache.rocketmq.connect.runtime.connectorwrapper.ServerResponseMocker;
+import org.apache.rocketmq.connect.runtime.controller.isolation.DelegatingClassLoader;
import org.apache.rocketmq.connect.runtime.store.KeyValueStore;
import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
import org.apache.rocketmq.connect.runtime.utils.TestUtils;
@@ -80,6 +81,9 @@ public class ConfigManagementServiceImplTest {
private Plugin plugin;
+ @Mock
+ private DelegatingClassLoader delegatingClassLoader;
+
private ServerResponseMocker nameServerMocker;
private ServerResponseMocker brokerMocker;
@@ -88,6 +92,7 @@ public class ConfigManagementServiceImplTest {
public void init() throws Exception {
nameServerMocker = NameServerMocker.startByDefaultConf(9876, 10911);
brokerMocker = ServerResponseMocker.startServer(10911, "Hello World".getBytes(StandardCharsets.UTF_8));
+
String consumerGroup = UUID.randomUUID().toString();
String producerGroup = UUID.randomUUID().toString();
@@ -138,12 +143,13 @@ public class ConfigManagementServiceImplTest {
plugin = new Plugin(pluginPaths);
configManagementService.initialize(connectConfig, plugin);
configManagementService.start();
- //final Field connectorKeyValueStoreField = ConfigManagementServiceImpl.class.getDeclaredField("connectorKeyValueStore");
- //connectorKeyValueStoreField.setAccessible(true);
- //connectorKeyValueStore = (KeyValueStore<String, ConnectKeyValue>) connectorKeyValueStoreField.get(configManagementService);
-// final Field taskKeyValueStoreField = ConfigManagementServiceImpl.class.getDeclaredField("taskKeyValueStore");
-// taskKeyValueStoreField.setAccessible(true);
-// taskKeyValueStore = (KeyValueStore<String, List<ConnectKeyValue>>) taskKeyValueStoreField.get(configManagementService);
+
+ final Field connectorKeyValueStoreField = ConfigManagementServiceImpl.class.getSuperclass().getDeclaredField("connectorKeyValueStore");
+ connectorKeyValueStoreField.setAccessible(true);
+ connectorKeyValueStore = (KeyValueStore<String, ConnectKeyValue>) connectorKeyValueStoreField.get(configManagementService);
+ final Field taskKeyValueStoreField = ConfigManagementServiceImpl.class.getSuperclass().getDeclaredField("taskKeyValueStore");
+ taskKeyValueStoreField.setAccessible(true);
+ taskKeyValueStore = (KeyValueStore<String, List<ConnectKeyValue>>) taskKeyValueStoreField.get(configManagementService);
final Field dataSynchronizerField = ConfigManagementServiceImpl.class.getDeclaredField("dataSynchronizer");
dataSynchronizerField.setAccessible(true);
@@ -164,7 +170,6 @@ public class ConfigManagementServiceImplTest {
TestUtils.deleteFile(new File(System.getProperty("user.home") + File.separator + "testConnectorStore"));
brokerMocker.shutdown();
nameServerMocker.shutdown();
-
}
@Test
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/DefaultConnectorContextTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/DefaultConnectorContextTest.java
new file mode 100644
index 00000000..aef48a9a
--- /dev/null
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/DefaultConnectorContextTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.service;
+
+import io.openmessaging.connector.api.component.connector.ConnectorContext;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.NameServerMocker;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.ServerResponseMocker;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerConnector;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.status.ConnectorStatus;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.status.WrapperStatusListener;
+import org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConnector;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.standalone.StandaloneConfig;
+import org.apache.rocketmq.connect.runtime.controller.standalone.StandaloneConnectController;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DefaultConnectorContextTest {
+
+ private DefaultConnectorContext defaultConnectorContext;
+
+ private StandaloneConnectController standaloneConnectController;
+
+ private Plugin plugin;
+ private StandaloneConfig standaloneConfig = new StandaloneConfig();
+
+ private ClusterManagementService clusterManagementService = new ClusterManagementServiceImpl();
+
+ private ConfigManagementService configManagementService;
+
+ private PositionManagementService positionManagementService = new PositionManagementServiceImpl();
+
+ private WorkerConfig workerConfig;
+
+ private ServerResponseMocker nameServerMocker;
+
+ private ServerResponseMocker brokerMocker;
+
+ private StateManagementService stateManagementService;
+
+ private ConnectorStatus.Listener statusListene;
+
+ @Before
+ public void before() {
+ nameServerMocker = NameServerMocker.startByDefaultConf(9876, 10911);
+ brokerMocker = ServerResponseMocker.startServer(10911, "Hello World".getBytes(StandardCharsets.UTF_8));
+ List<String> pluginPaths = new ArrayList<>();
+ pluginPaths.add("src/test/java/org/apache/rocketmq/connect/runtime");
+ plugin = new Plugin(pluginPaths);
+ workerConfig = new WorkerConfig();
+ workerConfig.setNamesrvAddr("localhost:9876");
+
+ configManagementService = new ConfigManagementServiceImpl();
+ configManagementService.initialize(workerConfig, plugin);
+ configManagementService.start();
+
+ clusterManagementService.initialize(workerConfig);
+ positionManagementService.initialize(workerConfig);
+ positionManagementService.start();
+ stateManagementService = new StateManagementServiceImpl();
+ stateManagementService.initialize(workerConfig);
+
+ standaloneConfig.setHttpPort(8083);
+ standaloneConnectController = new StandaloneConnectController(plugin, standaloneConfig, clusterManagementService,
+ configManagementService, positionManagementService, stateManagementService);
+ Set<WorkerConnector> workerConnectors = new HashSet<>();
+ ConnectorContext connectorContext = new DefaultConnectorContext("testConnector", standaloneConnectController);
+ statusListene = new WrapperStatusListener(stateManagementService, "worker1");
+ workerConnectors.add(new WorkerConnector("testConnector", new TestConnector(), new ConnectKeyValue(), connectorContext, statusListene, this.getClass().getClassLoader()));
+ standaloneConnectController.getWorker().setWorkingConnectors(workerConnectors);
+ standaloneConnectController.start();
+ defaultConnectorContext = new DefaultConnectorContext("testConnector", standaloneConnectController);
+ }
+
+ @After
+ public void after() {
+ standaloneConnectController.shutdown();
+ positionManagementService.stop();
+ configManagementService.stop();
+ brokerMocker.shutdown();
+ nameServerMocker.shutdown();
+ }
+
+ @Test
+ public void requestTaskReconfigurationTest() throws Exception {
+ configManagementService.getConnectorConfigs().put("testConnector", new ConnectKeyValue());
+ ConnectKeyValue connectKeyValue = new ConnectKeyValue();
+ connectKeyValue.put("connector.class", "org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConnector");
+ connectKeyValue.put("connect.topicname", "testTopic");
+ configManagementService.putConnectorConfig("testConnector", connectKeyValue);
+
+ Assertions.assertThatCode(() -> defaultConnectorContext.requestTaskReconfiguration()).doesNotThrowAnyException();
+ }
+}
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImplTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImplTest.java
index 65c5ec68..43d9e6ab 100644
--- a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImplTest.java
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImplTest.java
@@ -92,11 +92,13 @@ public class PositionManagementServiceImplTest {
private ServerResponseMocker brokerMocker;
private final String namespace = "namespace";
+
@Before
public void init() throws Exception {
nameServerMocker = NameServerMocker.startByDefaultConf(9876, 10911);
brokerMocker = ServerResponseMocker.startServer(10911, "Hello World".getBytes(StandardCharsets.UTF_8));
connectConfig = new WorkerConfig();
+
connectConfig.setHttpPort(8081);
connectConfig.setNamesrvAddr("localhost:9876");
connectConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "testConnectorStore");
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImplTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImplTest.java
new file mode 100644
index 00000000..59765574
--- /dev/null
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/memory/FilePositionManagementServiceImplTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.service.memory;
+
+import io.openmessaging.connector.api.data.RecordOffset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
+import org.apache.rocketmq.connect.runtime.service.PositionManagementService;
+import org.apache.rocketmq.connect.runtime.store.ExtendRecordPartition;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FilePositionManagementServiceImplTest {
+
+ private PositionManagementService filePositionManagementService = new FilePositionManagementServiceImpl();
+
+ private WorkerConfig workerConfig = new WorkerConfig();
+
+ @Before
+ public void before() {
+ filePositionManagementService.initialize(workerConfig);
+ filePositionManagementService.start();
+ }
+
+ @After
+ public void after() {
+ filePositionManagementService.stop();
+ }
+
+ @Test
+ public void persistTest() {
+ Assertions.assertThatCode(() -> filePositionManagementService.persist()).doesNotThrowAnyException();
+ }
+
+ @Test
+ public void loadTest() {
+ Assertions.assertThatCode(() -> filePositionManagementService.load()).doesNotThrowAnyException();
+ }
+
+ @Test
+ public void putPositionTest() {
+ Map<String, String> partition = new HashMap<>();
+ partition.put("topic", "testTopic");
+ partition.put("brokerName", "mockBroker");
+ partition.put("queueId", "0");
+ ExtendRecordPartition extendRecordPartition = new ExtendRecordPartition("testConnector", partition);
+ Map<String, Long> offset = new HashMap<>();
+ offset.put("queueOffset", 123L);
+ RecordOffset recordOffset = new RecordOffset(offset);
+ filePositionManagementService.putPosition(extendRecordPartition, recordOffset);
+ final RecordOffset position = filePositionManagementService.getPosition(extendRecordPartition);
+ Assert.assertEquals(123L, position.getOffset().get("queueOffset"));
+
+ Map<ExtendRecordPartition, RecordOffset> positions = new HashMap<>();
+ Map<String, Long> offset2 = new HashMap<>();
+ offset2.put("queueOffset", 124L);
+ RecordOffset recordOffset2 = new RecordOffset(offset2);
+ positions.put(extendRecordPartition, recordOffset2);
+ filePositionManagementService.putPosition(positions);
+ final RecordOffset position2 = filePositionManagementService.getPosition(extendRecordPartition);
+ Assert.assertEquals(124L, position2.getOffset().get("queueOffset"));
+
+ List<ExtendRecordPartition> partitions = new ArrayList<>();
+ partitions.add(extendRecordPartition);
+ Assertions.assertThatCode(() -> filePositionManagementService.removePosition(partitions)).doesNotThrowAnyException();
+
+ }
+}
diff --git a/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImplTest.java b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImplTest.java
new file mode 100644
index 00000000..651aaacd
--- /dev/null
+++ b/rocketmq-connect-runtime/src/test/java/org/apache/rocketmq/connect/runtime/service/memory/MemoryConfigManagementServiceImplTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.connect.runtime.service.memory;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
+import org.apache.rocketmq.connect.runtime.config.WorkerConfig;
+import org.apache.rocketmq.connect.runtime.controller.isolation.DelegatingClassLoader;
+import org.apache.rocketmq.connect.runtime.controller.isolation.Plugin;
+import org.apache.rocketmq.connect.runtime.controller.isolation.PluginClassLoader;
+import org.apache.rocketmq.connect.runtime.service.ConfigManagementService;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class MemoryConfigManagementServiceImplTest {
+
+ private ConfigManagementService memoryConfigManagementService = new MemoryConfigManagementServiceImpl();
+
+ private WorkerConfig workerConfig = new WorkerConfig();
+
+ private Plugin plugin;
+
+ @Mock
+ private DelegatingClassLoader delegatingClassLoader;
+
+ private PluginClassLoader pluginClassLoader;
+
+ @Before
+ public void before() throws ClassNotFoundException, MalformedURLException {
+ List<String> pluginPaths = new ArrayList<>();
+ pluginPaths.add("src/test/java/org/apache/rocketmq/connect/runtime");
+ plugin = new Plugin(pluginPaths);
+
+ URL url = new URL("file://src/test/java/org/apache/rocketmq/connect/runtime");
+ URL[] urls = new URL[]{};
+ pluginClassLoader = new PluginClassLoader(url, urls);
+ memoryConfigManagementService.initialize(workerConfig, plugin);
+ memoryConfigManagementService.start();
+ }
+
+ @After
+ public void after() {
+ memoryConfigManagementService.stop();
+ }
+
+ @Test
+ public void putConnectorConfigTest() {
+ ConnectKeyValue connectKeyValue = new ConnectKeyValue();
+ connectKeyValue.put("connect.topicname", "testTopic");
+ connectKeyValue.put("connector.class", "org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConnector");
+ final String result = memoryConfigManagementService.putConnectorConfig("testConnector", connectKeyValue);
+ Assertions.assertThat("".equals(result));
+ }
+
+ @Test
+ public void getConnectorConfigsTest() {
+ ConnectKeyValue connectKeyValue = new ConnectKeyValue();
+ connectKeyValue.put("connect.topicname", "testTopic");
+ connectKeyValue.put("connector.class", "org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConnector");
+ memoryConfigManagementService.putConnectorConfig("testConnector", connectKeyValue);
+
+ final Map<String, ConnectKeyValue> configs = memoryConfigManagementService.getConnectorConfigs();
+ final ConnectKeyValue resultKeyValue = configs.get("testConnector");
+ Assert.assertEquals("testTopic", resultKeyValue.getString("connect.topicname"));
+ Assert.assertEquals("org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConnector", resultKeyValue.getString("connector.class"));
+ }
+
+ @Test
+ public void getConnectorConfigsIncludeDeletedTest() {
+ ConnectKeyValue connectKeyValue = new ConnectKeyValue();
+ connectKeyValue.put("connect.topicname", "testTopic");
+ connectKeyValue.put("connector.class", "org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConnector");
+ connectKeyValue.put("config-deleted", 1);
+ memoryConfigManagementService.putConnectorConfig("testConnector", connectKeyValue);
+
+ final Map<String, ConnectKeyValue> allData = memoryConfigManagementService.getConnectorConfigs();
+ final ConnectKeyValue resultKeyValue = allData.get("testConnector");
+ Assert.assertEquals("testTopic", resultKeyValue.getString("connect.topicname"));
+ Assert.assertEquals("org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConnector", resultKeyValue.getString("connector.class"));
+ }
+
+ @Test
+ public void removeConnectorConfigTest() {
+ ConnectKeyValue connectKeyValue = new ConnectKeyValue();
+ connectKeyValue.put("connect.topicname", "testTopic");
+ connectKeyValue.put("connector.class", "org.apache.rocketmq.connect.runtime.connectorwrapper.testimpl.TestConnector");
+ memoryConfigManagementService.putConnectorConfig("testConnector", connectKeyValue);
+ Assertions.assertThatCode(() -> memoryConfigManagementService.deleteConnectorConfig("testConnector")).doesNotThrowAnyException();
+ }
+
+}