You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by st...@apache.org on 2017/01/20 09:44:58 UTC

[01/14] incubator-rocketmq git commit: [ROCKETMQ-52] Add unit tests for Validators and ThreadLocalIndex

Repository: incubator-rocketmq
Updated Branches:
  refs/heads/master b4108d2d9 -> 63de56c7b


[ROCKETMQ-52] Add unit tests for Validators and ThreadLocalIndex


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/03b12166
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/03b12166
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/03b12166

Branch: refs/heads/master
Commit: 03b12166bd1e2cc011ebbd2e667fe794ff04cc16
Parents: 2a9769a
Author: yukon <yu...@apache.org>
Authored: Tue Jan 17 12:53:23 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Jan 17 13:06:23 2017 +0800

----------------------------------------------------------------------
 client/pom.xml                                  |  4 ++
 .../org/apache/rocketmq/client/Validators.java  |  8 +--
 .../client/common/ThreadLocalIndex.java         |  4 --
 .../client/impl/producer/TopicPublishInfo.java  |  2 +-
 .../latency/LatencyFaultToleranceImpl.java      |  2 +-
 .../apache/rocketmq/client/ValidatorsTest.java  | 52 +++++++++++++++++++-
 .../client/common/ThreadLocalIndexTest.java     | 33 +++++++++++++
 7 files changed, 94 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/03b12166/client/pom.xml
