You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by st...@apache.org on 2017/01/20 09:44:58 UTC
[01/14] incubator-rocketmq git commit: [ROCKETMQ-52] Add unit tests
for Validators and ThreadLocalIndex
Repository: incubator-rocketmq
Updated Branches:
refs/heads/master b4108d2d9 -> 63de56c7b
[ROCKETMQ-52] Add unit tests for Validators and ThreadLocalIndex
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/03b12166
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/03b12166
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/03b12166
Branch: refs/heads/master
Commit: 03b12166bd1e2cc011ebbd2e667fe794ff04cc16
Parents: 2a9769a
Author: yukon <yu...@apache.org>
Authored: Tue Jan 17 12:53:23 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Jan 17 13:06:23 2017 +0800
----------------------------------------------------------------------
client/pom.xml | 4 ++
.../org/apache/rocketmq/client/Validators.java | 8 +--
.../client/common/ThreadLocalIndex.java | 4 --
.../client/impl/producer/TopicPublishInfo.java | 2 +-
.../latency/LatencyFaultToleranceImpl.java | 2 +-
.../apache/rocketmq/client/ValidatorsTest.java | 52 +++++++++++++++++++-
.../client/common/ThreadLocalIndexTest.java | 33 +++++++++++++
7 files changed, 94 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/03b12166/client/pom.xml
----------------------------------------------------------------------
diff --git a/client/pom.xml b/client/pom.xml
index 713523d..3a0f6ae 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -38,5 +38,9 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/03b12166/client/src/main/java/org/apache/rocketmq/client/Validators.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/Validators.java b/client/src/main/java/org/apache/rocketmq/client/Validators.java
index 7ea1bf9..899efa6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/Validators.java
+++ b/client/src/main/java/org/apache/rocketmq/client/Validators.java
@@ -118,23 +118,23 @@ public class Validators {
*/
public static void checkTopic(String topic) throws MQClientException {
if (UtilAll.isBlank(topic)) {
- throw new MQClientException("the specified topic is blank", null);
+ throw new MQClientException("The specified topic is blank", null);
}
if (!regularExpressionMatcher(topic, PATTERN)) {
throw new MQClientException(String.format(
- "the specified topic[%s] contains illegal characters, allowing only %s", topic,
+ "The specified topic[%s] contains illegal characters, allowing only %s", topic,
VALID_PATTERN_STR), null);
}
if (topic.length() > CHARACTER_MAX_LENGTH) {
- throw new MQClientException("the specified topic is longer than topic max length 255.", null);
+ throw new MQClientException("The specified topic is longer than topic max length 255.", null);
}
//whether the same with system reserved keyword
if (topic.equals(MixAll.DEFAULT_TOPIC)) {
throw new MQClientException(
- String.format("the topic[%s] is conflict with default topic.", topic), null);
+ String.format("The topic[%s] is conflict with default topic.", topic), null);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/03b12166/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java b/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java
index a30b5da..ab223c3 100644
--- a/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java
+++ b/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java
@@ -23,10 +23,6 @@ public class ThreadLocalIndex {
private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
private final Random random = new Random();
- public ThreadLocalIndex(int value) {
-
- }
-
public int getAndIncrement() {
Integer index = this.threadLocalIndex.get();
if (null == index) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/03b12166/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
index 52c5cfb..deb02cf 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
@@ -27,7 +27,7 @@ public class TopicPublishInfo {
private boolean orderTopic = false;
private boolean haveTopicRouterInfo = false;
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
- private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(0);
+ private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private TopicRouteData topicRouteData;
public boolean isOrderTopic() {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/03b12166/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
index 5309bc9..72d4347 100644
--- a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
@@ -27,7 +27,7 @@ import org.apache.rocketmq.client.common.ThreadLocalIndex;
public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {
private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);
- private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex(0);
+ private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex();
@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/03b12166/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java b/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java
index 6775404..2db648d 100644
--- a/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java
@@ -17,17 +17,67 @@
package org.apache.rocketmq.client;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.MixAll;
import org.junit.Test;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
+
public class ValidatorsTest {
@Test
- public void topicValidatorTest() throws MQClientException {
+ public void testCheckTopic_Success() throws MQClientException {
Validators.checkTopic("Hello");
Validators.checkTopic("%RETRY%Hello");
Validators.checkTopic("_%RETRY%Hello");
Validators.checkTopic("-%RETRY%Hello");
Validators.checkTopic("223-%RETRY%Hello");
}
+
+ @Test
+ public void testCheckTopic_HasIllegalCharacters() {
+ String illegalTopic = "TOPIC&*^";
+ try {
+ Validators.checkTopic(illegalTopic);
+ failBecauseExceptionWasNotThrown(MQClientException.class);
+ } catch (MQClientException e) {
+ assertThat(e).hasMessageStartingWith(String.format("The specified topic[%s] contains illegal characters, allowing only %s", illegalTopic, Validators.VALID_PATTERN_STR));
+ }
+ }
+
+ @Test
+ public void testCheckTopic_UseDefaultTopic() {
+ String defaultTopic = MixAll.DEFAULT_TOPIC;
+ try {
+ Validators.checkTopic(defaultTopic);
+ failBecauseExceptionWasNotThrown(MQClientException.class);
+ } catch (MQClientException e) {
+ assertThat(e).hasMessageStartingWith(String.format("The topic[%s] is conflict with default topic.", defaultTopic));
+ }
+ }
+
+ @Test
+ public void testCheckTopic_BlankTopic() {
+ String blankTopic = "";
+ try {
+ Validators.checkTopic(blankTopic);
+ failBecauseExceptionWasNotThrown(MQClientException.class);
+ } catch (MQClientException e) {
+ assertThat(e).hasMessageStartingWith("The specified topic is blank");
+ }
+ }
+
+ @Test
+ public void testCheckTopic_TooLongTopic() {
+ String tooLongTopic = StringUtils.rightPad("TooLongTopic", Validators.CHARACTER_MAX_LENGTH + 1, "_");
+ assertThat(tooLongTopic.length()).isGreaterThan(Validators.CHARACTER_MAX_LENGTH);
+ try {
+ Validators.checkTopic(tooLongTopic);
+ failBecauseExceptionWasNotThrown(MQClientException.class);
+ } catch (MQClientException e) {
+ assertThat(e).hasMessageStartingWith("The specified topic is longer than topic max length 255.");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/03b12166/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java b/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java
new file mode 100644
index 0000000..b937e45
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.common;
+
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ThreadLocalIndexTest {
+
+ @Test
+ public void getAndIncrement() throws Exception {
+ ThreadLocalIndex localIndex = new ThreadLocalIndex();
+ int initialVal = localIndex.getAndIncrement();
+
+ assertThat(localIndex.getAndIncrement()).isEqualTo(initialVal + 1);
+ }
+
+}
\ No newline at end of file
[07/14] incubator-rocketmq git commit: [ROCKETMQ-52] Use
MockitoJUnitRunner with Mock
Posted by st...@apache.org.
[ROCKETMQ-52] Use MockitoJUnitRunner with Mock
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/4dd6c385
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/4dd6c385
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/4dd6c385
Branch: refs/heads/master
Commit: 4dd6c385a14061a104713f2b1062d953d2f5dede
Parents: 8c933f2
Author: yukon <yu...@apache.org>
Authored: Tue Jan 17 21:37:26 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Jan 17 21:37:26 2017 +0800
----------------------------------------------------------------------
.../store/LocalFileOffsetStoreTest.java | 19 ++++++++--------
.../store/RemoteBrokerOffsetStoreTest.java | 23 ++++++++++----------
.../client/impl/MQClientAPIImplTest.java | 19 ++++++++--------
.../impl/factory/MQClientInstanceTest.java | 3 ---
4 files changed, 30 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/4dd6c385/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
index 0a54855..0b522e6 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
@@ -21,25 +21,26 @@ import java.util.HashSet;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.junit.BeforeClass;
+import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
import org.mockito.Mock;
-import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;
+@RunWith(MockitoJUnitRunner.class)
public class LocalFileOffsetStoreTest {
@Mock
- private static MQClientInstance mQClientFactory;
- private static String group = "FooBarGroup";
- private static String topic = "FooBar";
- private static String brokerName = "DefaultBrokerName";
+ private MQClientInstance mQClientFactory;
+ private String group = "FooBarGroup";
+ private String topic = "FooBar";
+ private String brokerName = "DefaultBrokerName";
- @BeforeClass
- public static void init() {
+ @Before
+ public void init() {
System.setProperty("rocketmq.client.localOffsetStoreDir", System.getProperty("java.io.tmpdir") + ".rocketmq_offsets");
- mQClientFactory = Mockito.mock(MQClientInstance.class);
String clientId = new ClientConfig().buildMQClientId() + "#TestNamespace" + System.currentTimeMillis();
when(mQClientFactory.getClientId()).thenReturn(clientId);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/4dd6c385/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
index 7ecb022..a13930f 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
@@ -27,10 +27,12 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.remoting.exception.RemotingException;
-import org.junit.BeforeClass;
+import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
@@ -39,27 +41,24 @@ import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+@RunWith(MockitoJUnitRunner.class)
public class RemoteBrokerOffsetStoreTest {
@Mock
- private static MQClientInstance mQClientFactory;
+ private MQClientInstance mQClientFactory;
@Mock
- private static MQClientAPIImpl mqClientAPI;
- private static String group = "FooBarGroup";
- private static String topic = "FooBar";
- private static String brokerName = "DefaultBrokerName";
+ private MQClientAPIImpl mqClientAPI;
+ private String group = "FooBarGroup";
+ private String topic = "FooBar";
+ private String brokerName = "DefaultBrokerName";
- @BeforeClass
- public static void init() {
+ @Before
+ public void init() {
System.setProperty("rocketmq.client.localOffsetStoreDir", System.getProperty("java.io.tmpdir") + ".rocketmq_offsets");
- mQClientFactory = mock(MQClientInstance.class);
String clientId = new ClientConfig().buildMQClientId() + "#TestNamespace" + System.currentTimeMillis();
when(mQClientFactory.getClientId()).thenReturn(clientId);
when(mQClientFactory.findBrokerAddressInAdmin(brokerName)).thenReturn(new FindBrokerResult("127.0.0.1", false));
-
- mqClientAPI = mock(MQClientAPIImpl.class);
when(mQClientFactory.getMQClientAPIImpl()).thenReturn(mqClientAPI);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/4dd6c385/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
index cbcf560..3553738 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@ -35,13 +35,14 @@ import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.junit.BeforeClass;
+import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
@@ -52,14 +53,14 @@ import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
+@RunWith(MockitoJUnitRunner.class)
public class MQClientAPIImplTest {
- private static MQClientAPIImpl mqClientAPI = new MQClientAPIImpl(new NettyClientConfig(), null, null, new ClientConfig());
+ private MQClientAPIImpl mqClientAPI = new MQClientAPIImpl(new NettyClientConfig(), null, null, new ClientConfig());
@Mock
- private static RemotingClient remotingClient;
+ private RemotingClient remotingClient;
@Mock
- private static DefaultMQProducerImpl defaultMQProducerImpl;
+ private DefaultMQProducerImpl defaultMQProducerImpl;
private String brokerAddr = "127.0.0.1";
private String brokerName = "DefaultBroker";
@@ -67,10 +68,8 @@ public class MQClientAPIImplTest {
private static String topic = "FooBar";
private Message msg = new Message("FooBar", new byte[] {});
- @BeforeClass
- public static void init() throws Exception {
- remotingClient = mock(NettyRemotingClient.class);
- defaultMQProducerImpl = mock(DefaultMQProducerImpl.class);
+ @Before
+ public void init() throws Exception {
Field field = MQClientAPIImpl.class.getDeclaredField("remotingClient");
field.setAccessible(true);
field.set(mqClientAPI, remotingClient);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/4dd6c385/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
index b7b07c6..c32cdab 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
@@ -32,7 +32,6 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
@@ -40,9 +39,7 @@ import static org.mockito.Mockito.mock;
@RunWith(MockitoJUnitRunner.class)
public class MQClientInstanceTest {
- @InjectMocks
private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());;
-
private String topic = "FooBar";
private String group = "FooBarGroup";
[06/14] incubator-rocketmq git commit: [ROCKETMQ-52] Add unit tests
for MQClientAPIImpl
Posted by st...@apache.org.
[ROCKETMQ-52] Add unit tests for MQClientAPIImpl
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/8c933f2c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/8c933f2c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/8c933f2c
Branch: refs/heads/master
Commit: 8c933f2cc769b522bba4d5cc64e047f4a1a97e85
Parents: 8e26a40
Author: yukon <yu...@apache.org>
Authored: Tue Jan 17 20:46:16 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Jan 17 20:46:16 2017 +0800
----------------------------------------------------------------------
.../client/common/ThreadLocalIndexTest.java | 1 -
.../store/LocalFileOffsetStoreTest.java | 1 -
.../store/RemoteBrokerOffsetStoreTest.java | 1 -
.../client/impl/MQClientAPIImplTest.java | 236 +++++++++++++++++++
.../impl/factory/MQClientInstanceTest.java | 7 +-
.../remoting/protocol/RemotingCommand.java | 4 +-
6 files changed, 238 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8c933f2c/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java b/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java
index 350cde7..1be93ce 100644
--- a/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java
@@ -21,7 +21,6 @@ import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class ThreadLocalIndexTest {
-
@Test
public void testGetAndIncrement() throws Exception {
ThreadLocalIndex localIndex = new ThreadLocalIndex();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8c933f2c/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
index 58d99d6..0a54855 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
@@ -30,7 +30,6 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;
public class LocalFileOffsetStoreTest {
-
@Mock
private static MQClientInstance mQClientFactory;
private static String group = "FooBarGroup";
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8c933f2c/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
index 9a6a4b5..7ecb022 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
@@ -43,7 +43,6 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class RemoteBrokerOffsetStoreTest {
-
@Mock
private static MQClientInstance mQClientFactory;
@Mock
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8c933f2c/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
new file mode 100644
index 0000000..cbcf560
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl;
+
+import java.lang.reflect.Field;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.hook.SendMessageContext;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
+import org.apache.rocketmq.remoting.InvokeCallback;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.netty.ResponseFuture;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+public class MQClientAPIImplTest {
+ private static MQClientAPIImpl mqClientAPI = new MQClientAPIImpl(new NettyClientConfig(), null, null, new ClientConfig());
+ @Mock
+ private static RemotingClient remotingClient;
+ @Mock
+ private static DefaultMQProducerImpl defaultMQProducerImpl;
+
+ private String brokerAddr = "127.0.0.1";
+ private String brokerName = "DefaultBroker";
+ private static String group = "FooBarGroup";
+ private static String topic = "FooBar";
+ private Message msg = new Message("FooBar", new byte[] {});
+
+ @BeforeClass
+ public static void init() throws Exception {
+ remotingClient = mock(NettyRemotingClient.class);
+ defaultMQProducerImpl = mock(DefaultMQProducerImpl.class);
+ Field field = MQClientAPIImpl.class.getDeclaredField("remotingClient");
+ field.setAccessible(true);
+ field.set(mqClientAPI, remotingClient);
+ }
+
+ @Test
+ public void testSendMessageOneWay_Success() throws RemotingException, InterruptedException, MQBrokerException {
+ doNothing().when(remotingClient).invokeOneway(anyString(), any(RemotingCommand.class), anyLong());
+ SendResult sendResult = mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(),
+ 3 * 1000, CommunicationMode.ONEWAY, new SendMessageContext(), defaultMQProducerImpl);
+ assertThat(sendResult).isNull();
+ }
+
+ @Test
+ public void testSendMessageOneWay_WithException() throws RemotingException, InterruptedException, MQBrokerException {
+ doThrow(new RemotingTimeoutException("Remoting Exception in Test")).when(remotingClient).invokeOneway(anyString(), any(RemotingCommand.class), anyLong());
+ try {
+ mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(),
+ 3 * 1000, CommunicationMode.ONEWAY, new SendMessageContext(), defaultMQProducerImpl);
+ failBecauseExceptionWasNotThrown(RemotingException.class);
+ } catch (RemotingException e) {
+ assertThat(e).hasMessage("Remoting Exception in Test");
+ }
+
+ doThrow(new InterruptedException("Interrupted Exception in Test")).when(remotingClient).invokeOneway(anyString(), any(RemotingCommand.class), anyLong());
+ try {
+ mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(),
+ 3 * 1000, CommunicationMode.ONEWAY, new SendMessageContext(), defaultMQProducerImpl);
+ failBecauseExceptionWasNotThrown(InterruptedException.class);
+ } catch (InterruptedException e) {
+ assertThat(e).hasMessage("Interrupted Exception in Test");
+ }
+ }
+
+ @Test
+ public void testSendMessageSync_Success() throws InterruptedException, RemotingException, MQBrokerException {
+ doAnswer(new Answer() {
+ @Override public Object answer(InvocationOnMock mock) throws Throwable {
+ RemotingCommand request = mock.getArgument(1);
+ return createSuccessResponse(request);
+ }
+ }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+ SendMessageRequestHeader requestHeader = createSendMessageRequestHeader();
+
+ SendResult sendResult = mqClientAPI.sendMessage(brokerAddr, brokerName, msg, requestHeader,
+ 3 * 1000, CommunicationMode.SYNC, new SendMessageContext(), defaultMQProducerImpl);
+
+ assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+ assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+ assertThat(sendResult.getQueueOffset()).isEqualTo(123L);
+ assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(1);
+ }
+
+ @Test
+ public void testSendMessageSync_WithException() throws InterruptedException, RemotingException, MQBrokerException {
+ doAnswer(new Answer() {
+ @Override public Object answer(InvocationOnMock mock) throws Throwable {
+ RemotingCommand request = mock.getArgument(1);
+ RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setOpaque(request.getOpaque());
+ response.setRemark("Broker is broken.");
+ return response;
+ }
+ }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+ SendMessageRequestHeader requestHeader = createSendMessageRequestHeader();
+
+ try {
+ mqClientAPI.sendMessage(brokerAddr, brokerName, msg, requestHeader,
+ 3 * 1000, CommunicationMode.SYNC, new SendMessageContext(), defaultMQProducerImpl);
+ failBecauseExceptionWasNotThrown(MQBrokerException.class);
+ } catch (MQBrokerException e) {
+ assertThat(e).hasMessageContaining("Broker is broken.");
+ }
+ }
+
+ @Test
+ public void testSendMessageAsync_Success() throws RemotingException, InterruptedException, MQBrokerException {
+ doNothing().when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
+ SendResult sendResult = mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(),
+ 3 * 1000, CommunicationMode.ASYNC, new SendMessageContext(), defaultMQProducerImpl);
+ assertThat(sendResult).isNull();
+
+ doAnswer(new Answer() {
+ @Override public Object answer(InvocationOnMock mock) throws Throwable {
+ InvokeCallback callback = mock.getArgument(3);
+ RemotingCommand request = mock.getArgument(1);
+ ResponseFuture responseFuture = new ResponseFuture(request.getOpaque(), 3 * 1000, null, null);
+ responseFuture.setResponseCommand(createSuccessResponse(request));
+ callback.operationComplete(responseFuture);
+ return null;
+ }
+ }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
+ SendMessageContext sendMessageContext = new SendMessageContext();
+ sendMessageContext.setProducer(new DefaultMQProducerImpl(new DefaultMQProducer()));
+ mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(), 3 * 1000, CommunicationMode.ASYNC,
+ new SendCallback() {
+ @Override public void onSuccess(SendResult sendResult) {
+ assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+ assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+ assertThat(sendResult.getQueueOffset()).isEqualTo(123L);
+ assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(1);
+ }
+
+ @Override public void onException(Throwable e) {
+ }
+ },
+ null, null, 0, sendMessageContext, defaultMQProducerImpl);
+ }
+
+ @Test
+ public void testSendMessageAsync_WithException() throws RemotingException, InterruptedException, MQBrokerException {
+ doThrow(new RemotingTimeoutException("Remoting Exception in Test")).when(remotingClient)
+ .invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
+ try {
+ mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(),
+ 3 * 1000, CommunicationMode.ASYNC, new SendMessageContext(), defaultMQProducerImpl);
+ failBecauseExceptionWasNotThrown(RemotingException.class);
+ } catch (RemotingException e) {
+ assertThat(e).hasMessage("Remoting Exception in Test");
+ }
+
+ doThrow(new InterruptedException("Interrupted Exception in Test")).when(remotingClient)
+ .invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
+ try {
+ mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(),
+ 3 * 1000, CommunicationMode.ASYNC, new SendMessageContext(), defaultMQProducerImpl);
+ failBecauseExceptionWasNotThrown(InterruptedException.class);
+ } catch (InterruptedException e) {
+ assertThat(e).hasMessage("Interrupted Exception in Test");
+ }
+ }
+
+ private RemotingCommand createSuccessResponse(RemotingCommand request) {
+ RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setOpaque(request.getOpaque());
+
+ SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
+ responseHeader.setMsgId("123");
+ responseHeader.setQueueId(1);
+ responseHeader.setQueueOffset(123L);
+
+ response.addExtField(MessageConst.PROPERTY_MSG_REGION, "RegionHZ");
+ response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, "true");
+ response.addExtField("queueId", String.valueOf(responseHeader.getQueueId()));
+ response.addExtField("msgId", responseHeader.getMsgId());
+ response.addExtField("queueOffset", String.valueOf(responseHeader.getQueueOffset()));
+ return response;
+ }
+
+ private SendMessageRequestHeader createSendMessageRequestHeader() {
+ SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
+ requestHeader.setBornTimestamp(System.currentTimeMillis());
+ requestHeader.setTopic(topic);
+ requestHeader.setProducerGroup(group);
+ requestHeader.setQueueId(1);
+ requestHeader.setMaxReconsumeTimes(10);
+ return requestHeader;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8c933f2c/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
index 956cdd9..b7b07c6 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
@@ -22,7 +22,6 @@ import java.util.List;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.admin.MQAdminExtInner;
import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.consumer.MQConsumerInner;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
@@ -34,7 +33,6 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
-import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
@@ -42,11 +40,8 @@ import static org.mockito.Mockito.mock;
@RunWith(MockitoJUnitRunner.class)
public class MQClientInstanceTest {
-
- @Mock
- private static MQClientAPIImpl mqClientAPI;
@InjectMocks
- private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());;
+ private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());;
private String topic = "FooBar";
private String group = "FooBarGroup";
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8c933f2c/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index 46ca8dd..99c13fb 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -114,9 +114,7 @@ public class RemotingCommand {
}
public static RemotingCommand createResponseCommand(Class<? extends CommandCustomHeader> classHeader) {
- RemotingCommand cmd = createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, "not set any response code", classHeader);
-
- return cmd;
+ return createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, "not set any response code", classHeader);
}
/**
[11/14] incubator-rocketmq git commit: [ROCKETMQ-57] Polish unit
tests for rocketmq-tools
Posted by st...@apache.org.
[ROCKETMQ-57] Polish unit tests for rocketmq-tools
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/626315ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/626315ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/626315ab
Branch: refs/heads/master
Commit: 626315aba4abc31380a194fad660f4ca9a64dc03
Parents: 0f9f55f
Author: stevenschew <st...@apache.org>
Authored: Thu Jan 19 11:22:48 2017 +0800
Committer: stevenschew <st...@apache.org>
Committed: Thu Jan 19 13:29:50 2017 +0800
----------------------------------------------------------------------
.../tools/admin/DefaultMQAdminExtTest.java | 240 +++++++++++++++++++
.../rocketmq/tools/command/CommandUtilTest.java | 92 +++++++
2 files changed, 332 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/626315ab/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
new file mode 100644
index 0000000..ab6319f
--- /dev/null
+++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
@@ -0,0 +1,240 @@
+package org.apache.rocketmq.tools.admin;
+
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.namesrv.NamesrvUtil;
+import org.apache.rocketmq.common.protocol.body.*;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Field;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DefaultMQAdminExtTest {
+ private DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+ private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+ private MQClientAPIImpl mQClientAPIImpl;
+ private Properties properties = new Properties();
+ private TopicList topicList = new TopicList();
+ private TopicRouteData topicRouteData = new TopicRouteData();
+ private KVTable kvTable = new KVTable();
+ private ClusterInfo clusterInfo = new ClusterInfo();
+
+ @Before
+ public void init() throws Exception {
+ mQClientAPIImpl = mock(MQClientAPIImpl.class);
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
+ defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
+
+ Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
+ field.setAccessible(true);
+ field.set(defaultMQAdminExtImpl, mqClientInstance);
+ field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+ field.setAccessible(true);
+ field.set(mqClientInstance, mQClientAPIImpl);
+
+ properties.setProperty("maxMessageSize", "5000000");
+ properties.setProperty("flushDelayOffsetInterval", "15000");
+ properties.setProperty("serverSocketRcvBufSize", "655350");
+ when(mQClientAPIImpl.getBrokerConfig(anyString(), anyLong())).thenReturn(properties);
+
+ Set<String> topicSet = new HashSet<>();
+ topicSet.add("topic_one");
+ topicSet.add("topic_two");
+ topicList.setTopicList(topicSet);
+ when(mQClientAPIImpl.getTopicListFromNameServer(anyLong())).thenReturn(topicList);
+
+
+ List<BrokerData> brokerDatas = new ArrayList<>();
+ HashMap<Long, String> brokerAddrs = new HashMap<>();
+ brokerAddrs.put(1234l, "127.0.0.1:10911");
+ BrokerData brokerData = new BrokerData();
+ brokerData.setCluster("default-cluster");
+ brokerData.setBrokerName("default-broker");
+ brokerData.setBrokerAddrs(brokerAddrs);
+ brokerDatas.add(brokerData);
+ topicRouteData.setBrokerDatas(brokerDatas);
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData);
+
+ HashMap<String, String> result = new HashMap<>();
+ result.put("id", "1234");
+ result.put("brokerName", "default-broker");
+ kvTable.setTable(result);
+ when(mQClientAPIImpl.getBrokerRuntimeInfo(anyString(), anyLong())).thenReturn(kvTable);
+
+ HashMap<String, BrokerData> brokerAddrTable = new HashMap<>();
+ brokerAddrTable.put("default-broker", brokerData);
+ brokerAddrTable.put("broker-test", new BrokerData());
+ clusterInfo.setBrokerAddrTable(brokerAddrTable);
+ when(mQClientAPIImpl.getBrokerClusterInfo(anyLong())).thenReturn(clusterInfo);
+ when(mQClientAPIImpl.cleanExpiredConsumeQueue(anyString(), anyLong())).thenReturn(true);
+
+ Set<String> clusterList = new HashSet<>();
+ clusterList.add("default-cluster-one");
+ clusterList.add("default-cluster-two");
+ when(mQClientAPIImpl.getClusterList(anyString(), anyLong())).thenReturn(clusterList);
+
+ GroupList groupList = new GroupList();
+ HashSet<String> groups = new HashSet<>();
+ groups.add("consumer-group-one");
+ groups.add("consumer-group-two");
+ groupList.setGroupList(groups);
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData);
+ when(mQClientAPIImpl.queryTopicConsumeByWho(anyString(), anyString(), anyLong())).thenReturn(groupList);
+
+ SubscriptionGroupWrapper subscriptionGroupWrapper = new SubscriptionGroupWrapper();
+ ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptions = new ConcurrentHashMap<>();
+ SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+ subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+ subscriptionGroupConfig.setBrokerId(1234);
+ subscriptionGroupConfig.setGroupName("Consumer-group-one");
+ subscriptions.put("Consumer-group-one", subscriptionGroupConfig);
+ subscriptionGroupWrapper.setSubscriptionGroupTable(subscriptions);
+ when(mQClientAPIImpl.getAllSubscriptionGroup(anyString(), anyLong())).thenReturn(subscriptionGroupWrapper);
+
+ String topicListConfig = "topicListConfig";
+ when(mQClientAPIImpl.getKVConfigValue(anyString(), anyString(), anyLong())).thenReturn(topicListConfig);
+
+ KVTable kvTable = new KVTable();
+ HashMap<String, String> kv = new HashMap<>();
+ kv.put("broker-name", "broker-one");
+ kv.put("cluster-name", "default-cluster");
+ kvTable.setTable(kv);
+ when(mQClientAPIImpl.getKVListByNamespace(anyString(), anyLong())).thenReturn(kvTable);
+ }
+
+ @After
+ public void terminate() throws Exception {
+ if (defaultMQAdminExtImpl != null)
+ defaultMQAdminExtImpl.shutdown();
+ }
+
+ @Test
+ public void testUpdateBrokerConfig() throws InterruptedException, RemotingConnectException, UnsupportedEncodingException, RemotingTimeoutException, MQBrokerException, RemotingSendRequestException {
+ Properties result = defaultMQAdminExtImpl.getBrokerConfig("127.0.0.1:10911");
+ assertThat(result.getProperty("maxMessageSize")).isEqualTo("5000000");
+ assertThat(result.getProperty("flushDelayOffsetInterval")).isEqualTo("15000");
+ assertThat(result.getProperty("serverSocketRcvBufSize")).isEqualTo("655350");
+ }
+
+
+ @Test
+ public void testFetchAllTopicList() throws RemotingException, MQClientException, InterruptedException {
+ TopicList topicList = defaultMQAdminExtImpl.fetchAllTopicList();
+ assertThat(topicList.getTopicList().size()).isEqualTo(2);
+ assertThat(topicList.getTopicList()).contains("topic_one");
+ }
+
+ @Test
+ public void testFetchBrokerRuntimeStats() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ KVTable brokerStats = defaultMQAdminExtImpl.fetchBrokerRuntimeStats("127.0.0.1:10911");
+ assertThat(brokerStats.getTable().get("id")).isEqualTo("1234");
+ assertThat(brokerStats.getTable().get("brokerName")).isEqualTo("default-broker");
+ }
+
+ @Test
+ public void testExamineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ ClusterInfo clusterInfo = defaultMQAdminExtImpl.examineBrokerClusterInfo();
+ HashMap<String, BrokerData> brokerList = clusterInfo.getBrokerAddrTable();
+ assertThat(brokerList.get("default-broker").getBrokerName()).isEqualTo("default-broker");
+ assertThat(brokerList.containsKey("broker-test")).isTrue();
+
+ HashMap<String, Set<String>> clusterMap = new HashMap<>();
+ Set<String> brokers = new HashSet<>();
+ brokers.add("default-broker");
+ brokers.add("broker-test");
+ clusterMap.put("default-cluster", brokers);
+ ClusterInfo cInfo = mock(ClusterInfo.class);
+ when(cInfo.getClusterAddrTable()).thenReturn(clusterMap);
+ HashMap<String, Set<String>> clusterAddress = cInfo.getClusterAddrTable();
+ assertThat(clusterAddress.containsKey("default-cluster")).isTrue();
+ assertThat(clusterAddress.get("default-cluster").size()).isEqualTo(2);
+ }
+
+ @Test
+ public void testExamineTopicRouteInfo() throws RemotingException, MQClientException, InterruptedException {
+ TopicRouteData topicRouteData = defaultMQAdminExtImpl.examineTopicRouteInfo("UnitTest");
+ assertThat(topicRouteData.getBrokerDatas().get(0).getBrokerName()).isEqualTo("default-broker");
+ assertThat(topicRouteData.getBrokerDatas().get(0).getCluster()).isEqualTo("default-cluster");
+ }
+
+ @Test
+ public void testGetNameServerAddressList() {
+ List<String> result = new ArrayList<>();
+ result.add("default-name-one");
+ result.add("default-name-two");
+ when(mqClientInstance.getMQClientAPIImpl().getNameServerAddressList()).thenReturn(result);
+ List<String> nameList = defaultMQAdminExtImpl.getNameServerAddressList();
+ assertThat(nameList.get(0)).isEqualTo("default-name-one");
+ assertThat(nameList.get(1)).isEqualTo("default-name-two");
+ }
+
+ @Test
+ public void testPutKVConfig() throws RemotingException, MQClientException, InterruptedException {
+ String topicConfig = defaultMQAdminExtImpl.getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, "UnitTest");
+ assertThat(topicConfig).isEqualTo("topicListConfig");
+ KVTable kvs = defaultMQAdminExtImpl.getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
+ assertThat(kvs.getTable().get("broker-name")).isEqualTo("broker-one");
+ assertThat(kvs.getTable().get("cluster-name")).isEqualTo("default-cluster");
+ }
+
+
+ @Test
+ public void testQueryTopicConsumeByWho() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+ GroupList groupList = defaultMQAdminExtImpl.queryTopicConsumeByWho("UnitTest");
+ assertThat(groupList.getGroupList().contains("consumer-group-two")).isTrue();
+ }
+
+ @Test
+ public void testCleanExpiredConsumerQueueByAddr() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
+ boolean clean = defaultMQAdminExtImpl.cleanExpiredConsumerQueueByAddr("127.0.0.1:10911");
+ assertThat(clean).isTrue();
+ }
+
+ @Test
+ public void testGetClusterList() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
+ Set<String> clusterlist = defaultMQAdminExtImpl.getClusterList("UnitTest");
+ assertThat(clusterlist.contains("default-cluster-one")).isTrue();
+ assertThat(clusterlist.contains("default-cluster-two")).isTrue();
+ }
+
+ @Test
+ public void testFetchConsumeStatsInBroker() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
+ ConsumeStatsList result = new ConsumeStatsList();
+ result.setBrokerAddr("127.0.0.1:10911");
+ when(mqClientInstance.getMQClientAPIImpl().fetchConsumeStatsInBroker("127.0.0.1:10911", false, 10000)).thenReturn(result);
+ ConsumeStatsList consumeStatsList = defaultMQAdminExtImpl.fetchConsumeStatsInBroker("127.0.0.1:10911", false, 10000);
+ assertThat(consumeStatsList.getBrokerAddr()).isEqualTo("127.0.0.1:10911");
+ }
+
+ @Test
+ public void testGetAllSubscriptionGroup() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExtImpl.getAllSubscriptionGroup("127.0.0.1:10911", 10000);
+ assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getBrokerId()).isEqualTo(1234);
+ assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getGroupName()).isEqualTo("Consumer-group-one");
+ assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").isConsumeBroadcastEnable()).isTrue();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/626315ab/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java
new file mode 100644
index 0000000..35c205e
--- /dev/null
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java
@@ -0,0 +1,92 @@
+package org.apache.rocketmq.tools.command;
+
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.*;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class CommandUtilTest {
+ private DefaultMQAdminExt defaultMQAdminExt;
+ private DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+ private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+ private MQClientAPIImpl mQClientAPIImpl;
+
+ @Before
+ public void setup() throws MQClientException, NoSuchFieldException, IllegalAccessException, InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ defaultMQAdminExt = mock(DefaultMQAdminExt.class);
+ MQClientAPIImpl mQClientAPIImpl = mock(MQClientAPIImpl.class);
+ defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 3000);
+
+ Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
+ field.setAccessible(true);
+ field.set(defaultMQAdminExtImpl, mqClientInstance);
+ field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+ field.setAccessible(true);
+ field.set(mqClientInstance, mQClientAPIImpl);
+
+ ClusterInfo clusterInfo = new ClusterInfo();
+ HashMap<String, BrokerData> brokerAddrTable = new HashMap<>();
+ HashMap<String, Set<String>> clusterAddrTable = new HashMap<>();
+ HashMap<Long, String> brokerAddrs = new HashMap<>();
+ brokerAddrs.put(1234l, "127.0.0.1:10911");
+ BrokerData brokerData = new BrokerData();
+ brokerData.setBrokerName("default-broker");
+ brokerData.setCluster("default-cluster");
+ brokerData.setBrokerAddrs(brokerAddrs);
+ brokerAddrTable.put("default-broker", brokerData);
+ brokerAddrTable.put("broker-test", new BrokerData());
+ Set<String> brokerSet = new HashSet<>();
+ brokerSet.add("default-broker");
+ brokerSet.add("default-broker-one");
+ clusterAddrTable.put("default-cluster", brokerSet);
+ clusterInfo.setBrokerAddrTable(brokerAddrTable);
+ clusterInfo.setClusterAddrTable(clusterAddrTable);
+ when(mQClientAPIImpl.getBrokerClusterInfo(anyLong())).thenReturn(clusterInfo);
+ when(mQClientAPIImpl.cleanExpiredConsumeQueue(anyString(), anyLong())).thenReturn(true);
+ }
+
+ @After
+ public void shutdown() throws Exception {
+ }
+
+ @Test
+ public void testFetchMasterAndSlaveDistinguish() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ Map<String, List<String>> result = CommandUtil.fetchMasterAndSlaveDistinguish(defaultMQAdminExtImpl, "default-cluster");
+ assertThat(result.get(null).get(0)).isEqualTo("127.0.0.1:10911");
+ }
+
+ @Test
+ public void testFetchMasterAddrByClusterName() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ Set<String> result = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExtImpl, "default-cluster");
+ assertThat(result.size()).isEqualTo(0);
+ }
+
+ @Test
+ public void testFetchBrokerNameByClusterName() throws Exception {
+ Set<String> result = CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExtImpl, "default-cluster");
+ assertThat(result.contains("default-broker")).isTrue();
+ assertThat(result.contains("default-broker-one")).isTrue();
+ assertThat(result.size()).isEqualTo(2);
+ }
+}
\ No newline at end of file
[09/14] incubator-rocketmq git commit: [ROCKETMQ-57] Polish unit
tests for rocketmq-tools
Posted by st...@apache.org.
[ROCKETMQ-57] Polish unit tests for rocketmq-tools
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/b29ff37d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/b29ff37d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/b29ff37d
Branch: refs/heads/master
Commit: b29ff37db4f75d09419dffa092e38b7765814f04
Parents: 0f9f55f
Author: stevenschew <st...@apache.org>
Authored: Thu Jan 19 11:41:32 2017 +0800
Committer: stevenschew <st...@apache.org>
Committed: Thu Jan 19 11:41:32 2017 +0800
----------------------------------------------------------------------
.../tools/admin/DefaultMQAdminExtTest.java | 240 +++++++++++++++++++
.../rocketmq/tools/command/CommandUtilTest.java | 92 +++++++
2 files changed, 332 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b29ff37d/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
new file mode 100644
index 0000000..ab6319f
--- /dev/null
+++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
@@ -0,0 +1,240 @@
+package org.apache.rocketmq.tools.admin;
+
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.namesrv.NamesrvUtil;
+import org.apache.rocketmq.common.protocol.body.*;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Field;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DefaultMQAdminExtTest {
+ private DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+ private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+ private MQClientAPIImpl mQClientAPIImpl;
+ private Properties properties = new Properties();
+ private TopicList topicList = new TopicList();
+ private TopicRouteData topicRouteData = new TopicRouteData();
+ private KVTable kvTable = new KVTable();
+ private ClusterInfo clusterInfo = new ClusterInfo();
+
+ @Before
+ public void init() throws Exception {
+ mQClientAPIImpl = mock(MQClientAPIImpl.class);
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
+ defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
+
+ Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
+ field.setAccessible(true);
+ field.set(defaultMQAdminExtImpl, mqClientInstance);
+ field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+ field.setAccessible(true);
+ field.set(mqClientInstance, mQClientAPIImpl);
+
+ properties.setProperty("maxMessageSize", "5000000");
+ properties.setProperty("flushDelayOffsetInterval", "15000");
+ properties.setProperty("serverSocketRcvBufSize", "655350");
+ when(mQClientAPIImpl.getBrokerConfig(anyString(), anyLong())).thenReturn(properties);
+
+ Set<String> topicSet = new HashSet<>();
+ topicSet.add("topic_one");
+ topicSet.add("topic_two");
+ topicList.setTopicList(topicSet);
+ when(mQClientAPIImpl.getTopicListFromNameServer(anyLong())).thenReturn(topicList);
+
+
+ List<BrokerData> brokerDatas = new ArrayList<>();
+ HashMap<Long, String> brokerAddrs = new HashMap<>();
+ brokerAddrs.put(1234l, "127.0.0.1:10911");
+ BrokerData brokerData = new BrokerData();
+ brokerData.setCluster("default-cluster");
+ brokerData.setBrokerName("default-broker");
+ brokerData.setBrokerAddrs(brokerAddrs);
+ brokerDatas.add(brokerData);
+ topicRouteData.setBrokerDatas(brokerDatas);
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData);
+
+ HashMap<String, String> result = new HashMap<>();
+ result.put("id", "1234");
+ result.put("brokerName", "default-broker");
+ kvTable.setTable(result);
+ when(mQClientAPIImpl.getBrokerRuntimeInfo(anyString(), anyLong())).thenReturn(kvTable);
+
+ HashMap<String, BrokerData> brokerAddrTable = new HashMap<>();
+ brokerAddrTable.put("default-broker", brokerData);
+ brokerAddrTable.put("broker-test", new BrokerData());
+ clusterInfo.setBrokerAddrTable(brokerAddrTable);
+ when(mQClientAPIImpl.getBrokerClusterInfo(anyLong())).thenReturn(clusterInfo);
+ when(mQClientAPIImpl.cleanExpiredConsumeQueue(anyString(), anyLong())).thenReturn(true);
+
+ Set<String> clusterList = new HashSet<>();
+ clusterList.add("default-cluster-one");
+ clusterList.add("default-cluster-two");
+ when(mQClientAPIImpl.getClusterList(anyString(), anyLong())).thenReturn(clusterList);
+
+ GroupList groupList = new GroupList();
+ HashSet<String> groups = new HashSet<>();
+ groups.add("consumer-group-one");
+ groups.add("consumer-group-two");
+ groupList.setGroupList(groups);
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData);
+ when(mQClientAPIImpl.queryTopicConsumeByWho(anyString(), anyString(), anyLong())).thenReturn(groupList);
+
+ SubscriptionGroupWrapper subscriptionGroupWrapper = new SubscriptionGroupWrapper();
+ ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptions = new ConcurrentHashMap<>();
+ SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+ subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+ subscriptionGroupConfig.setBrokerId(1234);
+ subscriptionGroupConfig.setGroupName("Consumer-group-one");
+ subscriptions.put("Consumer-group-one", subscriptionGroupConfig);
+ subscriptionGroupWrapper.setSubscriptionGroupTable(subscriptions);
+ when(mQClientAPIImpl.getAllSubscriptionGroup(anyString(), anyLong())).thenReturn(subscriptionGroupWrapper);
+
+ String topicListConfig = "topicListConfig";
+ when(mQClientAPIImpl.getKVConfigValue(anyString(), anyString(), anyLong())).thenReturn(topicListConfig);
+
+ KVTable kvTable = new KVTable();
+ HashMap<String, String> kv = new HashMap<>();
+ kv.put("broker-name", "broker-one");
+ kv.put("cluster-name", "default-cluster");
+ kvTable.setTable(kv);
+ when(mQClientAPIImpl.getKVListByNamespace(anyString(), anyLong())).thenReturn(kvTable);
+ }
+
+ @After
+ public void terminate() throws Exception {
+ if (defaultMQAdminExtImpl != null)
+ defaultMQAdminExtImpl.shutdown();
+ }
+
+ @Test
+ public void testUpdateBrokerConfig() throws InterruptedException, RemotingConnectException, UnsupportedEncodingException, RemotingTimeoutException, MQBrokerException, RemotingSendRequestException {
+ Properties result = defaultMQAdminExtImpl.getBrokerConfig("127.0.0.1:10911");
+ assertThat(result.getProperty("maxMessageSize")).isEqualTo("5000000");
+ assertThat(result.getProperty("flushDelayOffsetInterval")).isEqualTo("15000");
+ assertThat(result.getProperty("serverSocketRcvBufSize")).isEqualTo("655350");
+ }
+
+
+ @Test
+ public void testFetchAllTopicList() throws RemotingException, MQClientException, InterruptedException {
+ TopicList topicList = defaultMQAdminExtImpl.fetchAllTopicList();
+ assertThat(topicList.getTopicList().size()).isEqualTo(2);
+ assertThat(topicList.getTopicList()).contains("topic_one");
+ }
+
+ @Test
+ public void testFetchBrokerRuntimeStats() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ KVTable brokerStats = defaultMQAdminExtImpl.fetchBrokerRuntimeStats("127.0.0.1:10911");
+ assertThat(brokerStats.getTable().get("id")).isEqualTo("1234");
+ assertThat(brokerStats.getTable().get("brokerName")).isEqualTo("default-broker");
+ }
+
+ @Test
+ public void testExamineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ ClusterInfo clusterInfo = defaultMQAdminExtImpl.examineBrokerClusterInfo();
+ HashMap<String, BrokerData> brokerList = clusterInfo.getBrokerAddrTable();
+ assertThat(brokerList.get("default-broker").getBrokerName()).isEqualTo("default-broker");
+ assertThat(brokerList.containsKey("broker-test")).isTrue();
+
+ HashMap<String, Set<String>> clusterMap = new HashMap<>();
+ Set<String> brokers = new HashSet<>();
+ brokers.add("default-broker");
+ brokers.add("broker-test");
+ clusterMap.put("default-cluster", brokers);
+ ClusterInfo cInfo = mock(ClusterInfo.class);
+ when(cInfo.getClusterAddrTable()).thenReturn(clusterMap);
+ HashMap<String, Set<String>> clusterAddress = cInfo.getClusterAddrTable();
+ assertThat(clusterAddress.containsKey("default-cluster")).isTrue();
+ assertThat(clusterAddress.get("default-cluster").size()).isEqualTo(2);
+ }
+
+ @Test
+ public void testExamineTopicRouteInfo() throws RemotingException, MQClientException, InterruptedException {
+ TopicRouteData topicRouteData = defaultMQAdminExtImpl.examineTopicRouteInfo("UnitTest");
+ assertThat(topicRouteData.getBrokerDatas().get(0).getBrokerName()).isEqualTo("default-broker");
+ assertThat(topicRouteData.getBrokerDatas().get(0).getCluster()).isEqualTo("default-cluster");
+ }
+
+ @Test
+ public void testGetNameServerAddressList() {
+ List<String> result = new ArrayList<>();
+ result.add("default-name-one");
+ result.add("default-name-two");
+ when(mqClientInstance.getMQClientAPIImpl().getNameServerAddressList()).thenReturn(result);
+ List<String> nameList = defaultMQAdminExtImpl.getNameServerAddressList();
+ assertThat(nameList.get(0)).isEqualTo("default-name-one");
+ assertThat(nameList.get(1)).isEqualTo("default-name-two");
+ }
+
+ @Test
+ public void testPutKVConfig() throws RemotingException, MQClientException, InterruptedException {
+ String topicConfig = defaultMQAdminExtImpl.getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, "UnitTest");
+ assertThat(topicConfig).isEqualTo("topicListConfig");
+ KVTable kvs = defaultMQAdminExtImpl.getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
+ assertThat(kvs.getTable().get("broker-name")).isEqualTo("broker-one");
+ assertThat(kvs.getTable().get("cluster-name")).isEqualTo("default-cluster");
+ }
+
+
+ @Test
+ public void testQueryTopicConsumeByWho() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+ GroupList groupList = defaultMQAdminExtImpl.queryTopicConsumeByWho("UnitTest");
+ assertThat(groupList.getGroupList().contains("consumer-group-two")).isTrue();
+ }
+
+ @Test
+ public void testCleanExpiredConsumerQueueByAddr() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
+ boolean clean = defaultMQAdminExtImpl.cleanExpiredConsumerQueueByAddr("127.0.0.1:10911");
+ assertThat(clean).isTrue();
+ }
+
+ @Test
+ public void testGetClusterList() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
+ Set<String> clusterlist = defaultMQAdminExtImpl.getClusterList("UnitTest");
+ assertThat(clusterlist.contains("default-cluster-one")).isTrue();
+ assertThat(clusterlist.contains("default-cluster-two")).isTrue();
+ }
+
+ @Test
+ public void testFetchConsumeStatsInBroker() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
+ ConsumeStatsList result = new ConsumeStatsList();
+ result.setBrokerAddr("127.0.0.1:10911");
+ when(mqClientInstance.getMQClientAPIImpl().fetchConsumeStatsInBroker("127.0.0.1:10911", false, 10000)).thenReturn(result);
+ ConsumeStatsList consumeStatsList = defaultMQAdminExtImpl.fetchConsumeStatsInBroker("127.0.0.1:10911", false, 10000);
+ assertThat(consumeStatsList.getBrokerAddr()).isEqualTo("127.0.0.1:10911");
+ }
+
+ @Test
+ public void testGetAllSubscriptionGroup() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExtImpl.getAllSubscriptionGroup("127.0.0.1:10911", 10000);
+ assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getBrokerId()).isEqualTo(1234);
+ assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getGroupName()).isEqualTo("Consumer-group-one");
+ assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").isConsumeBroadcastEnable()).isTrue();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b29ff37d/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java
new file mode 100644
index 0000000..35c205e
--- /dev/null
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java
@@ -0,0 +1,92 @@
+package org.apache.rocketmq.tools.command;
+
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.*;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class CommandUtilTest {
+ private DefaultMQAdminExt defaultMQAdminExt;
+ private DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+ private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+ private MQClientAPIImpl mQClientAPIImpl;
+
+ @Before
+ public void setup() throws MQClientException, NoSuchFieldException, IllegalAccessException, InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ defaultMQAdminExt = mock(DefaultMQAdminExt.class);
+ MQClientAPIImpl mQClientAPIImpl = mock(MQClientAPIImpl.class);
+ defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 3000);
+
+ Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
+ field.setAccessible(true);
+ field.set(defaultMQAdminExtImpl, mqClientInstance);
+ field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+ field.setAccessible(true);
+ field.set(mqClientInstance, mQClientAPIImpl);
+
+ ClusterInfo clusterInfo = new ClusterInfo();
+ HashMap<String, BrokerData> brokerAddrTable = new HashMap<>();
+ HashMap<String, Set<String>> clusterAddrTable = new HashMap<>();
+ HashMap<Long, String> brokerAddrs = new HashMap<>();
+ brokerAddrs.put(1234l, "127.0.0.1:10911");
+ BrokerData brokerData = new BrokerData();
+ brokerData.setBrokerName("default-broker");
+ brokerData.setCluster("default-cluster");
+ brokerData.setBrokerAddrs(brokerAddrs);
+ brokerAddrTable.put("default-broker", brokerData);
+ brokerAddrTable.put("broker-test", new BrokerData());
+ Set<String> brokerSet = new HashSet<>();
+ brokerSet.add("default-broker");
+ brokerSet.add("default-broker-one");
+ clusterAddrTable.put("default-cluster", brokerSet);
+ clusterInfo.setBrokerAddrTable(brokerAddrTable);
+ clusterInfo.setClusterAddrTable(clusterAddrTable);
+ when(mQClientAPIImpl.getBrokerClusterInfo(anyLong())).thenReturn(clusterInfo);
+ when(mQClientAPIImpl.cleanExpiredConsumeQueue(anyString(), anyLong())).thenReturn(true);
+ }
+
+ @After
+ public void shutdown() throws Exception {
+ }
+
+ @Test
+ public void testFetchMasterAndSlaveDistinguish() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ Map<String, List<String>> result = CommandUtil.fetchMasterAndSlaveDistinguish(defaultMQAdminExtImpl, "default-cluster");
+ assertThat(result.get(null).get(0)).isEqualTo("127.0.0.1:10911");
+ }
+
+ @Test
+ public void testFetchMasterAddrByClusterName() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ Set<String> result = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExtImpl, "default-cluster");
+ assertThat(result.size()).isEqualTo(0);
+ }
+
+ @Test
+ public void testFetchBrokerNameByClusterName() throws Exception {
+ Set<String> result = CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExtImpl, "default-cluster");
+ assertThat(result.contains("default-broker")).isTrue();
+ assertThat(result.contains("default-broker-one")).isTrue();
+ assertThat(result.size()).isEqualTo(2);
+ }
+}
\ No newline at end of file
[12/14] incubator-rocketmq git commit: [ROCKETMQ-57] Add license
Posted by st...@apache.org.
[ROCKETMQ-57] Add license
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/0de84e20
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/0de84e20
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/0de84e20
Branch: refs/heads/master
Commit: 0de84e20fccea6391e3f50fce051be795deeccd7
Parents: 8399389 4291348
Author: stevenschew <st...@apache.org>
Authored: Thu Jan 19 21:00:54 2017 +0800
Committer: stevenschew <st...@apache.org>
Committed: Thu Jan 19 21:00:54 2017 +0800
----------------------------------------------------------------------
.travis.yml | 45 +++-
README.md | 2 +-
.../rocketmq/broker/BrokerTestHarness.java | 69 ------
.../broker/api/BrokerFastFailureTest.java | 61 -----
.../rocketmq/broker/api/SendMessageTest.java | 79 ------
.../offset/ConsumerOffsetManagerTest.java | 61 -----
.../broker/topic/TopicConfigManagerTest.java | 60 -----
.../consumer/DefaultMQPullConsumerTest.java | 152 ++++++++++++
.../consumer/DefaultMQPushConsumerTest.java | 178 +++++++++++++
.../store/LocalFileOffsetStoreTest.java | 3 +-
.../impl/factory/MQClientInstanceTest.java | 6 +-
.../client/producer/DefaultMQProducerTest.java | 248 +++++++++++++++++++
pom.xml | 76 +++++-
.../tools/admin/DefaultMQAdminExtTest.java | 16 ++
.../rocketmq/tools/command/CommandUtilTest.java | 16 ++
15 files changed, 728 insertions(+), 344 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0de84e20/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
----------------------------------------------------------------------
diff --cc tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
index ab6319f,0000000..5964cd1
mode 100644,000000..100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
@@@ -1,240 -1,0 +1,256 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+package org.apache.rocketmq.tools.admin;
+
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.namesrv.NamesrvUtil;
+import org.apache.rocketmq.common.protocol.body.*;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Field;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DefaultMQAdminExtTest {
+ private DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+ private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+ private MQClientAPIImpl mQClientAPIImpl;
+ private Properties properties = new Properties();
+ private TopicList topicList = new TopicList();
+ private TopicRouteData topicRouteData = new TopicRouteData();
+ private KVTable kvTable = new KVTable();
+ private ClusterInfo clusterInfo = new ClusterInfo();
+
+ @Before
+ public void init() throws Exception {
+ mQClientAPIImpl = mock(MQClientAPIImpl.class);
+ DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
+ defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
+
+ Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
+ field.setAccessible(true);
+ field.set(defaultMQAdminExtImpl, mqClientInstance);
+ field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+ field.setAccessible(true);
+ field.set(mqClientInstance, mQClientAPIImpl);
+
+ properties.setProperty("maxMessageSize", "5000000");
+ properties.setProperty("flushDelayOffsetInterval", "15000");
+ properties.setProperty("serverSocketRcvBufSize", "655350");
+ when(mQClientAPIImpl.getBrokerConfig(anyString(), anyLong())).thenReturn(properties);
+
+ Set<String> topicSet = new HashSet<>();
+ topicSet.add("topic_one");
+ topicSet.add("topic_two");
+ topicList.setTopicList(topicSet);
+ when(mQClientAPIImpl.getTopicListFromNameServer(anyLong())).thenReturn(topicList);
+
+
+ List<BrokerData> brokerDatas = new ArrayList<>();
+ HashMap<Long, String> brokerAddrs = new HashMap<>();
+ brokerAddrs.put(1234l, "127.0.0.1:10911");
+ BrokerData brokerData = new BrokerData();
+ brokerData.setCluster("default-cluster");
+ brokerData.setBrokerName("default-broker");
+ brokerData.setBrokerAddrs(brokerAddrs);
+ brokerDatas.add(brokerData);
+ topicRouteData.setBrokerDatas(brokerDatas);
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData);
+
+ HashMap<String, String> result = new HashMap<>();
+ result.put("id", "1234");
+ result.put("brokerName", "default-broker");
+ kvTable.setTable(result);
+ when(mQClientAPIImpl.getBrokerRuntimeInfo(anyString(), anyLong())).thenReturn(kvTable);
+
+ HashMap<String, BrokerData> brokerAddrTable = new HashMap<>();
+ brokerAddrTable.put("default-broker", brokerData);
+ brokerAddrTable.put("broker-test", new BrokerData());
+ clusterInfo.setBrokerAddrTable(brokerAddrTable);
+ when(mQClientAPIImpl.getBrokerClusterInfo(anyLong())).thenReturn(clusterInfo);
+ when(mQClientAPIImpl.cleanExpiredConsumeQueue(anyString(), anyLong())).thenReturn(true);
+
+ Set<String> clusterList = new HashSet<>();
+ clusterList.add("default-cluster-one");
+ clusterList.add("default-cluster-two");
+ when(mQClientAPIImpl.getClusterList(anyString(), anyLong())).thenReturn(clusterList);
+
+ GroupList groupList = new GroupList();
+ HashSet<String> groups = new HashSet<>();
+ groups.add("consumer-group-one");
+ groups.add("consumer-group-two");
+ groupList.setGroupList(groups);
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData);
+ when(mQClientAPIImpl.queryTopicConsumeByWho(anyString(), anyString(), anyLong())).thenReturn(groupList);
+
+ SubscriptionGroupWrapper subscriptionGroupWrapper = new SubscriptionGroupWrapper();
+ ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptions = new ConcurrentHashMap<>();
+ SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+ subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+ subscriptionGroupConfig.setBrokerId(1234);
+ subscriptionGroupConfig.setGroupName("Consumer-group-one");
+ subscriptions.put("Consumer-group-one", subscriptionGroupConfig);
+ subscriptionGroupWrapper.setSubscriptionGroupTable(subscriptions);
+ when(mQClientAPIImpl.getAllSubscriptionGroup(anyString(), anyLong())).thenReturn(subscriptionGroupWrapper);
+
+ String topicListConfig = "topicListConfig";
+ when(mQClientAPIImpl.getKVConfigValue(anyString(), anyString(), anyLong())).thenReturn(topicListConfig);
+
+ KVTable kvTable = new KVTable();
+ HashMap<String, String> kv = new HashMap<>();
+ kv.put("broker-name", "broker-one");
+ kv.put("cluster-name", "default-cluster");
+ kvTable.setTable(kv);
+ when(mQClientAPIImpl.getKVListByNamespace(anyString(), anyLong())).thenReturn(kvTable);
+ }
+
+ @After
+ public void terminate() throws Exception {
+ if (defaultMQAdminExtImpl != null)
+ defaultMQAdminExtImpl.shutdown();
+ }
+
+ @Test
+ public void testUpdateBrokerConfig() throws InterruptedException, RemotingConnectException, UnsupportedEncodingException, RemotingTimeoutException, MQBrokerException, RemotingSendRequestException {
+ Properties result = defaultMQAdminExtImpl.getBrokerConfig("127.0.0.1:10911");
+ assertThat(result.getProperty("maxMessageSize")).isEqualTo("5000000");
+ assertThat(result.getProperty("flushDelayOffsetInterval")).isEqualTo("15000");
+ assertThat(result.getProperty("serverSocketRcvBufSize")).isEqualTo("655350");
+ }
+
+
+ @Test
+ public void testFetchAllTopicList() throws RemotingException, MQClientException, InterruptedException {
+ TopicList topicList = defaultMQAdminExtImpl.fetchAllTopicList();
+ assertThat(topicList.getTopicList().size()).isEqualTo(2);
+ assertThat(topicList.getTopicList()).contains("topic_one");
+ }
+
+ @Test
+ public void testFetchBrokerRuntimeStats() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ KVTable brokerStats = defaultMQAdminExtImpl.fetchBrokerRuntimeStats("127.0.0.1:10911");
+ assertThat(brokerStats.getTable().get("id")).isEqualTo("1234");
+ assertThat(brokerStats.getTable().get("brokerName")).isEqualTo("default-broker");
+ }
+
+ @Test
+ public void testExamineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ ClusterInfo clusterInfo = defaultMQAdminExtImpl.examineBrokerClusterInfo();
+ HashMap<String, BrokerData> brokerList = clusterInfo.getBrokerAddrTable();
+ assertThat(brokerList.get("default-broker").getBrokerName()).isEqualTo("default-broker");
+ assertThat(brokerList.containsKey("broker-test")).isTrue();
+
+ HashMap<String, Set<String>> clusterMap = new HashMap<>();
+ Set<String> brokers = new HashSet<>();
+ brokers.add("default-broker");
+ brokers.add("broker-test");
+ clusterMap.put("default-cluster", brokers);
+ ClusterInfo cInfo = mock(ClusterInfo.class);
+ when(cInfo.getClusterAddrTable()).thenReturn(clusterMap);
+ HashMap<String, Set<String>> clusterAddress = cInfo.getClusterAddrTable();
+ assertThat(clusterAddress.containsKey("default-cluster")).isTrue();
+ assertThat(clusterAddress.get("default-cluster").size()).isEqualTo(2);
+ }
+
+ @Test
+ public void testExamineTopicRouteInfo() throws RemotingException, MQClientException, InterruptedException {
+ TopicRouteData topicRouteData = defaultMQAdminExtImpl.examineTopicRouteInfo("UnitTest");
+ assertThat(topicRouteData.getBrokerDatas().get(0).getBrokerName()).isEqualTo("default-broker");
+ assertThat(topicRouteData.getBrokerDatas().get(0).getCluster()).isEqualTo("default-cluster");
+ }
+
+ @Test
+ public void testGetNameServerAddressList() {
+ List<String> result = new ArrayList<>();
+ result.add("default-name-one");
+ result.add("default-name-two");
+ when(mqClientInstance.getMQClientAPIImpl().getNameServerAddressList()).thenReturn(result);
+ List<String> nameList = defaultMQAdminExtImpl.getNameServerAddressList();
+ assertThat(nameList.get(0)).isEqualTo("default-name-one");
+ assertThat(nameList.get(1)).isEqualTo("default-name-two");
+ }
+
+ @Test
+ public void testPutKVConfig() throws RemotingException, MQClientException, InterruptedException {
+ String topicConfig = defaultMQAdminExtImpl.getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, "UnitTest");
+ assertThat(topicConfig).isEqualTo("topicListConfig");
+ KVTable kvs = defaultMQAdminExtImpl.getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
+ assertThat(kvs.getTable().get("broker-name")).isEqualTo("broker-one");
+ assertThat(kvs.getTable().get("cluster-name")).isEqualTo("default-cluster");
+ }
+
+
+ @Test
+ public void testQueryTopicConsumeByWho() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+ GroupList groupList = defaultMQAdminExtImpl.queryTopicConsumeByWho("UnitTest");
+ assertThat(groupList.getGroupList().contains("consumer-group-two")).isTrue();
+ }
+
+ @Test
+ public void testCleanExpiredConsumerQueueByAddr() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
+ boolean clean = defaultMQAdminExtImpl.cleanExpiredConsumerQueueByAddr("127.0.0.1:10911");
+ assertThat(clean).isTrue();
+ }
+
+ @Test
+ public void testGetClusterList() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
+ Set<String> clusterlist = defaultMQAdminExtImpl.getClusterList("UnitTest");
+ assertThat(clusterlist.contains("default-cluster-one")).isTrue();
+ assertThat(clusterlist.contains("default-cluster-two")).isTrue();
+ }
+
+ @Test
+ public void testFetchConsumeStatsInBroker() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
+ ConsumeStatsList result = new ConsumeStatsList();
+ result.setBrokerAddr("127.0.0.1:10911");
+ when(mqClientInstance.getMQClientAPIImpl().fetchConsumeStatsInBroker("127.0.0.1:10911", false, 10000)).thenReturn(result);
+ ConsumeStatsList consumeStatsList = defaultMQAdminExtImpl.fetchConsumeStatsInBroker("127.0.0.1:10911", false, 10000);
+ assertThat(consumeStatsList.getBrokerAddr()).isEqualTo("127.0.0.1:10911");
+ }
+
+ @Test
+ public void testGetAllSubscriptionGroup() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExtImpl.getAllSubscriptionGroup("127.0.0.1:10911", 10000);
+ assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getBrokerId()).isEqualTo(1234);
+ assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getGroupName()).isEqualTo("Consumer-group-one");
+ assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").isConsumeBroadcastEnable()).isTrue();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0de84e20/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java
----------------------------------------------------------------------
diff --cc tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java
index 35c205e,0000000..ba58010
mode 100644,000000..100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java
@@@ -1,92 -1,0 +1,108 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
+package org.apache.rocketmq.tools.command;
+
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.*;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class CommandUtilTest {
+ private DefaultMQAdminExt defaultMQAdminExt;
+ private DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+ private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+ private MQClientAPIImpl mQClientAPIImpl;
+
+ @Before
+ public void setup() throws MQClientException, NoSuchFieldException, IllegalAccessException, InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ defaultMQAdminExt = mock(DefaultMQAdminExt.class);
+ MQClientAPIImpl mQClientAPIImpl = mock(MQClientAPIImpl.class);
+ defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 3000);
+
+ Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
+ field.setAccessible(true);
+ field.set(defaultMQAdminExtImpl, mqClientInstance);
+ field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+ field.setAccessible(true);
+ field.set(mqClientInstance, mQClientAPIImpl);
+
+ ClusterInfo clusterInfo = new ClusterInfo();
+ HashMap<String, BrokerData> brokerAddrTable = new HashMap<>();
+ HashMap<String, Set<String>> clusterAddrTable = new HashMap<>();
+ HashMap<Long, String> brokerAddrs = new HashMap<>();
+ brokerAddrs.put(1234l, "127.0.0.1:10911");
+ BrokerData brokerData = new BrokerData();
+ brokerData.setBrokerName("default-broker");
+ brokerData.setCluster("default-cluster");
+ brokerData.setBrokerAddrs(brokerAddrs);
+ brokerAddrTable.put("default-broker", brokerData);
+ brokerAddrTable.put("broker-test", new BrokerData());
+ Set<String> brokerSet = new HashSet<>();
+ brokerSet.add("default-broker");
+ brokerSet.add("default-broker-one");
+ clusterAddrTable.put("default-cluster", brokerSet);
+ clusterInfo.setBrokerAddrTable(brokerAddrTable);
+ clusterInfo.setClusterAddrTable(clusterAddrTable);
+ when(mQClientAPIImpl.getBrokerClusterInfo(anyLong())).thenReturn(clusterInfo);
+ when(mQClientAPIImpl.cleanExpiredConsumeQueue(anyString(), anyLong())).thenReturn(true);
+ }
+
+ @After
+ public void shutdown() throws Exception {
+ }
+
+ @Test
+ public void testFetchMasterAndSlaveDistinguish() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ Map<String, List<String>> result = CommandUtil.fetchMasterAndSlaveDistinguish(defaultMQAdminExtImpl, "default-cluster");
+ assertThat(result.get(null).get(0)).isEqualTo("127.0.0.1:10911");
+ }
+
+ @Test
+ public void testFetchMasterAddrByClusterName() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+ Set<String> result = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExtImpl, "default-cluster");
+ assertThat(result.size()).isEqualTo(0);
+ }
+
+ @Test
+ public void testFetchBrokerNameByClusterName() throws Exception {
+ Set<String> result = CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExtImpl, "default-cluster");
+ assertThat(result.contains("default-broker")).isTrue();
+ assertThat(result.contains("default-broker-one")).isTrue();
+ assertThat(result.size()).isEqualTo(2);
+ }
+}
[10/14] incubator-rocketmq git commit: Merge branch 'ROCKETMQ-57' of
https://git-wip-us.apache.org/repos/asf/incubator-rocketmq into ROCKETMQ-57
Posted by st...@apache.org.
Merge branch 'ROCKETMQ-57' of https://git-wip-us.apache.org/repos/asf/incubator-rocketmq into ROCKETMQ-57
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/83993896
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/83993896
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/83993896
Branch: refs/heads/master
Commit: 83993896e223f6ef08b0573f98f50177c5463b79
Parents: b29ff37 626315a
Author: stevenschew <st...@apache.org>
Authored: Thu Jan 19 11:41:37 2017 +0800
Committer: stevenschew <st...@apache.org>
Committed: Thu Jan 19 11:41:37 2017 +0800
----------------------------------------------------------------------
----------------------------------------------------------------------
[13/14] incubator-rocketmq git commit: [ROCKETMQ-57] Add unit test
for DefaultMQAdminExt
Posted by st...@apache.org.
[ROCKETMQ-57] Add unit test for DefaultMQAdminExt
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/f8d881ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/f8d881ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/f8d881ba
Branch: refs/heads/master
Commit: f8d881babe3f8f8810f1303912b2fb0ef2badea0
Parents: 0de84e2
Author: stevenschew <st...@apache.org>
Authored: Fri Jan 20 14:53:43 2017 +0800
Committer: stevenschew <st...@apache.org>
Committed: Fri Jan 20 14:53:43 2017 +0800
----------------------------------------------------------------------
.../tools/admin/DefaultMQAdminExtTest.java | 136 ++++++++++++++++++-
1 file changed, 130 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/f8d881ba/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
index 5964cd1..da3bd8c 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
@@ -22,15 +22,21 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.admin.*;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.namesrv.NamesrvUtil;
import org.apache.rocketmq.common.protocol.body.*;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.exception.*;
+import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -43,8 +49,7 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -93,6 +98,8 @@ public class DefaultMQAdminExtTest {
brokerData.setBrokerAddrs(brokerAddrs);
brokerDatas.add(brokerData);
topicRouteData.setBrokerDatas(brokerDatas);
+ topicRouteData.setQueueDatas(new ArrayList<QueueData>());
+ topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData);
HashMap<String, String> result = new HashMap<>();
@@ -105,6 +112,7 @@ public class DefaultMQAdminExtTest {
brokerAddrTable.put("default-broker", brokerData);
brokerAddrTable.put("broker-test", new BrokerData());
clusterInfo.setBrokerAddrTable(brokerAddrTable);
+ clusterInfo.setClusterAddrTable(new HashMap<String, Set<String>>());
when(mQClientAPIImpl.getBrokerClusterInfo(anyLong())).thenReturn(clusterInfo);
when(mQClientAPIImpl.cleanExpiredConsumeQueue(anyString(), anyLong())).thenReturn(true);
@@ -140,6 +148,52 @@ public class DefaultMQAdminExtTest {
kv.put("cluster-name", "default-cluster");
kvTable.setTable(kv);
when(mQClientAPIImpl.getKVListByNamespace(anyString(), anyLong())).thenReturn(kvTable);
+
+ ConsumeStats consumeStats = new ConsumeStats();
+ consumeStats.setConsumeTps(1234);
+ MessageQueue messageQueue = new MessageQueue();
+ OffsetWrapper offsetWrapper = new OffsetWrapper();
+ HashMap<MessageQueue, OffsetWrapper> stats = new HashMap<>();
+ stats.put(messageQueue, offsetWrapper);
+ consumeStats.setOffsetTable(stats);
+ when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), anyString(), anyLong())).thenReturn(consumeStats);
+
+ ConsumerConnection consumerConnection = new ConsumerConnection();
+ consumerConnection.setConsumeType(ConsumeType.CONSUME_PASSIVELY);
+ consumerConnection.setMessageModel(MessageModel.CLUSTERING);
+ HashSet<Connection> connections = new HashSet<>();
+ connections.add(new Connection());
+ consumerConnection.setConnectionSet(connections);
+ consumerConnection.setSubscriptionTable(new ConcurrentHashMap<String, SubscriptionData>());
+ consumerConnection.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ when(mQClientAPIImpl.getConsumerConnectionList(anyString(), anyString(), anyLong())).thenReturn(consumerConnection);
+
+ ProducerConnection producerConnection = new ProducerConnection();
+ Connection connection = new Connection();
+ connection.setClientAddr("127.0.0.1:9898");
+ connection.setClientId("PID_12345");
+ HashSet<Connection> connectionSet = new HashSet<Connection>();
+ connectionSet.add(connection);
+ producerConnection.setConnectionSet(connectionSet);
+ when(mQClientAPIImpl.getProducerConnectionList(anyString(), anyString(), anyLong())).thenReturn(producerConnection);
+
+ when(mQClientAPIImpl.wipeWritePermOfBroker(anyString(), anyString(), anyLong())).thenReturn(6);
+
+ TopicStatsTable topicStatsTable = new TopicStatsTable();
+ topicStatsTable.setOffsetTable(new HashMap<MessageQueue, TopicOffset>());
+
+ Map<String, Map<MessageQueue, Long>> consumerStatus = new HashMap<>();
+ when(mQClientAPIImpl.invokeBrokerToGetConsumerStatus(anyString(), anyString(), anyString(), anyString(), anyLong())).thenReturn(consumerStatus);
+
+ List<QueueTimeSpan> queueTimeSpanList = new ArrayList<>();
+ when(mQClientAPIImpl.queryConsumeTimeSpan(anyString(), anyString(), anyString(), anyLong())).thenReturn(queueTimeSpanList);
+
+ ConsumerRunningInfo consumerRunningInfo = new ConsumerRunningInfo();
+ consumerRunningInfo.setJstack("test");
+ consumerRunningInfo.setMqTable(new TreeMap<MessageQueue, ProcessQueueInfo>());
+ consumerRunningInfo.setStatusTable(new TreeMap<String, ConsumeStatus>());
+ consumerRunningInfo.setSubscriptionSet(new TreeSet<SubscriptionData>());
+ when(mQClientAPIImpl.getConsumerRunningInfo(anyString(), anyString(), anyString(), anyBoolean(), anyLong())).thenReturn(consumerRunningInfo);
}
@After
@@ -191,6 +245,31 @@ public class DefaultMQAdminExtTest {
}
@Test
+ public void testExamineConsumeStats() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+ ConsumeStats consumeStats = defaultMQAdminExtImpl.examineConsumeStats("default-consumer-group", "unit-test");
+ assertThat(consumeStats.getConsumeTps()).isEqualTo(1234);
+ }
+
+ @Test
+ public void testExamineConsumerConnectionInfo() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+ ConsumerConnection consumerConnection = defaultMQAdminExtImpl.examineConsumerConnectionInfo("default-consumer-group");
+ assertThat(consumerConnection.getConsumeType()).isEqualTo(ConsumeType.CONSUME_PASSIVELY);
+ assertThat(consumerConnection.getMessageModel()).isEqualTo(MessageModel.CLUSTERING);
+ }
+
+ @Test
+ public void testExamineProducerConnectionInfo() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+ ProducerConnection producerConnection = defaultMQAdminExtImpl.examineProducerConnectionInfo("default-producer-group", "unit-test");
+ assertThat(producerConnection.getConnectionSet().size()).isEqualTo(1);
+ }
+
+ @Test
+ public void testWipeWritePermOfBroker() throws InterruptedException, RemotingCommandException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, RemotingConnectException {
+ int result = defaultMQAdminExtImpl.wipeWritePermOfBroker("127.0.0.1:9876", "default-broker");
+ assertThat(result).isEqualTo(6);
+ }
+
+ @Test
public void testExamineTopicRouteInfo() throws RemotingException, MQClientException, InterruptedException {
TopicRouteData topicRouteData = defaultMQAdminExtImpl.examineTopicRouteInfo("UnitTest");
assertThat(topicRouteData.getBrokerDatas().get(0).getBrokerName()).isEqualTo("default-broker");
@@ -225,12 +304,57 @@ public class DefaultMQAdminExtTest {
}
@Test
+ public void testQueryConsumeTimeSpan() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+ List<QueueTimeSpan> result = defaultMQAdminExtImpl.queryConsumeTimeSpan("unit-test", "default-broker-group");
+ assertThat(result.size()).isEqualTo(0);
+ }
+
+ @Test
+ public void testCleanExpiredConsumerQueue() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
+ boolean result = defaultMQAdminExtImpl.cleanExpiredConsumerQueue("default-cluster");
+ assertThat(result).isFalse();
+ }
+
+ @Test
public void testCleanExpiredConsumerQueueByAddr() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
boolean clean = defaultMQAdminExtImpl.cleanExpiredConsumerQueueByAddr("127.0.0.1:10911");
assertThat(clean).isTrue();
}
@Test
+ public void testCleanUnusedTopic() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
+ boolean result = defaultMQAdminExtImpl.cleanUnusedTopic("default-cluster");
+ assertThat(result).isFalse();
+ }
+
+ @Test
+ public void testGetConsumerRunningInfo() throws RemotingException, MQClientException, InterruptedException {
+ ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExtImpl.getConsumerRunningInfo("consumer-group", "cid_123", false);
+ assertThat(consumerRunningInfo.getJstack()).isEqualTo("test");
+ }
+
+ @Test
+ public void testMessageTrackDetail() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+ MessageExt messageExt = new MessageExt();
+ messageExt.setMsgId("msgId");
+ messageExt.setTopic("unit-test");
+ List<MessageTrack> messageTrackList = defaultMQAdminExtImpl.messageTrackDetail(messageExt);
+ assertThat(messageTrackList.size()).isEqualTo(2);
+ }
+
+ @Test
+ public void testGetConsumeStatus() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+ Map<String, Map<MessageQueue, Long>> result = defaultMQAdminExtImpl.getConsumeStatus("unit-test", "default-broker-group", "127.0.0.1:10911");
+ assertThat(result.size()).isEqualTo(0);
+ }
+
+ @Test
+ public void testGetTopicClusterList() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+ Set<String> result = defaultMQAdminExtImpl.getTopicClusterList("unit-test");
+ assertThat(result.size()).isEqualTo(0);
+ }
+
+ @Test
public void testGetClusterList() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
Set<String> clusterlist = defaultMQAdminExtImpl.getClusterList("UnitTest");
assertThat(clusterlist.contains("default-cluster-one")).isTrue();
[14/14] incubator-rocketmq git commit: Merge branch 'master' into
ROCKETMQ-57
Posted by st...@apache.org.
Merge branch 'master' into ROCKETMQ-57
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/63de56c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/63de56c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/63de56c7
Branch: refs/heads/master
Commit: 63de56c7b6cada18338bb4be5e84fb17a379e5ac
Parents: f8d881b b4108d2
Author: stevenschew <st...@apache.org>
Authored: Fri Jan 20 14:55:52 2017 +0800
Committer: stevenschew <st...@apache.org>
Committed: Fri Jan 20 14:55:52 2017 +0800
----------------------------------------------------------------------
.travis.yml | 1 +
BUILDING | 2 +-
.../rocketmq/broker/out/BrokerOuterAPI.java | 1 -
.../subscription/SubscriptionGroupManager.java | 2 +-
.../rocketmq/client/impl/MQClientManager.java | 3 +-
.../processor/DefaultRequestProcessorTest.java | 246 +++++++++++++++
pom.xml | 73 +++--
.../remoting/protocol/RemotingSerializable.java | 2 +-
.../remoting/protocol/RocketMQSerializable.java | 17 +-
style/rmq_checkstyle.xml | 1 -
test/pom.xml | 52 ++++
.../test/client/mq/MQAsyncProducer.java | 85 ++++++
.../test/client/rmq/RMQAsyncSendProducer.java | 226 ++++++++++++++
.../test/client/rmq/RMQBroadCastConsumer.java | 37 +++
.../test/client/rmq/RMQNormalConsumer.java | 90 ++++++
.../test/client/rmq/RMQNormalProducer.java | 167 ++++++++++
.../clientinterface/AbstractMQConsumer.java | 112 +++++++
.../clientinterface/AbstractMQProducer.java | 151 +++++++++
.../test/clientinterface/MQCollector.java | 108 +++++++
.../test/clientinterface/MQConsumer.java | 26 ++
.../test/clientinterface/MQProducer.java | 30 ++
.../rocketmq/test/factory/ConsumerFactory.java | 45 +++
.../rocketmq/test/factory/MQMessageFactory.java | 128 ++++++++
.../rocketmq/test/factory/MessageFactory.java | 62 ++++
.../rocketmq/test/factory/ProducerFactory.java | 37 +++
.../test/factory/SendCallBackFactory.java | 35 +++
.../rocketmq/test/factory/TagMessage.java | 108 +++++++
.../test/listener/AbstractListener.java | 104 +++++++
.../rmq/concurrent/RMQDelayListner.java | 61 ++++
.../rmq/concurrent/RMQNormalListner.java | 70 +++++
.../listener/rmq/order/RMQOrderListener.java | 85 ++++++
.../rocketmq/test/message/MessageQueueMsg.java | 62 ++++
.../rocketmq/test/sendresult/SendResult.java | 62 ++++
.../apache/rocketmq/test/util/Condition.java | 23 ++
.../test/util/DuplicateMessageInfo.java | 137 +++++++++
.../org/apache/rocketmq/test/util/FileUtil.java | 108 +++++++
.../org/apache/rocketmq/test/util/MQAdmin.java | 164 ++++++++++
.../rocketmq/test/util/MQRandomUtils.java | 28 ++
.../org/apache/rocketmq/test/util/MQWait.java | 93 ++++++
.../apache/rocketmq/test/util/RandomUtil.java | 306 +++++++++++++++++++
.../apache/rocketmq/test/util/RandomUtils.java | 91 ++++++
.../org/apache/rocketmq/test/util/TestUtil.java | 122 ++++++++
.../apache/rocketmq/test/util/TestUtils.java | 50 +++
.../apache/rocketmq/test/util/VerifyUtils.java | 146 +++++++++
.../test/util/data/collect/DataCollector.java | 45 +++
.../util/data/collect/DataCollectorManager.java | 112 +++++++
.../test/util/data/collect/DataFilter.java | 22 ++
.../collect/impl/ListDataCollectorImpl.java | 95 ++++++
.../data/collect/impl/MapDataCollectorImpl.java | 111 +++++++
.../test/util/parallel/ParallelTask.java | 43 +++
.../util/parallel/ParallelTaskExecutor.java | 67 ++++
.../rocketmq/test/util/parallel/Task4Test.java | 30 ++
.../org/apache/rocketmq/test/base/BaseConf.java | 162 ++++++++++
.../rocketmq/test/base/IntegrationTestBase.java | 143 +++++++++
.../balance/NormalMsgDynamicBalanceIT.java | 111 +++++++
.../balance/NormalMsgStaticBalanceIT.java | 109 +++++++
.../consumer/broadcast/BaseBroadCastIT.java | 56 ++++
.../normal/BroadCastNormalMsgNotRecvIT.java | 73 +++++
.../normal/BroadCastNormalMsgRecvCrashIT.java | 90 ++++++
.../normal/BroadCastNormalMsgRecvFailIT.java | 72 +++++
.../BroadCastNormalMsgRecvStartLaterIT.java | 88 ++++++
.../BroadCastNormalMsgTwoDiffGroupRecvIT.java | 78 +++++
.../normal/NormalMsgTwoSameGroupConsumerIT.java | 78 +++++
.../broadcast/order/OrderMsgBroadCastIT.java | 75 +++++
.../tag/BroadCastTwoConsumerFilterIT.java | 78 +++++
.../tag/BroadCastTwoConsumerSubDiffTagIT.java | 75 +++++
.../tag/BroadCastTwoConsumerSubTagIT.java | 75 +++++
.../consumer/cluster/DynamicAddAndCrashIT.java | 103 +++++++
.../consumer/cluster/DynamicAddConsumerIT.java | 97 ++++++
.../cluster/DynamicCrashConsumerIT.java | 100 ++++++
.../test/client/consumer/tag/MulTagSubIT.java | 156 ++++++++++
.../consumer/tag/TagMessageWith1ConsumerIT.java | 197 ++++++++++++
.../tag/TagMessageWithMulConsumerIT.java | 196 ++++++++++++
.../tag/TagMessageWithSameGroupConsumerIT.java | 115 +++++++
.../consumer/topic/MulConsumerMulTopicIT.java | 108 +++++++
.../consumer/topic/OneConsumerMulTopicIT.java | 104 +++++++
.../producer/async/AsyncSendExceptionIT.java | 150 +++++++++
.../async/AsyncSendWithMessageQueueIT.java | 85 ++++++
.../AsyncSendWithMessageQueueSelectorIT.java | 106 +++++++
.../async/AsyncSendWithOnlySendCallBackIT.java | 64 ++++
.../producer/exception/msg/ChinaPropIT.java | 74 +++++
.../exception/msg/MessageExceptionIT.java | 129 ++++++++
.../exception/msg/MessageUserPropIT.java | 94 ++++++
.../ProducerGroupAndInstanceNameValidityIT.java | 69 +++++
.../producer/oneway/OneWaySendExceptionIT.java | 78 +++++
.../client/producer/oneway/OneWaySendIT.java | 64 ++++
.../producer/oneway/OneWaySendWithMQIT.java | 79 +++++
.../oneway/OneWaySendWithSelectorIT.java | 104 +++++++
.../order/OrderMsgDynamicRebalanceIT.java | 115 +++++++
.../test/client/producer/order/OrderMsgIT.java | 108 +++++++
.../producer/order/OrderMsgRebalanceIT.java | 133 ++++++++
.../producer/order/OrderMsgWithTagIT.java | 169 ++++++++++
.../querymsg/QueryMsgByIdExceptionIT.java | 81 +++++
.../producer/querymsg/QueryMsgByIdIT.java | 75 +++++
.../producer/querymsg/QueryMsgByKeyIT.java | 104 +++++++
.../apache/rocketmq/test/delay/DelayConf.java | 27 ++
.../rocketmq/test/delay/NormalMsgDelayIT.java | 116 +++++++
.../test/smoke/NormalMessageSendAndRecvIT.java | 62 ++++
test/src/test/resources/log4j.xml | 46 +++
test/src/test/resources/logback-test.xml | 33 ++
100 files changed, 8750 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
[03/14] incubator-rocketmq git commit: [ROCKETMQ-52] Add unit tests
for LocalFileOffsetStore
Posted by st...@apache.org.
[ROCKETMQ-52] Add unit tests for LocalFileOffsetStore
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/6b5ab40c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/6b5ab40c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/6b5ab40c
Branch: refs/heads/master
Commit: 6b5ab40ced9f3f40f0acc18ba4eaf64b76686701
Parents: c0e7629
Author: yukon <yu...@apache.org>
Authored: Tue Jan 17 15:15:16 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Jan 17 15:15:16 2017 +0800
----------------------------------------------------------------------
.../store/LocalFileOffsetStoreTest.java | 74 ++++++++++++++++++++
1 file changed, 74 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6b5ab40c/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
new file mode 100644
index 0000000..58d99d6
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.store;
+
+import java.util.Collections;
+import java.util.HashSet;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+
+public class LocalFileOffsetStoreTest {
+
+ @Mock
+ private static MQClientInstance mQClientFactory;
+ private static String group = "FooBarGroup";
+ private static String topic = "FooBar";
+ private static String brokerName = "DefaultBrokerName";
+
+ @BeforeClass
+ public static void init() {
+ System.setProperty("rocketmq.client.localOffsetStoreDir", System.getProperty("java.io.tmpdir") + ".rocketmq_offsets");
+ mQClientFactory = Mockito.mock(MQClientInstance.class);
+ String clientId = new ClientConfig().buildMQClientId() + "#TestNamespace" + System.currentTimeMillis();
+ when(mQClientFactory.getClientId()).thenReturn(clientId);
+ }
+
+ @Test
+ public void testUpdateOffset() throws Exception {
+ OffsetStore offsetStore = new LocalFileOffsetStore(mQClientFactory, group);
+ MessageQueue messageQueue = new MessageQueue(topic, brokerName, 1);
+ offsetStore.updateOffset(messageQueue, 1024, false);
+
+ assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1024);
+
+ offsetStore.updateOffset(messageQueue, 1023, false);
+ assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1023);
+
+ offsetStore.updateOffset(messageQueue, 1022, true);
+ assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1023);
+ }
+
+ @Test
+ public void testReadOffset_FromStore() throws Exception {
+ OffsetStore offsetStore = new LocalFileOffsetStore(mQClientFactory, group);
+ MessageQueue messageQueue = new MessageQueue(topic, brokerName, 2);
+
+ offsetStore.updateOffset(messageQueue, 1024, false);
+ assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(-1);
+
+ offsetStore.persistAll(new HashSet<>(Collections.singletonList(messageQueue)));
+ assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1024);
+ }
+}
\ No newline at end of file
[04/14] incubator-rocketmq git commit: [ROCKETMQ-52] Add unit tests
for RemoteBrokerOffsetStore
Posted by st...@apache.org.
[ROCKETMQ-52] Add unit tests for RemoteBrokerOffsetStore
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/2e226f1d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/2e226f1d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/2e226f1d
Branch: refs/heads/master
Commit: 2e226f1deee6968782159c31ca77323b86664482
Parents: 6b5ab40
Author: yukon <yu...@apache.org>
Authored: Tue Jan 17 16:15:11 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Jan 17 16:15:11 2017 +0800
----------------------------------------------------------------------
.../store/RemoteBrokerOffsetStoreTest.java | 141 +++++++++++++++++++
1 file changed, 141 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/2e226f1d/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
new file mode 100644
index 0000000..9a6a4b5
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.store;
+
+import java.util.Collections;
+import java.util.HashSet;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class RemoteBrokerOffsetStoreTest {
+
+ @Mock
+ private static MQClientInstance mQClientFactory;
+ @Mock
+ private static MQClientAPIImpl mqClientAPI;
+ private static String group = "FooBarGroup";
+ private static String topic = "FooBar";
+ private static String brokerName = "DefaultBrokerName";
+
+ @BeforeClass
+ public static void init() {
+ System.setProperty("rocketmq.client.localOffsetStoreDir", System.getProperty("java.io.tmpdir") + ".rocketmq_offsets");
+ mQClientFactory = mock(MQClientInstance.class);
+ String clientId = new ClientConfig().buildMQClientId() + "#TestNamespace" + System.currentTimeMillis();
+ when(mQClientFactory.getClientId()).thenReturn(clientId);
+ when(mQClientFactory.findBrokerAddressInAdmin(brokerName)).thenReturn(new FindBrokerResult("127.0.0.1", false));
+
+ mqClientAPI = mock(MQClientAPIImpl.class);
+ when(mQClientFactory.getMQClientAPIImpl()).thenReturn(mqClientAPI);
+ }
+
+ @Test
+ public void testUpdateOffset() throws Exception {
+ OffsetStore offsetStore = new RemoteBrokerOffsetStore(mQClientFactory, group);
+ MessageQueue messageQueue = new MessageQueue(topic, brokerName, 1);
+
+ offsetStore.updateOffset(messageQueue, 1024, false);
+ assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1024);
+
+ offsetStore.updateOffset(messageQueue, 1023, false);
+ assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1023);
+
+ offsetStore.updateOffset(messageQueue, 1022, true);
+ assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1023);
+ }
+
+ @Test
+ public void testReadOffset_WithException() throws Exception {
+ OffsetStore offsetStore = new RemoteBrokerOffsetStore(mQClientFactory, group);
+ MessageQueue messageQueue = new MessageQueue(topic, brokerName, 2);
+
+ offsetStore.updateOffset(messageQueue, 1024, false);
+
+ doThrow(new MQBrokerException(-1, ""))
+ .when(mqClientAPI).queryConsumerOffset(anyString(), any(QueryConsumerOffsetRequestHeader.class), anyLong());
+ assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(-1);
+
+ doThrow(new RemotingException("", null))
+ .when(mqClientAPI).queryConsumerOffset(anyString(), any(QueryConsumerOffsetRequestHeader.class), anyLong());
+ assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(-2);
+ }
+
+ @Test
+ public void testReadOffset_Success() throws Exception {
+ OffsetStore offsetStore = new RemoteBrokerOffsetStore(mQClientFactory, group);
+ final MessageQueue messageQueue = new MessageQueue(topic, brokerName, 3);
+
+ doAnswer(new Answer() {
+ @Override public Object answer(InvocationOnMock mock) throws Throwable {
+ UpdateConsumerOffsetRequestHeader updateRequestHeader = mock.getArgument(1);
+ when(mqClientAPI.queryConsumerOffset(anyString(), any(QueryConsumerOffsetRequestHeader.class), anyLong())).thenReturn(updateRequestHeader.getCommitOffset());
+ return null;
+ }
+ }).when(mqClientAPI).updateConsumerOffsetOneway(any(String.class), any(UpdateConsumerOffsetRequestHeader.class), any(Long.class));
+
+ offsetStore.updateOffset(messageQueue, 1024, false);
+ offsetStore.persist(messageQueue);
+ assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1024);
+
+ offsetStore.updateOffset(messageQueue, 1023, false);
+ offsetStore.persist(messageQueue);
+ assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1023);
+
+ offsetStore.updateOffset(messageQueue, 1022, true);
+ offsetStore.persist(messageQueue);
+ assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1023);
+
+ offsetStore.updateOffset(messageQueue, 1025, false);
+ offsetStore.persistAll(new HashSet<>(Collections.singletonList(messageQueue)));
+ assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1025);
+ }
+
+
+
+ @Test
+ public void testRemoveOffset() throws Exception {
+ OffsetStore offsetStore = new RemoteBrokerOffsetStore(mQClientFactory, group);
+ final MessageQueue messageQueue = new MessageQueue(topic, brokerName, 4);
+
+ offsetStore.updateOffset(messageQueue, 1024, false);
+ assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1024);
+
+ offsetStore.removeOffset(messageQueue);
+ assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(-1);
+ }
+}
\ No newline at end of file
[05/14] incubator-rocketmq git commit: [ROCKETMQ-52] Add unit tests
for MQClientInstance
Posted by st...@apache.org.
[ROCKETMQ-52] Add unit tests for MQClientInstance
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/8e26a401
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/8e26a401
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/8e26a401
Branch: refs/heads/master
Commit: 8e26a40109b1865ab701c10c769ae76af7ddafee
Parents: 2e226f1
Author: yukon <yu...@apache.org>
Authored: Tue Jan 17 17:22:15 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Jan 17 17:22:15 2017 +0800
----------------------------------------------------------------------
.../impl/factory/MQClientInstanceTest.java | 123 +++++++++++++++++++
1 file changed, 123 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8e26a401/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
new file mode 100644
index 0000000..956cdd9
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.factory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.admin.MQAdminExtInner;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.consumer.MQConsumerInner;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+@RunWith(MockitoJUnitRunner.class)
+public class MQClientInstanceTest {
+
+ @Mock
+ private static MQClientAPIImpl mqClientAPI;
+ @InjectMocks
+ private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());;
+
+ private String topic = "FooBar";
+ private String group = "FooBarGroup";
+
+ @Test
+ public void testTopicRouteData2TopicPublishInfo() {
+ TopicRouteData topicRouteData = new TopicRouteData();
+
+ topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
+ List<BrokerData> brokerDataList = new ArrayList<>();
+ BrokerData brokerData = new BrokerData();
+ brokerData.setBrokerName("BrokerA");
+ brokerData.setCluster("DefaultCluster");
+ HashMap<Long, String> brokerAddrs = new HashMap<>();
+ brokerAddrs.put(0L, "127.0.0.1");
+ brokerData.setBrokerAddrs(brokerAddrs);
+ brokerDataList.add(brokerData);
+ topicRouteData.setBrokerDatas(brokerDataList);
+
+ List<QueueData> queueDataList = new ArrayList<>();
+ QueueData queueData = new QueueData();
+ queueData.setBrokerName("BrokerA");
+ queueData.setPerm(2);
+ queueData.setReadQueueNums(3);
+ queueData.setWriteQueueNums(4);
+ queueData.setTopicSynFlag(0);
+ queueDataList.add(queueData);
+ topicRouteData.setQueueDatas(queueDataList);
+
+ TopicPublishInfo topicPublishInfo = MQClientInstance.topicRouteData2TopicPublishInfo(topic, topicRouteData);
+
+ assertThat(topicPublishInfo.isHaveTopicRouterInfo()).isFalse();
+ assertThat(topicPublishInfo.getMessageQueueList().size()).isEqualTo(4);
+ }
+
+ @Test
+ public void testRegisterProducer() {
+ boolean flag = mqClientInstance.registerProducer(group, mock(DefaultMQProducerImpl.class));
+ assertThat(flag).isTrue();
+
+ flag = mqClientInstance.registerProducer(group, mock(DefaultMQProducerImpl.class));
+ assertThat(flag).isFalse();
+
+ mqClientInstance.unregisterProducer(group);
+ flag = mqClientInstance.registerProducer(group, mock(DefaultMQProducerImpl.class));
+ assertThat(flag).isTrue();
+ }
+
+ @Test
+ public void testRegisterConsumer() throws RemotingException, InterruptedException, MQBrokerException {
+ boolean flag = mqClientInstance.registerConsumer(group, mock(MQConsumerInner.class));
+ assertThat(flag).isTrue();
+
+ flag = mqClientInstance.registerConsumer(group, mock(MQConsumerInner.class));
+ assertThat(flag).isFalse();
+
+ mqClientInstance.unregisterConsumer(group);
+ flag = mqClientInstance.registerConsumer(group, mock(MQConsumerInner.class));
+ assertThat(flag).isTrue();
+ }
+
+ @Test
+ public void testRegisterAdminExt() {
+ boolean flag = mqClientInstance.registerAdminExt(group, mock(MQAdminExtInner.class));
+ assertThat(flag).isTrue();
+
+ flag = mqClientInstance.registerAdminExt(group, mock(MQAdminExtInner.class));
+ assertThat(flag).isFalse();
+
+ mqClientInstance.unregisterAdminExt(group);
+ flag = mqClientInstance.registerAdminExt(group, mock(MQAdminExtInner.class));
+ assertThat(flag).isTrue();
+ }
+}
\ No newline at end of file
[08/14] incubator-rocketmq git commit: [ROCKETMQ-52] Add unit tests
for LatencyFaultTolerance
Posted by st...@apache.org.
[ROCKETMQ-52] Add unit tests for LatencyFaultTolerance
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/0f9f55f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/0f9f55f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/0f9f55f0
Branch: refs/heads/master
Commit: 0f9f55f0f39107b304ee26c603307e9bc5bf2c99
Parents: 4dd6c38
Author: yukon <yu...@apache.org>
Authored: Tue Jan 17 22:03:07 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Jan 17 22:03:07 2017 +0800
----------------------------------------------------------------------
.../latency/LatencyFaultToleranceImplTest.java | 67 ++++++++++++++++++++
1 file changed, 67 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0f9f55f0/client/src/test/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImplTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImplTest.java b/client/src/test/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImplTest.java
new file mode 100644
index 0000000..86690e4
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImplTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.latency;
+
+import java.util.concurrent.TimeUnit;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class LatencyFaultToleranceImplTest {
+ private LatencyFaultTolerance<String> latencyFaultTolerance;
+ private String brokerName = "BrokerA";
+ private String anotherBrokerName = "BrokerB";
+
+ @Before
+ public void init() {
+ latencyFaultTolerance = new LatencyFaultToleranceImpl();
+ }
+
+ @Test
+ public void testUpdateFaultItem() throws Exception {
+ latencyFaultTolerance.updateFaultItem(brokerName, 3000, 3000);
+ assertThat(latencyFaultTolerance.isAvailable(brokerName)).isFalse();
+ assertThat(latencyFaultTolerance.isAvailable(anotherBrokerName)).isTrue();
+ }
+
+ @Test
+ public void testIsAvailable() throws Exception {
+ latencyFaultTolerance.updateFaultItem(brokerName, 3000, 50);
+ assertThat(latencyFaultTolerance.isAvailable(brokerName)).isFalse();
+
+ TimeUnit.MILLISECONDS.sleep(70);
+ assertThat(latencyFaultTolerance.isAvailable(brokerName)).isTrue();
+ }
+
+ @Test
+ public void testRemove() throws Exception {
+ latencyFaultTolerance.updateFaultItem(brokerName, 3000, 3000);
+ assertThat(latencyFaultTolerance.isAvailable(brokerName)).isFalse();
+ latencyFaultTolerance.remove(brokerName);
+ assertThat(latencyFaultTolerance.isAvailable(brokerName)).isTrue();
+ }
+
+ @Test
+ public void testPickOneAtLeast() throws Exception {
+ latencyFaultTolerance.updateFaultItem(brokerName, 1000, 3000);
+ assertThat(latencyFaultTolerance.pickOneAtLeast()).isEqualTo(brokerName);
+
+ latencyFaultTolerance.updateFaultItem(anotherBrokerName, 1001, 3000);
+ assertThat(latencyFaultTolerance.pickOneAtLeast()).isEqualTo(brokerName);
+ }
+}
\ No newline at end of file
[02/14] incubator-rocketmq git commit: [ROCKETMQ-52] Add unit tests
for MessageQueueSelector
Posted by st...@apache.org.
[ROCKETMQ-52] Add unit tests for MessageQueueSelector
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/c0e76299
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/c0e76299
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/c0e76299
Branch: refs/heads/master
Commit: c0e762995b1669b3febc3a5a2bac76577981696d
Parents: 03b1216
Author: yukon <yu...@apache.org>
Authored: Tue Jan 17 13:28:56 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Jan 17 13:28:56 2017 +0800
----------------------------------------------------------------------
.../client/common/ThreadLocalIndexTest.java | 2 +-
.../selector/SelectMessageQueueByHashTest.java | 49 ++++++++++++++++++++
2 files changed, 50 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/c0e76299/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java b/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java
index b937e45..350cde7 100644
--- a/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java
@@ -23,7 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
public class ThreadLocalIndexTest {
@Test
- public void getAndIncrement() throws Exception {
+ public void testGetAndIncrement() throws Exception {
ThreadLocalIndex localIndex = new ThreadLocalIndex();
int initialVal = localIndex.getAndIncrement();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/c0e76299/client/src/test/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByHashTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByHashTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByHashTest.java
new file mode 100644
index 0000000..64ef21d
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByHashTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.producer.selector;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class SelectMessageQueueByHashTest {
+
+ private String topic = "FooBar";
+
+ @Test
+ public void testSelect() throws Exception {
+ SelectMessageQueueByHash selector = new SelectMessageQueueByHash();
+
+ Message message = new Message(topic, new byte[] {});
+
+ List<MessageQueue> messageQueues = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ MessageQueue messageQueue = new MessageQueue(topic, "DefaultBroker", i);
+ messageQueues.add(messageQueue);
+ }
+
+ String orderId = "123";
+ String anotherOrderId = "234";
+ MessageQueue selected = selector.select(messageQueues, message, orderId);
+ assertThat(selector.select(messageQueues, message, anotherOrderId)).isNotEqualTo(selected);
+ }
+
+}
\ No newline at end of file