You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2021/12/28 01:02:34 UTC
[rocketmq] branch develop updated: [ISSUE #3674] Improve the test cases of the tools module (#3672)
This is an automated email from the ASF dual-hosted git repository.
vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 8540da4 [ISSUE #3674] Improve the test cases of the tools module (#3672)
8540da4 is described below
commit 8540da4449fdffd65a85e305ad8d0428703216e0
Author: xijiu <42...@qq.com>
AuthorDate: Tue Dec 28 09:02:16 2021 +0800
[ISSUE #3674] Improve the test cases of the tools module (#3672)
* move the filtering logic to client
* fix error test
* fix all junit test except QueryMsgTraceByIdSubCommand
* fix some junit test exception
* Run through all test cases
* Code optimization
* extract the startNameServer, eliminate duplicate code
---
tools/pom.xml | 4 -
.../command/broker/BrokerStatusSubCommandTest.java | 68 +++------
.../broker/CleanExpiredCQSubCommandTest.java | 56 ++------
.../broker/CleanUnusedTopicCommandTest.java | 56 ++------
.../command/broker/GetBrokerConfigCommandTest.java | 73 ++++------
.../broker/UpdateBrokerConfigSubCommandTest.java | 52 ++-----
.../ConsumerConnectionSubCommandTest.java | 87 +++++-------
.../ProducerConnectionSubCommandTest.java | 80 +++++------
.../consumer/ConsumerProgressSubCommandTest.java | 103 +++++---------
.../consumer/ConsumerStatusSubCommandTest.java | 113 ++++-----------
.../consumer/GetConsumerConfigSubCommandTest.java | 104 ++++++++------
.../command/message/ConsumeMessageCommandTest.java | 43 ++++--
.../message/QueryMsgByUniqueKeySubCommandTest.java | 6 +-
.../message/QueryMsgTraceByIdSubCommandTest.java | 126 +++++++++++------
.../namesrv/AddWritePermSubCommandTest.java | 38 +++++
.../namesrv/GetNamesrvConfigCommandTest.java | 77 ++++-------
.../command/namesrv/UpdateKvConfigCommandTest.java | 56 +++-----
.../namesrv/WipeWritePermSubCommandTest.java | 82 +++++------
.../offset/GetConsumerStatusCommandTest.java | 70 ++++------
.../offset/ResetOffsetByTimeCommandTest.java | 88 ++++--------
.../tools/command/server/NameServerMocker.java | 67 +++++++++
.../tools/command/server/ServerResponseMocker.java | 153 +++++++++++++++++++++
22 files changed, 771 insertions(+), 831 deletions(-)
diff --git a/tools/pom.xml b/tools/pom.xml
index 3eda852..2bd2a8b 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -27,10 +27,6 @@
<artifactId>rocketmq-tools</artifactId>
<name>rocketmq-tools ${project.version}</name>
- <properties>
- <maven.test.skip>true</maven.test.skip>
- </properties>
-
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommandTest.java
index c850d71..ad2a76b 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/BrokerStatusSubCommandTest.java
@@ -16,75 +16,43 @@
*/
package org.apache.rocketmq.tools.command.broker;
-import java.lang.reflect.Field;
-import java.util.HashMap;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
-import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.impl.MQClientAPIImpl;
-import org.apache.rocketmq.client.impl.MQClientManager;
-import org.apache.rocketmq.client.impl.factory.MQClientInstance;
-import org.apache.rocketmq.common.protocol.body.KVTable;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
+import org.apache.rocketmq.common.protocol.body.BrokerStatsItem;
import org.apache.rocketmq.srvutil.ServerUtil;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.apache.rocketmq.tools.command.server.ServerResponseMocker;
import org.junit.Test;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+public class BrokerStatusSubCommandTest extends ServerResponseMocker {
-public class BrokerStatusSubCommandTest {
- private static DefaultMQAdminExt defaultMQAdminExt;
- private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
- private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
- private static MQClientAPIImpl mQClientAPIImpl;
+ private static final int PORT = 45678;
- @BeforeClass
- public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
- mQClientAPIImpl = mock(MQClientAPIImpl.class);
- defaultMQAdminExt = new DefaultMQAdminExt();
- defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
-
- Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
- field.setAccessible(true);
- field.set(defaultMQAdminExtImpl, mqClientInstance);
- field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
- field.setAccessible(true);
- field.set(mqClientInstance, mQClientAPIImpl);
- field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
- field.setAccessible(true);
- field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
-
- KVTable kvTable = new KVTable();
- kvTable.setTable(new HashMap<String, String>());
- when(mQClientAPIImpl.getBrokerRuntimeInfo(anyString(), anyLong())).thenReturn(kvTable);
+ @Override
+ protected int getPort() {
+ return PORT;
}
- @AfterClass
- public static void terminate() {
- defaultMQAdminExt.shutdown();
+ @Override
+ protected byte[] getBody() {
+ BrokerStatsData brokerStatsData = new BrokerStatsData();
+ BrokerStatsItem item = new BrokerStatsItem();
+ brokerStatsData.setStatsDay(item);
+ return brokerStatsData.encode();
}
- @Ignore
@Test
public void testExecute() throws SubCommandException {
BrokerStatusSubCommand cmd = new BrokerStatusSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
- String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster"};
+ String[] subargs = new String[] {"-b 127.0.0.1:" + PORT, "-c default-cluster"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
+
cmd.execute(commandLine, options, null);
}
+
+
}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommandTest.java
index 241ae88..a5c070b 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanExpiredCQSubCommandTest.java
@@ -16,69 +16,33 @@
*/
package org.apache.rocketmq.tools.command.broker;
-import java.lang.reflect.Field;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
-import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.impl.MQClientAPIImpl;
-import org.apache.rocketmq.client.impl.MQClientManager;
-import org.apache.rocketmq.client.impl.factory.MQClientInstance;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.srvutil.ServerUtil;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.apache.rocketmq.tools.command.server.ServerResponseMocker;
import org.junit.Test;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+public class CleanExpiredCQSubCommandTest extends ServerResponseMocker {
-public class CleanExpiredCQSubCommandTest {
- private static DefaultMQAdminExt defaultMQAdminExt;
- private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
- private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
- private static MQClientAPIImpl mQClientAPIImpl;
+ private static final int PORT = 45678;
- @BeforeClass
- public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
- mQClientAPIImpl = mock(MQClientAPIImpl.class);
- defaultMQAdminExt = new DefaultMQAdminExt();
- defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
-
- Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
- field.setAccessible(true);
- field.set(defaultMQAdminExtImpl, mqClientInstance);
- field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
- field.setAccessible(true);
- field.set(mqClientInstance, mQClientAPIImpl);
- field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
- field.setAccessible(true);
- field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
-
- when(mQClientAPIImpl.cleanExpiredConsumeQueue(anyString(), anyLong())).thenReturn(true);
+ @Override
+ protected int getPort() {
+ return PORT;
}
- @AfterClass
- public static void terminate() {
- defaultMQAdminExt.shutdown();
+ @Override
+ protected byte[] getBody() {
+ return null;
}
- @Ignore
@Test
public void testExecute() throws SubCommandException {
CleanExpiredCQSubCommand cmd = new CleanExpiredCQSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
- String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster"};
+ String[] subargs = new String[] {"-b 127.0.0.1:" + PORT, "-c default-cluster"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommandTest.java
index 759f783..b893a76 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/CleanUnusedTopicCommandTest.java
@@ -16,69 +16,33 @@
*/
package org.apache.rocketmq.tools.command.broker;
-import java.lang.reflect.Field;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
-import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.impl.MQClientAPIImpl;
-import org.apache.rocketmq.client.impl.MQClientManager;
-import org.apache.rocketmq.client.impl.factory.MQClientInstance;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.srvutil.ServerUtil;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.apache.rocketmq.tools.command.server.ServerResponseMocker;
import org.junit.Test;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+public class CleanUnusedTopicCommandTest extends ServerResponseMocker {
-public class CleanUnusedTopicCommandTest {
- private static DefaultMQAdminExt defaultMQAdminExt;
- private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
- private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
- private static MQClientAPIImpl mQClientAPIImpl;
+ private static final int PORT = 45678;
- @BeforeClass
- public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
- mQClientAPIImpl = mock(MQClientAPIImpl.class);
- defaultMQAdminExt = new DefaultMQAdminExt();
- defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
-
- Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
- field.setAccessible(true);
- field.set(defaultMQAdminExtImpl, mqClientInstance);
- field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
- field.setAccessible(true);
- field.set(mqClientInstance, mQClientAPIImpl);
- field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
- field.setAccessible(true);
- field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
-
- when(mQClientAPIImpl.cleanUnusedTopicByAddr(anyString(), anyLong())).thenReturn(true);
+ @Override
+ protected int getPort() {
+ return PORT;
}
- @AfterClass
- public static void terminate() {
- defaultMQAdminExt.shutdown();
+ @Override
+ protected byte[] getBody() {
+ return null;
}
- @Ignore
@Test
public void testExecute() throws SubCommandException {
CleanUnusedTopicCommand cmd = new CleanUnusedTopicCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
- String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster"};
+ String[] subargs = new String[] {"-b 127.0.0.1:" + PORT, "-c default-cluster"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommandTest.java
index 8bb4079..7673a78 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommandTest.java
@@ -16,75 +16,48 @@
*/
package org.apache.rocketmq.tools.command.broker;
-import java.io.UnsupportedEncodingException;
-import java.lang.reflect.Field;
-import java.util.Properties;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
-import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.impl.MQClientAPIImpl;
-import org.apache.rocketmq.client.impl.MQClientManager;
-import org.apache.rocketmq.client.impl.factory.MQClientInstance;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.srvutil.ServerUtil;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.apache.rocketmq.tools.command.server.ServerResponseMocker;
import org.junit.Test;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import java.io.UnsupportedEncodingException;
+import java.util.Properties;
-public class GetBrokerConfigCommandTest {
- private static DefaultMQAdminExt defaultMQAdminExt;
- private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
- private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
- private static MQClientAPIImpl mQClientAPIImpl;
- @BeforeClass
- public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
- mQClientAPIImpl = mock(MQClientAPIImpl.class);
- defaultMQAdminExt = new DefaultMQAdminExt();
- defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
+public class GetBrokerConfigCommandTest extends ServerResponseMocker {
- Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
- field.setAccessible(true);
- field.set(defaultMQAdminExtImpl, mqClientInstance);
- field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
- field.setAccessible(true);
- field.set(mqClientInstance, mQClientAPIImpl);
- field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
- field.setAccessible(true);
- field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+ private static final int PORT = 45678;
- Properties properties = new Properties();
- properties.setProperty("maxMessageSize", "5000000");
- properties.setProperty("flushDelayOffsetInterval", "15000");
- properties.setProperty("serverSocketRcvBufSize", "655350");
- when(mQClientAPIImpl.getBrokerConfig(anyString(), anyLong())).thenReturn(properties);
+ @Override
+ protected int getPort() {
+ return PORT;
}
- @AfterClass
- public static void terminate() {
- defaultMQAdminExt.shutdown();
+ @Override
+ protected byte[] getBody() {
+ StringBuilder sb = new StringBuilder();
+ Properties properties = new Properties();
+ properties.setProperty("stat", "123");
+ properties.setProperty("ip", "192.168.1.1");
+ properties.setProperty("broker_name", "broker_101");
+ sb.append(MixAll.properties2String(properties));
+ try {
+ return sb.toString().getBytes(MixAll.DEFAULT_CHARSET);
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
}
- @Ignore
@Test
public void testExecute() throws SubCommandException {
GetBrokerConfigCommand cmd = new GetBrokerConfigCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
- String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster"};
+ String[] subargs = new String[] {"-b 127.0.0.1:" + PORT, "-c default-cluster"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommandTest.java
index c74107e..120ab9e 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/broker/UpdateBrokerConfigSubCommandTest.java
@@ -16,65 +16,33 @@
*/
package org.apache.rocketmq.tools.command.broker;
-import java.io.UnsupportedEncodingException;
-import java.lang.reflect.Field;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
-import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.impl.MQClientAPIImpl;
-import org.apache.rocketmq.client.impl.MQClientManager;
-import org.apache.rocketmq.client.impl.factory.MQClientInstance;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.srvutil.ServerUtil;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.apache.rocketmq.tools.command.server.ServerResponseMocker;
import org.junit.Test;
-import static org.mockito.Mockito.mock;
+public class UpdateBrokerConfigSubCommandTest extends ServerResponseMocker {
-public class UpdateBrokerConfigSubCommandTest {
- private static DefaultMQAdminExt defaultMQAdminExt;
- private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
- private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
- private static MQClientAPIImpl mQClientAPIImpl;
+ private static final int PORT = 45678;
- @BeforeClass
- public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
- mQClientAPIImpl = mock(MQClientAPIImpl.class);
- defaultMQAdminExt = new DefaultMQAdminExt();
- defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
-
- Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
- field.setAccessible(true);
- field.set(defaultMQAdminExtImpl, mqClientInstance);
- field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
- field.setAccessible(true);
- field.set(mqClientInstance, mQClientAPIImpl);
- field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
- field.setAccessible(true);
- field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+ @Override
+ protected int getPort() {
+ return PORT;
}
- @AfterClass
- public static void terminate() {
- defaultMQAdminExt.shutdown();
+ @Override
+ protected byte[] getBody() {
+ return null;
}
- @Ignore
@Test
public void testExecute() throws SubCommandException {
UpdateBrokerConfigSubCommand cmd = new UpdateBrokerConfigSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
- String[] subargs = new String[] {"-b 127.0.0.1:10911", "-c default-cluster", "-k topicname", "-v unit_test"};
+ String[] subargs = new String[] {"-b 127.0.0.1:" + PORT, "-c default-cluster", "-k topicname", "-v unit_test"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommandTest.java
index 584943c..6ad311a 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ConsumerConnectionSubCommandTest.java
@@ -16,80 +16,45 @@
*/
package org.apache.rocketmq.tools.command.connection;
-import java.lang.reflect.Field;
-import java.util.HashSet;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
-import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.impl.MQClientAPIImpl;
-import org.apache.rocketmq.client.impl.MQClientManager;
-import org.apache.rocketmq.client.impl.factory.MQClientInstance;
-import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
-import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
-import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.srvutil.ServerUtil;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.apache.rocketmq.tools.command.server.NameServerMocker;
+import org.apache.rocketmq.tools.command.server.ServerResponseMocker;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
+import java.util.HashSet;
+
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
public class ConsumerConnectionSubCommandTest {
- private static DefaultMQAdminExt defaultMQAdminExt;
- private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
- private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
- private static MQClientAPIImpl mQClientAPIImpl;
- @BeforeClass
- public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
- mQClientAPIImpl = mock(MQClientAPIImpl.class);
- defaultMQAdminExt = new DefaultMQAdminExt();
- defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
+ private static final int NAME_SERVER_PORT = 45677;
- Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
- field.setAccessible(true);
- field.set(defaultMQAdminExtImpl, mqClientInstance);
- field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
- field.setAccessible(true);
- field.set(mqClientInstance, mQClientAPIImpl);
- field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
- field.setAccessible(true);
- field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+ private static final int BROKER_PORT = 45676;
- ConsumerConnection consumerConnection = new ConsumerConnection();
- consumerConnection.setConsumeType(ConsumeType.CONSUME_PASSIVELY);
- consumerConnection.setMessageModel(MessageModel.CLUSTERING);
- HashSet<Connection> connections = new HashSet<>();
- connections.add(new Connection());
- consumerConnection.setConnectionSet(connections);
- consumerConnection.setSubscriptionTable(new ConcurrentHashMap<String, SubscriptionData>());
- consumerConnection.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- when(mQClientAPIImpl.getConsumerConnectionList(anyString(), anyString(), anyLong())).thenReturn(consumerConnection);
+ private ServerResponseMocker brokerMocker;
+
+ private ServerResponseMocker nameServerMocker;
+
+ @Before
+ public void before() {
+ brokerMocker = startOneBroker();
+ nameServerMocker = NameServerMocker.startByDefaultConf(NAME_SERVER_PORT, BROKER_PORT);
}
- @AfterClass
- public static void terminate() {
- defaultMQAdminExt.shutdown();
+ @After
+ public void after() {
+ brokerMocker.shutdown();
+ nameServerMocker.shutdown();
}
- @Ignore
@Test
public void testExecute() throws SubCommandException {
ConsumerConnectionSubCommand cmd = new ConsumerConnectionSubCommand();
@@ -99,4 +64,16 @@ public class ConsumerConnectionSubCommandTest {
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
+
+ private ServerResponseMocker startOneBroker() {
+ ConsumerConnection consumerConnection = new ConsumerConnection();
+ HashSet<Connection> connectionSet = new HashSet<>();
+ Connection connection = mock(Connection.class);
+ connectionSet.add(connection);
+ consumerConnection.setConnectionSet(connectionSet);
+ // start broker
+ return ServerResponseMocker.startServer(BROKER_PORT, consumerConnection.encode());
+ }
+
+
}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommandTest.java
index 060ba93..4cab7cc 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/connection/ProducerConnectionSubCommandTest.java
@@ -16,74 +16,45 @@
*/
package org.apache.rocketmq.tools.command.connection;
-import java.lang.reflect.Field;
-import java.util.HashSet;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
-import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.impl.MQClientAPIImpl;
-import org.apache.rocketmq.client.impl.MQClientManager;
-import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.protocol.body.Connection;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.srvutil.ServerUtil;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.apache.rocketmq.tools.command.server.NameServerMocker;
+import org.apache.rocketmq.tools.command.server.ServerResponseMocker;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
+import java.util.HashSet;
+
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
public class ProducerConnectionSubCommandTest {
- private static DefaultMQAdminExt defaultMQAdminExt;
- private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
- private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
- private static MQClientAPIImpl mQClientAPIImpl;
- @BeforeClass
- public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
- mQClientAPIImpl = mock(MQClientAPIImpl.class);
- defaultMQAdminExt = new DefaultMQAdminExt();
- defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
+ private static final int NAME_SERVER_PORT = 45677;
- Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
- field.setAccessible(true);
- field.set(defaultMQAdminExtImpl, mqClientInstance);
- field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
- field.setAccessible(true);
- field.set(mqClientInstance, mQClientAPIImpl);
- field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
- field.setAccessible(true);
- field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+ private static final int BROKER_PORT = 45676;
- ProducerConnection producerConnection = new ProducerConnection();
- Connection connection = new Connection();
- connection.setClientAddr("127.0.0.1:9898");
- connection.setClientId("PID_12345");
- HashSet<Connection> connectionSet = new HashSet<>();
- connectionSet.add(connection);
- producerConnection.setConnectionSet(connectionSet);
- when(mQClientAPIImpl.getProducerConnectionList(anyString(), anyString(), anyLong())).thenReturn(producerConnection);
+ private ServerResponseMocker brokerMocker;
+
+ private ServerResponseMocker nameServerMocker;
+
+ @Before
+ public void before() {
+ brokerMocker = startOneBroker();
+ nameServerMocker = NameServerMocker.startByDefaultConf(NAME_SERVER_PORT, BROKER_PORT);
}
- @AfterClass
- public static void terminate() {
- defaultMQAdminExt.shutdown();
+ @After
+ public void after() {
+ brokerMocker.shutdown();
+ nameServerMocker.shutdown();
}
- @Ignore
@Test
public void testExecute() throws SubCommandException {
ProducerConnectionSubCommand cmd = new ProducerConnectionSubCommand();
@@ -93,4 +64,15 @@ public class ProducerConnectionSubCommandTest {
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
+
+ private ServerResponseMocker startOneBroker() {
+ ProducerConnection producerConnection = new ProducerConnection();
+ HashSet<Connection> connectionSet = new HashSet<>();
+ Connection connection = mock(Connection.class);
+ connectionSet.add(connection);
+ producerConnection.setConnectionSet(connectionSet);
+
+ // start broker
+ return ServerResponseMocker.startServer(BROKER_PORT, producerConnection.encode());
+ }
}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommandTest.java
index 19d903c..b234ef2 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommandTest.java
@@ -16,92 +16,44 @@
*/
package org.apache.rocketmq.tools.command.consumer;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
-import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.impl.MQClientAPIImpl;
-import org.apache.rocketmq.client.impl.MQClientManager;
-import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.OffsetWrapper;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.QueueData;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.apache.rocketmq.tools.command.server.NameServerMocker;
+import org.apache.rocketmq.tools.command.server.ServerResponseMocker;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import java.util.HashMap;
public class ConsumerProgressSubCommandTest {
- private static DefaultMQAdminExt defaultMQAdminExt;
- private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
- private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
- private static MQClientAPIImpl mQClientAPIImpl;
- @BeforeClass
- public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingException, MQClientException, MQBrokerException {
- mQClientAPIImpl = mock(MQClientAPIImpl.class);
- defaultMQAdminExt = new DefaultMQAdminExt();
- defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
+ private static final int NAME_SERVER_PORT = 45677;
- Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
- field.setAccessible(true);
- field.set(defaultMQAdminExtImpl, mqClientInstance);
- field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
- field.setAccessible(true);
- field.set(mqClientInstance, mQClientAPIImpl);
- field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
- field.setAccessible(true);
- field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+ private static final int BROKER_PORT = 45676;
- TopicRouteData topicRouteData = new TopicRouteData();
- List<BrokerData> brokerDatas = new ArrayList<>();
- HashMap<Long, String> brokerAddrs = new HashMap<>();
- brokerAddrs.put(1234l, "127.0.0.1:10911");
- BrokerData brokerData = new BrokerData();
- brokerData.setCluster("default-cluster");
- brokerData.setBrokerName("default-broker");
- brokerData.setBrokerAddrs(brokerAddrs);
- brokerDatas.add(brokerData);
- topicRouteData.setBrokerDatas(brokerDatas);
- topicRouteData.setQueueDatas(new ArrayList<QueueData>());
- topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
- when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData);
+ private ServerResponseMocker brokerMocker;
- ConsumeStats consumeStats = new ConsumeStats();
- consumeStats.setConsumeTps(1234);
- MessageQueue messageQueue = new MessageQueue();
- OffsetWrapper offsetWrapper = new OffsetWrapper();
- HashMap<MessageQueue, OffsetWrapper> stats = new HashMap<>();
- stats.put(messageQueue, offsetWrapper);
- consumeStats.setOffsetTable(stats);
- when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), anyString(), anyLong())).thenReturn(consumeStats);
+ private ServerResponseMocker nameServerMocker;
+
+ @Before
+ public void before() {
+ brokerMocker = startOneBroker();
+ nameServerMocker = NameServerMocker.startByDefaultConf(NAME_SERVER_PORT, BROKER_PORT);
}
- @AfterClass
- public static void terminate() {
- defaultMQAdminExt.shutdown();
+ @After
+ public void after() {
+ brokerMocker.shutdown();
+ nameServerMocker.shutdown();
}
- @Ignore
@Test
public void testExecute() throws SubCommandException {
ConsumerProgressSubCommand cmd = new ConsumerProgressSubCommand();
@@ -111,4 +63,23 @@ public class ConsumerProgressSubCommandTest {
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
+
+ private ServerResponseMocker startOneBroker() {
+ ConsumeStats consumeStats = new ConsumeStats();
+ HashMap<MessageQueue, OffsetWrapper> offsetTable = new HashMap<>();
+ MessageQueue messageQueue = new MessageQueue();
+ messageQueue.setBrokerName("mockBrokerName");
+ messageQueue.setQueueId(1);
+ messageQueue.setBrokerName("mockTopicName");
+
+ OffsetWrapper offsetWrapper = new OffsetWrapper();
+ offsetWrapper.setBrokerOffset(1);
+ offsetWrapper.setConsumerOffset(1);
+ offsetWrapper.setLastTimestamp(System.currentTimeMillis());
+
+ offsetTable.put(messageQueue, offsetWrapper);
+ consumeStats.setOffsetTable(offsetTable);
+ // start broker
+ return ServerResponseMocker.startServer(BROKER_PORT, consumeStats.encode());
+ }
}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommandTest.java
index 7f44af8..82baaeb 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/ConsumerStatusSubCommandTest.java
@@ -16,112 +16,45 @@
*/
package org.apache.rocketmq.tools.command.consumer;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
-import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.impl.MQClientAPIImpl;
-import org.apache.rocketmq.client.impl.MQClientManager;
-import org.apache.rocketmq.client.impl.factory.MQClientInstance;
-import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.Connection;
-import org.apache.rocketmq.common.protocol.body.ConsumeStatus;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
-import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
-import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
-import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
-import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.QueueData;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.apache.rocketmq.tools.command.server.NameServerMocker;
+import org.apache.rocketmq.tools.command.server.ServerResponseMocker;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
+import java.util.HashSet;
+
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
public class ConsumerStatusSubCommandTest {
- private static DefaultMQAdminExt defaultMQAdminExt;
- private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
- private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
- private static MQClientAPIImpl mQClientAPIImpl;
- @BeforeClass
- public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingException, MQClientException, MQBrokerException {
- mQClientAPIImpl = mock(MQClientAPIImpl.class);
- defaultMQAdminExt = new DefaultMQAdminExt();
- defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
+ private static final int NAME_SERVER_PORT = 45677;
- Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
- field.setAccessible(true);
- field.set(defaultMQAdminExtImpl, mqClientInstance);
- field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
- field.setAccessible(true);
- field.set(mqClientInstance, mQClientAPIImpl);
- field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
- field.setAccessible(true);
- field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+ private static final int BROKER_PORT = 45676;
- TopicRouteData topicRouteData = new TopicRouteData();
- List<BrokerData> brokerDatas = new ArrayList<>();
- HashMap<Long, String> brokerAddrs = new HashMap<>();
- brokerAddrs.put(1234l, "127.0.0.1:10911");
- BrokerData brokerData = new BrokerData();
- brokerData.setCluster("default-cluster");
- brokerData.setBrokerName("default-broker");
- brokerData.setBrokerAddrs(brokerAddrs);
- brokerDatas.add(brokerData);
- topicRouteData.setBrokerDatas(brokerDatas);
- topicRouteData.setQueueDatas(new ArrayList<QueueData>());
- topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
- when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData);
+ private ServerResponseMocker brokerMocker;
- ConsumerConnection consumerConnection = new ConsumerConnection();
- consumerConnection.setConsumeType(ConsumeType.CONSUME_PASSIVELY);
- consumerConnection.setMessageModel(MessageModel.CLUSTERING);
- HashSet<Connection> connections = new HashSet<>();
- connections.add(new Connection());
- consumerConnection.setConnectionSet(connections);
- consumerConnection.setSubscriptionTable(new ConcurrentHashMap<String, SubscriptionData>());
- consumerConnection.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- when(mQClientAPIImpl.getConsumerConnectionList(anyString(), anyString(), anyLong())).thenReturn(consumerConnection);
+ private ServerResponseMocker nameServerMocker;
- ConsumerRunningInfo consumerRunningInfo = new ConsumerRunningInfo();
- consumerRunningInfo.setJstack("test");
- consumerRunningInfo.setMqTable(new TreeMap<MessageQueue, ProcessQueueInfo>());
- consumerRunningInfo.setStatusTable(new TreeMap<String, ConsumeStatus>());
- consumerRunningInfo.setSubscriptionSet(new TreeSet<SubscriptionData>());
- when(mQClientAPIImpl.getConsumerRunningInfo(anyString(), anyString(), anyString(), anyBoolean(), anyLong())).thenReturn(consumerRunningInfo);
+ @Before
+ public void before() {
+ brokerMocker = startOneBroker();
+ nameServerMocker = NameServerMocker.startByDefaultConf(NAME_SERVER_PORT, BROKER_PORT);
}
- @AfterClass
- public static void terminate() {
- defaultMQAdminExt.shutdown();
+ @After
+ public void after() {
+ brokerMocker.shutdown();
+ nameServerMocker.shutdown();
}
- @Ignore
@Test
public void testExecute() throws SubCommandException {
ConsumerStatusSubCommand cmd = new ConsumerStatusSubCommand();
@@ -131,4 +64,14 @@ public class ConsumerStatusSubCommandTest {
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
+
+ private ServerResponseMocker startOneBroker() {
+ ConsumerConnection consumerConnection = new ConsumerConnection();
+ HashSet<Connection> connectionSet = new HashSet<>();
+ Connection connection = mock(Connection.class);
+ connectionSet.add(connection);
+ consumerConnection.setConnectionSet(connectionSet);
+ // start broker
+ return ServerResponseMocker.startServer(BROKER_PORT, consumerConnection.encode());
+ }
}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommandTest.java
index 1ec68ff..c21df3d 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/consumer/GetConsumerConfigSubCommandTest.java
@@ -16,63 +16,51 @@
*/
package org.apache.rocketmq.tools.command.consumer;
-import java.io.UnsupportedEncodingException;
-import java.lang.reflect.Field;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
-import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.impl.MQClientAPIImpl;
-import org.apache.rocketmq.client.impl.MQClientManager;
-import org.apache.rocketmq.client.impl.factory.MQClientInstance;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.Connection;
+import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.srvutil.ServerUtil;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.apache.rocketmq.tools.command.server.ServerResponseMocker;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+
import static org.mockito.Mockito.mock;
public class GetConsumerConfigSubCommandTest {
- private static DefaultMQAdminExt defaultMQAdminExt;
- private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
- private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
- private static MQClientAPIImpl mQClientAPIImpl;
-
- @BeforeClass
- public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
- mQClientAPIImpl = mock(MQClientAPIImpl.class);
- defaultMQAdminExt = new DefaultMQAdminExt();
- defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
-
- Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
- field.setAccessible(true);
- field.set(defaultMQAdminExtImpl, mqClientInstance);
- field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
- field.setAccessible(true);
- field.set(mqClientInstance, mQClientAPIImpl);
- field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
- field.setAccessible(true);
- field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+
+ private static final int NAME_SERVER_PORT = 45677;
+
+ private static final int BROKER_PORT = 45676;
+
+ private ServerResponseMocker brokerMocker;
+
+ private ServerResponseMocker nameServerMocker;
+
+ @Before
+ public void before() {
+ brokerMocker = startOneBroker();
+ nameServerMocker = startNameServer();
}
- @AfterClass
- public static void terminate() {
- defaultMQAdminExt.shutdown();
+ @After
+ public void after() {
+ brokerMocker.shutdown();
+ nameServerMocker.shutdown();
}
- @Ignore
@Test
public void testExecute() throws SubCommandException {
- System.setProperty("rocketmq.namesrv.addr", "127.0.0.1:9876");
GetConsumerConfigSubCommand cmd = new GetConsumerConfigSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-g group_test"};
@@ -80,4 +68,38 @@ public class GetConsumerConfigSubCommandTest {
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
+
+ private ServerResponseMocker startNameServer() {
+ System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:" + NAME_SERVER_PORT);
+ ClusterInfo clusterInfo = new ClusterInfo();
+
+ HashMap<String, BrokerData> brokerAddressTable = new HashMap<>();
+ BrokerData brokerData = new BrokerData();
+ brokerData.setBrokerName("mockBrokerName");
+ HashMap<Long, String> brokerAddress = new HashMap<>();
+ brokerAddress.put(1L, "127.0.0.1:" + BROKER_PORT);
+ brokerData.setBrokerAddrs(brokerAddress);
+ brokerData.setCluster("mockCluster");
+ brokerAddressTable.put("mockBrokerName", brokerData);
+ clusterInfo.setBrokerAddrTable(brokerAddressTable);
+
+ HashMap<String, Set<String>> clusterAddressTable = new HashMap<>();
+ Set<String> brokerNames = new HashSet<>();
+ brokerNames.add("mockBrokerName");
+ clusterAddressTable.put("mockCluster", brokerNames);
+ clusterInfo.setClusterAddrTable(clusterAddressTable);
+
+ // start name server
+ return ServerResponseMocker.startServer(NAME_SERVER_PORT, clusterInfo.encode());
+ }
+
+ private ServerResponseMocker startOneBroker() {
+ ConsumerConnection consumerConnection = new ConsumerConnection();
+ HashSet<Connection> connectionSet = new HashSet<>();
+ Connection connection = mock(Connection.class);
+ connectionSet.add(connection);
+ consumerConnection.setConnectionSet(connectionSet);
+ // start broker
+ return ServerResponseMocker.startServer(BROKER_PORT, consumerConnection.encode());
+ }
}
\ No newline at end of file
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
index 1154395..98621e6 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
@@ -51,18 +51,24 @@ import static org.mockito.Mockito.when;
public class ConsumeMessageCommandTest {
private static ConsumeMessageCommand consumeMessageCommand;
+ private static PullResult PULL_RESULT = mockPullResult();
+
+ private static PullResult mockPullResult() {
+ MessageExt msg = new MessageExt();
+ msg.setBody(new byte[] {'a'});
+ List<MessageExt> msgFoundList = new ArrayList<>();
+ msgFoundList.add(msg);
+ return new PullResult(PullStatus.FOUND, 2, 0, 1, msgFoundList);
+ }
+
+
@BeforeClass
public static void init() throws MQClientException, RemotingException, MQBrokerException, InterruptedException,
NoSuchFieldException, IllegalAccessException {
consumeMessageCommand = new ConsumeMessageCommand();
DefaultMQPullConsumer defaultMQPullConsumer = mock(DefaultMQPullConsumer.class);
- MessageExt msg = new MessageExt();
- msg.setBody(new byte[] {'a'});
- List<MessageExt> msgFoundList = new ArrayList<>();
- msgFoundList.add(msg);
- final PullResult pullResult = new PullResult(PullStatus.FOUND, 2, 0, 1, msgFoundList);
- when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenReturn(pullResult);
+ assignPullResult(defaultMQPullConsumer);
when(defaultMQPullConsumer.minOffset(any(MessageQueue.class))).thenReturn(Long.valueOf(0));
when(defaultMQPullConsumer.maxOffset(any(MessageQueue.class))).thenReturn(Long.valueOf(1));
@@ -79,6 +85,25 @@ public class ConsumeMessageCommandTest {
public static void terminate() {
}
+ private static void assignPullResult() {
+ assignPullResult(null);
+ }
+
+ private static void assignPullResult(DefaultMQPullConsumer defaultMQPullConsumer) {
+ try {
+ if (defaultMQPullConsumer == null) {
+ Field producerField = ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer");
+ producerField.setAccessible(true);
+ defaultMQPullConsumer = (DefaultMQPullConsumer) producerField.get(consumeMessageCommand);
+ }
+ when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt()))
+ .thenReturn(PULL_RESULT);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
@Test
public void testExecuteDefault() throws SubCommandException {
PrintStream out = System.out;
@@ -86,6 +111,7 @@ public class ConsumeMessageCommandTest {
System.setOut(new PrintStream(bos));
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {"-t mytopic", "-n localhost:9876"};
+ assignPullResult();
CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + consumeMessageCommand.commandName(),
subargs, consumeMessageCommand.buildCommandlineOptions(options), new PosixParser());
consumeMessageCommand.execute(commandLine, options, null);
@@ -104,6 +130,7 @@ public class ConsumeMessageCommandTest {
String[] subargs = new String[] {"-t mytopic", "-b localhost", "-i 0", "-n localhost:9876"};
CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + consumeMessageCommand.commandName(), subargs, consumeMessageCommand.buildCommandlineOptions(options), new PosixParser());
+ assignPullResult();
consumeMessageCommand.execute(commandLine, options, null);
System.setOut(out);
String s = new String(bos.toByteArray());
@@ -113,7 +140,7 @@ public class ConsumeMessageCommandTest {
@Test
public void testExecuteDefaultWhenPullMessageByQueueGotException() throws SubCommandException, InterruptedException, RemotingException, MQClientException, MQBrokerException, NoSuchFieldException, IllegalAccessException {
DefaultMQPullConsumer defaultMQPullConsumer = mock(DefaultMQPullConsumer.class);
- when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenThrow(Exception.class);
+ when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenThrow(MQClientException.class);
Field producerField = ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer");
producerField.setAccessible(true);
producerField.set(consumeMessageCommand, defaultMQPullConsumer);
@@ -135,7 +162,7 @@ public class ConsumeMessageCommandTest {
@Test
public void testExecuteByConditionWhenPullMessageByQueueGotException() throws IllegalAccessException, InterruptedException, RemotingException, MQClientException, MQBrokerException, NoSuchFieldException, SubCommandException {
DefaultMQPullConsumer defaultMQPullConsumer = mock(DefaultMQPullConsumer.class);
- when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenThrow(Exception.class);
+ when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), anyLong(), anyInt())).thenThrow(MQClientException.class);
Field producerField = ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer");
producerField.setAccessible(true);
producerField.set(consumeMessageCommand, defaultMQPullConsumer);
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java
index 504b465..e21a66f 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommandTest.java
@@ -20,6 +20,7 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQAdminImpl;
@@ -44,6 +45,7 @@ import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
+import org.assertj.core.util.Lists;
import org.junit.Before;
import org.junit.Test;
@@ -114,6 +116,9 @@ public class QueryMsgByUniqueKeySubCommandTest {
when(mQAdminImpl.queryMessageByUniqKey(anyString(), anyString())).thenReturn(retMsgExt);
+ QueryResult queryResult = new QueryResult(0, Lists.newArrayList(retMsgExt));
+ when(defaultMQAdminExtImpl.queryMessageByUniqKey(anyString(), anyString(), anyInt(), anyLong(), anyLong())).thenReturn(queryResult);
+
TopicRouteData topicRouteData = new TopicRouteData();
List<BrokerData> brokerDataList = new ArrayList<BrokerData>();
BrokerData brokerData = new BrokerData();
@@ -231,7 +236,6 @@ public class QueryMsgByUniqueKeySubCommandTest {
System.setProperty("rocketmq.namesrv.addr", "127.0.0.1:9876");
- QueryMsgByUniqueKeySubCommand cmd = new QueryMsgByUniqueKeySubCommand();
String[] args = new String[]{"-t myTopicTest", "-i 0A3A54F7BF7D18B4AAC28A3FA2CF0000"};
Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin ", args, cmd.buildCommandlineOptions(options), new PosixParser());
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgTraceByIdSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgTraceByIdSubCommandTest.java
index f61c71d..f22acde 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgTraceByIdSubCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/message/QueryMsgTraceByIdSubCommandTest.java
@@ -19,66 +19,104 @@ package org.apache.rocketmq.tools.command.message;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
-import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.impl.MQClientAPIImpl;
-import org.apache.rocketmq.client.impl.MQClientManager;
-import org.apache.rocketmq.client.impl.factory.MQClientInstance;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.srvutil.ServerUtil;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.apache.rocketmq.tools.command.server.ServerResponseMocker;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
-import java.io.UnsupportedEncodingException;
-import java.lang.reflect.Field;
-
-import static org.mockito.Mockito.mock;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
public class QueryMsgTraceByIdSubCommandTest {
- private static DefaultMQAdminExt defaultMQAdminExt;
- private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
- private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
- private static MQClientAPIImpl mQClientAPIImpl;
-
- @BeforeClass
- public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
- mQClientAPIImpl = mock(MQClientAPIImpl.class);
- defaultMQAdminExt = new DefaultMQAdminExt();
- defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
-
- Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
- field.setAccessible(true);
- field.set(defaultMQAdminExtImpl, mqClientInstance);
- field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
- field.setAccessible(true);
- field.set(mqClientInstance, mQClientAPIImpl);
- field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
- field.setAccessible(true);
- field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+
+ private static final int NAME_SERVER_PORT = 45677;
+
+ private static final int BROKER_PORT = 45676;
+
+ private ServerResponseMocker brokerMocker;
+
+ private ServerResponseMocker nameServerMocker;
+
+ private static final String MSG_ID = "AC1FF54E81C418B4AAC24F92E1E00000";
+
+ @Before
+ public void before() {
+ brokerMocker = startOneBroker();
+ nameServerMocker = startNameServer();
}
- @AfterClass
- public static void terminate() {
- defaultMQAdminExt.shutdown();
+ @After
+ public void after() {
+ brokerMocker.shutdown();
+ nameServerMocker.shutdown();
}
- @Ignore
@Test
public void testExecute() throws SubCommandException {
- System.setProperty("rocketmq.namesrv.addr", "127.0.0.1:9876");
QueryMsgTraceByIdSubCommand cmd = new QueryMsgTraceByIdSubCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
- String[] subargs = new String[] {"-i AC1FF54E81C418B4AAC24F92E1E00000"};
+ String[] subargs = new String[] {"-i " + MSG_ID};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
+
+ private ServerResponseMocker startNameServer() {
+ int nameServerPort = NAME_SERVER_PORT;
+ System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:" + nameServerPort);
+ TopicRouteData topicRouteData = new TopicRouteData();
+ List<BrokerData> dataList = new ArrayList<>();
+ HashMap<Long, String> brokerAddress = new HashMap<>();
+ brokerAddress.put(1L, "127.0.0.1:" + BROKER_PORT);
+ BrokerData brokerData = new BrokerData("mockCluster", "mockBrokerName", brokerAddress);
+ brokerData.setBrokerName("mockBrokerName");
+ dataList.add(brokerData);
+ topicRouteData.setBrokerDatas(dataList);
+
+ List<QueueData> queueDatas = new ArrayList<>();
+ QueueData queueData = new QueueData();
+ queueData.setBrokerName("mockBrokerName");
+ queueData.setPerm(1);
+ queueData.setReadQueueNums(1);
+ queueData.setTopicSysFlag(1);
+ queueData.setWriteQueueNums(1);
+ queueDatas.add(queueData);
+ topicRouteData.setQueueDatas(queueDatas);
+
+ return ServerResponseMocker.startServer(nameServerPort, topicRouteData.encode());
+ }
+
+ private ServerResponseMocker startOneBroker() {
+ try {
+ MessageExt messageExt = new MessageExt();
+ messageExt.setTopic(TopicValidator.RMQ_SYS_TRACE_TOPIC);
+ messageExt.setBody(new byte[100]);
+ // topic RMQ_SYS_TRACE_TOPIC which built-in rocketMQ, set msg id as msg key
+ messageExt.setKeys(MSG_ID);
+ messageExt.setBornHost(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0));
+ messageExt.setStoreHost(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0));
+ byte[] body = MessageDecoder.encode(messageExt, false);
+
+ HashMap<String, String> extMap = new HashMap<>();
+ extMap.put("indexLastUpdateTimestamp", String.valueOf(System.currentTimeMillis()));
+ extMap.put("indexLastUpdatePhyoffset", String.valueOf(System.currentTimeMillis()));
+ // start broker
+ return ServerResponseMocker.startServer(BROKER_PORT, body, extMap);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
}
\ No newline at end of file
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommandTest.java
index 901b8bb..c65a6c3 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/AddWritePermSubCommandTest.java
@@ -21,10 +21,36 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.command.SubCommandException;
+import org.apache.rocketmq.tools.command.server.NameServerMocker;
+import org.apache.rocketmq.tools.command.server.ServerResponseMocker;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
+import java.util.HashMap;
+
public class AddWritePermSubCommandTest {
+ private static final int NAME_SERVER_PORT = 45677;
+
+ private static final int BROKER_PORT = 45676;
+
+ private ServerResponseMocker brokerMocker;
+
+ private ServerResponseMocker nameServerMocker;
+
+ @Before
+ public void before() {
+ brokerMocker = startOneBroker();
+ nameServerMocker = startNameServer();
+ }
+
+ @After
+ public void after() {
+ brokerMocker.shutdown();
+ nameServerMocker.shutdown();
+ }
+
@Test
public void testExecute() throws SubCommandException {
AddWritePermSubCommand cmd = new AddWritePermSubCommand();
@@ -34,4 +60,16 @@ public class AddWritePermSubCommandTest {
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
+
+ private ServerResponseMocker startNameServer() {
+ HashMap<String, String> extMap = new HashMap<>();
+ extMap.put("addTopicCount", "1");
+ // start name server
+ return NameServerMocker.startByDefaultConf(NAME_SERVER_PORT, BROKER_PORT, extMap);
+ }
+
+ private ServerResponseMocker startOneBroker() {
+ // start broker
+ return ServerResponseMocker.startServer(BROKER_PORT, null);
+ }
}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommandTest.java
index dde80eb..e94aba9 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommandTest.java
@@ -16,78 +16,51 @@
*/
package org.apache.rocketmq.tools.command.namesrv;
-import java.io.UnsupportedEncodingException;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
-import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.impl.MQClientAPIImpl;
-import org.apache.rocketmq.client.impl.MQClientManager;
-import org.apache.rocketmq.client.impl.factory.MQClientInstance;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.srvutil.ServerUtil;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
-import org.apache.rocketmq.tools.command.SubCommandException;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.apache.rocketmq.tools.command.server.NameServerMocker;
+import org.apache.rocketmq.tools.command.server.ServerResponseMocker;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
-import org.mockito.ArgumentMatchers;
-
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
public class GetNamesrvConfigCommandTest {
- private static DefaultMQAdminExt defaultMQAdminExt;
- private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
- private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
- private static MQClientAPIImpl mQClientAPIImpl;
- @BeforeClass
- public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
- mQClientAPIImpl = mock(MQClientAPIImpl.class);
- defaultMQAdminExt = new DefaultMQAdminExt();
- defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
+ private static final int NAME_SERVER_PORT = 45677;
+
+ private static final int BROKER_PORT = 45676;
+
+ private ServerResponseMocker brokerMocker;
- Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
- field.setAccessible(true);
- field.set(defaultMQAdminExtImpl, mqClientInstance);
- field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
- field.setAccessible(true);
- field.set(mqClientInstance, mQClientAPIImpl);
- field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
- field.setAccessible(true);
- field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+ private ServerResponseMocker nameServerMocker;
- Map<String, Properties> propertiesMap = new HashMap<>();
- List<String> nameServers = new ArrayList<>();
- when(mQClientAPIImpl.getNameServerConfig(ArgumentMatchers.<String>anyList(), anyLong())).thenReturn(propertiesMap);
+ @Before
+ public void before() {
+ brokerMocker = startOneBroker();
+ nameServerMocker = NameServerMocker.startByDefaultConf(NAME_SERVER_PORT, BROKER_PORT);
}
- @AfterClass
- public static void terminate() {
- defaultMQAdminExt.shutdown();
+ @After
+ public void after() {
+ brokerMocker.shutdown();
+ nameServerMocker.shutdown();
}
- // @Ignore
@Test
- public void testExecute() throws SubCommandException {
+ public void testExecute() throws Exception {
GetNamesrvConfigCommand cmd = new GetNamesrvConfigCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[] {};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
+
cmd.execute(commandLine, options, null);
}
+
+ private ServerResponseMocker startOneBroker() {
+ // start broker
+ return ServerResponseMocker.startServer(BROKER_PORT, null);
+ }
}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/UpdateKvConfigCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/UpdateKvConfigCommandTest.java
index c4edcaf..c7480d1 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/UpdateKvConfigCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/UpdateKvConfigCommandTest.java
@@ -19,52 +19,35 @@ package org.apache.rocketmq.tools.command.namesrv;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
-import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.impl.MQClientAPIImpl;
-import org.apache.rocketmq.client.impl.MQClientManager;
-import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.srvutil.ServerUtil;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.apache.rocketmq.tools.command.server.NameServerMocker;
+import org.apache.rocketmq.tools.command.server.ServerResponseMocker;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
-import java.lang.reflect.Field;
+public class UpdateKvConfigCommandTest {
+ private static final int NAME_SERVER_PORT = 45677;
-import static org.mockito.Mockito.mock;
+ private static final int BROKER_PORT = 45676;
-public class UpdateKvConfigCommandTest {
- private static DefaultMQAdminExt defaultMQAdminExt;
- private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
- private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
- private static MQClientAPIImpl mQClientAPIImpl;
+ private ServerResponseMocker brokerMocker;
- @BeforeClass
- public static void init() throws NoSuchFieldException, IllegalAccessException {
- mQClientAPIImpl = mock(MQClientAPIImpl.class);
- defaultMQAdminExt = new DefaultMQAdminExt();
- defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
+ private ServerResponseMocker nameServerMocker;
- Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
- field.setAccessible(true);
- field.set(defaultMQAdminExtImpl, mqClientInstance);
- field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
- field.setAccessible(true);
- field.set(mqClientInstance, mQClientAPIImpl);
- field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
- field.setAccessible(true);
- field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+ @Before
+ public void before() {
+ brokerMocker = startOneBroker();
+ nameServerMocker = NameServerMocker.startByDefaultConf(NAME_SERVER_PORT, BROKER_PORT);
}
- @AfterClass
- public static void terminate() {
- defaultMQAdminExt.shutdown();
+ @After
+ public void after() {
+ brokerMocker.shutdown();
+ nameServerMocker.shutdown();
}
- @Ignore
@Test
public void testExecute() throws SubCommandException {
UpdateKvConfigCommand cmd = new UpdateKvConfigCommand();
@@ -74,4 +57,9 @@ public class UpdateKvConfigCommandTest {
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName() + cmd.commandDesc(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
+
+ private ServerResponseMocker startOneBroker() {
+ // start broker
+ return ServerResponseMocker.startServer(BROKER_PORT, null);
+ }
}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommandTest.java
index 9befdf8..b065c31 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/namesrv/WipeWritePermSubCommandTest.java
@@ -16,71 +16,41 @@
*/
package org.apache.rocketmq.tools.command.namesrv;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
-import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.impl.MQClientAPIImpl;
-import org.apache.rocketmq.client.impl.MQClientManager;
-import org.apache.rocketmq.client.impl.factory.MQClientInstance;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.srvutil.ServerUtil;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.apache.rocketmq.tools.command.server.NameServerMocker;
+import org.apache.rocketmq.tools.command.server.ServerResponseMocker;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import java.util.HashMap;
public class WipeWritePermSubCommandTest {
- private static DefaultMQAdminExt defaultMQAdminExt;
- private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
- private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
- private static MQClientAPIImpl mQClientAPIImpl;
- @BeforeClass
- public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException, MQBrokerException, RemotingCommandException {
- mQClientAPIImpl = mock(MQClientAPIImpl.class);
- defaultMQAdminExt = new DefaultMQAdminExt();
- defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
+ private static final int NAME_SERVER_PORT = 45677;
- Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
- field.setAccessible(true);
- field.set(defaultMQAdminExtImpl, mqClientInstance);
- field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
- field.setAccessible(true);
- field.set(mqClientInstance, mQClientAPIImpl);
- field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
- field.setAccessible(true);
- field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+ private static final int BROKER_PORT = 45676;
- List<String> result = new ArrayList<>();
- result.add("default-name-one");
- result.add("default-name-two");
- when(mqClientInstance.getMQClientAPIImpl().getNameServerAddressList()).thenReturn(result);
- when(mQClientAPIImpl.wipeWritePermOfBroker(anyString(), anyString(), anyLong())).thenReturn(6);
+ private ServerResponseMocker brokerMocker;
+
+ private ServerResponseMocker nameServerMocker;
+
+ @Before
+ public void before() {
+ brokerMocker = startOneBroker();
+ nameServerMocker = startNameServer();
}
- @AfterClass
- public static void terminate() {
- defaultMQAdminExt.shutdown();
+ @After
+ public void after() {
+ brokerMocker.shutdown();
+ nameServerMocker.shutdown();
}
- @Ignore
@Test
public void testExecute() throws SubCommandException {
WipeWritePermSubCommand cmd = new WipeWritePermSubCommand();
@@ -90,4 +60,18 @@ public class WipeWritePermSubCommandTest {
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
+
+ private ServerResponseMocker startNameServer() {
+ HashMap<String, String> extMap = new HashMap<>();
+ extMap.put("wipeTopicCount", "1");
+ // start name server
+ return NameServerMocker.startByDefaultConf(NAME_SERVER_PORT, BROKER_PORT, extMap);
+ }
+
+ private ServerResponseMocker startOneBroker() {
+ // start broker
+ HashMap<String, String> extMap = new HashMap<>();
+ extMap.put("wipeTopicCount", "1");
+ return ServerResponseMocker.startServer(BROKER_PORT, new byte[0], extMap);
+ }
}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommandTest.java
index a01bf81..aece90f 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/GetConsumerStatusCommandTest.java
@@ -16,66 +16,40 @@
*/
package org.apache.rocketmq.tools.command.offset;
-import java.lang.reflect.Field;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
-import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.impl.MQClientAPIImpl;
-import org.apache.rocketmq.client.impl.MQClientManager;
-import org.apache.rocketmq.client.impl.factory.MQClientInstance;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.common.protocol.body.GetConsumerStatusBody;
import org.apache.rocketmq.srvutil.ServerUtil;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.apache.rocketmq.tools.command.server.NameServerMocker;
+import org.apache.rocketmq.tools.command.server.ServerResponseMocker;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
public class GetConsumerStatusCommandTest {
- private static DefaultMQAdminExt defaultMQAdminExt;
- private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
- private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
- private static MQClientAPIImpl mQClientAPIImpl;
- @BeforeClass
- public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingException, MQClientException, MQBrokerException {
- mQClientAPIImpl = mock(MQClientAPIImpl.class);
- defaultMQAdminExt = new DefaultMQAdminExt();
- defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
+ private static final int NAME_SERVER_PORT = 45677;
+
+ private static final int BROKER_PORT = 45676;
+
+ private ServerResponseMocker brokerMocker;
- Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
- field.setAccessible(true);
- field.set(defaultMQAdminExtImpl, mqClientInstance);
- field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
- field.setAccessible(true);
- field.set(mqClientInstance, mQClientAPIImpl);
- field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
- field.setAccessible(true);
- field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+ private ServerResponseMocker nameServerMocker;
- Map<String, Map<MessageQueue, Long>> consumerStatus = new HashMap<>();
- when(mQClientAPIImpl.invokeBrokerToGetConsumerStatus(anyString(), anyString(), anyString(), anyString(), anyLong())).thenReturn(consumerStatus);
+ @Before
+ public void before() {
+ brokerMocker = startOneBroker();
+ nameServerMocker = NameServerMocker.startByDefaultConf(NAME_SERVER_PORT, BROKER_PORT);
}
- @AfterClass
- public static void terminate() {
- defaultMQAdminExt.shutdown();
+ @After
+ public void after() {
+ brokerMocker.shutdown();
+ nameServerMocker.shutdown();
}
- @Ignore
@Test
public void testExecute() throws SubCommandException {
GetConsumerStatusCommand cmd = new GetConsumerStatusCommand();
@@ -85,4 +59,10 @@ public class GetConsumerStatusCommandTest {
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
+
+ private ServerResponseMocker startOneBroker() {
+ GetConsumerStatusBody getConsumerStatusBody = new GetConsumerStatusBody();
+ // start broker
+ return ServerResponseMocker.startServer(BROKER_PORT, getConsumerStatusBody.encode());
+ }
}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommandTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommandTest.java
index d73a996..03e8943 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommandTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommandTest.java
@@ -16,86 +16,40 @@
*/
package org.apache.rocketmq.tools.command.offset;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
-import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.impl.MQClientAPIImpl;
-import org.apache.rocketmq.client.impl.MQClientManager;
-import org.apache.rocketmq.client.impl.factory.MQClientInstance;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.route.BrokerData;
-import org.apache.rocketmq.common.protocol.route.QueueData;
-import org.apache.rocketmq.common.protocol.route.TopicRouteData;
-import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.common.protocol.body.ResetOffsetBody;
import org.apache.rocketmq.srvutil.ServerUtil;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
+import org.apache.rocketmq.tools.command.server.NameServerMocker;
+import org.apache.rocketmq.tools.command.server.ServerResponseMocker;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
public class ResetOffsetByTimeCommandTest {
- private static DefaultMQAdminExt defaultMQAdminExt;
- private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
- private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig());
- private static MQClientAPIImpl mQClientAPIImpl;
- @BeforeClass
- public static void init() throws NoSuchFieldException, IllegalAccessException, InterruptedException, RemotingException, MQClientException, MQBrokerException {
- mQClientAPIImpl = mock(MQClientAPIImpl.class);
- defaultMQAdminExt = new DefaultMQAdminExt();
- defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
+ private static final int NAME_SERVER_PORT = 45677;
+
+ private static final int BROKER_PORT = 45676;
- Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
- field.setAccessible(true);
- field.set(defaultMQAdminExtImpl, mqClientInstance);
- field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
- field.setAccessible(true);
- field.set(mqClientInstance, mQClientAPIImpl);
- field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
- field.setAccessible(true);
- field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
+ private ServerResponseMocker brokerMocker;
- TopicRouteData topicRouteData = new TopicRouteData();
- List<BrokerData> brokerDatas = new ArrayList<>();
- HashMap<Long, String> brokerAddrs = new HashMap<>();
- brokerAddrs.put(1234l, "127.0.0.1:10911");
- BrokerData brokerData = new BrokerData();
- brokerData.setCluster("default-cluster");
- brokerData.setBrokerName("default-broker");
- brokerData.setBrokerAddrs(brokerAddrs);
- brokerDatas.add(brokerData);
- topicRouteData.setBrokerDatas(brokerDatas);
- topicRouteData.setQueueDatas(new ArrayList<QueueData>());
- topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
- when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData);
+ private ServerResponseMocker nameServerMocker;
- Map<MessageQueue, Long> messageQueueLongMap = new HashMap<>();
- when(mQClientAPIImpl.invokeBrokerToResetOffset(anyString(), anyString(), anyString(), anyLong(), anyBoolean(), anyLong())).thenReturn(messageQueueLongMap);
+ @Before
+ public void before() {
+ brokerMocker = startOneBroker();
+ nameServerMocker = NameServerMocker.startByDefaultConf(NAME_SERVER_PORT, BROKER_PORT);
}
- @AfterClass
- public static void terminate() {
- defaultMQAdminExt.shutdown();
+ @After
+ public void after() {
+ brokerMocker.shutdown();
+ nameServerMocker.shutdown();
}
- @Ignore
@Test
public void testExecute() throws SubCommandException {
ResetOffsetByTimeCommand cmd = new ResetOffsetByTimeCommand();
@@ -105,4 +59,10 @@ public class ResetOffsetByTimeCommandTest {
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
+
+ private ServerResponseMocker startOneBroker() {
+ ResetOffsetBody resetOffsetBody = new ResetOffsetBody();
+ // start broker
+ return ServerResponseMocker.startServer(BROKER_PORT, resetOffsetBody.encode());
+ }
}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/server/NameServerMocker.java b/tools/src/test/java/org/apache/rocketmq/tools/command/server/NameServerMocker.java
new file mode 100644
index 0000000..32718f1
--- /dev/null
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/server/NameServerMocker.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.server;
+
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * tools class
+ */
+public class NameServerMocker {
+
+ /**
+ * use the specified port to start the nameserver
+ *
+ * @param nameServerPort nameServer port
+ * @param brokerPort broker port
+ * @return ServerResponseMocker
+ */
+ public static ServerResponseMocker startByDefaultConf(int nameServerPort, int brokerPort) {
+ return startByDefaultConf(nameServerPort, brokerPort, null);
+ }
+
+ /**
+ * use the specified port to start the nameserver
+ *
+ * @param nameServerPort nameServer port
+ * @param brokerPort broker port
+ * @param extMap extend config
+ * @return ServerResponseMocker
+ */
+ public static ServerResponseMocker startByDefaultConf(int nameServerPort, int brokerPort,
+ HashMap<String, String> extMap) {
+
+ System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:" + nameServerPort);
+ TopicRouteData topicRouteData = new TopicRouteData();
+ List<BrokerData> dataList = new ArrayList<>();
+ HashMap<Long, String> brokerAddress = new HashMap<>();
+ brokerAddress.put(1L, "127.0.0.1:" + brokerPort);
+ BrokerData brokerData = new BrokerData("mockCluster", "mockBrokerName", brokerAddress);
+ brokerData.setBrokerName("mockBrokerName");
+ dataList.add(brokerData);
+ topicRouteData.setBrokerDatas(dataList);
+ // start name server
+ return ServerResponseMocker.startServer(nameServerPort, topicRouteData.encode(), extMap);
+ }
+
+}
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/server/ServerResponseMocker.java b/tools/src/test/java/org/apache/rocketmq/tools/command/server/ServerResponseMocker.java
new file mode 100644
index 0000000..20ddf62
--- /dev/null
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/server/ServerResponseMocker.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.server;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.concurrent.Future;
+import org.apache.rocketmq.remoting.netty.NettyDecoder;
+import org.apache.rocketmq.remoting.netty.NettyEncoder;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
+import org.junit.After;
+import org.junit.Before;
+
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * mock server response for command
+ */
+public abstract class ServerResponseMocker {
+
+ private final NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
+
+ @Before
+ public void before() {
+ start();
+ }
+
+ @After
+ public void shutdown() {
+ if (eventLoopGroup.isShutdown()) {
+ return;
+ }
+ Future<?> future = eventLoopGroup.shutdownGracefully();
+ try {
+ future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ e.printStackTrace();
+ }
+ }
+
+ protected abstract int getPort();
+
+ protected abstract byte[] getBody();
+
+ public void start() {
+ start(null);
+ }
+
+ public void start(HashMap<String, String> extMap) {
+ ServerBootstrap serverBootstrap = new ServerBootstrap();
+ serverBootstrap.group(eventLoopGroup)
+ .channel(NioServerSocketChannel.class)
+ .option(ChannelOption.SO_BACKLOG, 1024)
+ .option(ChannelOption.SO_REUSEADDR, true)
+ .option(ChannelOption.SO_KEEPALIVE, false)
+ .childOption(ChannelOption.TCP_NODELAY, true)
+ .childOption(ChannelOption.SO_SNDBUF, 65535)
+ .childOption(ChannelOption.SO_RCVBUF, 65535)
+ .localAddress(new InetSocketAddress(getPort()))
+ .childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ ch.pipeline()
+ .addLast(eventLoopGroup,
+ new NettyEncoder(),
+ new NettyDecoder(),
+ new IdleStateHandler(0, 0, 120),
+ new ChannelDuplexHandler(),
+ new NettyServerHandler(extMap)
+ );
+ }
+ });
+ try {
+ ChannelFuture sync = serverBootstrap.bind().sync();
+ InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
+ } catch (InterruptedException e1) {
+ throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
+ }
+ }
+
+ @ChannelHandler.Sharable
+ private class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
+ private HashMap<String, String> extMap;
+
+ public NettyServerHandler(HashMap<String, String> extMap) {
+ this.extMap = extMap;
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
+ String remark = "mock data";
+ final RemotingCommand response =
+ RemotingCommand.createResponseCommand(RemotingSysResponseCode.SUCCESS, remark);
+ response.setOpaque(msg.getOpaque());
+ response.setBody(getBody());
+
+ if (extMap != null && extMap.size() > 0) {
+ response.setExtFields(extMap);
+ }
+ ctx.writeAndFlush(response);
+ }
+ }
+
+ public static ServerResponseMocker startServer(int port, byte[] body) {
+ return startServer(port, body, null);
+ }
+
+
+ public static ServerResponseMocker startServer(int port, byte[] body, HashMap<String, String> extMap) {
+ ServerResponseMocker mocker = new ServerResponseMocker() {
+ @Override
+ protected int getPort() {
+ return port;
+ }
+
+ @Override
+ protected byte[] getBody() {
+ return body;
+ }
+ };
+ mocker.start(extMap);
+ // add jvm hook, close connection when jvm down
+ Runtime.getRuntime().addShutdownHook(new Thread(mocker::shutdown));
+ return mocker;
+ }
+}