----------------------------------------------------------------------
diff --git a/client/pom.xml b/client/pom.xml
index 713523d..3a0f6ae 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -38,5 +38,9 @@
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/03b12166/client/src/main/java/org/apache/rocketmq/client/Validators.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/Validators.java b/client/src/main/java/org/apache/rocketmq/client/Validators.java
index 7ea1bf9..899efa6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/Validators.java
+++ b/client/src/main/java/org/apache/rocketmq/client/Validators.java
@@ -118,23 +118,23 @@ public class Validators {
      */
     public static void checkTopic(String topic) throws MQClientException {
         if (UtilAll.isBlank(topic)) {
-            throw new MQClientException("the specified topic is blank", null);
+            throw new MQClientException("The specified topic is blank", null);
         }
 
         if (!regularExpressionMatcher(topic, PATTERN)) {
             throw new MQClientException(String.format(
-                "the specified topic[%s] contains illegal characters, allowing only %s", topic,
+                "The specified topic[%s] contains illegal characters, allowing only %s", topic,
                 VALID_PATTERN_STR), null);
         }
 
         if (topic.length() > CHARACTER_MAX_LENGTH) {
-            throw new MQClientException("the specified topic is longer than topic max length 255.", null);
+            throw new MQClientException("The specified topic is longer than topic max length 255.", null);
         }
 
         //whether the same with system reserved keyword
         if (topic.equals(MixAll.DEFAULT_TOPIC)) {
             throw new MQClientException(
-                String.format("the topic[%s] is conflict with default topic.", topic), null);
+                String.format("The topic[%s] is conflict with default topic.", topic), null);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/03b12166/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java b/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java
index a30b5da..ab223c3 100644
--- a/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java
+++ b/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java
@@ -23,10 +23,6 @@ public class ThreadLocalIndex {
     private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
     private final Random random = new Random();
 
-    public ThreadLocalIndex(int value) {
-
-    }
-
     public int getAndIncrement() {
         Integer index = this.threadLocalIndex.get();
         if (null == index) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/03b12166/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
index 52c5cfb..deb02cf 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/TopicPublishInfo.java
@@ -27,7 +27,7 @@ public class TopicPublishInfo {
     private boolean orderTopic = false;
     private boolean haveTopicRouterInfo = false;
     private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
-    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(0);
+    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
     private TopicRouteData topicRouteData;
 
     public boolean isOrderTopic() {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/03b12166/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
index 5309bc9..72d4347 100644
--- a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
@@ -27,7 +27,7 @@ import org.apache.rocketmq.client.common.ThreadLocalIndex;
 public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {
     private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);
 
-    private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex(0);
+    private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex();
 
     @Override
     public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/03b12166/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java b/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java
index 6775404..2db648d 100644
--- a/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java
@@ -17,17 +17,67 @@
 
 package org.apache.rocketmq.client;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.MixAll;
 import org.junit.Test;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
+
 public class ValidatorsTest {
 
     @Test
-    public void topicValidatorTest() throws MQClientException {
+    public void testCheckTopic_Success() throws MQClientException {
         Validators.checkTopic("Hello");
         Validators.checkTopic("%RETRY%Hello");
         Validators.checkTopic("_%RETRY%Hello");
         Validators.checkTopic("-%RETRY%Hello");
         Validators.checkTopic("223-%RETRY%Hello");
     }
+
+    @Test
+    public void testCheckTopic_HasIllegalCharacters() {
+        String illegalTopic = "TOPIC&*^";
+        try {
+            Validators.checkTopic(illegalTopic);
+            failBecauseExceptionWasNotThrown(MQClientException.class);
+        } catch (MQClientException e) {
+            assertThat(e).hasMessageStartingWith(String.format("The specified topic[%s] contains illegal characters, allowing only %s", illegalTopic, Validators.VALID_PATTERN_STR));
+        }
+    }
+
+    @Test
+    public void testCheckTopic_UseDefaultTopic() {
+        String defaultTopic = MixAll.DEFAULT_TOPIC;
+        try {
+            Validators.checkTopic(defaultTopic);
+            failBecauseExceptionWasNotThrown(MQClientException.class);
+        } catch (MQClientException e) {
+            assertThat(e).hasMessageStartingWith(String.format("The topic[%s] is conflict with default topic.", defaultTopic));
+        }
+    }
+
+    @Test
+    public void testCheckTopic_BlankTopic() {
+        String blankTopic = "";
+        try {
+            Validators.checkTopic(blankTopic);
+            failBecauseExceptionWasNotThrown(MQClientException.class);
+        } catch (MQClientException e) {
+            assertThat(e).hasMessageStartingWith("The specified topic is blank");
+        }
+    }
+
+    @Test
+    public void testCheckTopic_TooLongTopic() {
+        String tooLongTopic = StringUtils.rightPad("TooLongTopic", Validators.CHARACTER_MAX_LENGTH + 1, "_");
+        assertThat(tooLongTopic.length()).isGreaterThan(Validators.CHARACTER_MAX_LENGTH);
+        try {
+            Validators.checkTopic(tooLongTopic);
+            failBecauseExceptionWasNotThrown(MQClientException.class);
+        } catch (MQClientException e) {
+            assertThat(e).hasMessageStartingWith("The specified topic is longer than topic max length 255.");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/03b12166/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java b/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java
new file mode 100644
index 0000000..b937e45
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.common;
+
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ThreadLocalIndexTest {
+
+    @Test
+    public void getAndIncrement() throws Exception {
+        ThreadLocalIndex localIndex = new ThreadLocalIndex();
+        int initialVal = localIndex.getAndIncrement();
+
+        assertThat(localIndex.getAndIncrement()).isEqualTo(initialVal + 1);
+    }
+
+}
\ No newline at end of file


[07/14] incubator-rocketmq git commit: [ROCKETMQ-52] Use MockitoJUnitRunner with Mock

Posted by st...@apache.org.
[ROCKETMQ-52] Use MockitoJUnitRunner with Mock


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/4dd6c385
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/4dd6c385
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/4dd6c385

Branch: refs/heads/master
Commit: 4dd6c385a14061a104713f2b1062d953d2f5dede
Parents: 8c933f2
Author: yukon <yu...@apache.org>
Authored: Tue Jan 17 21:37:26 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Jan 17 21:37:26 2017 +0800

----------------------------------------------------------------------
 .../store/LocalFileOffsetStoreTest.java         | 19 ++++++++--------
 .../store/RemoteBrokerOffsetStoreTest.java      | 23 ++++++++++----------
 .../client/impl/MQClientAPIImplTest.java        | 19 ++++++++--------
 .../impl/factory/MQClientInstanceTest.java      |  3 ---
 4 files changed, 30 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/4dd6c385/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
index 0a54855..0b522e6 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
@@ -21,25 +21,26 @@ import java.util.HashSet;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.mockito.Mock;
-import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.when;
 
+@RunWith(MockitoJUnitRunner.class)
 public class LocalFileOffsetStoreTest {
     @Mock
-    private static MQClientInstance mQClientFactory;
-    private static String group = "FooBarGroup";
-    private static String topic = "FooBar";
-    private static String brokerName = "DefaultBrokerName";
+    private MQClientInstance mQClientFactory;
+    private String group = "FooBarGroup";
+    private String topic = "FooBar";
+    private String brokerName = "DefaultBrokerName";
 
-    @BeforeClass
-    public static void init() {
+    @Before
+    public void init() {
         System.setProperty("rocketmq.client.localOffsetStoreDir", System.getProperty("java.io.tmpdir") + ".rocketmq_offsets");
-        mQClientFactory = Mockito.mock(MQClientInstance.class);
         String clientId = new ClientConfig().buildMQClientId() + "#TestNamespace" + System.currentTimeMillis();
         when(mQClientFactory.getClientId()).thenReturn(clientId);
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/4dd6c385/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
index 7ecb022..a13930f 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
@@ -27,10 +27,12 @@ import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
 import org.apache.rocketmq.remoting.exception.RemotingException;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -39,27 +41,24 @@ import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+@RunWith(MockitoJUnitRunner.class)
 public class RemoteBrokerOffsetStoreTest {
     @Mock
-    private static MQClientInstance mQClientFactory;
+    private MQClientInstance mQClientFactory;
     @Mock
-    private static MQClientAPIImpl mqClientAPI;
-    private static String group = "FooBarGroup";
-    private static String topic = "FooBar";
-    private static String brokerName = "DefaultBrokerName";
+    private MQClientAPIImpl mqClientAPI;
+    private String group = "FooBarGroup";
+    private String topic = "FooBar";
+    private String brokerName = "DefaultBrokerName";
 
-    @BeforeClass
-    public static void init() {
+    @Before
+    public void init() {
         System.setProperty("rocketmq.client.localOffsetStoreDir", System.getProperty("java.io.tmpdir") + ".rocketmq_offsets");
-        mQClientFactory = mock(MQClientInstance.class);
         String clientId = new ClientConfig().buildMQClientId() + "#TestNamespace" + System.currentTimeMillis();
         when(mQClientFactory.getClientId()).thenReturn(clientId);
         when(mQClientFactory.findBrokerAddressInAdmin(brokerName)).thenReturn(new FindBrokerResult("127.0.0.1", false));
-
-        mqClientAPI = mock(MQClientAPIImpl.class);
         when(mQClientFactory.getMQClientAPIImpl()).thenReturn(mqClientAPI);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/4dd6c385/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
index cbcf560..3553738 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@ -35,13 +35,14 @@ import org.apache.rocketmq.remoting.RemotingClient;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
 import org.apache.rocketmq.remoting.netty.ResponseFuture;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -52,14 +53,14 @@ import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
 
+@RunWith(MockitoJUnitRunner.class)
 public class MQClientAPIImplTest {
-    private static MQClientAPIImpl mqClientAPI = new MQClientAPIImpl(new NettyClientConfig(), null, null, new ClientConfig());
+    private MQClientAPIImpl mqClientAPI = new MQClientAPIImpl(new NettyClientConfig(), null, null, new ClientConfig());
     @Mock
-    private static RemotingClient remotingClient;
+    private RemotingClient remotingClient;
     @Mock
-    private static DefaultMQProducerImpl defaultMQProducerImpl;
+    private DefaultMQProducerImpl defaultMQProducerImpl;
 
     private String brokerAddr = "127.0.0.1";
     private String brokerName = "DefaultBroker";
@@ -67,10 +68,8 @@ public class MQClientAPIImplTest {
     private static String topic = "FooBar";
     private Message msg = new Message("FooBar", new byte[] {});
 
-    @BeforeClass
-    public static void init() throws Exception {
-        remotingClient = mock(NettyRemotingClient.class);
-        defaultMQProducerImpl = mock(DefaultMQProducerImpl.class);
+    @Before
+    public void init() throws Exception {
         Field field = MQClientAPIImpl.class.getDeclaredField("remotingClient");
         field.setAccessible(true);
         field.set(mqClientAPI, remotingClient);

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/4dd6c385/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
index b7b07c6..c32cdab 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
@@ -32,7 +32,6 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -40,9 +39,7 @@ import static org.mockito.Mockito.mock;
 
 @RunWith(MockitoJUnitRunner.class)
 public class MQClientInstanceTest {
-    @InjectMocks
     private MQClientInstance mqClientInstance =  MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());;
-
     private String topic = "FooBar";
     private String group = "FooBarGroup";
 


[06/14] incubator-rocketmq git commit: [ROCKETMQ-52] Add unit tests for MQClientAPIImpl

Posted by st...@apache.org.
[ROCKETMQ-52] Add unit tests for MQClientAPIImpl


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/8c933f2c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/8c933f2c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/8c933f2c

Branch: refs/heads/master
Commit: 8c933f2cc769b522bba4d5cc64e047f4a1a97e85
Parents: 8e26a40
Author: yukon <yu...@apache.org>
Authored: Tue Jan 17 20:46:16 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Jan 17 20:46:16 2017 +0800

----------------------------------------------------------------------
 .../client/common/ThreadLocalIndexTest.java     |   1 -
 .../store/LocalFileOffsetStoreTest.java         |   1 -
 .../store/RemoteBrokerOffsetStoreTest.java      |   1 -
 .../client/impl/MQClientAPIImplTest.java        | 236 +++++++++++++++++++
 .../impl/factory/MQClientInstanceTest.java      |   7 +-
 .../remoting/protocol/RemotingCommand.java      |   4 +-
 6 files changed, 238 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8c933f2c/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java b/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java
index 350cde7..1be93ce 100644
--- a/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java
@@ -21,7 +21,6 @@ import org.junit.Test;
 import static org.assertj.core.api.Assertions.assertThat;
 
 public class ThreadLocalIndexTest {
-
     @Test
     public void testGetAndIncrement() throws Exception {
         ThreadLocalIndex localIndex = new ThreadLocalIndex();

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8c933f2c/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
index 58d99d6..0a54855 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
@@ -30,7 +30,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.when;
 
 public class LocalFileOffsetStoreTest {
-
     @Mock
     private static MQClientInstance mQClientFactory;
     private static String group = "FooBarGroup";

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8c933f2c/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
index 9a6a4b5..7ecb022 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
@@ -43,7 +43,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class RemoteBrokerOffsetStoreTest {
-
     @Mock
     private static MQClientInstance mQClientFactory;
     @Mock

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8c933f2c/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
new file mode 100644
index 0000000..cbcf560
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl;
+
+import java.lang.reflect.Field;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.hook.SendMessageContext;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
+import org.apache.rocketmq.remoting.InvokeCallback;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.netty.ResponseFuture;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+public class MQClientAPIImplTest {
+    private static MQClientAPIImpl mqClientAPI = new MQClientAPIImpl(new NettyClientConfig(), null, null, new ClientConfig());
+    @Mock
+    private static RemotingClient remotingClient;
+    @Mock
+    private static DefaultMQProducerImpl defaultMQProducerImpl;
+
+    private String brokerAddr = "127.0.0.1";
+    private String brokerName = "DefaultBroker";
+    private static String group = "FooBarGroup";
+    private static String topic = "FooBar";
+    private Message msg = new Message("FooBar", new byte[] {});
+
+    @BeforeClass
+    public static void init() throws Exception {
+        remotingClient = mock(NettyRemotingClient.class);
+        defaultMQProducerImpl = mock(DefaultMQProducerImpl.class);
+        Field field = MQClientAPIImpl.class.getDeclaredField("remotingClient");
+        field.setAccessible(true);
+        field.set(mqClientAPI, remotingClient);
+    }
+
+    @Test
+    public void testSendMessageOneWay_Success() throws RemotingException, InterruptedException, MQBrokerException {
+        doNothing().when(remotingClient).invokeOneway(anyString(), any(RemotingCommand.class), anyLong());
+        SendResult sendResult = mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(),
+            3 * 1000, CommunicationMode.ONEWAY, new SendMessageContext(), defaultMQProducerImpl);
+        assertThat(sendResult).isNull();
+    }
+
+    @Test
+    public void testSendMessageOneWay_WithException() throws RemotingException, InterruptedException, MQBrokerException {
+        doThrow(new RemotingTimeoutException("Remoting Exception in Test")).when(remotingClient).invokeOneway(anyString(), any(RemotingCommand.class), anyLong());
+        try {
+            mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(),
+                3 * 1000, CommunicationMode.ONEWAY, new SendMessageContext(), defaultMQProducerImpl);
+            failBecauseExceptionWasNotThrown(RemotingException.class);
+        } catch (RemotingException e) {
+            assertThat(e).hasMessage("Remoting Exception in Test");
+        }
+
+        doThrow(new InterruptedException("Interrupted Exception in Test")).when(remotingClient).invokeOneway(anyString(), any(RemotingCommand.class), anyLong());
+        try {
+            mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(),
+                3 * 1000, CommunicationMode.ONEWAY, new SendMessageContext(), defaultMQProducerImpl);
+            failBecauseExceptionWasNotThrown(InterruptedException.class);
+        } catch (InterruptedException e) {
+            assertThat(e).hasMessage("Interrupted Exception in Test");
+        }
+    }
+
+    @Test
+    public void testSendMessageSync_Success() throws InterruptedException, RemotingException, MQBrokerException {
+        doAnswer(new Answer() {
+            @Override public Object answer(InvocationOnMock mock) throws Throwable {
+                RemotingCommand request = mock.getArgument(1);
+                return createSuccessResponse(request);
+            }
+        }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+        SendMessageRequestHeader requestHeader = createSendMessageRequestHeader();
+
+        SendResult sendResult = mqClientAPI.sendMessage(brokerAddr, brokerName, msg, requestHeader,
+            3 * 1000, CommunicationMode.SYNC, new SendMessageContext(), defaultMQProducerImpl);
+
+        assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+        assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+        assertThat(sendResult.getQueueOffset()).isEqualTo(123L);
+        assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(1);
+    }
+
+    @Test
+    public void testSendMessageSync_WithException() throws InterruptedException, RemotingException, MQBrokerException {
+        doAnswer(new Answer() {
+            @Override public Object answer(InvocationOnMock mock) throws Throwable {
+                RemotingCommand request = mock.getArgument(1);
+                RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setOpaque(request.getOpaque());
+                response.setRemark("Broker is broken.");
+                return response;
+            }
+        }).when(remotingClient).invokeSync(anyString(), any(RemotingCommand.class), anyLong());
+
+        SendMessageRequestHeader requestHeader = createSendMessageRequestHeader();
+
+        try {
+            mqClientAPI.sendMessage(brokerAddr, brokerName, msg, requestHeader,
+                3 * 1000, CommunicationMode.SYNC, new SendMessageContext(), defaultMQProducerImpl);
+            failBecauseExceptionWasNotThrown(MQBrokerException.class);
+        } catch (MQBrokerException e) {
+            assertThat(e).hasMessageContaining("Broker is broken.");
+        }
+    }
+
+    @Test
+    public void testSendMessageAsync_Success() throws RemotingException, InterruptedException, MQBrokerException {
+        doNothing().when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
+        SendResult sendResult = mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(),
+            3 * 1000, CommunicationMode.ASYNC, new SendMessageContext(), defaultMQProducerImpl);
+        assertThat(sendResult).isNull();
+
+        doAnswer(new Answer() {
+            @Override public Object answer(InvocationOnMock mock) throws Throwable {
+                InvokeCallback callback = mock.getArgument(3);
+                RemotingCommand request = mock.getArgument(1);
+                ResponseFuture responseFuture = new ResponseFuture(request.getOpaque(), 3 * 1000, null, null);
+                responseFuture.setResponseCommand(createSuccessResponse(request));
+                callback.operationComplete(responseFuture);
+                return null;
+            }
+        }).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
+        SendMessageContext sendMessageContext = new SendMessageContext();
+        sendMessageContext.setProducer(new DefaultMQProducerImpl(new DefaultMQProducer()));
+        mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(), 3 * 1000, CommunicationMode.ASYNC,
+            new SendCallback() {
+                @Override public void onSuccess(SendResult sendResult) {
+                    assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+                    assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+                    assertThat(sendResult.getQueueOffset()).isEqualTo(123L);
+                    assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(1);
+                }
+
+                @Override public void onException(Throwable e) {
+                }
+            },
+            null, null, 0, sendMessageContext, defaultMQProducerImpl);
+    }
+
+    @Test
+    public void testSendMessageAsync_WithException() throws RemotingException, InterruptedException, MQBrokerException {
+        doThrow(new RemotingTimeoutException("Remoting Exception in Test")).when(remotingClient)
+            .invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
+        try {
+            mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(),
+                3 * 1000, CommunicationMode.ASYNC, new SendMessageContext(), defaultMQProducerImpl);
+            failBecauseExceptionWasNotThrown(RemotingException.class);
+        } catch (RemotingException e) {
+            assertThat(e).hasMessage("Remoting Exception in Test");
+        }
+
+        doThrow(new InterruptedException("Interrupted Exception in Test")).when(remotingClient)
+            .invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
+        try {
+            mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(),
+                3 * 1000, CommunicationMode.ASYNC, new SendMessageContext(), defaultMQProducerImpl);
+            failBecauseExceptionWasNotThrown(InterruptedException.class);
+        } catch (InterruptedException e) {
+            assertThat(e).hasMessage("Interrupted Exception in Test");
+        }
+    }
+
+    private RemotingCommand createSuccessResponse(RemotingCommand request) {
+        RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
+        response.setCode(ResponseCode.SUCCESS);
+        response.setOpaque(request.getOpaque());
+
+        SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
+        responseHeader.setMsgId("123");
+        responseHeader.setQueueId(1);
+        responseHeader.setQueueOffset(123L);
+
+        response.addExtField(MessageConst.PROPERTY_MSG_REGION, "RegionHZ");
+        response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, "true");
+        response.addExtField("queueId", String.valueOf(responseHeader.getQueueId()));
+        response.addExtField("msgId", responseHeader.getMsgId());
+        response.addExtField("queueOffset", String.valueOf(responseHeader.getQueueOffset()));
+        return response;
+    }
+
+    private SendMessageRequestHeader createSendMessageRequestHeader() {
+        SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
+        requestHeader.setBornTimestamp(System.currentTimeMillis());
+        requestHeader.setTopic(topic);
+        requestHeader.setProducerGroup(group);
+        requestHeader.setQueueId(1);
+        requestHeader.setMaxReconsumeTimes(10);
+        return requestHeader;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8c933f2c/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
index 956cdd9..b7b07c6 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
@@ -22,7 +22,6 @@ import java.util.List;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.admin.MQAdminExtInner;
 import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.impl.MQClientAPIImpl;
 import org.apache.rocketmq.client.impl.MQClientManager;
 import org.apache.rocketmq.client.impl.consumer.MQConsumerInner;
 import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
@@ -34,7 +33,6 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.InjectMocks;
-import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -42,11 +40,8 @@ import static org.mockito.Mockito.mock;
 
 @RunWith(MockitoJUnitRunner.class)
 public class MQClientInstanceTest {
-
-    @Mock
-    private static MQClientAPIImpl mqClientAPI;
     @InjectMocks
-    private static MQClientInstance mqClientInstance =  MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());;
+    private MQClientInstance mqClientInstance =  MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());;
 
     private String topic = "FooBar";
     private String group = "FooBarGroup";

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8c933f2c/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index 46ca8dd..99c13fb 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -114,9 +114,7 @@ public class RemotingCommand {
     }
 
     public static RemotingCommand createResponseCommand(Class<? extends CommandCustomHeader> classHeader) {
-        RemotingCommand cmd = createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, "not set any response code", classHeader);
-
-        return cmd;
+        return createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, "not set any response code", classHeader);
     }
 
     /**


[11/14] incubator-rocketmq git commit: [ROCKETMQ-57] Polish unit tests for rocketmq-tools

Posted by st...@apache.org.
[ROCKETMQ-57] Polish unit tests for rocketmq-tools


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/626315ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/626315ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/626315ab

Branch: refs/heads/master
Commit: 626315aba4abc31380a194fad660f4ca9a64dc03
Parents: 0f9f55f
Author: stevenschew <st...@apache.org>
Authored: Thu Jan 19 11:22:48 2017 +0800
Committer: stevenschew <st...@apache.org>
Committed: Thu Jan 19 13:29:50 2017 +0800

----------------------------------------------------------------------
 .../tools/admin/DefaultMQAdminExtTest.java      | 240 +++++++++++++++++++
 .../rocketmq/tools/command/CommandUtilTest.java |  92 +++++++
 2 files changed, 332 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/626315ab/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
new file mode 100644
index 0000000..ab6319f
--- /dev/null
+++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
@@ -0,0 +1,240 @@
+package org.apache.rocketmq.tools.admin;
+
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.namesrv.NamesrvUtil;
+import org.apache.rocketmq.common.protocol.body.*;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Field;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DefaultMQAdminExtTest {
+    private DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+    private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+    private MQClientAPIImpl mQClientAPIImpl;
+    private Properties properties = new Properties();
+    private TopicList topicList = new TopicList();
+    private TopicRouteData topicRouteData = new TopicRouteData();
+    private KVTable kvTable = new KVTable();
+    private ClusterInfo clusterInfo = new ClusterInfo();
+
+    @Before
+    public void init() throws Exception {
+        mQClientAPIImpl = mock(MQClientAPIImpl.class);
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
+        defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
+
+        Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExtImpl, mqClientInstance);
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mqClientInstance, mQClientAPIImpl);
+
+        properties.setProperty("maxMessageSize", "5000000");
+        properties.setProperty("flushDelayOffsetInterval", "15000");
+        properties.setProperty("serverSocketRcvBufSize", "655350");
+        when(mQClientAPIImpl.getBrokerConfig(anyString(), anyLong())).thenReturn(properties);
+
+        Set<String> topicSet = new HashSet<>();
+        topicSet.add("topic_one");
+        topicSet.add("topic_two");
+        topicList.setTopicList(topicSet);
+        when(mQClientAPIImpl.getTopicListFromNameServer(anyLong())).thenReturn(topicList);
+
+
+        List<BrokerData> brokerDatas = new ArrayList<>();
+        HashMap<Long, String> brokerAddrs = new HashMap<>();
+        brokerAddrs.put(1234l, "127.0.0.1:10911");
+        BrokerData brokerData = new BrokerData();
+        brokerData.setCluster("default-cluster");
+        brokerData.setBrokerName("default-broker");
+        brokerData.setBrokerAddrs(brokerAddrs);
+        brokerDatas.add(brokerData);
+        topicRouteData.setBrokerDatas(brokerDatas);
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData);
+
+        HashMap<String, String> result = new HashMap<>();
+        result.put("id", "1234");
+        result.put("brokerName", "default-broker");
+        kvTable.setTable(result);
+        when(mQClientAPIImpl.getBrokerRuntimeInfo(anyString(), anyLong())).thenReturn(kvTable);
+
+        HashMap<String, BrokerData> brokerAddrTable = new HashMap<>();
+        brokerAddrTable.put("default-broker", brokerData);
+        brokerAddrTable.put("broker-test", new BrokerData());
+        clusterInfo.setBrokerAddrTable(brokerAddrTable);
+        when(mQClientAPIImpl.getBrokerClusterInfo(anyLong())).thenReturn(clusterInfo);
+        when(mQClientAPIImpl.cleanExpiredConsumeQueue(anyString(), anyLong())).thenReturn(true);
+
+        Set<String> clusterList = new HashSet<>();
+        clusterList.add("default-cluster-one");
+        clusterList.add("default-cluster-two");
+        when(mQClientAPIImpl.getClusterList(anyString(), anyLong())).thenReturn(clusterList);
+
+        GroupList groupList = new GroupList();
+        HashSet<String> groups = new HashSet<>();
+        groups.add("consumer-group-one");
+        groups.add("consumer-group-two");
+        groupList.setGroupList(groups);
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData);
+        when(mQClientAPIImpl.queryTopicConsumeByWho(anyString(), anyString(), anyLong())).thenReturn(groupList);
+
+        SubscriptionGroupWrapper subscriptionGroupWrapper = new SubscriptionGroupWrapper();
+        ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptions = new ConcurrentHashMap<>();
+        SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+        subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+        subscriptionGroupConfig.setBrokerId(1234);
+        subscriptionGroupConfig.setGroupName("Consumer-group-one");
+        subscriptions.put("Consumer-group-one", subscriptionGroupConfig);
+        subscriptionGroupWrapper.setSubscriptionGroupTable(subscriptions);
+        when(mQClientAPIImpl.getAllSubscriptionGroup(anyString(), anyLong())).thenReturn(subscriptionGroupWrapper);
+
+        String topicListConfig = "topicListConfig";
+        when(mQClientAPIImpl.getKVConfigValue(anyString(), anyString(), anyLong())).thenReturn(topicListConfig);
+
+        KVTable kvTable = new KVTable();
+        HashMap<String, String> kv = new HashMap<>();
+        kv.put("broker-name", "broker-one");
+        kv.put("cluster-name", "default-cluster");
+        kvTable.setTable(kv);
+        when(mQClientAPIImpl.getKVListByNamespace(anyString(), anyLong())).thenReturn(kvTable);
+    }
+
+    @After
+    public void terminate() throws Exception {
+        if (defaultMQAdminExtImpl != null)
+            defaultMQAdminExtImpl.shutdown();
+    }
+
+    @Test
+    public void testUpdateBrokerConfig() throws InterruptedException, RemotingConnectException, UnsupportedEncodingException, RemotingTimeoutException, MQBrokerException, RemotingSendRequestException {
+        Properties result = defaultMQAdminExtImpl.getBrokerConfig("127.0.0.1:10911");
+        assertThat(result.getProperty("maxMessageSize")).isEqualTo("5000000");
+        assertThat(result.getProperty("flushDelayOffsetInterval")).isEqualTo("15000");
+        assertThat(result.getProperty("serverSocketRcvBufSize")).isEqualTo("655350");
+    }
+
+
+    @Test
+    public void testFetchAllTopicList() throws RemotingException, MQClientException, InterruptedException {
+        TopicList topicList = defaultMQAdminExtImpl.fetchAllTopicList();
+        assertThat(topicList.getTopicList().size()).isEqualTo(2);
+        assertThat(topicList.getTopicList()).contains("topic_one");
+    }
+
+    @Test
+    public void testFetchBrokerRuntimeStats() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        KVTable brokerStats = defaultMQAdminExtImpl.fetchBrokerRuntimeStats("127.0.0.1:10911");
+        assertThat(brokerStats.getTable().get("id")).isEqualTo("1234");
+        assertThat(brokerStats.getTable().get("brokerName")).isEqualTo("default-broker");
+    }
+
+    @Test
+    public void testExamineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        ClusterInfo clusterInfo = defaultMQAdminExtImpl.examineBrokerClusterInfo();
+        HashMap<String, BrokerData> brokerList = clusterInfo.getBrokerAddrTable();
+        assertThat(brokerList.get("default-broker").getBrokerName()).isEqualTo("default-broker");
+        assertThat(brokerList.containsKey("broker-test")).isTrue();
+
+        HashMap<String, Set<String>> clusterMap = new HashMap<>();
+        Set<String> brokers = new HashSet<>();
+        brokers.add("default-broker");
+        brokers.add("broker-test");
+        clusterMap.put("default-cluster", brokers);
+        ClusterInfo cInfo = mock(ClusterInfo.class);
+        when(cInfo.getClusterAddrTable()).thenReturn(clusterMap);
+        HashMap<String, Set<String>> clusterAddress = cInfo.getClusterAddrTable();
+        assertThat(clusterAddress.containsKey("default-cluster")).isTrue();
+        assertThat(clusterAddress.get("default-cluster").size()).isEqualTo(2);
+    }
+
+    @Test
+    public void testExamineTopicRouteInfo() throws RemotingException, MQClientException, InterruptedException {
+        TopicRouteData topicRouteData = defaultMQAdminExtImpl.examineTopicRouteInfo("UnitTest");
+        assertThat(topicRouteData.getBrokerDatas().get(0).getBrokerName()).isEqualTo("default-broker");
+        assertThat(topicRouteData.getBrokerDatas().get(0).getCluster()).isEqualTo("default-cluster");
+    }
+
+    @Test
+    public void testGetNameServerAddressList() {
+        List<String> result = new ArrayList<>();
+        result.add("default-name-one");
+        result.add("default-name-two");
+        when(mqClientInstance.getMQClientAPIImpl().getNameServerAddressList()).thenReturn(result);
+        List<String> nameList = defaultMQAdminExtImpl.getNameServerAddressList();
+        assertThat(nameList.get(0)).isEqualTo("default-name-one");
+        assertThat(nameList.get(1)).isEqualTo("default-name-two");
+    }
+
+    @Test
+    public void testPutKVConfig() throws RemotingException, MQClientException, InterruptedException {
+        String topicConfig = defaultMQAdminExtImpl.getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, "UnitTest");
+        assertThat(topicConfig).isEqualTo("topicListConfig");
+        KVTable kvs = defaultMQAdminExtImpl.getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
+        assertThat(kvs.getTable().get("broker-name")).isEqualTo("broker-one");
+        assertThat(kvs.getTable().get("cluster-name")).isEqualTo("default-cluster");
+    }
+
+
+    @Test
+    public void testQueryTopicConsumeByWho() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        GroupList groupList = defaultMQAdminExtImpl.queryTopicConsumeByWho("UnitTest");
+        assertThat(groupList.getGroupList().contains("consumer-group-two")).isTrue();
+    }
+
+    @Test
+    public void testCleanExpiredConsumerQueueByAddr() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
+        boolean clean = defaultMQAdminExtImpl.cleanExpiredConsumerQueueByAddr("127.0.0.1:10911");
+        assertThat(clean).isTrue();
+    }
+
+    @Test
+    public void testGetClusterList() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
+        Set<String> clusterlist = defaultMQAdminExtImpl.getClusterList("UnitTest");
+        assertThat(clusterlist.contains("default-cluster-one")).isTrue();
+        assertThat(clusterlist.contains("default-cluster-two")).isTrue();
+    }
+
+    @Test
+    public void testFetchConsumeStatsInBroker() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
+        ConsumeStatsList result = new ConsumeStatsList();
+        result.setBrokerAddr("127.0.0.1:10911");
+        when(mqClientInstance.getMQClientAPIImpl().fetchConsumeStatsInBroker("127.0.0.1:10911", false, 10000)).thenReturn(result);
+        ConsumeStatsList consumeStatsList = defaultMQAdminExtImpl.fetchConsumeStatsInBroker("127.0.0.1:10911", false, 10000);
+        assertThat(consumeStatsList.getBrokerAddr()).isEqualTo("127.0.0.1:10911");
+    }
+
+    @Test
+    public void testGetAllSubscriptionGroup() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExtImpl.getAllSubscriptionGroup("127.0.0.1:10911", 10000);
+        assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getBrokerId()).isEqualTo(1234);
+        assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getGroupName()).isEqualTo("Consumer-group-one");
+        assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").isConsumeBroadcastEnable()).isTrue();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/626315ab/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java
new file mode 100644
index 0000000..35c205e
--- /dev/null
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java
@@ -0,0 +1,92 @@
+package org.apache.rocketmq.tools.command;
+
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.*;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class CommandUtilTest {
+    private DefaultMQAdminExt defaultMQAdminExt;
+    private DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+    private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+    private MQClientAPIImpl mQClientAPIImpl;
+
+    @Before
+    public void setup() throws MQClientException, NoSuchFieldException, IllegalAccessException, InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        defaultMQAdminExt = mock(DefaultMQAdminExt.class);
+        MQClientAPIImpl mQClientAPIImpl = mock(MQClientAPIImpl.class);
+        defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 3000);
+
+        Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExtImpl, mqClientInstance);
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mqClientInstance, mQClientAPIImpl);
+
+        ClusterInfo clusterInfo = new ClusterInfo();
+        HashMap<String, BrokerData> brokerAddrTable = new HashMap<>();
+        HashMap<String, Set<String>> clusterAddrTable = new HashMap<>();
+        HashMap<Long, String> brokerAddrs = new HashMap<>();
+        brokerAddrs.put(1234l, "127.0.0.1:10911");
+        BrokerData brokerData = new BrokerData();
+        brokerData.setBrokerName("default-broker");
+        brokerData.setCluster("default-cluster");
+        brokerData.setBrokerAddrs(brokerAddrs);
+        brokerAddrTable.put("default-broker", brokerData);
+        brokerAddrTable.put("broker-test", new BrokerData());
+        Set<String> brokerSet = new HashSet<>();
+        brokerSet.add("default-broker");
+        brokerSet.add("default-broker-one");
+        clusterAddrTable.put("default-cluster", brokerSet);
+        clusterInfo.setBrokerAddrTable(brokerAddrTable);
+        clusterInfo.setClusterAddrTable(clusterAddrTable);
+        when(mQClientAPIImpl.getBrokerClusterInfo(anyLong())).thenReturn(clusterInfo);
+        when(mQClientAPIImpl.cleanExpiredConsumeQueue(anyString(), anyLong())).thenReturn(true);
+    }
+
+    @After
+    public void shutdown() throws Exception {
+    }
+
+    @Test
+    public void testFetchMasterAndSlaveDistinguish() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        Map<String, List<String>> result = CommandUtil.fetchMasterAndSlaveDistinguish(defaultMQAdminExtImpl, "default-cluster");
+        assertThat(result.get(null).get(0)).isEqualTo("127.0.0.1:10911");
+    }
+
+    @Test
+    public void testFetchMasterAddrByClusterName() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        Set<String> result = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExtImpl, "default-cluster");
+        assertThat(result.size()).isEqualTo(0);
+    }
+
+    @Test
+    public void testFetchBrokerNameByClusterName() throws Exception {
+        Set<String> result = CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExtImpl, "default-cluster");
+        assertThat(result.contains("default-broker")).isTrue();
+        assertThat(result.contains("default-broker-one")).isTrue();
+        assertThat(result.size()).isEqualTo(2);
+    }
+}
\ No newline at end of file


[09/14] incubator-rocketmq git commit: [ROCKETMQ-57] Polish unit tests for rocketmq-tools

Posted by st...@apache.org.
[ROCKETMQ-57] Polish unit tests for rocketmq-tools


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/b29ff37d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/b29ff37d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/b29ff37d

Branch: refs/heads/master
Commit: b29ff37db4f75d09419dffa092e38b7765814f04
Parents: 0f9f55f
Author: stevenschew <st...@apache.org>
Authored: Thu Jan 19 11:41:32 2017 +0800
Committer: stevenschew <st...@apache.org>
Committed: Thu Jan 19 11:41:32 2017 +0800

----------------------------------------------------------------------
 .../tools/admin/DefaultMQAdminExtTest.java      | 240 +++++++++++++++++++
 .../rocketmq/tools/command/CommandUtilTest.java |  92 +++++++
 2 files changed, 332 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b29ff37d/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
new file mode 100644
index 0000000..ab6319f
--- /dev/null
+++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
@@ -0,0 +1,240 @@
+package org.apache.rocketmq.tools.admin;
+
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.namesrv.NamesrvUtil;
+import org.apache.rocketmq.common.protocol.body.*;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Field;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DefaultMQAdminExtTest {
+    private DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+    private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+    private MQClientAPIImpl mQClientAPIImpl;
+    private Properties properties = new Properties();
+    private TopicList topicList = new TopicList();
+    private TopicRouteData topicRouteData = new TopicRouteData();
+    private KVTable kvTable = new KVTable();
+    private ClusterInfo clusterInfo = new ClusterInfo();
+
+    @Before
+    public void init() throws Exception {
+        mQClientAPIImpl = mock(MQClientAPIImpl.class);
+        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
+        defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
+
+        Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExtImpl, mqClientInstance);
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mqClientInstance, mQClientAPIImpl);
+
+        properties.setProperty("maxMessageSize", "5000000");
+        properties.setProperty("flushDelayOffsetInterval", "15000");
+        properties.setProperty("serverSocketRcvBufSize", "655350");
+        when(mQClientAPIImpl.getBrokerConfig(anyString(), anyLong())).thenReturn(properties);
+
+        Set<String> topicSet = new HashSet<>();
+        topicSet.add("topic_one");
+        topicSet.add("topic_two");
+        topicList.setTopicList(topicSet);
+        when(mQClientAPIImpl.getTopicListFromNameServer(anyLong())).thenReturn(topicList);
+
+
+        List<BrokerData> brokerDatas = new ArrayList<>();
+        HashMap<Long, String> brokerAddrs = new HashMap<>();
+        brokerAddrs.put(1234l, "127.0.0.1:10911");
+        BrokerData brokerData = new BrokerData();
+        brokerData.setCluster("default-cluster");
+        brokerData.setBrokerName("default-broker");
+        brokerData.setBrokerAddrs(brokerAddrs);
+        brokerDatas.add(brokerData);
+        topicRouteData.setBrokerDatas(brokerDatas);
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData);
+
+        HashMap<String, String> result = new HashMap<>();
+        result.put("id", "1234");
+        result.put("brokerName", "default-broker");
+        kvTable.setTable(result);
+        when(mQClientAPIImpl.getBrokerRuntimeInfo(anyString(), anyLong())).thenReturn(kvTable);
+
+        HashMap<String, BrokerData> brokerAddrTable = new HashMap<>();
+        brokerAddrTable.put("default-broker", brokerData);
+        brokerAddrTable.put("broker-test", new BrokerData());
+        clusterInfo.setBrokerAddrTable(brokerAddrTable);
+        when(mQClientAPIImpl.getBrokerClusterInfo(anyLong())).thenReturn(clusterInfo);
+        when(mQClientAPIImpl.cleanExpiredConsumeQueue(anyString(), anyLong())).thenReturn(true);
+
+        Set<String> clusterList = new HashSet<>();
+        clusterList.add("default-cluster-one");
+        clusterList.add("default-cluster-two");
+        when(mQClientAPIImpl.getClusterList(anyString(), anyLong())).thenReturn(clusterList);
+
+        GroupList groupList = new GroupList();
+        HashSet<String> groups = new HashSet<>();
+        groups.add("consumer-group-one");
+        groups.add("consumer-group-two");
+        groupList.setGroupList(groups);
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData);
+        when(mQClientAPIImpl.queryTopicConsumeByWho(anyString(), anyString(), anyLong())).thenReturn(groupList);
+
+        SubscriptionGroupWrapper subscriptionGroupWrapper = new SubscriptionGroupWrapper();
+        ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptions = new ConcurrentHashMap<>();
+        SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+        subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+        subscriptionGroupConfig.setBrokerId(1234);
+        subscriptionGroupConfig.setGroupName("Consumer-group-one");
+        subscriptions.put("Consumer-group-one", subscriptionGroupConfig);
+        subscriptionGroupWrapper.setSubscriptionGroupTable(subscriptions);
+        when(mQClientAPIImpl.getAllSubscriptionGroup(anyString(), anyLong())).thenReturn(subscriptionGroupWrapper);
+
+        String topicListConfig = "topicListConfig";
+        when(mQClientAPIImpl.getKVConfigValue(anyString(), anyString(), anyLong())).thenReturn(topicListConfig);
+
+        KVTable kvTable = new KVTable();
+        HashMap<String, String> kv = new HashMap<>();
+        kv.put("broker-name", "broker-one");
+        kv.put("cluster-name", "default-cluster");
+        kvTable.setTable(kv);
+        when(mQClientAPIImpl.getKVListByNamespace(anyString(), anyLong())).thenReturn(kvTable);
+    }
+
+    @After
+    public void terminate() throws Exception {
+        if (defaultMQAdminExtImpl != null)
+            defaultMQAdminExtImpl.shutdown();
+    }
+
+    @Test
+    public void testUpdateBrokerConfig() throws InterruptedException, RemotingConnectException, UnsupportedEncodingException, RemotingTimeoutException, MQBrokerException, RemotingSendRequestException {
+        Properties result = defaultMQAdminExtImpl.getBrokerConfig("127.0.0.1:10911");
+        assertThat(result.getProperty("maxMessageSize")).isEqualTo("5000000");
+        assertThat(result.getProperty("flushDelayOffsetInterval")).isEqualTo("15000");
+        assertThat(result.getProperty("serverSocketRcvBufSize")).isEqualTo("655350");
+    }
+
+
+    @Test
+    public void testFetchAllTopicList() throws RemotingException, MQClientException, InterruptedException {
+        TopicList topicList = defaultMQAdminExtImpl.fetchAllTopicList();
+        assertThat(topicList.getTopicList().size()).isEqualTo(2);
+        assertThat(topicList.getTopicList()).contains("topic_one");
+    }
+
+    @Test
+    public void testFetchBrokerRuntimeStats() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        KVTable brokerStats = defaultMQAdminExtImpl.fetchBrokerRuntimeStats("127.0.0.1:10911");
+        assertThat(brokerStats.getTable().get("id")).isEqualTo("1234");
+        assertThat(brokerStats.getTable().get("brokerName")).isEqualTo("default-broker");
+    }
+
+    @Test
+    public void testExamineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        ClusterInfo clusterInfo = defaultMQAdminExtImpl.examineBrokerClusterInfo();
+        HashMap<String, BrokerData> brokerList = clusterInfo.getBrokerAddrTable();
+        assertThat(brokerList.get("default-broker").getBrokerName()).isEqualTo("default-broker");
+        assertThat(brokerList.containsKey("broker-test")).isTrue();
+
+        HashMap<String, Set<String>> clusterMap = new HashMap<>();
+        Set<String> brokers = new HashSet<>();
+        brokers.add("default-broker");
+        brokers.add("broker-test");
+        clusterMap.put("default-cluster", brokers);
+        ClusterInfo cInfo = mock(ClusterInfo.class);
+        when(cInfo.getClusterAddrTable()).thenReturn(clusterMap);
+        HashMap<String, Set<String>> clusterAddress = cInfo.getClusterAddrTable();
+        assertThat(clusterAddress.containsKey("default-cluster")).isTrue();
+        assertThat(clusterAddress.get("default-cluster").size()).isEqualTo(2);
+    }
+
+    @Test
+    public void testExamineTopicRouteInfo() throws RemotingException, MQClientException, InterruptedException {
+        TopicRouteData topicRouteData = defaultMQAdminExtImpl.examineTopicRouteInfo("UnitTest");
+        assertThat(topicRouteData.getBrokerDatas().get(0).getBrokerName()).isEqualTo("default-broker");
+        assertThat(topicRouteData.getBrokerDatas().get(0).getCluster()).isEqualTo("default-cluster");
+    }
+
+    @Test
+    public void testGetNameServerAddressList() {
+        List<String> result = new ArrayList<>();
+        result.add("default-name-one");
+        result.add("default-name-two");
+        when(mqClientInstance.getMQClientAPIImpl().getNameServerAddressList()).thenReturn(result);
+        List<String> nameList = defaultMQAdminExtImpl.getNameServerAddressList();
+        assertThat(nameList.get(0)).isEqualTo("default-name-one");
+        assertThat(nameList.get(1)).isEqualTo("default-name-two");
+    }
+
+    @Test
+    public void testPutKVConfig() throws RemotingException, MQClientException, InterruptedException {
+        String topicConfig = defaultMQAdminExtImpl.getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, "UnitTest");
+        assertThat(topicConfig).isEqualTo("topicListConfig");
+        KVTable kvs = defaultMQAdminExtImpl.getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
+        assertThat(kvs.getTable().get("broker-name")).isEqualTo("broker-one");
+        assertThat(kvs.getTable().get("cluster-name")).isEqualTo("default-cluster");
+    }
+
+
+    @Test
+    public void testQueryTopicConsumeByWho() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        GroupList groupList = defaultMQAdminExtImpl.queryTopicConsumeByWho("UnitTest");
+        assertThat(groupList.getGroupList().contains("consumer-group-two")).isTrue();
+    }
+
+    @Test
+    public void testCleanExpiredConsumerQueueByAddr() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
+        boolean clean = defaultMQAdminExtImpl.cleanExpiredConsumerQueueByAddr("127.0.0.1:10911");
+        assertThat(clean).isTrue();
+    }
+
+    @Test
+    public void testGetClusterList() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
+        Set<String> clusterlist = defaultMQAdminExtImpl.getClusterList("UnitTest");
+        assertThat(clusterlist.contains("default-cluster-one")).isTrue();
+        assertThat(clusterlist.contains("default-cluster-two")).isTrue();
+    }
+
+    @Test
+    public void testFetchConsumeStatsInBroker() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
+        ConsumeStatsList result = new ConsumeStatsList();
+        result.setBrokerAddr("127.0.0.1:10911");
+        when(mqClientInstance.getMQClientAPIImpl().fetchConsumeStatsInBroker("127.0.0.1:10911", false, 10000)).thenReturn(result);
+        ConsumeStatsList consumeStatsList = defaultMQAdminExtImpl.fetchConsumeStatsInBroker("127.0.0.1:10911", false, 10000);
+        assertThat(consumeStatsList.getBrokerAddr()).isEqualTo("127.0.0.1:10911");
+    }
+
+    @Test
+    public void testGetAllSubscriptionGroup() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExtImpl.getAllSubscriptionGroup("127.0.0.1:10911", 10000);
+        assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getBrokerId()).isEqualTo(1234);
+        assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getGroupName()).isEqualTo("Consumer-group-one");
+        assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").isConsumeBroadcastEnable()).isTrue();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/b29ff37d/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java b/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java
new file mode 100644
index 0000000..35c205e
--- /dev/null
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java
@@ -0,0 +1,92 @@
+package org.apache.rocketmq.tools.command;
+
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.*;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class CommandUtilTest {
+    private DefaultMQAdminExt defaultMQAdminExt;
+    private DefaultMQAdminExtImpl defaultMQAdminExtImpl;
+    private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
+    private MQClientAPIImpl mQClientAPIImpl;
+
+    @Before
+    public void setup() throws MQClientException, NoSuchFieldException, IllegalAccessException, InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        defaultMQAdminExt = mock(DefaultMQAdminExt.class);
+        MQClientAPIImpl mQClientAPIImpl = mock(MQClientAPIImpl.class);
+        defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 3000);
+
+        Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
+        field.setAccessible(true);
+        field.set(defaultMQAdminExtImpl, mqClientInstance);
+        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
+        field.setAccessible(true);
+        field.set(mqClientInstance, mQClientAPIImpl);
+
+        ClusterInfo clusterInfo = new ClusterInfo();
+        HashMap<String, BrokerData> brokerAddrTable = new HashMap<>();
+        HashMap<String, Set<String>> clusterAddrTable = new HashMap<>();
+        HashMap<Long, String> brokerAddrs = new HashMap<>();
+        brokerAddrs.put(1234l, "127.0.0.1:10911");
+        BrokerData brokerData = new BrokerData();
+        brokerData.setBrokerName("default-broker");
+        brokerData.setCluster("default-cluster");
+        brokerData.setBrokerAddrs(brokerAddrs);
+        brokerAddrTable.put("default-broker", brokerData);
+        brokerAddrTable.put("broker-test", new BrokerData());
+        Set<String> brokerSet = new HashSet<>();
+        brokerSet.add("default-broker");
+        brokerSet.add("default-broker-one");
+        clusterAddrTable.put("default-cluster", brokerSet);
+        clusterInfo.setBrokerAddrTable(brokerAddrTable);
+        clusterInfo.setClusterAddrTable(clusterAddrTable);
+        when(mQClientAPIImpl.getBrokerClusterInfo(anyLong())).thenReturn(clusterInfo);
+        when(mQClientAPIImpl.cleanExpiredConsumeQueue(anyString(), anyLong())).thenReturn(true);
+    }
+
+    @After
+    public void shutdown() throws Exception {
+    }
+
+    @Test
+    public void testFetchMasterAndSlaveDistinguish() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        Map<String, List<String>> result = CommandUtil.fetchMasterAndSlaveDistinguish(defaultMQAdminExtImpl, "default-cluster");
+        assertThat(result.get(null).get(0)).isEqualTo("127.0.0.1:10911");
+    }
+
+    @Test
+    public void testFetchMasterAddrByClusterName() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
+        Set<String> result = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExtImpl, "default-cluster");
+        assertThat(result.size()).isEqualTo(0);
+    }
+
+    @Test
+    public void testFetchBrokerNameByClusterName() throws Exception {
+        Set<String> result = CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExtImpl, "default-cluster");
+        assertThat(result.contains("default-broker")).isTrue();
+        assertThat(result.contains("default-broker-one")).isTrue();
+        assertThat(result.size()).isEqualTo(2);
+    }
+}
\ No newline at end of file


[12/14] incubator-rocketmq git commit: [ROCKETMQ-57] Add license

Posted by st...@apache.org.
[ROCKETMQ-57] Add license


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/0de84e20
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/0de84e20
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/0de84e20

Branch: refs/heads/master
Commit: 0de84e20fccea6391e3f50fce051be795deeccd7
Parents: 8399389 4291348
Author: stevenschew <st...@apache.org>
Authored: Thu Jan 19 21:00:54 2017 +0800
Committer: stevenschew <st...@apache.org>
Committed: Thu Jan 19 21:00:54 2017 +0800

----------------------------------------------------------------------
 .travis.yml                                     |  45 +++-
 README.md                                       |   2 +-
 .../rocketmq/broker/BrokerTestHarness.java      |  69 ------
 .../broker/api/BrokerFastFailureTest.java       |  61 -----
 .../rocketmq/broker/api/SendMessageTest.java    |  79 ------
 .../offset/ConsumerOffsetManagerTest.java       |  61 -----
 .../broker/topic/TopicConfigManagerTest.java    |  60 -----
 .../consumer/DefaultMQPullConsumerTest.java     | 152 ++++++++++++
 .../consumer/DefaultMQPushConsumerTest.java     | 178 +++++++++++++
 .../store/LocalFileOffsetStoreTest.java         |   3 +-
 .../impl/factory/MQClientInstanceTest.java      |   6 +-
 .../client/producer/DefaultMQProducerTest.java  | 248 +++++++++++++++++++
 pom.xml                                         |  76 +++++-
 .../tools/admin/DefaultMQAdminExtTest.java      |  16 ++
 .../rocketmq/tools/command/CommandUtilTest.java |  16 ++
 15 files changed, 728 insertions(+), 344 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0de84e20/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
----------------------------------------------------------------------
diff --cc tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
index ab6319f,0000000..5964cd1
mode 100644,000000..100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
@@@ -1,240 -1,0 +1,256 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
 +package org.apache.rocketmq.tools.admin;
 +
 +import org.apache.rocketmq.client.ClientConfig;
 +import org.apache.rocketmq.client.exception.MQBrokerException;
 +import org.apache.rocketmq.client.exception.MQClientException;
 +import org.apache.rocketmq.client.impl.MQClientAPIImpl;
 +import org.apache.rocketmq.client.impl.MQClientManager;
 +import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 +import org.apache.rocketmq.common.namesrv.NamesrvUtil;
 +import org.apache.rocketmq.common.protocol.body.*;
 +import org.apache.rocketmq.common.protocol.route.BrokerData;
 +import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 +import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 +import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 +import org.apache.rocketmq.remoting.exception.RemotingException;
 +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 +import org.junit.After;
 +import org.junit.Before;
 +import org.junit.Test;
 +import org.junit.runner.RunWith;
 +import org.mockito.junit.MockitoJUnitRunner;
 +
 +import java.io.UnsupportedEncodingException;
 +import java.lang.reflect.Field;
 +import java.util.*;
 +import java.util.concurrent.ConcurrentHashMap;
 +
 +import static org.assertj.core.api.Assertions.assertThat;
 +import static org.mockito.ArgumentMatchers.anyLong;
 +import static org.mockito.ArgumentMatchers.anyString;
 +import static org.mockito.Mockito.mock;
 +import static org.mockito.Mockito.when;
 +
 +@RunWith(MockitoJUnitRunner.class)
 +public class DefaultMQAdminExtTest {
 +    private DefaultMQAdminExtImpl defaultMQAdminExtImpl;
 +    private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
 +    private MQClientAPIImpl mQClientAPIImpl;
 +    private Properties properties = new Properties();
 +    private TopicList topicList = new TopicList();
 +    private TopicRouteData topicRouteData = new TopicRouteData();
 +    private KVTable kvTable = new KVTable();
 +    private ClusterInfo clusterInfo = new ClusterInfo();
 +
 +    @Before
 +    public void init() throws Exception {
 +        mQClientAPIImpl = mock(MQClientAPIImpl.class);
 +        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
 +        defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
 +
 +        Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
 +        field.setAccessible(true);
 +        field.set(defaultMQAdminExtImpl, mqClientInstance);
 +        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
 +        field.setAccessible(true);
 +        field.set(mqClientInstance, mQClientAPIImpl);
 +
 +        properties.setProperty("maxMessageSize", "5000000");
 +        properties.setProperty("flushDelayOffsetInterval", "15000");
 +        properties.setProperty("serverSocketRcvBufSize", "655350");
 +        when(mQClientAPIImpl.getBrokerConfig(anyString(), anyLong())).thenReturn(properties);
 +
 +        Set<String> topicSet = new HashSet<>();
 +        topicSet.add("topic_one");
 +        topicSet.add("topic_two");
 +        topicList.setTopicList(topicSet);
 +        when(mQClientAPIImpl.getTopicListFromNameServer(anyLong())).thenReturn(topicList);
 +
 +
 +        List<BrokerData> brokerDatas = new ArrayList<>();
 +        HashMap<Long, String> brokerAddrs = new HashMap<>();
 +        brokerAddrs.put(1234l, "127.0.0.1:10911");
 +        BrokerData brokerData = new BrokerData();
 +        brokerData.setCluster("default-cluster");
 +        brokerData.setBrokerName("default-broker");
 +        brokerData.setBrokerAddrs(brokerAddrs);
 +        brokerDatas.add(brokerData);
 +        topicRouteData.setBrokerDatas(brokerDatas);
 +        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData);
 +
 +        HashMap<String, String> result = new HashMap<>();
 +        result.put("id", "1234");
 +        result.put("brokerName", "default-broker");
 +        kvTable.setTable(result);
 +        when(mQClientAPIImpl.getBrokerRuntimeInfo(anyString(), anyLong())).thenReturn(kvTable);
 +
 +        HashMap<String, BrokerData> brokerAddrTable = new HashMap<>();
 +        brokerAddrTable.put("default-broker", brokerData);
 +        brokerAddrTable.put("broker-test", new BrokerData());
 +        clusterInfo.setBrokerAddrTable(brokerAddrTable);
 +        when(mQClientAPIImpl.getBrokerClusterInfo(anyLong())).thenReturn(clusterInfo);
 +        when(mQClientAPIImpl.cleanExpiredConsumeQueue(anyString(), anyLong())).thenReturn(true);
 +
 +        Set<String> clusterList = new HashSet<>();
 +        clusterList.add("default-cluster-one");
 +        clusterList.add("default-cluster-two");
 +        when(mQClientAPIImpl.getClusterList(anyString(), anyLong())).thenReturn(clusterList);
 +
 +        GroupList groupList = new GroupList();
 +        HashSet<String> groups = new HashSet<>();
 +        groups.add("consumer-group-one");
 +        groups.add("consumer-group-two");
 +        groupList.setGroupList(groups);
 +        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData);
 +        when(mQClientAPIImpl.queryTopicConsumeByWho(anyString(), anyString(), anyLong())).thenReturn(groupList);
 +
 +        SubscriptionGroupWrapper subscriptionGroupWrapper = new SubscriptionGroupWrapper();
 +        ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptions = new ConcurrentHashMap<>();
 +        SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
 +        subscriptionGroupConfig.setConsumeBroadcastEnable(true);
 +        subscriptionGroupConfig.setBrokerId(1234);
 +        subscriptionGroupConfig.setGroupName("Consumer-group-one");
 +        subscriptions.put("Consumer-group-one", subscriptionGroupConfig);
 +        subscriptionGroupWrapper.setSubscriptionGroupTable(subscriptions);
 +        when(mQClientAPIImpl.getAllSubscriptionGroup(anyString(), anyLong())).thenReturn(subscriptionGroupWrapper);
 +
 +        String topicListConfig = "topicListConfig";
 +        when(mQClientAPIImpl.getKVConfigValue(anyString(), anyString(), anyLong())).thenReturn(topicListConfig);
 +
 +        KVTable kvTable = new KVTable();
 +        HashMap<String, String> kv = new HashMap<>();
 +        kv.put("broker-name", "broker-one");
 +        kv.put("cluster-name", "default-cluster");
 +        kvTable.setTable(kv);
 +        when(mQClientAPIImpl.getKVListByNamespace(anyString(), anyLong())).thenReturn(kvTable);
 +    }
 +
 +    @After
 +    public void terminate() throws Exception {
 +        if (defaultMQAdminExtImpl != null)
 +            defaultMQAdminExtImpl.shutdown();
 +    }
 +
 +    @Test
 +    public void testUpdateBrokerConfig() throws InterruptedException, RemotingConnectException, UnsupportedEncodingException, RemotingTimeoutException, MQBrokerException, RemotingSendRequestException {
 +        Properties result = defaultMQAdminExtImpl.getBrokerConfig("127.0.0.1:10911");
 +        assertThat(result.getProperty("maxMessageSize")).isEqualTo("5000000");
 +        assertThat(result.getProperty("flushDelayOffsetInterval")).isEqualTo("15000");
 +        assertThat(result.getProperty("serverSocketRcvBufSize")).isEqualTo("655350");
 +    }
 +
 +
 +    @Test
 +    public void testFetchAllTopicList() throws RemotingException, MQClientException, InterruptedException {
 +        TopicList topicList = defaultMQAdminExtImpl.fetchAllTopicList();
 +        assertThat(topicList.getTopicList().size()).isEqualTo(2);
 +        assertThat(topicList.getTopicList()).contains("topic_one");
 +    }
 +
 +    @Test
 +    public void testFetchBrokerRuntimeStats() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
 +        KVTable brokerStats = defaultMQAdminExtImpl.fetchBrokerRuntimeStats("127.0.0.1:10911");
 +        assertThat(brokerStats.getTable().get("id")).isEqualTo("1234");
 +        assertThat(brokerStats.getTable().get("brokerName")).isEqualTo("default-broker");
 +    }
 +
 +    @Test
 +    public void testExamineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
 +        ClusterInfo clusterInfo = defaultMQAdminExtImpl.examineBrokerClusterInfo();
 +        HashMap<String, BrokerData> brokerList = clusterInfo.getBrokerAddrTable();
 +        assertThat(brokerList.get("default-broker").getBrokerName()).isEqualTo("default-broker");
 +        assertThat(brokerList.containsKey("broker-test")).isTrue();
 +
 +        HashMap<String, Set<String>> clusterMap = new HashMap<>();
 +        Set<String> brokers = new HashSet<>();
 +        brokers.add("default-broker");
 +        brokers.add("broker-test");
 +        clusterMap.put("default-cluster", brokers);
 +        ClusterInfo cInfo = mock(ClusterInfo.class);
 +        when(cInfo.getClusterAddrTable()).thenReturn(clusterMap);
 +        HashMap<String, Set<String>> clusterAddress = cInfo.getClusterAddrTable();
 +        assertThat(clusterAddress.containsKey("default-cluster")).isTrue();
 +        assertThat(clusterAddress.get("default-cluster").size()).isEqualTo(2);
 +    }
 +
 +    @Test
 +    public void testExamineTopicRouteInfo() throws RemotingException, MQClientException, InterruptedException {
 +        TopicRouteData topicRouteData = defaultMQAdminExtImpl.examineTopicRouteInfo("UnitTest");
 +        assertThat(topicRouteData.getBrokerDatas().get(0).getBrokerName()).isEqualTo("default-broker");
 +        assertThat(topicRouteData.getBrokerDatas().get(0).getCluster()).isEqualTo("default-cluster");
 +    }
 +
 +    @Test
 +    public void testGetNameServerAddressList() {
 +        List<String> result = new ArrayList<>();
 +        result.add("default-name-one");
 +        result.add("default-name-two");
 +        when(mqClientInstance.getMQClientAPIImpl().getNameServerAddressList()).thenReturn(result);
 +        List<String> nameList = defaultMQAdminExtImpl.getNameServerAddressList();
 +        assertThat(nameList.get(0)).isEqualTo("default-name-one");
 +        assertThat(nameList.get(1)).isEqualTo("default-name-two");
 +    }
 +
 +    @Test
 +    public void testPutKVConfig() throws RemotingException, MQClientException, InterruptedException {
 +        String topicConfig = defaultMQAdminExtImpl.getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, "UnitTest");
 +        assertThat(topicConfig).isEqualTo("topicListConfig");
 +        KVTable kvs = defaultMQAdminExtImpl.getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
 +        assertThat(kvs.getTable().get("broker-name")).isEqualTo("broker-one");
 +        assertThat(kvs.getTable().get("cluster-name")).isEqualTo("default-cluster");
 +    }
 +
 +
 +    @Test
 +    public void testQueryTopicConsumeByWho() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
 +        GroupList groupList = defaultMQAdminExtImpl.queryTopicConsumeByWho("UnitTest");
 +        assertThat(groupList.getGroupList().contains("consumer-group-two")).isTrue();
 +    }
 +
 +    @Test
 +    public void testCleanExpiredConsumerQueueByAddr() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
 +        boolean clean = defaultMQAdminExtImpl.cleanExpiredConsumerQueueByAddr("127.0.0.1:10911");
 +        assertThat(clean).isTrue();
 +    }
 +
 +    @Test
 +    public void testGetClusterList() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
 +        Set<String> clusterlist = defaultMQAdminExtImpl.getClusterList("UnitTest");
 +        assertThat(clusterlist.contains("default-cluster-one")).isTrue();
 +        assertThat(clusterlist.contains("default-cluster-two")).isTrue();
 +    }
 +
 +    @Test
 +    public void testFetchConsumeStatsInBroker() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
 +        ConsumeStatsList result = new ConsumeStatsList();
 +        result.setBrokerAddr("127.0.0.1:10911");
 +        when(mqClientInstance.getMQClientAPIImpl().fetchConsumeStatsInBroker("127.0.0.1:10911", false, 10000)).thenReturn(result);
 +        ConsumeStatsList consumeStatsList = defaultMQAdminExtImpl.fetchConsumeStatsInBroker("127.0.0.1:10911", false, 10000);
 +        assertThat(consumeStatsList.getBrokerAddr()).isEqualTo("127.0.0.1:10911");
 +    }
 +
 +    @Test
 +    public void testGetAllSubscriptionGroup() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
 +        SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExtImpl.getAllSubscriptionGroup("127.0.0.1:10911", 10000);
 +        assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getBrokerId()).isEqualTo(1234);
 +        assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").getGroupName()).isEqualTo("Consumer-group-one");
 +        assertThat(subscriptionGroupWrapper.getSubscriptionGroupTable().get("Consumer-group-one").isConsumeBroadcastEnable()).isTrue();
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0de84e20/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java
----------------------------------------------------------------------
diff --cc tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java
index 35c205e,0000000..ba58010
mode 100644,000000..100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/command/CommandUtilTest.java
@@@ -1,92 -1,0 +1,108 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
 +package org.apache.rocketmq.tools.command;
 +
 +import org.apache.rocketmq.client.ClientConfig;
 +import org.apache.rocketmq.client.exception.MQBrokerException;
 +import org.apache.rocketmq.client.exception.MQClientException;
 +import org.apache.rocketmq.client.impl.MQClientAPIImpl;
 +import org.apache.rocketmq.client.impl.MQClientManager;
 +import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 +import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 +import org.apache.rocketmq.common.protocol.route.BrokerData;
 +import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 +import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 +import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
 +import org.junit.After;
 +import org.junit.Before;
 +import org.junit.Test;
 +
 +import java.lang.reflect.Field;
 +import java.util.*;
 +
 +import static org.assertj.core.api.Assertions.assertThat;
 +import static org.mockito.ArgumentMatchers.anyLong;
 +import static org.mockito.ArgumentMatchers.anyString;
 +import static org.mockito.Mockito.mock;
 +import static org.mockito.Mockito.when;
 +
 +public class CommandUtilTest {
 +    private DefaultMQAdminExt defaultMQAdminExt;
 +    private DefaultMQAdminExtImpl defaultMQAdminExtImpl;
 +    private MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
 +    private MQClientAPIImpl mQClientAPIImpl;
 +
 +    @Before
 +    public void setup() throws MQClientException, NoSuchFieldException, IllegalAccessException, InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
 +        defaultMQAdminExt = mock(DefaultMQAdminExt.class);
 +        MQClientAPIImpl mQClientAPIImpl = mock(MQClientAPIImpl.class);
 +        defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 3000);
 +
 +        Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
 +        field.setAccessible(true);
 +        field.set(defaultMQAdminExtImpl, mqClientInstance);
 +        field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
 +        field.setAccessible(true);
 +        field.set(mqClientInstance, mQClientAPIImpl);
 +
 +        ClusterInfo clusterInfo = new ClusterInfo();
 +        HashMap<String, BrokerData> brokerAddrTable = new HashMap<>();
 +        HashMap<String, Set<String>> clusterAddrTable = new HashMap<>();
 +        HashMap<Long, String> brokerAddrs = new HashMap<>();
 +        brokerAddrs.put(1234l, "127.0.0.1:10911");
 +        BrokerData brokerData = new BrokerData();
 +        brokerData.setBrokerName("default-broker");
 +        brokerData.setCluster("default-cluster");
 +        brokerData.setBrokerAddrs(brokerAddrs);
 +        brokerAddrTable.put("default-broker", brokerData);
 +        brokerAddrTable.put("broker-test", new BrokerData());
 +        Set<String> brokerSet = new HashSet<>();
 +        brokerSet.add("default-broker");
 +        brokerSet.add("default-broker-one");
 +        clusterAddrTable.put("default-cluster", brokerSet);
 +        clusterInfo.setBrokerAddrTable(brokerAddrTable);
 +        clusterInfo.setClusterAddrTable(clusterAddrTable);
 +        when(mQClientAPIImpl.getBrokerClusterInfo(anyLong())).thenReturn(clusterInfo);
 +        when(mQClientAPIImpl.cleanExpiredConsumeQueue(anyString(), anyLong())).thenReturn(true);
 +    }
 +
 +    @After
 +    public void shutdown() throws Exception {
 +    }
 +
 +    @Test
 +    public void testFetchMasterAndSlaveDistinguish() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
 +        Map<String, List<String>> result = CommandUtil.fetchMasterAndSlaveDistinguish(defaultMQAdminExtImpl, "default-cluster");
 +        assertThat(result.get(null).get(0)).isEqualTo("127.0.0.1:10911");
 +    }
 +
 +    @Test
 +    public void testFetchMasterAddrByClusterName() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
 +        Set<String> result = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExtImpl, "default-cluster");
 +        assertThat(result.size()).isEqualTo(0);
 +    }
 +
 +    @Test
 +    public void testFetchBrokerNameByClusterName() throws Exception {
 +        Set<String> result = CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExtImpl, "default-cluster");
 +        assertThat(result.contains("default-broker")).isTrue();
 +        assertThat(result.contains("default-broker-one")).isTrue();
 +        assertThat(result.size()).isEqualTo(2);
 +    }
 +}


[10/14] incubator-rocketmq git commit: Merge branch 'ROCKETMQ-57' of https://git-wip-us.apache.org/repos/asf/incubator-rocketmq into ROCKETMQ-57

Posted by st...@apache.org.
Merge branch 'ROCKETMQ-57' of https://git-wip-us.apache.org/repos/asf/incubator-rocketmq into ROCKETMQ-57


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/83993896
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/83993896
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/83993896

Branch: refs/heads/master
Commit: 83993896e223f6ef08b0573f98f50177c5463b79
Parents: b29ff37 626315a
Author: stevenschew <st...@apache.org>
Authored: Thu Jan 19 11:41:37 2017 +0800
Committer: stevenschew <st...@apache.org>
Committed: Thu Jan 19 11:41:37 2017 +0800

----------------------------------------------------------------------

----------------------------------------------------------------------



[13/14] incubator-rocketmq git commit: [ROCKETMQ-57] Add unit test for DefaultMQAdminExt

Posted by st...@apache.org.
[ROCKETMQ-57] Add unit test for DefaultMQAdminExt


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/f8d881ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/f8d881ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/f8d881ba

Branch: refs/heads/master
Commit: f8d881babe3f8f8810f1303912b2fb0ef2badea0
Parents: 0de84e2
Author: stevenschew <st...@apache.org>
Authored: Fri Jan 20 14:53:43 2017 +0800
Committer: stevenschew <st...@apache.org>
Committed: Fri Jan 20 14:53:43 2017 +0800

----------------------------------------------------------------------
 .../tools/admin/DefaultMQAdminExtTest.java      | 136 ++++++++++++++++++-
 1 file changed, 130 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/f8d881ba/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
----------------------------------------------------------------------
diff --git a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
index 5964cd1..da3bd8c 100644
--- a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
+++ b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
@@ -22,15 +22,21 @@ import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.MQClientAPIImpl;
 import org.apache.rocketmq.client.impl.MQClientManager;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.admin.*;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.namesrv.NamesrvUtil;
 import org.apache.rocketmq.common.protocol.body.*;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.exception.*;
+import org.apache.rocketmq.tools.admin.api.MessageTrack;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -43,8 +49,7 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.*;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -93,6 +98,8 @@ public class DefaultMQAdminExtTest {
         brokerData.setBrokerAddrs(brokerAddrs);
         brokerDatas.add(brokerData);
         topicRouteData.setBrokerDatas(brokerDatas);
+        topicRouteData.setQueueDatas(new ArrayList<QueueData>());
+        topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
         when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(topicRouteData);
 
         HashMap<String, String> result = new HashMap<>();
@@ -105,6 +112,7 @@ public class DefaultMQAdminExtTest {
         brokerAddrTable.put("default-broker", brokerData);
         brokerAddrTable.put("broker-test", new BrokerData());
         clusterInfo.setBrokerAddrTable(brokerAddrTable);
+        clusterInfo.setClusterAddrTable(new HashMap<String, Set<String>>());
         when(mQClientAPIImpl.getBrokerClusterInfo(anyLong())).thenReturn(clusterInfo);
         when(mQClientAPIImpl.cleanExpiredConsumeQueue(anyString(), anyLong())).thenReturn(true);
 
@@ -140,6 +148,52 @@ public class DefaultMQAdminExtTest {
         kv.put("cluster-name", "default-cluster");
         kvTable.setTable(kv);
         when(mQClientAPIImpl.getKVListByNamespace(anyString(), anyLong())).thenReturn(kvTable);
+
+        ConsumeStats consumeStats = new ConsumeStats();
+        consumeStats.setConsumeTps(1234);
+        MessageQueue messageQueue = new MessageQueue();
+        OffsetWrapper offsetWrapper = new OffsetWrapper();
+        HashMap<MessageQueue, OffsetWrapper> stats = new HashMap<>();
+        stats.put(messageQueue, offsetWrapper);
+        consumeStats.setOffsetTable(stats);
+        when(mQClientAPIImpl.getConsumeStats(anyString(), anyString(), anyString(), anyLong())).thenReturn(consumeStats);
+
+        ConsumerConnection consumerConnection = new ConsumerConnection();
+        consumerConnection.setConsumeType(ConsumeType.CONSUME_PASSIVELY);
+        consumerConnection.setMessageModel(MessageModel.CLUSTERING);
+        HashSet<Connection> connections = new HashSet<>();
+        connections.add(new Connection());
+        consumerConnection.setConnectionSet(connections);
+        consumerConnection.setSubscriptionTable(new ConcurrentHashMap<String, SubscriptionData>());
+        consumerConnection.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        when(mQClientAPIImpl.getConsumerConnectionList(anyString(), anyString(), anyLong())).thenReturn(consumerConnection);
+
+        ProducerConnection producerConnection = new ProducerConnection();
+        Connection connection = new Connection();
+        connection.setClientAddr("127.0.0.1:9898");
+        connection.setClientId("PID_12345");
+        HashSet<Connection> connectionSet = new HashSet<Connection>();
+        connectionSet.add(connection);
+        producerConnection.setConnectionSet(connectionSet);
+        when(mQClientAPIImpl.getProducerConnectionList(anyString(), anyString(), anyLong())).thenReturn(producerConnection);
+
+        when(mQClientAPIImpl.wipeWritePermOfBroker(anyString(), anyString(), anyLong())).thenReturn(6);
+
+        TopicStatsTable topicStatsTable = new TopicStatsTable();
+        topicStatsTable.setOffsetTable(new HashMap<MessageQueue, TopicOffset>());
+
+        Map<String, Map<MessageQueue, Long>> consumerStatus = new HashMap<>();
+        when(mQClientAPIImpl.invokeBrokerToGetConsumerStatus(anyString(), anyString(), anyString(), anyString(), anyLong())).thenReturn(consumerStatus);
+
+        List<QueueTimeSpan> queueTimeSpanList = new ArrayList<>();
+        when(mQClientAPIImpl.queryConsumeTimeSpan(anyString(), anyString(), anyString(), anyLong())).thenReturn(queueTimeSpanList);
+
+        ConsumerRunningInfo consumerRunningInfo = new ConsumerRunningInfo();
+        consumerRunningInfo.setJstack("test");
+        consumerRunningInfo.setMqTable(new TreeMap<MessageQueue, ProcessQueueInfo>());
+        consumerRunningInfo.setStatusTable(new TreeMap<String, ConsumeStatus>());
+        consumerRunningInfo.setSubscriptionSet(new TreeSet<SubscriptionData>());
+        when(mQClientAPIImpl.getConsumerRunningInfo(anyString(), anyString(), anyString(), anyBoolean(), anyLong())).thenReturn(consumerRunningInfo);
     }
 
     @After
@@ -191,6 +245,31 @@ public class DefaultMQAdminExtTest {
     }
 
     @Test
+    public void testExamineConsumeStats() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        ConsumeStats consumeStats = defaultMQAdminExtImpl.examineConsumeStats("default-consumer-group", "unit-test");
+        assertThat(consumeStats.getConsumeTps()).isEqualTo(1234);
+    }
+
+    @Test
+    public void testExamineConsumerConnectionInfo() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        ConsumerConnection consumerConnection = defaultMQAdminExtImpl.examineConsumerConnectionInfo("default-consumer-group");
+        assertThat(consumerConnection.getConsumeType()).isEqualTo(ConsumeType.CONSUME_PASSIVELY);
+        assertThat(consumerConnection.getMessageModel()).isEqualTo(MessageModel.CLUSTERING);
+    }
+
+    @Test
+    public void testExamineProducerConnectionInfo() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        ProducerConnection producerConnection = defaultMQAdminExtImpl.examineProducerConnectionInfo("default-producer-group", "unit-test");
+        assertThat(producerConnection.getConnectionSet().size()).isEqualTo(1);
+    }
+
+    @Test
+    public void testWipeWritePermOfBroker() throws InterruptedException, RemotingCommandException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, RemotingConnectException {
+        int result = defaultMQAdminExtImpl.wipeWritePermOfBroker("127.0.0.1:9876", "default-broker");
+        assertThat(result).isEqualTo(6);
+    }
+
+    @Test
     public void testExamineTopicRouteInfo() throws RemotingException, MQClientException, InterruptedException {
         TopicRouteData topicRouteData = defaultMQAdminExtImpl.examineTopicRouteInfo("UnitTest");
         assertThat(topicRouteData.getBrokerDatas().get(0).getBrokerName()).isEqualTo("default-broker");
@@ -225,12 +304,57 @@ public class DefaultMQAdminExtTest {
     }
 
     @Test
+    public void testQueryConsumeTimeSpan() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        List<QueueTimeSpan> result = defaultMQAdminExtImpl.queryConsumeTimeSpan("unit-test", "default-broker-group");
+        assertThat(result.size()).isEqualTo(0);
+    }
+
+    @Test
+    public void testCleanExpiredConsumerQueue() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
+        boolean result = defaultMQAdminExtImpl.cleanExpiredConsumerQueue("default-cluster");
+        assertThat(result).isFalse();
+    }
+
+    @Test
     public void testCleanExpiredConsumerQueueByAddr() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
         boolean clean = defaultMQAdminExtImpl.cleanExpiredConsumerQueueByAddr("127.0.0.1:10911");
         assertThat(clean).isTrue();
     }
 
     @Test
+    public void testCleanUnusedTopic() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
+        boolean result = defaultMQAdminExtImpl.cleanUnusedTopic("default-cluster");
+        assertThat(result).isFalse();
+    }
+
+    @Test
+    public void testGetConsumerRunningInfo() throws RemotingException, MQClientException, InterruptedException {
+        ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExtImpl.getConsumerRunningInfo("consumer-group", "cid_123", false);
+        assertThat(consumerRunningInfo.getJstack()).isEqualTo("test");
+    }
+
+    @Test
+    public void testMessageTrackDetail() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        MessageExt messageExt = new MessageExt();
+        messageExt.setMsgId("msgId");
+        messageExt.setTopic("unit-test");
+        List<MessageTrack> messageTrackList = defaultMQAdminExtImpl.messageTrackDetail(messageExt);
+        assertThat(messageTrackList.size()).isEqualTo(2);
+    }
+
+    @Test
+    public void testGetConsumeStatus() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        Map<String, Map<MessageQueue, Long>> result = defaultMQAdminExtImpl.getConsumeStatus("unit-test", "default-broker-group", "127.0.0.1:10911");
+        assertThat(result.size()).isEqualTo(0);
+    }
+
+    @Test
+    public void testGetTopicClusterList() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        Set<String> result = defaultMQAdminExtImpl.getTopicClusterList("unit-test");
+        assertThat(result.size()).isEqualTo(0);
+    }
+
+    @Test
     public void testGetClusterList() throws InterruptedException, RemotingTimeoutException, MQClientException, RemotingSendRequestException, RemotingConnectException {
         Set<String> clusterlist = defaultMQAdminExtImpl.getClusterList("UnitTest");
         assertThat(clusterlist.contains("default-cluster-one")).isTrue();


[14/14] incubator-rocketmq git commit: Merge branch 'master' into ROCKETMQ-57

Posted by st...@apache.org.
Merge branch 'master' into ROCKETMQ-57


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/63de56c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/63de56c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/63de56c7

Branch: refs/heads/master
Commit: 63de56c7b6cada18338bb4be5e84fb17a379e5ac
Parents: f8d881b b4108d2
Author: stevenschew <st...@apache.org>
Authored: Fri Jan 20 14:55:52 2017 +0800
Committer: stevenschew <st...@apache.org>
Committed: Fri Jan 20 14:55:52 2017 +0800

----------------------------------------------------------------------
 .travis.yml                                     |   1 +
 BUILDING                                        |   2 +-
 .../rocketmq/broker/out/BrokerOuterAPI.java     |   1 -
 .../subscription/SubscriptionGroupManager.java  |   2 +-
 .../rocketmq/client/impl/MQClientManager.java   |   3 +-
 .../processor/DefaultRequestProcessorTest.java  | 246 +++++++++++++++
 pom.xml                                         |  73 +++--
 .../remoting/protocol/RemotingSerializable.java |   2 +-
 .../remoting/protocol/RocketMQSerializable.java |  17 +-
 style/rmq_checkstyle.xml                        |   1 -
 test/pom.xml                                    |  52 ++++
 .../test/client/mq/MQAsyncProducer.java         |  85 ++++++
 .../test/client/rmq/RMQAsyncSendProducer.java   | 226 ++++++++++++++
 .../test/client/rmq/RMQBroadCastConsumer.java   |  37 +++
 .../test/client/rmq/RMQNormalConsumer.java      |  90 ++++++
 .../test/client/rmq/RMQNormalProducer.java      | 167 ++++++++++
 .../clientinterface/AbstractMQConsumer.java     | 112 +++++++
 .../clientinterface/AbstractMQProducer.java     | 151 +++++++++
 .../test/clientinterface/MQCollector.java       | 108 +++++++
 .../test/clientinterface/MQConsumer.java        |  26 ++
 .../test/clientinterface/MQProducer.java        |  30 ++
 .../rocketmq/test/factory/ConsumerFactory.java  |  45 +++
 .../rocketmq/test/factory/MQMessageFactory.java | 128 ++++++++
 .../rocketmq/test/factory/MessageFactory.java   |  62 ++++
 .../rocketmq/test/factory/ProducerFactory.java  |  37 +++
 .../test/factory/SendCallBackFactory.java       |  35 +++
 .../rocketmq/test/factory/TagMessage.java       | 108 +++++++
 .../test/listener/AbstractListener.java         | 104 +++++++
 .../rmq/concurrent/RMQDelayListner.java         |  61 ++++
 .../rmq/concurrent/RMQNormalListner.java        |  70 +++++
 .../listener/rmq/order/RMQOrderListener.java    |  85 ++++++
 .../rocketmq/test/message/MessageQueueMsg.java  |  62 ++++
 .../rocketmq/test/sendresult/SendResult.java    |  62 ++++
 .../apache/rocketmq/test/util/Condition.java    |  23 ++
 .../test/util/DuplicateMessageInfo.java         | 137 +++++++++
 .../org/apache/rocketmq/test/util/FileUtil.java | 108 +++++++
 .../org/apache/rocketmq/test/util/MQAdmin.java  | 164 ++++++++++
 .../rocketmq/test/util/MQRandomUtils.java       |  28 ++
 .../org/apache/rocketmq/test/util/MQWait.java   |  93 ++++++
 .../apache/rocketmq/test/util/RandomUtil.java   | 306 +++++++++++++++++++
 .../apache/rocketmq/test/util/RandomUtils.java  |  91 ++++++
 .../org/apache/rocketmq/test/util/TestUtil.java | 122 ++++++++
 .../apache/rocketmq/test/util/TestUtils.java    |  50 +++
 .../apache/rocketmq/test/util/VerifyUtils.java  | 146 +++++++++
 .../test/util/data/collect/DataCollector.java   |  45 +++
 .../util/data/collect/DataCollectorManager.java | 112 +++++++
 .../test/util/data/collect/DataFilter.java      |  22 ++
 .../collect/impl/ListDataCollectorImpl.java     |  95 ++++++
 .../data/collect/impl/MapDataCollectorImpl.java | 111 +++++++
 .../test/util/parallel/ParallelTask.java        |  43 +++
 .../util/parallel/ParallelTaskExecutor.java     |  67 ++++
 .../rocketmq/test/util/parallel/Task4Test.java  |  30 ++
 .../org/apache/rocketmq/test/base/BaseConf.java | 162 ++++++++++
 .../rocketmq/test/base/IntegrationTestBase.java | 143 +++++++++
 .../balance/NormalMsgDynamicBalanceIT.java      | 111 +++++++
 .../balance/NormalMsgStaticBalanceIT.java       | 109 +++++++
 .../consumer/broadcast/BaseBroadCastIT.java     |  56 ++++
 .../normal/BroadCastNormalMsgNotRecvIT.java     |  73 +++++
 .../normal/BroadCastNormalMsgRecvCrashIT.java   |  90 ++++++
 .../normal/BroadCastNormalMsgRecvFailIT.java    |  72 +++++
 .../BroadCastNormalMsgRecvStartLaterIT.java     |  88 ++++++
 .../BroadCastNormalMsgTwoDiffGroupRecvIT.java   |  78 +++++
 .../normal/NormalMsgTwoSameGroupConsumerIT.java |  78 +++++
 .../broadcast/order/OrderMsgBroadCastIT.java    |  75 +++++
 .../tag/BroadCastTwoConsumerFilterIT.java       |  78 +++++
 .../tag/BroadCastTwoConsumerSubDiffTagIT.java   |  75 +++++
 .../tag/BroadCastTwoConsumerSubTagIT.java       |  75 +++++
 .../consumer/cluster/DynamicAddAndCrashIT.java  | 103 +++++++
 .../consumer/cluster/DynamicAddConsumerIT.java  |  97 ++++++
 .../cluster/DynamicCrashConsumerIT.java         | 100 ++++++
 .../test/client/consumer/tag/MulTagSubIT.java   | 156 ++++++++++
 .../consumer/tag/TagMessageWith1ConsumerIT.java | 197 ++++++++++++
 .../tag/TagMessageWithMulConsumerIT.java        | 196 ++++++++++++
 .../tag/TagMessageWithSameGroupConsumerIT.java  | 115 +++++++
 .../consumer/topic/MulConsumerMulTopicIT.java   | 108 +++++++
 .../consumer/topic/OneConsumerMulTopicIT.java   | 104 +++++++
 .../producer/async/AsyncSendExceptionIT.java    | 150 +++++++++
 .../async/AsyncSendWithMessageQueueIT.java      |  85 ++++++
 .../AsyncSendWithMessageQueueSelectorIT.java    | 106 +++++++
 .../async/AsyncSendWithOnlySendCallBackIT.java  |  64 ++++
 .../producer/exception/msg/ChinaPropIT.java     |  74 +++++
 .../exception/msg/MessageExceptionIT.java       | 129 ++++++++
 .../exception/msg/MessageUserPropIT.java        |  94 ++++++
 .../ProducerGroupAndInstanceNameValidityIT.java |  69 +++++
 .../producer/oneway/OneWaySendExceptionIT.java  |  78 +++++
 .../client/producer/oneway/OneWaySendIT.java    |  64 ++++
 .../producer/oneway/OneWaySendWithMQIT.java     |  79 +++++
 .../oneway/OneWaySendWithSelectorIT.java        | 104 +++++++
 .../order/OrderMsgDynamicRebalanceIT.java       | 115 +++++++
 .../test/client/producer/order/OrderMsgIT.java  | 108 +++++++
 .../producer/order/OrderMsgRebalanceIT.java     | 133 ++++++++
 .../producer/order/OrderMsgWithTagIT.java       | 169 ++++++++++
 .../querymsg/QueryMsgByIdExceptionIT.java       |  81 +++++
 .../producer/querymsg/QueryMsgByIdIT.java       |  75 +++++
 .../producer/querymsg/QueryMsgByKeyIT.java      | 104 +++++++
 .../apache/rocketmq/test/delay/DelayConf.java   |  27 ++
 .../rocketmq/test/delay/NormalMsgDelayIT.java   | 116 +++++++
 .../test/smoke/NormalMessageSendAndRecvIT.java  |  62 ++++
 test/src/test/resources/log4j.xml               |  46 +++
 test/src/test/resources/logback-test.xml        |  33 ++
 100 files changed, 8750 insertions(+), 38 deletions(-)
----------------------------------------------------------------------



[03/14] incubator-rocketmq git commit: [ROCKETMQ-52] Add unit tests for LocalFileOffsetStore

Posted by st...@apache.org.
[ROCKETMQ-52] Add unit tests for LocalFileOffsetStore


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/6b5ab40c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/6b5ab40c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/6b5ab40c

Branch: refs/heads/master
Commit: 6b5ab40ced9f3f40f0acc18ba4eaf64b76686701
Parents: c0e7629
Author: yukon <yu...@apache.org>
Authored: Tue Jan 17 15:15:16 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Jan 17 15:15:16 2017 +0800

----------------------------------------------------------------------
 .../store/LocalFileOffsetStoreTest.java         | 74 ++++++++++++++++++++
 1 file changed, 74 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6b5ab40c/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
new file mode 100644
index 0000000..58d99d6
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.store;
+
+import java.util.Collections;
+import java.util.HashSet;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+
+public class LocalFileOffsetStoreTest {
+
+    @Mock
+    private static MQClientInstance mQClientFactory;
+    private static String group = "FooBarGroup";
+    private static String topic = "FooBar";
+    private static String brokerName = "DefaultBrokerName";
+
+    @BeforeClass
+    public static void init() {
+        System.setProperty("rocketmq.client.localOffsetStoreDir", System.getProperty("java.io.tmpdir") + ".rocketmq_offsets");
+        mQClientFactory = Mockito.mock(MQClientInstance.class);
+        String clientId = new ClientConfig().buildMQClientId() + "#TestNamespace" + System.currentTimeMillis();
+        when(mQClientFactory.getClientId()).thenReturn(clientId);
+    }
+
+    @Test
+    public void testUpdateOffset() throws Exception {
+        OffsetStore offsetStore = new LocalFileOffsetStore(mQClientFactory, group);
+        MessageQueue messageQueue = new MessageQueue(topic, brokerName, 1);
+        offsetStore.updateOffset(messageQueue, 1024, false);
+
+        assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1024);
+
+        offsetStore.updateOffset(messageQueue, 1023, false);
+        assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1023);
+
+        offsetStore.updateOffset(messageQueue, 1022, true);
+        assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1023);
+    }
+
+    @Test
+    public void testReadOffset_FromStore() throws Exception {
+        OffsetStore offsetStore = new LocalFileOffsetStore(mQClientFactory, group);
+        MessageQueue messageQueue = new MessageQueue(topic, brokerName, 2);
+
+        offsetStore.updateOffset(messageQueue, 1024, false);
+        assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(-1);
+
+        offsetStore.persistAll(new HashSet<>(Collections.singletonList(messageQueue)));
+        assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1024);
+    }
+}
\ No newline at end of file


[04/14] incubator-rocketmq git commit: [ROCKETMQ-52] Add unit tests for RemoteBrokerOffsetStore

Posted by st...@apache.org.
[ROCKETMQ-52] Add unit tests for RemoteBrokerOffsetStore


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/2e226f1d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/2e226f1d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/2e226f1d

Branch: refs/heads/master
Commit: 2e226f1deee6968782159c31ca77323b86664482
Parents: 6b5ab40
Author: yukon <yu...@apache.org>
Authored: Tue Jan 17 16:15:11 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Jan 17 16:15:11 2017 +0800

----------------------------------------------------------------------
 .../store/RemoteBrokerOffsetStoreTest.java      | 141 +++++++++++++++++++
 1 file changed, 141 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/2e226f1d/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
new file mode 100644
index 0000000..9a6a4b5
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.store;
+
+import java.util.Collections;
+import java.util.HashSet;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.impl.FindBrokerResult;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class RemoteBrokerOffsetStoreTest {
+
+    @Mock
+    private static MQClientInstance mQClientFactory;
+    @Mock
+    private static MQClientAPIImpl mqClientAPI;
+    private static String group = "FooBarGroup";
+    private static String topic = "FooBar";
+    private static String brokerName = "DefaultBrokerName";
+
+    @BeforeClass
+    public static void init() {
+        System.setProperty("rocketmq.client.localOffsetStoreDir", System.getProperty("java.io.tmpdir") + ".rocketmq_offsets");
+        mQClientFactory = mock(MQClientInstance.class);
+        String clientId = new ClientConfig().buildMQClientId() + "#TestNamespace" + System.currentTimeMillis();
+        when(mQClientFactory.getClientId()).thenReturn(clientId);
+        when(mQClientFactory.findBrokerAddressInAdmin(brokerName)).thenReturn(new FindBrokerResult("127.0.0.1", false));
+
+        mqClientAPI = mock(MQClientAPIImpl.class);
+        when(mQClientFactory.getMQClientAPIImpl()).thenReturn(mqClientAPI);
+    }
+
+    @Test
+    public void testUpdateOffset() throws Exception {
+        OffsetStore offsetStore = new RemoteBrokerOffsetStore(mQClientFactory, group);
+        MessageQueue messageQueue = new MessageQueue(topic, brokerName, 1);
+
+        offsetStore.updateOffset(messageQueue, 1024, false);
+        assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1024);
+
+        offsetStore.updateOffset(messageQueue, 1023, false);
+        assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1023);
+
+        offsetStore.updateOffset(messageQueue, 1022, true);
+        assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1023);
+    }
+
+    @Test
+    public void testReadOffset_WithException() throws Exception {
+        OffsetStore offsetStore = new RemoteBrokerOffsetStore(mQClientFactory, group);
+        MessageQueue messageQueue = new MessageQueue(topic, brokerName, 2);
+
+        offsetStore.updateOffset(messageQueue, 1024, false);
+
+        doThrow(new MQBrokerException(-1, ""))
+            .when(mqClientAPI).queryConsumerOffset(anyString(), any(QueryConsumerOffsetRequestHeader.class), anyLong());
+        assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(-1);
+
+        doThrow(new RemotingException("", null))
+            .when(mqClientAPI).queryConsumerOffset(anyString(), any(QueryConsumerOffsetRequestHeader.class), anyLong());
+        assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(-2);
+    }
+
+    @Test
+    public void testReadOffset_Success() throws Exception {
+        OffsetStore offsetStore = new RemoteBrokerOffsetStore(mQClientFactory, group);
+        final MessageQueue messageQueue = new MessageQueue(topic, brokerName, 3);
+
+        doAnswer(new Answer() {
+            @Override public Object answer(InvocationOnMock mock) throws Throwable {
+                UpdateConsumerOffsetRequestHeader updateRequestHeader = mock.getArgument(1);
+                when(mqClientAPI.queryConsumerOffset(anyString(), any(QueryConsumerOffsetRequestHeader.class), anyLong())).thenReturn(updateRequestHeader.getCommitOffset());
+                return null;
+            }
+        }).when(mqClientAPI).updateConsumerOffsetOneway(any(String.class), any(UpdateConsumerOffsetRequestHeader.class), any(Long.class));
+
+        offsetStore.updateOffset(messageQueue, 1024, false);
+        offsetStore.persist(messageQueue);
+        assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1024);
+
+        offsetStore.updateOffset(messageQueue, 1023, false);
+        offsetStore.persist(messageQueue);
+        assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1023);
+
+        offsetStore.updateOffset(messageQueue, 1022, true);
+        offsetStore.persist(messageQueue);
+        assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1023);
+
+        offsetStore.updateOffset(messageQueue, 1025, false);
+        offsetStore.persistAll(new HashSet<>(Collections.singletonList(messageQueue)));
+        assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_STORE)).isEqualTo(1025);
+    }
+
+
+
+    @Test
+    public void testRemoveOffset() throws Exception {
+        OffsetStore offsetStore = new RemoteBrokerOffsetStore(mQClientFactory, group);
+        final MessageQueue messageQueue = new MessageQueue(topic, brokerName, 4);
+
+        offsetStore.updateOffset(messageQueue, 1024, false);
+        assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1024);
+
+        offsetStore.removeOffset(messageQueue);
+        assertThat(offsetStore.readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(-1);
+    }
+}
\ No newline at end of file


[05/14] incubator-rocketmq git commit: [ROCKETMQ-52] Add unit tests for MQClientInstance

Posted by st...@apache.org.
[ROCKETMQ-52] Add unit tests for MQClientInstance


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/8e26a401
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/8e26a401
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/8e26a401

Branch: refs/heads/master
Commit: 8e26a40109b1865ab701c10c769ae76af7ddafee
Parents: 2e226f1
Author: yukon <yu...@apache.org>
Authored: Tue Jan 17 17:22:15 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Jan 17 17:22:15 2017 +0800

----------------------------------------------------------------------
 .../impl/factory/MQClientInstanceTest.java      | 123 +++++++++++++++++++
 1 file changed, 123 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/8e26a401/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
new file mode 100644
index 0000000..956cdd9
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.factory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.admin.MQAdminExtInner;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.MQClientManager;
+import org.apache.rocketmq.client.impl.consumer.MQConsumerInner;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+@RunWith(MockitoJUnitRunner.class)
+public class MQClientInstanceTest {
+
+    @Mock
+    private static MQClientAPIImpl mqClientAPI;
+    @InjectMocks
+    private static MQClientInstance mqClientInstance =  MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());;
+
+    private String topic = "FooBar";
+    private String group = "FooBarGroup";
+
+    @Test
+    public void testTopicRouteData2TopicPublishInfo() {
+        TopicRouteData topicRouteData = new TopicRouteData();
+
+        topicRouteData.setFilterServerTable(new HashMap<String, List<String>>());
+        List<BrokerData> brokerDataList = new ArrayList<>();
+        BrokerData brokerData = new BrokerData();
+        brokerData.setBrokerName("BrokerA");
+        brokerData.setCluster("DefaultCluster");
+        HashMap<Long, String> brokerAddrs = new HashMap<>();
+        brokerAddrs.put(0L, "127.0.0.1");
+        brokerData.setBrokerAddrs(brokerAddrs);
+        brokerDataList.add(brokerData);
+        topicRouteData.setBrokerDatas(brokerDataList);
+
+        List<QueueData> queueDataList = new ArrayList<>();
+        QueueData queueData = new QueueData();
+        queueData.setBrokerName("BrokerA");
+        queueData.setPerm(2);
+        queueData.setReadQueueNums(3);
+        queueData.setWriteQueueNums(4);
+        queueData.setTopicSynFlag(0);
+        queueDataList.add(queueData);
+        topicRouteData.setQueueDatas(queueDataList);
+
+        TopicPublishInfo topicPublishInfo = MQClientInstance.topicRouteData2TopicPublishInfo(topic, topicRouteData);
+
+        assertThat(topicPublishInfo.isHaveTopicRouterInfo()).isFalse();
+        assertThat(topicPublishInfo.getMessageQueueList().size()).isEqualTo(4);
+    }
+
+    @Test
+    public void testRegisterProducer() {
+        boolean flag = mqClientInstance.registerProducer(group, mock(DefaultMQProducerImpl.class));
+        assertThat(flag).isTrue();
+
+        flag = mqClientInstance.registerProducer(group, mock(DefaultMQProducerImpl.class));
+        assertThat(flag).isFalse();
+
+        mqClientInstance.unregisterProducer(group);
+        flag = mqClientInstance.registerProducer(group, mock(DefaultMQProducerImpl.class));
+        assertThat(flag).isTrue();
+    }
+
+    @Test
+    public void testRegisterConsumer() throws RemotingException, InterruptedException, MQBrokerException {
+        boolean flag = mqClientInstance.registerConsumer(group, mock(MQConsumerInner.class));
+        assertThat(flag).isTrue();
+
+        flag = mqClientInstance.registerConsumer(group, mock(MQConsumerInner.class));
+        assertThat(flag).isFalse();
+
+        mqClientInstance.unregisterConsumer(group);
+        flag = mqClientInstance.registerConsumer(group, mock(MQConsumerInner.class));
+        assertThat(flag).isTrue();
+    }
+
+    @Test
+    public void testRegisterAdminExt() {
+        boolean flag = mqClientInstance.registerAdminExt(group, mock(MQAdminExtInner.class));
+        assertThat(flag).isTrue();
+
+        flag = mqClientInstance.registerAdminExt(group, mock(MQAdminExtInner.class));
+        assertThat(flag).isFalse();
+
+        mqClientInstance.unregisterAdminExt(group);
+        flag = mqClientInstance.registerAdminExt(group, mock(MQAdminExtInner.class));
+        assertThat(flag).isTrue();
+    }
+}
\ No newline at end of file


[08/14] incubator-rocketmq git commit: [ROCKETMQ-52] Add unit tests for LatencyFaultTolerance

Posted by st...@apache.org.
[ROCKETMQ-52] Add unit tests for LatencyFaultTolerance


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/0f9f55f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/0f9f55f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/0f9f55f0

Branch: refs/heads/master
Commit: 0f9f55f0f39107b304ee26c603307e9bc5bf2c99
Parents: 4dd6c38
Author: yukon <yu...@apache.org>
Authored: Tue Jan 17 22:03:07 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Jan 17 22:03:07 2017 +0800

----------------------------------------------------------------------
 .../latency/LatencyFaultToleranceImplTest.java  | 67 ++++++++++++++++++++
 1 file changed, 67 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/0f9f55f0/client/src/test/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImplTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImplTest.java b/client/src/test/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImplTest.java
new file mode 100644
index 0000000..86690e4
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImplTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.latency;
+
+import java.util.concurrent.TimeUnit;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class LatencyFaultToleranceImplTest {
+    private LatencyFaultTolerance<String> latencyFaultTolerance;
+    private String brokerName = "BrokerA";
+    private String anotherBrokerName = "BrokerB";
+
+    @Before
+    public void init() {
+        latencyFaultTolerance = new LatencyFaultToleranceImpl();
+    }
+
+    @Test
+    public void testUpdateFaultItem() throws Exception {
+        latencyFaultTolerance.updateFaultItem(brokerName, 3000, 3000);
+        assertThat(latencyFaultTolerance.isAvailable(brokerName)).isFalse();
+        assertThat(latencyFaultTolerance.isAvailable(anotherBrokerName)).isTrue();
+    }
+
+    @Test
+    public void testIsAvailable() throws Exception {
+        latencyFaultTolerance.updateFaultItem(brokerName, 3000, 50);
+        assertThat(latencyFaultTolerance.isAvailable(brokerName)).isFalse();
+
+        TimeUnit.MILLISECONDS.sleep(70);
+        assertThat(latencyFaultTolerance.isAvailable(brokerName)).isTrue();
+    }
+
+    @Test
+    public void testRemove() throws Exception {
+        latencyFaultTolerance.updateFaultItem(brokerName, 3000, 3000);
+        assertThat(latencyFaultTolerance.isAvailable(brokerName)).isFalse();
+        latencyFaultTolerance.remove(brokerName);
+        assertThat(latencyFaultTolerance.isAvailable(brokerName)).isTrue();
+    }
+
+    @Test
+    public void testPickOneAtLeast() throws Exception {
+        latencyFaultTolerance.updateFaultItem(brokerName, 1000, 3000);
+        assertThat(latencyFaultTolerance.pickOneAtLeast()).isEqualTo(brokerName);
+
+        latencyFaultTolerance.updateFaultItem(anotherBrokerName, 1001, 3000);
+        assertThat(latencyFaultTolerance.pickOneAtLeast()).isEqualTo(brokerName);
+    }
+}
\ No newline at end of file


[02/14] incubator-rocketmq git commit: [ROCKETMQ-52] Add unit tests for MessageQueueSelector

Posted by st...@apache.org.
[ROCKETMQ-52] Add unit tests for MessageQueueSelector


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/c0e76299
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/c0e76299
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/c0e76299

Branch: refs/heads/master
Commit: c0e762995b1669b3febc3a5a2bac76577981696d
Parents: 03b1216
Author: yukon <yu...@apache.org>
Authored: Tue Jan 17 13:28:56 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Tue Jan 17 13:28:56 2017 +0800

----------------------------------------------------------------------
 .../client/common/ThreadLocalIndexTest.java     |  2 +-
 .../selector/SelectMessageQueueByHashTest.java  | 49 ++++++++++++++++++++
 2 files changed, 50 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/c0e76299/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java b/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java
index b937e45..350cde7 100644
--- a/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/common/ThreadLocalIndexTest.java
@@ -23,7 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 public class ThreadLocalIndexTest {
 
     @Test
-    public void getAndIncrement() throws Exception {
+    public void testGetAndIncrement() throws Exception {
         ThreadLocalIndex localIndex = new ThreadLocalIndex();
         int initialVal = localIndex.getAndIncrement();
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/c0e76299/client/src/test/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByHashTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByHashTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByHashTest.java
new file mode 100644
index 0000000..64ef21d
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByHashTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.producer.selector;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class SelectMessageQueueByHashTest {
+
+    private String topic = "FooBar";
+
+    @Test
+    public void testSelect() throws Exception {
+        SelectMessageQueueByHash selector = new SelectMessageQueueByHash();
+
+        Message message = new Message(topic, new byte[] {});
+
+        List<MessageQueue> messageQueues = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            MessageQueue messageQueue = new MessageQueue(topic, "DefaultBroker", i);
+            messageQueues.add(messageQueue);
+        }
+
+        String orderId = "123";
+        String anotherOrderId = "234";
+        MessageQueue selected = selector.select(messageQueues, message, orderId);
+        assertThat(selector.select(messageQueues, message, anotherOrderId)).isNotEqualTo(selected);
+    }
+
+}
\ No newline at end of file