You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/08/11 11:59:38 UTC
incubator-rocketmq git commit: [ROCKETMQ-209]Remove duplicated code
in class MQClientAPIImpl
Repository: incubator-rocketmq
Updated Branches:
refs/heads/develop ccc2235ae -> ffad6566b
[ROCKETMQ-209]Remove duplicated code in class MQClientAPIImpl
Author: Ritabrata Moitra <rm...@thoughtworks.com>
Closes #134 from Ritabrata-TW/master.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/ffad6566
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/ffad6566
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/ffad6566
Branch: refs/heads/develop
Commit: ffad6566b8aee86b87b34f2c77ab2ae3b9c15b1c
Parents: ccc2235
Author: Ritabrata Moitra <rm...@thoughtworks.com>
Authored: Fri Aug 11 19:59:28 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Fri Aug 11 19:59:28 2017 +0800
----------------------------------------------------------------------
.../rocketmq/client/impl/MQClientAPIImpl.java | 160 ++++++++++---------
.../rocketmq/store/DefaultMessageStore.java | 47 +++---
.../rocketmq/store/DefaultMessageStoreTest.java | 21 +--
3 files changed, 114 insertions(+), 114 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ffad6566/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 4244bdd..ae9ed6c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -18,14 +18,14 @@ package org.apache.rocketmq.client.impl;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
-import java.util.Iterator;
-import java.util.Collections;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.PullCallback;
@@ -49,12 +49,12 @@ import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
-import org.apache.rocketmq.common.message.MessageBatch;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
@@ -152,7 +152,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.slf4j.Logger;
-
public class MQClientAPIImpl {
private final static Logger log = ClientLogger.getLog();
@@ -169,7 +168,8 @@ public class MQClientAPIImpl {
private String nameSrvAddr = null;
private ClientConfig clientConfig;
- public MQClientAPIImpl(final NettyClientConfig nettyClientConfig, final ClientRemotingProcessor clientRemotingProcessor,
+ public MQClientAPIImpl(final NettyClientConfig nettyClientConfig,
+ final ClientRemotingProcessor clientRemotingProcessor,
RPCHook rpcHook, final ClientConfig clientConfig) {
this.clientConfig = clientConfig;
topAddressing = new TopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName());
@@ -233,7 +233,8 @@ public class MQClientAPIImpl {
this.remotingClient.shutdown();
}
- public void createSubscriptionGroup(final String addr, final SubscriptionGroupConfig config, final long timeoutMillis)
+ public void createSubscriptionGroup(final String addr, final SubscriptionGroupConfig config,
+ final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null);
@@ -255,7 +256,8 @@ public class MQClientAPIImpl {
}
- public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final long timeoutMillis)
+ public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig,
+ final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(topicConfig.getTopicName());
@@ -284,14 +286,14 @@ public class MQClientAPIImpl {
}
public SendResult sendMessage(//
- final String addr, // 1
- final String brokerName, // 2
- final Message msg, // 3
- final SendMessageRequestHeader requestHeader, // 4
- final long timeoutMillis, // 5
- final CommunicationMode communicationMode, // 6
- final SendMessageContext context, // 7
- final DefaultMQProducerImpl producer // 8
+ final String addr, // 1
+ final String brokerName, // 2
+ final Message msg, // 3
+ final SendMessageRequestHeader requestHeader, // 4
+ final long timeoutMillis, // 5
+ final CommunicationMode communicationMode, // 6
+ final SendMessageContext context, // 7
+ final DefaultMQProducerImpl producer // 8
) throws RemotingException, MQBrokerException, InterruptedException {
return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer);
}
@@ -340,11 +342,11 @@ public class MQClientAPIImpl {
}
private SendResult sendMessageSync(//
- final String addr, //
- final String brokerName, //
- final Message msg, //
- final long timeoutMillis, //
- final RemotingCommand request//
+ final String addr, //
+ final String brokerName, //
+ final Message msg, //
+ final long timeoutMillis, //
+ final RemotingCommand request//
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
@@ -619,7 +621,8 @@ public class MQClientAPIImpl {
return this.processPullResponse(response);
}
- private PullResult processPullResponse(final RemotingCommand response) throws MQBrokerException, RemotingCommandException {
+ private PullResult processPullResponse(
+ final RemotingCommand response) throws MQBrokerException, RemotingCommandException {
PullStatus pullStatus = PullStatus.NO_NEW_MSG;
switch (response.getCode()) {
case ResponseCode.SUCCESS:
@@ -668,7 +671,8 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
- public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp, final long timeoutMillis)
+ public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp,
+ final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader();
requestHeader.setTopic(topic);
@@ -767,7 +771,8 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
- public long getEarliestMsgStoretime(final String addr, final String topic, final int queueId, final long timeoutMillis)
+ public long getEarliestMsgStoretime(final String addr, final String topic, final int queueId,
+ final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
GetEarliestMsgStoretimeRequestHeader requestHeader = new GetEarliestMsgStoretimeRequestHeader();
requestHeader.setTopic(topic);
@@ -1009,7 +1014,8 @@ public class MQClientAPIImpl {
}
}
- public TopicStatsTable getTopicStatsInfo(final String addr, final String topic, final long timeoutMillis) throws InterruptedException,
+ public TopicStatsTable getTopicStatsInfo(final String addr, final String topic,
+ final long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
GetTopicStatsInfoRequestHeader requestHeader = new GetTopicStatsInfoRequestHeader();
requestHeader.setTopic(topic);
@@ -1036,7 +1042,8 @@ public class MQClientAPIImpl {
return getConsumeStats(addr, consumerGroup, null, timeoutMillis);
}
- public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic, final long timeoutMillis)
+ public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic,
+ final long timeoutMillis)
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
MQBrokerException {
GetConsumeStatsRequestHeader requestHeader = new GetConsumeStatsRequestHeader();
@@ -1059,7 +1066,8 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
- public ProducerConnection getProducerConnectionList(final String addr, final String producerGroup, final long timeoutMillis)
+ public ProducerConnection getProducerConnectionList(final String addr, final String producerGroup,
+ final long timeoutMillis)
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
MQBrokerException {
GetProducerConnectionListRequestHeader requestHeader = new GetProducerConnectionListRequestHeader();
@@ -1080,7 +1088,8 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
- public ConsumerConnection getConsumerConnectionList(final String addr, final String consumerGroup, final long timeoutMillis)
+ public ConsumerConnection getConsumerConnectionList(final String addr, final String consumerGroup,
+ final long timeoutMillis)
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
MQBrokerException {
GetConsumerConnectionListRequestHeader requestHeader = new GetConsumerConnectionListRequestHeader();
@@ -1160,7 +1169,8 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
- public ClusterInfo getBrokerClusterInfo(final long timeoutMillis) throws InterruptedException, RemotingTimeoutException,
+ public ClusterInfo getBrokerClusterInfo(
+ final long timeoutMillis) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null);
@@ -1179,33 +1189,18 @@ public class MQClientAPIImpl {
public TopicRouteData getDefaultTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
throws RemotingException, MQClientException, InterruptedException {
- GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
- requestHeader.setTopic(topic);
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
-
- RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.TOPIC_NOT_EXIST: {
- // TODO LOG
- break;
- }
- case ResponseCode.SUCCESS: {
- byte[] body = response.getBody();
- if (body != null) {
- return TopicRouteData.decode(body, TopicRouteData.class);
- }
- }
- default:
- break;
- }
-
- throw new MQClientException(response.getCode(), response.getRemark());
+ return getTopicRouteInfoFromNameServer(topic, timeoutMillis, false);
}
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
throws RemotingException, MQClientException, InterruptedException {
+
+ return getTopicRouteInfoFromNameServer(topic, timeoutMillis, true);
+ }
+
+ public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
+ boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
requestHeader.setTopic(topic);
@@ -1215,8 +1210,10 @@ public class MQClientAPIImpl {
assert response != null;
switch (response.getCode()) {
case ResponseCode.TOPIC_NOT_EXIST: {
- if (!topic.equals(MixAll.DEFAULT_TOPIC))
+ if (allowTopicNotExist && !topic.equals(MixAll.DEFAULT_TOPIC)) {
log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
+ }
+ // TODO :- Log when if condition is not satisfied
break;
}
case ResponseCode.SUCCESS: {
@@ -1252,7 +1249,8 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
- public int wipeWritePermOfBroker(final String namesrvAddr, String brokerName, final long timeoutMillis) throws RemotingCommandException,
+ public int wipeWritePermOfBroker(final String namesrvAddr, String brokerName,
+ final long timeoutMillis) throws RemotingCommandException,
RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException {
WipeWritePermOfBrokerRequestHeader requestHeader = new WipeWritePermOfBrokerRequestHeader();
requestHeader.setBrokerName(brokerName);
@@ -1471,8 +1469,10 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
- public Map<String, Map<MessageQueue, Long>> invokeBrokerToGetConsumerStatus(final String addr, final String topic, final String group,
- final String clientAddr, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
+ public Map<String, Map<MessageQueue, Long>> invokeBrokerToGetConsumerStatus(final String addr, final String topic,
+ final String group,
+ final String clientAddr,
+ final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setGroup(group);
@@ -1519,7 +1519,8 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
- public List<QueueTimeSpan> queryConsumeTimeSpan(final String addr, final String topic, final String group, final long timeoutMillis)
+ public List<QueueTimeSpan> queryConsumeTimeSpan(final String addr, final String topic, final String group,
+ final long timeoutMillis)
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException,
MQBrokerException {
QueryConsumeTimeSpanRequestHeader requestHeader = new QueryConsumeTimeSpanRequestHeader();
@@ -1593,7 +1594,8 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
- public TopicList getSystemTopicList(final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
+ public TopicList getSystemTopicList(
+ final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS, null);
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
@@ -1642,7 +1644,8 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
- public boolean cleanExpiredConsumeQueue(final String addr, long timeoutMillis) throws MQClientException, RemotingConnectException,
+ public boolean cleanExpiredConsumeQueue(final String addr,
+ long timeoutMillis) throws MQClientException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLEAN_EXPIRED_CONSUMEQUEUE, null);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
@@ -1658,7 +1661,8 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
- public boolean cleanUnusedTopicByAddr(final String addr, long timeoutMillis) throws MQClientException, RemotingConnectException,
+ public boolean cleanUnusedTopicByAddr(final String addr,
+ long timeoutMillis) throws MQClientException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLEAN_UNUSED_TOPIC, null);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
@@ -1674,7 +1678,8 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
- public ConsumerRunningInfo getConsumerRunningInfo(final String addr, String consumerGroup, String clientId, boolean jstack,
+ public ConsumerRunningInfo getConsumerRunningInfo(final String addr, String consumerGroup, String clientId,
+ boolean jstack,
final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
GetConsumerRunningInfoRequestHeader requestHeader = new GetConsumerRunningInfoRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
@@ -1731,7 +1736,8 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
- public Map<Integer, Long> queryCorrectionOffset(final String addr, final String topic, final String group, Set<String> filterGroup,
+ public Map<Integer, Long> queryCorrectionOffset(final String addr, final String topic, final String group,
+ Set<String> filterGroup,
long timeoutMillis) throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException {
QueryCorrectionOffsetHeader requestHeader = new QueryCorrectionOffsetHeader();
@@ -1854,7 +1860,8 @@ public class MQClientAPIImpl {
}
public void cloneGroupOffset(final String addr, final String srcGroup, final String destGroup, final String topic,
- final boolean isOffline, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
+ final boolean isOffline,
+ final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
CloneGroupOffsetRequestHeader requestHeader = new CloneGroupOffsetRequestHeader();
requestHeader.setSrcGroup(srcGroup);
requestHeader.setDestGroup(destGroup);
@@ -1902,13 +1909,15 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
- public Set<String> getClusterList(String topic, long timeoutMillis) throws MQClientException, RemotingConnectException,
+ public Set<String> getClusterList(String topic,
+ long timeoutMillis) throws MQClientException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
// todo:jodie
return Collections.EMPTY_SET;
}
- public ConsumeStatsList fetchConsumeStatsInBroker(String brokerAddr, boolean isOrder, long timeoutMillis) throws MQClientException,
+ public ConsumeStatsList fetchConsumeStatsInBroker(String brokerAddr, boolean isOrder,
+ long timeoutMillis) throws MQClientException,
RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
GetConsumeStatsInBrokerHeader requestHeader = new GetConsumeStatsInBrokerHeader();
requestHeader.setIsOrder(isOrder);
@@ -1932,7 +1941,8 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
- public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException,
+ public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr,
+ long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
RemotingCommand response = this.remotingClient
@@ -1948,7 +1958,8 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
- public TopicConfigSerializeWrapper getAllTopicConfig(final String addr, long timeoutMillis) throws RemotingConnectException,
+ public TopicConfigSerializeWrapper getAllTopicConfig(final String addr,
+ long timeoutMillis) throws RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
@@ -2028,9 +2039,10 @@ public class MQClientAPIImpl {
return configMap;
}
- public QueryConsumeQueueResponseBody queryConsumeQueue(final String brokerAddr, final String topic, final int queueId,
- final long index, final int count, final String consumerGroup,
- final long timeoutMillis) throws InterruptedException,
+ public QueryConsumeQueueResponseBody queryConsumeQueue(final String brokerAddr, final String topic,
+ final int queueId,
+ final long index, final int count, final String consumerGroup,
+ final long timeoutMillis) throws InterruptedException,
RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException {
QueryConsumeQueueRequestHeader requestHeader = new QueryConsumeQueueRequestHeader();
@@ -2054,8 +2066,8 @@ public class MQClientAPIImpl {
}
public void checkClientInBroker(final String brokerAddr, final String consumerGroup,
- final String clientId, final SubscriptionData subscriptionData,
- final long timeoutMillis)
+ final String clientId, final SubscriptionData subscriptionData,
+ final long timeoutMillis)
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
RemotingConnectException, MQClientException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_CLIENT_CONFIG, null);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ffad6566/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 7b5ac45..49a1eba 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -413,8 +413,9 @@ public class DefaultMessageStore implements MessageStore {
return commitLog;
}
- public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums,
- final MessageFilter messageFilter) {
+ public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
+ final int maxMsgNums,
+ final MessageFilter messageFilter) {
if (this.shutdown) {
log.warn("message store has shutdown, so getMessage is forbidden");
return null;
@@ -717,22 +718,27 @@ public class DefaultMessageStore implements MessageStore {
long minLogicOffset = logicQueue.getMinLogicOffset();
SelectMappedBufferResult result = logicQueue.getIndexBuffer(minLogicOffset / ConsumeQueue.CQ_STORE_UNIT_SIZE);
- if (result != null) {
- try {
- final long phyOffset = result.getByteBuffer().getLong();
- final int size = result.getByteBuffer().getInt();
- long storeTime = this.getCommitLog().pickupStoreTimestamp(phyOffset, size);
- return storeTime;
- } catch (Exception e) {
- } finally {
- result.release();
- }
- }
+ return getStoreTime(result);
}
return -1;
}
+ private long getStoreTime(SelectMappedBufferResult result) {
+ if (result != null) {
+ try {
+ final long phyOffset = result.getByteBuffer().getLong();
+ final int size = result.getByteBuffer().getInt();
+ long storeTime = this.getCommitLog().pickupStoreTimestamp(phyOffset, size);
+ return storeTime;
+ } catch (Exception e) {
+ } finally {
+ result.release();
+ }
+ }
+ return -1;
+ }
+
@Override
public long getEarliestMessageTime() {
final long minPhyOffset = this.getMinPhyOffset();
@@ -745,17 +751,7 @@ public class DefaultMessageStore implements MessageStore {
ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId);
if (logicQueue != null) {
SelectMappedBufferResult result = logicQueue.getIndexBuffer(consumeQueueOffset);
- if (result != null) {
- try {
- final long phyOffset = result.getByteBuffer().getLong();
- final int size = result.getByteBuffer().getInt();
- long storeTime = this.getCommitLog().pickupStoreTimestamp(phyOffset, size);
- return storeTime;
- } catch (Exception ignored) {
- } finally {
- result.release();
- }
- }
+ return getStoreTime(result);
}
return -1;
@@ -955,7 +951,8 @@ public class DefaultMessageStore implements MessageStore {
}
}
- public Map<String, Long> getMessageIds(final String topic, final int queueId, long minOffset, long maxOffset, SocketAddress storeHost) {
+ public Map<String, Long> getMessageIds(final String topic, final int queueId, long minOffset, long maxOffset,
+ SocketAddress storeHost) {
Map<String, Long> messageIds = new HashMap<String, Long>();
if (this.shutdown) {
return messageIds;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/ffad6566/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
----------------------------------------------------------------------
diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index ac78a1d..a81f328 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -77,20 +77,7 @@ public class DefaultMessageStoreTest {
assertTrue(load);
master.start();
- try {
- for (long i = 0; i < totalMsgs; i++) {
- master.putMessage(buildMessage());
- }
-
- for (long i = 0; i < totalMsgs; i++) {
- GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
- assertThat(result).isNotNull();
- result.release();
- }
- } finally {
- master.shutdown();
- master.destroy();
- }
+ verifyThatMasterIsFunctional(totalMsgs, master);
}
public MessageExtBrokerInner buildMessage() {
@@ -121,6 +108,10 @@ public class DefaultMessageStoreTest {
assertTrue(load);
master.start();
+ verifyThatMasterIsFunctional(totalMsgs, master);
+ }
+
+ private void verifyThatMasterIsFunctional(long totalMsgs, MessageStore master) {
try {
for (long i = 0; i < totalMsgs; i++) {
master.putMessage(buildMessage());
@@ -171,7 +162,7 @@ public class DefaultMessageStoreTest {
private class MyMessageArrivingListener implements MessageArrivingListener {
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
- byte[] filterBitMap, Map<String, String> properties) {
+ byte[] filterBitMap, Map<String, String> properties) {
}
}
}