You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/08/11 11:59:38 UTC

incubator-rocketmq git commit: [ROCKETMQ-209]Remove duplicated code in class MQClientAPIImpl

Repository: incubator-rocketmq
Updated Branches:
  refs/heads/develop ccc2235ae -> ffad6566b


[ROCKETMQ-209]Remove duplicated code in class MQClientAPIImpl

Author: Ritabrata Moitra <rm...@thoughtworks.com>

Closes #134 from Ritabrata-TW/master.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/ffad6566
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/ffad6566
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/ffad6566

Branch: refs/heads/develop
Commit: ffad6566b8aee86b87b34f2c77ab2ae3b9c15b1c
Parents: ccc2235
Author: Ritabrata Moitra <rm...@thoughtworks.com>
Authored: Fri Aug 11 19:59:28 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Fri Aug 11 19:59:28 2017 +0800

----------------------------------------------------------------------
 .../rocketmq/client/impl/MQClientAPIImpl.java   | 160 ++++++++++---------
 .../rocketmq/store/DefaultMessageStore.java     |  47 +++---
 .../rocketmq/store/DefaultMessageStoreTest.java |  21 +--
 3 files changed, 114 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ffad6566/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 4244bdd..ae9ed6c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -18,14 +18,14 @@ package org.apache.rocketmq.client.impl;
 
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-import java.util.Iterator;
-import java.util.Collections;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.consumer.PullCallback;
@@ -49,12 +49,12 @@ import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.admin.ConsumeStats;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
 import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageBatch;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
-import org.apache.rocketmq.common.message.MessageBatch;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.namesrv.TopAddressing;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
@@ -152,7 +152,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.slf4j.Logger;
 
-
 public class MQClientAPIImpl {
 
     private final static Logger log = ClientLogger.getLog();
@@ -169,7 +168,8 @@ public class MQClientAPIImpl {
     private String nameSrvAddr = null;
     private ClientConfig clientConfig;
 
-    public MQClientAPIImpl(final NettyClientConfig nettyClientConfig, final ClientRemotingProcessor clientRemotingProcessor,
+    public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
+        final ClientRemotingProcessor clientRemotingProcessor,
         RPCHook rpcHook, final ClientConfig clientConfig) {
         this.clientConfig = clientConfig;
         topAddressing = new TopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName());
@@ -233,7 +233,8 @@ public class MQClientAPIImpl {
         this.remotingClient.shutdown();
     }
 
-    public void createSubscriptionGroup(final String addr, final SubscriptionGroupConfig config, final long timeoutMillis)
+    public void createSubscriptionGroup(final String addr, final SubscriptionGroupConfig config,
+        final long timeoutMillis)
         throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null);
 
@@ -255,7 +256,8 @@ public class MQClientAPIImpl {
 
     }
 
-    public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final long timeoutMillis)
+    public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig,
+        final long timeoutMillis)
         throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
         requestHeader.setTopic(topicConfig.getTopicName());
@@ -284,14 +286,14 @@ public class MQClientAPIImpl {
     }
 
     public SendResult sendMessage(//
-                                  final String addr, // 1
-                                  final String brokerName, // 2
-                                  final Message msg, // 3
-                                  final SendMessageRequestHeader requestHeader, // 4
-                                  final long timeoutMillis, // 5
-                                  final CommunicationMode communicationMode, // 6
-                                  final SendMessageContext context, // 7
-                                  final DefaultMQProducerImpl producer // 8
+        final String addr, // 1
+        final String brokerName, // 2
+        final Message msg, // 3
+        final SendMessageRequestHeader requestHeader, // 4
+        final long timeoutMillis, // 5
+        final CommunicationMode communicationMode, // 6
+        final SendMessageContext context, // 7
+        final DefaultMQProducerImpl producer // 8
     ) throws RemotingException, MQBrokerException, InterruptedException {
         return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer);
     }
@@ -340,11 +342,11 @@ public class MQClientAPIImpl {
     }
 
     private SendResult sendMessageSync(//
-                                       final String addr, //
-                                       final String brokerName, //
-                                       final Message msg, //
-                                       final long timeoutMillis, //
-                                       final RemotingCommand request//
+        final String addr, //
+        final String brokerName, //
+        final Message msg, //
+        final long timeoutMillis, //
+        final RemotingCommand request//
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
         assert response != null;
@@ -619,7 +621,8 @@ public class MQClientAPIImpl {
         return this.processPullResponse(response);
     }
 
-    private PullResult processPullResponse(final RemotingCommand response) throws MQBrokerException, RemotingCommandException {
+    private PullResult processPullResponse(
+        final RemotingCommand response) throws MQBrokerException, RemotingCommandException {
         PullStatus pullStatus = PullStatus.NO_NEW_MSG;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS:
@@ -668,7 +671,8 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-    public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp, final long timeoutMillis)
+    public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp,
+        final long timeoutMillis)
         throws RemotingException, MQBrokerException, InterruptedException {
         SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader();
         requestHeader.setTopic(topic);
@@ -767,7 +771,8 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-    public long getEarliestMsgStoretime(final String addr, final String topic, final int queueId, final long timeoutMillis)
+    public long getEarliestMsgStoretime(final String addr, final String topic, final int queueId,
+        final long timeoutMillis)
         throws RemotingException, MQBrokerException, InterruptedException {
         GetEarliestMsgStoretimeRequestHeader requestHeader = new GetEarliestMsgStoretimeRequestHeader();
         requestHeader.setTopic(topic);
@@ -1009,7 +1014,8 @@ public class MQClientAPIImpl {
         }
     }
 
-    public TopicStatsTable getTopicStatsInfo(final String addr, final String topic, final long timeoutMillis) throws InterruptedException,
+    public TopicStatsTable getTopicStatsInfo(final String addr, final String topic,
+        final long timeoutMillis) throws InterruptedException,
         RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
         GetTopicStatsInfoRequestHeader requestHeader = new GetTopicStatsInfoRequestHeader();
         requestHeader.setTopic(topic);
@@ -1036,7 +1042,8 @@ public class MQClientAPIImpl {
         return getConsumeStats(addr, consumerGroup, null, timeoutMillis);
     }
 
-    public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic, final long timeoutMillis)
+    public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic,
+        final long timeoutMillis)
         throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
         MQBrokerException {
         GetConsumeStatsRequestHeader requestHeader = new GetConsumeStatsRequestHeader();
@@ -1059,7 +1066,8 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-    public ProducerConnection getProducerConnectionList(final String addr, final String producerGroup, final long timeoutMillis)
+    public ProducerConnection getProducerConnectionList(final String addr, final String producerGroup,
+        final long timeoutMillis)
         throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
         MQBrokerException {
         GetProducerConnectionListRequestHeader requestHeader = new GetProducerConnectionListRequestHeader();
@@ -1080,7 +1088,8 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-    public ConsumerConnection getConsumerConnectionList(final String addr, final String consumerGroup, final long timeoutMillis)
+    public ConsumerConnection getConsumerConnectionList(final String addr, final String consumerGroup,
+        final long timeoutMillis)
         throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
         MQBrokerException {
         GetConsumerConnectionListRequestHeader requestHeader = new GetConsumerConnectionListRequestHeader();
@@ -1160,7 +1169,8 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-    public ClusterInfo getBrokerClusterInfo(final long timeoutMillis) throws InterruptedException, RemotingTimeoutException,
+    public ClusterInfo getBrokerClusterInfo(
+        final long timeoutMillis) throws InterruptedException, RemotingTimeoutException,
         RemotingSendRequestException, RemotingConnectException, MQBrokerException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null);
 
@@ -1179,33 +1189,18 @@ public class MQClientAPIImpl {
 
     public TopicRouteData getDefaultTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
         throws RemotingException, MQClientException, InterruptedException {
-        GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
-        requestHeader.setTopic(topic);
 
-        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
-
-        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.TOPIC_NOT_EXIST: {
-                // TODO LOG
-                break;
-            }
-            case ResponseCode.SUCCESS: {
-                byte[] body = response.getBody();
-                if (body != null) {
-                    return TopicRouteData.decode(body, TopicRouteData.class);
-                }
-            }
-            default:
-                break;
-        }
-
-        throw new MQClientException(response.getCode(), response.getRemark());
+        return getTopicRouteInfoFromNameServer(topic, timeoutMillis, false);
     }
 
     public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
         throws RemotingException, MQClientException, InterruptedException {
+
+        return getTopicRouteInfoFromNameServer(topic, timeoutMillis, true);
+    }
+
+    public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
+        boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
         GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
         requestHeader.setTopic(topic);
 
@@ -1215,8 +1210,10 @@ public class MQClientAPIImpl {
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.TOPIC_NOT_EXIST: {
-                if (!topic.equals(MixAll.DEFAULT_TOPIC))
+                if (allowTopicNotExist && !topic.equals(MixAll.DEFAULT_TOPIC)) {
                     log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
+                }
+                // TODO :- Log when if condition is not satisfied
                 break;
             }
             case ResponseCode.SUCCESS: {
@@ -1252,7 +1249,8 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-    public int wipeWritePermOfBroker(final String namesrvAddr, String brokerName, final long timeoutMillis) throws RemotingCommandException,
+    public int wipeWritePermOfBroker(final String namesrvAddr, String brokerName,
+        final long timeoutMillis) throws RemotingCommandException,
         RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
         WipeWritePermOfBrokerRequestHeader requestHeader = new WipeWritePermOfBrokerRequestHeader();
         requestHeader.setBrokerName(brokerName);
@@ -1471,8 +1469,10 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-    public Map<String, Map<MessageQueue, Long>> invokeBrokerToGetConsumerStatus(final String addr, final String topic, final String group,
-                                                                                final String clientAddr, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
+    public Map<String, Map<MessageQueue, Long>> invokeBrokerToGetConsumerStatus(final String addr, final String topic,
+        final String group,
+        final String clientAddr,
+        final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
         GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader();
         requestHeader.setTopic(topic);
         requestHeader.setGroup(group);
@@ -1519,7 +1519,8 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-    public List<QueueTimeSpan> queryConsumeTimeSpan(final String addr, final String topic, final String group, final long timeoutMillis)
+    public List<QueueTimeSpan> queryConsumeTimeSpan(final String addr, final String topic, final String group,
+        final long timeoutMillis)
         throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
         MQBrokerException {
         QueryConsumeTimeSpanRequestHeader requestHeader = new QueryConsumeTimeSpanRequestHeader();
@@ -1593,7 +1594,8 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-    public TopicList getSystemTopicList(final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
+    public TopicList getSystemTopicList(
+        final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS, null);
 
         RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
@@ -1642,7 +1644,8 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-    public boolean cleanExpiredConsumeQueue(final String addr, long timeoutMillis) throws MQClientException, RemotingConnectException,
+    public boolean cleanExpiredConsumeQueue(final String addr,
+        long timeoutMillis) throws MQClientException, RemotingConnectException,
         RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLEAN_EXPIRED_CONSUMEQUEUE, null);
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
@@ -1658,7 +1661,8 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-    public boolean cleanUnusedTopicByAddr(final String addr, long timeoutMillis) throws MQClientException, RemotingConnectException,
+    public boolean cleanUnusedTopicByAddr(final String addr,
+        long timeoutMillis) throws MQClientException, RemotingConnectException,
         RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLEAN_UNUSED_TOPIC, null);
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
@@ -1674,7 +1678,8 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-    public ConsumerRunningInfo getConsumerRunningInfo(final String addr, String consumerGroup, String clientId, boolean jstack,
+    public ConsumerRunningInfo getConsumerRunningInfo(final String addr, String consumerGroup, String clientId,
+        boolean jstack,
         final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
         GetConsumerRunningInfoRequestHeader requestHeader = new GetConsumerRunningInfoRequestHeader();
         requestHeader.setConsumerGroup(consumerGroup);
@@ -1731,7 +1736,8 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-    public Map<Integer, Long> queryCorrectionOffset(final String addr, final String topic, final String group, Set<String> filterGroup,
+    public Map<Integer, Long> queryCorrectionOffset(final String addr, final String topic, final String group,
+        Set<String> filterGroup,
         long timeoutMillis) throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
         InterruptedException {
         QueryCorrectionOffsetHeader requestHeader = new QueryCorrectionOffsetHeader();
@@ -1854,7 +1860,8 @@ public class MQClientAPIImpl {
     }
 
     public void cloneGroupOffset(final String addr, final String srcGroup, final String destGroup, final String topic,
-        final boolean isOffline, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
+        final boolean isOffline,
+        final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
         CloneGroupOffsetRequestHeader requestHeader = new CloneGroupOffsetRequestHeader();
         requestHeader.setSrcGroup(srcGroup);
         requestHeader.setDestGroup(destGroup);
@@ -1902,13 +1909,15 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-    public Set<String> getClusterList(String topic, long timeoutMillis) throws MQClientException, RemotingConnectException,
+    public Set<String> getClusterList(String topic,
+        long timeoutMillis) throws MQClientException, RemotingConnectException,
         RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
         // todo:jodie
         return Collections.EMPTY_SET;
     }
 
-    public ConsumeStatsList fetchConsumeStatsInBroker(String brokerAddr, boolean isOrder, long timeoutMillis) throws MQClientException,
+    public ConsumeStatsList fetchConsumeStatsInBroker(String brokerAddr, boolean isOrder,
+        long timeoutMillis) throws MQClientException,
         RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
         GetConsumeStatsInBrokerHeader requestHeader = new GetConsumeStatsInBrokerHeader();
         requestHeader.setIsOrder(isOrder);
@@ -1932,7 +1941,8 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-    public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException,
+    public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr,
+        long timeoutMillis) throws InterruptedException,
         RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
         RemotingCommand response = this.remotingClient
@@ -1948,7 +1958,8 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-    public TopicConfigSerializeWrapper getAllTopicConfig(final String addr, long timeoutMillis) throws RemotingConnectException,
+    public TopicConfigSerializeWrapper getAllTopicConfig(final String addr,
+        long timeoutMillis) throws RemotingConnectException,
         RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
 
@@ -2028,9 +2039,10 @@ public class MQClientAPIImpl {
         return configMap;
     }
 
-    public QueryConsumeQueueResponseBody queryConsumeQueue(final String brokerAddr, final String topic, final int queueId,
-                                                   final long index, final int count, final String consumerGroup,
-                                                   final long timeoutMillis) throws InterruptedException,
+    public QueryConsumeQueueResponseBody queryConsumeQueue(final String brokerAddr, final String topic,
+        final int queueId,
+        final long index, final int count, final String consumerGroup,
+        final long timeoutMillis) throws InterruptedException,
         RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
 
         QueryConsumeQueueRequestHeader requestHeader = new QueryConsumeQueueRequestHeader();
@@ -2054,8 +2066,8 @@ public class MQClientAPIImpl {
     }
 
     public void checkClientInBroker(final String brokerAddr, final String consumerGroup,
-                                    final String clientId, final SubscriptionData subscriptionData,
-                                    final long timeoutMillis)
+        final String clientId, final SubscriptionData subscriptionData,
+        final long timeoutMillis)
         throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
         RemotingConnectException, MQClientException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_CLIENT_CONFIG, null);

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ffad6566/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 7b5ac45..49a1eba 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -413,8 +413,9 @@ public class DefaultMessageStore implements MessageStore {
         return commitLog;
     }
 
-    public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums,
-                                       final MessageFilter messageFilter) {
+    public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
+        final int maxMsgNums,
+        final MessageFilter messageFilter) {
         if (this.shutdown) {
             log.warn("message store has shutdown, so getMessage is forbidden");
             return null;
@@ -717,22 +718,27 @@ public class DefaultMessageStore implements MessageStore {
             long minLogicOffset = logicQueue.getMinLogicOffset();
 
             SelectMappedBufferResult result = logicQueue.getIndexBuffer(minLogicOffset / ConsumeQueue.CQ_STORE_UNIT_SIZE);
-            if (result != null) {
-                try {
-                    final long phyOffset = result.getByteBuffer().getLong();
-                    final int size = result.getByteBuffer().getInt();
-                    long storeTime = this.getCommitLog().pickupStoreTimestamp(phyOffset, size);
-                    return storeTime;
-                } catch (Exception e) {
-                } finally {
-                    result.release();
-                }
-            }
+            return getStoreTime(result);
         }
 
         return -1;
     }
 
+    private long getStoreTime(SelectMappedBufferResult result) {
+        if (result != null) {
+            try {
+                final long phyOffset = result.getByteBuffer().getLong();
+                final int size = result.getByteBuffer().getInt();
+                long storeTime = this.getCommitLog().pickupStoreTimestamp(phyOffset, size);
+                return storeTime;
+            } catch (Exception e) {
+            } finally {
+                result.release();
+            }
+        }
+        return -1;
+    }
+
     @Override
     public long getEarliestMessageTime() {
         final long minPhyOffset = this.getMinPhyOffset();
@@ -745,17 +751,7 @@ public class DefaultMessageStore implements MessageStore {
         ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId);
         if (logicQueue != null) {
             SelectMappedBufferResult result = logicQueue.getIndexBuffer(consumeQueueOffset);
-            if (result != null) {
-                try {
-                    final long phyOffset = result.getByteBuffer().getLong();
-                    final int size = result.getByteBuffer().getInt();
-                    long storeTime = this.getCommitLog().pickupStoreTimestamp(phyOffset, size);
-                    return storeTime;
-                } catch (Exception ignored) {
-                } finally {
-                    result.release();
-                }
-            }
+            return getStoreTime(result);
         }
 
         return -1;
@@ -955,7 +951,8 @@ public class DefaultMessageStore implements MessageStore {
         }
     }
 
-    public Map<String, Long> getMessageIds(final String topic, final int queueId, long minOffset, long maxOffset, SocketAddress storeHost) {
+    public Map<String, Long> getMessageIds(final String topic, final int queueId, long minOffset, long maxOffset,
+        SocketAddress storeHost) {
         Map<String, Long> messageIds = new HashMap<String, Long>();
         if (this.shutdown) {
             return messageIds;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ffad6566/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
----------------------------------------------------------------------
diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index ac78a1d..a81f328 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -77,20 +77,7 @@ public class DefaultMessageStoreTest {
         assertTrue(load);
 
         master.start();
-        try {
-            for (long i = 0; i < totalMsgs; i++) {
-                master.putMessage(buildMessage());
-            }
-
-            for (long i = 0; i < totalMsgs; i++) {
-                GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
-                assertThat(result).isNotNull();
-                result.release();
-            }
-        } finally {
-            master.shutdown();
-            master.destroy();
-        }
+        verifyThatMasterIsFunctional(totalMsgs, master);
     }
 
     public MessageExtBrokerInner buildMessage() {
@@ -121,6 +108,10 @@ public class DefaultMessageStoreTest {
         assertTrue(load);
 
         master.start();
+        verifyThatMasterIsFunctional(totalMsgs, master);
+    }
+
+    private void verifyThatMasterIsFunctional(long totalMsgs, MessageStore master) {
         try {
             for (long i = 0; i < totalMsgs; i++) {
                 master.putMessage(buildMessage());
@@ -171,7 +162,7 @@ public class DefaultMessageStoreTest {
     private class MyMessageArrivingListener implements MessageArrivingListener {
         @Override
         public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
-                             byte[] filterBitMap, Map<String, String> properties) {
+            byte[] filterBitMap, Map<String, String> properties) {
         }
     }
 }