You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/08/29 12:30:36 UTC
[08/28] incubator-rocketmq git commit: Remove unused class
GetRouteInfoResponseHeader and meaningless comments
Remove unused class GetRouteInfoResponseHeader and meaningless comments
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/7f96008c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/7f96008c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/7f96008c
Branch: refs/heads/master
Commit: 7f96008c8b6f3ce5ac38cd168bd12252799973e3
Parents: ffad656
Author: yukon <yu...@apache.org>
Authored: Fri Aug 11 20:28:13 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Fri Aug 11 20:28:13 2017 +0800
----------------------------------------------------------------------
.../rocketmq/broker/BrokerController.java | 11 +-
.../apache/rocketmq/broker/BrokerStartup.java | 8 +-
.../client/rebalance/RebalanceLockManager.java | 44 ++--
.../broker/filtersrv/FilterServerManager.java | 2 -
.../processor/AbstractSendMessageProcessor.java | 8 +-
.../broker/processor/AdminBrokerProcessor.java | 49 ++--
.../broker/processor/PullMessageProcessor.java | 16 +-
.../broker/processor/SendMessageProcessor.java | 28 +--
.../rocketmq/broker/BrokerControllerTest.java | 8 +-
.../consumer/store/LocalFileOffsetStore.java | 10 +-
.../consumer/store/RemoteBrokerOffsetStore.java | 4 +-
.../rocketmq/client/impl/MQAdminImpl.java | 2 +-
.../rocketmq/client/impl/MQClientAPIImpl.java | 247 +++++++++----------
.../ConsumeMessageConcurrentlyService.java | 66 +++--
.../consumer/ConsumeMessageOrderlyService.java | 86 +++----
.../impl/consumer/ConsumeMessageService.java | 8 +-
.../consumer/DefaultMQPullConsumerImpl.java | 104 ++++----
.../consumer/DefaultMQPushConsumerImpl.java | 62 ++---
.../client/impl/consumer/RebalanceImpl.java | 22 +-
.../client/impl/consumer/RebalancePushImpl.java | 4 +-
.../client/impl/factory/MQClientInstance.java | 27 +-
.../impl/producer/DefaultMQProducerImpl.java | 92 +++----
.../client/impl/producer/MQProducerInner.java | 6 +-
.../org/apache/rocketmq/common/TopicConfig.java | 10 -
.../org/apache/rocketmq/common/help/FAQUrl.java | 26 +-
.../common/message/MessageClientIDSetter.java | 2 +-
.../protocol/body/ConsumerRunningInfo.java | 94 +++----
.../header/GetConsumeStatsRequestHeader.java | 2 -
.../header/GetConsumerStatusRequestHeader.java | 1 -
.../GetEarliestMsgStoretimeRequestHeader.java | 1 -
.../header/QueryCorrectionOffsetHeader.java | 2 +-
.../header/SearchOffsetRequestHeader.java | 2 +-
.../header/UnregisterClientRequestHeader.java | 2 +-
.../header/UnregisterClientResponseHeader.java | 2 +-
.../namesrv/GetRouteInfoResponseHeader.java | 33 ---
.../RegisterOrderTopicRequestHeader.java | 2 +-
.../rocketmq/common/sysflag/TopicSysFlag.java | 4 -
.../rocketmq/common/utils/IOTinyUtils.java | 2 -
.../rocketmq/example/simple/PushConsumer.java | 2 -
.../src/main/resources/MessageFilterImpl.java | 2 +-
.../rocketmq/filter/parser/SelectorParser.java | 1 -
.../rocketmq/filter/parser/SelectorParser.jj | 1 -
.../namesrv/kvconfig/KVConfigManager.java | 6 +-
.../namesrv/routeinfo/RouteInfoManager.java | 4 +-
.../rocketmq/remoting/netty/NettyDecoder.java | 2 +-
.../remoting/netty/NettyRemotingAbstract.java | 24 +-
.../remoting/netty/NettyRemotingClient.java | 8 +-
.../remoting/netty/NettyServerConfig.java | 4 -
.../remoting/netty/NettySystemConfig.java | 18 +-
.../org/apache/rocketmq/store/CommitLog.java | 101 +++-----
.../rocketmq/store/DefaultMessageStore.java | 54 ++--
.../apache/rocketmq/store/DispatchRequest.java | 18 --
.../org/apache/rocketmq/store/MappedFile.java | 2 -
.../apache/rocketmq/store/MappedFileQueue.java | 1 -
.../apache/rocketmq/store/ha/HAConnection.java | 8 -
.../org/apache/rocketmq/store/ha/HAService.java | 18 --
.../apache/rocketmq/store/index/IndexFile.java | 1 -
.../rocketmq/store/index/IndexHeader.java | 4 -
.../store/schedule/ScheduleMessageService.java | 3 -
.../rocketmq/store/stats/BrokerStats.java | 2 -
.../tools/admin/DefaultMQAdminExtImpl.java | 10 +-
.../command/message/PrintMessageSubCommand.java | 5 +-
62 files changed, 617 insertions(+), 781 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index c8624c4..cd68552 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -135,11 +135,11 @@ public class BrokerController {
private BrokerFastFailure brokerFastFailure;
private Configuration configuration;
- public BrokerController(//
- final BrokerConfig brokerConfig, //
- final NettyServerConfig nettyServerConfig, //
- final NettyClientConfig nettyClientConfig, //
- final MessageStoreConfig messageStoreConfig //
+ public BrokerController(
+ final BrokerConfig brokerConfig,
+ final NettyServerConfig nettyServerConfig,
+ final NettyClientConfig nettyClientConfig,
+ final MessageStoreConfig messageStoreConfig
) {
this.brokerConfig = brokerConfig;
this.nettyServerConfig = nettyServerConfig;
@@ -255,7 +255,6 @@ public class BrokerController {
this.registerProcessor();
- // TODO remove in future
final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis();
final long period = 1000 * 60 * 60 * 24;
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
index 85d2e3a..e0a3b69 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -190,10 +190,10 @@ public class BrokerStartup {
MixAll.printObjectProperties(log, nettyClientConfig);
MixAll.printObjectProperties(log, messageStoreConfig);
- final BrokerController controller = new BrokerController(//
- brokerConfig, //
- nettyServerConfig, //
- nettyClientConfig, //
+ final BrokerController controller = new BrokerController(
+ brokerConfig,
+ nettyServerConfig,
+ nettyClientConfig,
messageStoreConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java
index ed5a875..519745e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java
@@ -52,9 +52,9 @@ public class RebalanceLockManager {
lockEntry = new LockEntry();
lockEntry.setClientId(clientId);
groupValue.put(mq, lockEntry);
- log.info("tryLock, message queue not locked, I got it. Group: {} NewClientId: {} {}", //
- group, //
- clientId, //
+ log.info("tryLock, message queue not locked, I got it. Group: {} NewClientId: {} {}",
+ group,
+ clientId,
mq);
}
@@ -69,19 +69,19 @@ public class RebalanceLockManager {
lockEntry.setClientId(clientId);
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
log.warn(
- "tryLock, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", //
- group, //
- oldClientId, //
- clientId, //
+ "tryLock, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}",
+ group,
+ oldClientId,
+ clientId,
mq);
return true;
}
log.warn(
- "tryLock, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", //
- group, //
- oldClientId, //
- clientId, //
+ "tryLock, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}",
+ group,
+ oldClientId,
+ clientId,
mq);
return false;
} finally {
@@ -144,9 +144,9 @@ public class RebalanceLockManager {
lockEntry.setClientId(clientId);
groupValue.put(mq, lockEntry);
log.info(
- "tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}", //
- group, //
- clientId, //
+ "tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}",
+ group,
+ clientId,
mq);
}
@@ -162,20 +162,20 @@ public class RebalanceLockManager {
lockEntry.setClientId(clientId);
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
log.warn(
- "tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", //
- group, //
- oldClientId, //
- clientId, //
+ "tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}",
+ group,
+ oldClientId,
+ clientId,
mq);
lockedMqs.add(mq);
continue;
}
log.warn(
- "tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", //
- group, //
- oldClientId, //
- clientId, //
+ "tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}",
+ group,
+ oldClientId,
+ clientId,
mq);
}
} finally {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
index 52cb919..ff63127 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
@@ -111,9 +111,7 @@ public class FilterServerManager {
}
}
- /**
- */
public void scanNotActiveChannel() {
Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index 3faa7ae..410192f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -189,10 +189,10 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
}
log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());
- topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(//
- requestHeader.getTopic(), //
- requestHeader.getDefaultTopic(), //
- RemotingHelper.parseChannelRemoteAddr(ctx.channel()), //
+ topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
+ requestHeader.getTopic(),
+ requestHeader.getDefaultTopic(),
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
if (null == topicConfig) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 71fdda9..937f575 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -116,6 +116,7 @@ import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
public class AdminBrokerProcessor implements NettyRequestProcessor {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
@@ -432,9 +433,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class);
- Set<MessageQueue> lockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch(//
- requestBody.getConsumerGroup(), //
- requestBody.getMqSet(), //
+ Set<MessageQueue> lockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch(
+ requestBody.getConsumerGroup(),
+ requestBody.getMqSet(),
requestBody.getClientId());
LockBatchResponseBody responseBody = new LockBatchResponseBody();
@@ -450,9 +451,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
UnlockBatchRequestBody requestBody = UnlockBatchRequestBody.decode(request.getBody(), UnlockBatchRequestBody.class);
- this.brokerController.getRebalanceLockManager().unlockBatch(//
- requestBody.getConsumerGroup(), //
- requestBody.getMqSet(), //
+ this.brokerController.getRebalanceLockManager().unlockBatch(
+ requestBody.getConsumerGroup(),
+ requestBody.getMqSet(),
requestBody.getClientId());
response.setCode(ResponseCode.SUCCESS);
@@ -657,14 +658,12 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
continue;
}
- /**
- */
{
SubscriptionData findSubscriptionData =
this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic);
- if (null == findSubscriptionData //
+ if (null == findSubscriptionData
&& this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup()) > 0) {
log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", requestHeader.getConsumerGroup(), topic);
continue;
@@ -683,9 +682,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
if (brokerOffset < 0)
brokerOffset = 0;
- long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(//
- requestHeader.getConsumerGroup(), //
- topic, //
+ long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(
+ requestHeader.getConsumerGroup(),
+ topic,
i);
if (consumerOffset < 0)
consumerOffset = 0;
@@ -925,9 +924,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return response;
}
- /**
- */
private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
final GetConsumerRunningInfoRequestHeader requestHeader =
(GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);
@@ -1007,9 +1004,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
continue;
}
- /**
- */
if (!requestHeader.isOffline()) {
SubscriptionData findSubscriptionData =
@@ -1107,13 +1102,11 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
if (isOrder && !topicConfig.isOrder()) {
continue;
}
- /**
- */
{
SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(group, topic);
- if (null == findSubscriptionData //
+ if (null == findSubscriptionData
&& this.brokerController.getConsumerManager().findSubscriptionDataCount(group) > 0) {
log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", group, topic);
continue;
@@ -1129,9 +1122,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
if (brokerOffset < 0)
brokerOffset = 0;
- long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(//
- group, //
- topic, //
+ long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(
+ group,
+ topic,
i);
if (consumerOffset < 0)
consumerOffset = 0;
@@ -1215,10 +1208,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
return runtimeInfo;
}
- private RemotingCommand callConsumer(//
- final int requestCode, //
- final RemotingCommand request, //
- final String consumerGroup, //
+ private RemotingCommand callConsumer(
+ final int requestCode,
+ final RemotingCommand request,
+ final String consumerGroup,
final String clientId) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
ClientChannelInfo clientChannelInfo = this.brokerController.getConsumerManager().findChannel(consumerGroup, clientId);
@@ -1231,8 +1224,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
if (clientChannelInfo.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) {
response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark(String.format("The Consumer <%s> Version <%s> too low to finish, please upgrade it to V3_1_8_SNAPSHOT", //
- clientId, //
+ response.setRemark(String.format("The Consumer <%s> Version <%s> too low to finish, please upgrade it to V3_1_8_SNAPSHOT",
+ clientId,
MQVersion.getVersionDesc(clientChannelInfo.getVersion())));
return response;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index fb7ea20..fe2fcfe 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -160,7 +160,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
assert consumerFilterData != null;
}
} catch (Exception e) {
- log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), //
+ log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(),
requestHeader.getConsumerGroup());
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
response.setRemark("parse the consumer's subscription failed");
@@ -176,7 +176,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
return response;
}
- if (!subscriptionGroupConfig.isConsumeBroadcastEnable() //
+ if (!subscriptionGroupConfig.isConsumeBroadcastEnable()
&& consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
@@ -285,12 +285,12 @@ public class PullMessageProcessor implements NettyRequestProcessor {
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
// XXX: warn and notify me
- log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", //
- requestHeader.getQueueOffset(), //
- getMessageResult.getNextBeginOffset(), //
- requestHeader.getTopic(), //
- requestHeader.getQueueId(), //
- requestHeader.getConsumerGroup()//
+ log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}",
+ requestHeader.getQueueOffset(),
+ getMessageResult.getNextBeginOffset(),
+ requestHeader.getTopic(),
+ requestHeader.getQueueId(),
+ requestHeader.getConsumerGroup()
);
} else {
response.setCode(ResponseCode.PULL_NOT_FOUND);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 5c716cc..cd60c44 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -139,9 +139,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
- TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(//
- newTopic, //
- subscriptionGroupConfig.getRetryQueueNums(), //
+ TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
+ newTopic,
+ subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
@@ -175,13 +175,13 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}
- if (msgExt.getReconsumeTimes() >= maxReconsumeTimes//
+ if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
- topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
- DLQ_NUMS_PER_GROUP, //
+ topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
+ DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE, 0
);
if (null == topicConfig) {
@@ -268,8 +268,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
if (reconsumeTimes >= maxReconsumeTimes) {
newTopic = MixAll.getDLQTopic(groupName);
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
- topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
- DLQ_NUMS_PER_GROUP, //
+ topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
+ DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE, 0
);
msg.setTopic(newTopic);
@@ -289,9 +289,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
return true;
}
- private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //
- final RemotingCommand request, //
- final SendMessageContext sendMessageContext, //
+ private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
+ final RemotingCommand request,
+ final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
@@ -464,9 +464,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
}
return response;
}
- private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, //
- final RemotingCommand request, //
- final SendMessageContext sendMessageContext, //
+ private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx,
+ final RemotingCommand request,
+ final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
index fe30d8f..d4edd9a 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
@@ -38,10 +38,10 @@ public class BrokerControllerTest {
@Test
public void testBrokerRestart() throws Exception {
for (int i = 0; i < 2; i++) {
- BrokerController brokerController = new BrokerController(//
- new BrokerConfig(), //
- new NettyServerConfig(), //
- new NettyClientConfig(), //
+ BrokerController brokerController = new BrokerController(
+ new BrokerConfig(),
+ new NettyServerConfig(),
+ new NettyClientConfig(),
new MessageStoreConfig());
assertThat(brokerController.initialize());
brokerController.start();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
index d4b19b2..22ec674 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
@@ -52,9 +52,9 @@ public class LocalFileOffsetStore implements OffsetStore {
public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) {
this.mQClientFactory = mQClientFactory;
this.groupName = groupName;
- this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator + //
- this.mQClientFactory.getClientId() + File.separator + //
- this.groupName + File.separator + //
+ this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator +
+ this.mQClientFactory.getClientId() + File.separator +
+ this.groupName + File.separator +
"offsets.json";
}
@@ -217,8 +217,8 @@ public class LocalFileOffsetStore implements OffsetStore {
OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);
} catch (Exception e) {
log.warn("readLocalOffset Exception", e);
- throw new MQClientException("readLocalOffset Exception, maybe fastjson version too low" //
- + FAQUrl.suggestTodo(FAQUrl.LOAD_JSON_EXCEPTION), //
+ throw new MQClientException("readLocalOffset Exception, maybe fastjson version too low"
+ + FAQUrl.suggestTodo(FAQUrl.LOAD_JSON_EXCEPTION),
e);
}
return offsetSerializeWrapper;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
index 5bd5749..b82e992 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
@@ -204,7 +204,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
MQBrokerException, InterruptedException, MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
- // TODO Here may be heavily overhead for Name Server,need tuning
+
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
}
@@ -232,7 +232,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
InterruptedException, MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
- // TODO Here may be heavily overhead for Name Server,need tuning
+
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
index 983e515..92d8513 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
@@ -159,7 +159,7 @@ public class MQAdminImpl {
}
} catch (Exception e) {
throw new MQClientException(
- "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST), //
+ "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST),
e);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index ae9ed6c..c5abc36 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -285,32 +285,32 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
- public SendResult sendMessage(//
- final String addr, // 1
- final String brokerName, // 2
- final Message msg, // 3
- final SendMessageRequestHeader requestHeader, // 4
- final long timeoutMillis, // 5
- final CommunicationMode communicationMode, // 6
- final SendMessageContext context, // 7
- final DefaultMQProducerImpl producer // 8
+ public SendResult sendMessage(
+ final String addr,
+ final String brokerName,
+ final Message msg,
+ final SendMessageRequestHeader requestHeader,
+ final long timeoutMillis,
+ final CommunicationMode communicationMode,
+ final SendMessageContext context,
+ final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer);
}
- public SendResult sendMessage(//
- final String addr, // 1
- final String brokerName, // 2
- final Message msg, // 3
- final SendMessageRequestHeader requestHeader, // 4
- final long timeoutMillis, // 5
- final CommunicationMode communicationMode, // 6
- final SendCallback sendCallback, // 7
- final TopicPublishInfo topicPublishInfo, // 8
- final MQClientInstance instance, // 9
- final int retryTimesWhenSendFailed, // 10
- final SendMessageContext context, // 11
- final DefaultMQProducerImpl producer // 12
+ public SendResult sendMessage(
+ final String addr,
+ final String brokerName,
+ final Message msg,
+ final SendMessageRequestHeader requestHeader,
+ final long timeoutMillis,
+ final CommunicationMode communicationMode,
+ final SendCallback sendCallback,
+ final TopicPublishInfo topicPublishInfo,
+ final MQClientInstance instance,
+ final int retryTimesWhenSendFailed,
+ final SendMessageContext context,
+ final DefaultMQProducerImpl producer
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = null;
if (sendSmartMsg || msg instanceof MessageBatch) {
@@ -341,31 +341,31 @@ public class MQClientAPIImpl {
return null;
}
- private SendResult sendMessageSync(//
- final String addr, //
- final String brokerName, //
- final Message msg, //
- final long timeoutMillis, //
- final RemotingCommand request//
+ private SendResult sendMessageSync(
+ final String addr,
+ final String brokerName,
+ final Message msg,
+ final long timeoutMillis,
+ final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
return this.processSendResponse(brokerName, msg, response);
}
- private void sendMessageAsync(//
- final String addr, //
- final String brokerName, //
- final Message msg, //
- final long timeoutMillis, //
- final RemotingCommand request, //
- final SendCallback sendCallback, //
- final TopicPublishInfo topicPublishInfo, //
- final MQClientInstance instance, //
- final int retryTimesWhenSendFailed, //
- final AtomicInteger times, //
- final SendMessageContext context, //
- final DefaultMQProducerImpl producer //
+ private void sendMessageAsync(
+ final String addr,
+ final String brokerName,
+ final Message msg,
+ final long timeoutMillis,
+ final RemotingCommand request,
+ final SendCallback sendCallback,
+ final TopicPublishInfo topicPublishInfo,
+ final MQClientInstance instance,
+ final int retryTimesWhenSendFailed,
+ final AtomicInteger times,
+ final SendMessageContext context,
+ final DefaultMQProducerImpl producer
) throws InterruptedException, RemotingException {
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
@@ -380,7 +380,6 @@ public class MQClientAPIImpl {
context.getProducer().executeSendMessageHookAfter(context);
}
} catch (Throwable e) {
- //
}
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
@@ -428,19 +427,19 @@ public class MQClientAPIImpl {
});
}
- private void onExceptionImpl(final String brokerName, //
- final Message msg, //
- final long timeoutMillis, //
- final RemotingCommand request, //
- final SendCallback sendCallback, //
- final TopicPublishInfo topicPublishInfo, //
- final MQClientInstance instance, //
- final int timesTotal, //
- final AtomicInteger curTimes, //
- final Exception e, //
- final SendMessageContext context, //
- final boolean needRetry, //
- final DefaultMQProducerImpl producer // 12
+ private void onExceptionImpl(final String brokerName,
+ final Message msg,
+ final long timeoutMillis,
+ final RemotingCommand request,
+ final SendCallback sendCallback,
+ final TopicPublishInfo topicPublishInfo,
+ final MQClientInstance instance,
+ final int timesTotal,
+ final AtomicInteger curTimes,
+ final Exception e,
+ final SendMessageContext context,
+ final boolean needRetry,
+ final DefaultMQProducerImpl producer
) {
int tmp = curTimes.incrementAndGet();
if (needRetry && tmp <= timesTotal) {
@@ -485,16 +484,15 @@ public class MQClientAPIImpl {
}
}
- private SendResult processSendResponse(//
- final String brokerName, //
- final Message msg, //
- final RemotingCommand response//
+ private SendResult processSendResponse(
+ final String brokerName,
+ final Message msg,
+ final RemotingCommand response
) throws MQBrokerException, RemotingCommandException {
switch (response.getCode()) {
case ResponseCode.FLUSH_DISK_TIMEOUT:
case ResponseCode.FLUSH_SLAVE_TIMEOUT:
case ResponseCode.SLAVE_NOT_AVAILABLE: {
- // TODO LOG
}
case ResponseCode.SUCCESS: {
SendStatus sendStatus = SendStatus.SEND_OK;
@@ -553,12 +551,12 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
- public PullResult pullMessage(//
- final String addr, //
- final PullMessageRequestHeader requestHeader, //
- final long timeoutMillis, //
- final CommunicationMode communicationMode, //
- final PullCallback pullCallback//
+ public PullResult pullMessage(
+ final String addr,
+ final PullMessageRequestHeader requestHeader,
+ final long timeoutMillis,
+ final CommunicationMode communicationMode,
+ final PullCallback pullCallback
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
@@ -579,11 +577,11 @@ public class MQClientAPIImpl {
return null;
}
- private void pullMessageAsync(//
- final String addr, // 1
- final RemotingCommand request, //
- final long timeoutMillis, //
- final PullCallback pullCallback//
+ private void pullMessageAsync(
+ final String addr,
+ final RemotingCommand request,
+ final long timeoutMillis,
+ final PullCallback pullCallback
) throws RemotingException, InterruptedException {
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
@@ -611,10 +609,10 @@ public class MQClientAPIImpl {
});
}
- private PullResult pullMessageSync(//
- final String addr, // 1
- final RemotingCommand request, // 2
- final long timeoutMillis// 3
+ private PullResult pullMessageSync(
+ final String addr,
+ final RemotingCommand request,
+ final long timeoutMillis
) throws RemotingException, InterruptedException, MQBrokerException {
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
@@ -720,9 +718,9 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
- public List<String> getConsumerIdListByGroup(//
- final String addr, //
- final String consumerGroup, //
+ public List<String> getConsumerIdListByGroup(
+ final String addr,
+ final String consumerGroup,
final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
MQBrokerException, InterruptedException {
GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader();
@@ -796,10 +794,10 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
- public long queryConsumerOffset(//
- final String addr, //
- final QueryConsumerOffsetRequestHeader requestHeader, //
- final long timeoutMillis//
+ public long queryConsumerOffset(
+ final String addr,
+ final QueryConsumerOffsetRequestHeader requestHeader,
+ final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader);
@@ -820,10 +818,10 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
- public void updateConsumerOffset(//
- final String addr, //
- final UpdateConsumerOffsetRequestHeader requestHeader, //
- final long timeoutMillis//
+ public void updateConsumerOffset(
+ final String addr,
+ final UpdateConsumerOffsetRequestHeader requestHeader,
+ final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
@@ -841,10 +839,10 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
- public void updateConsumerOffsetOneway(//
- final String addr, //
- final UpdateConsumerOffsetRequestHeader requestHeader, //
- final long timeoutMillis//
+ public void updateConsumerOffsetOneway(
+ final String addr,
+ final UpdateConsumerOffsetRequestHeader requestHeader,
+ final long timeoutMillis
) throws RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException,
InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
@@ -852,10 +850,10 @@ public class MQClientAPIImpl {
this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
}
- public int sendHearbeat(//
- final String addr, //
- final HeartbeatData heartbeatData, //
- final long timeoutMillis//
+ public int sendHearbeat(
+ final String addr,
+ final HeartbeatData heartbeatData,
+ final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
@@ -873,12 +871,12 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
- public void unregisterClient(//
- final String addr, //
- final String clientID, //
- final String producerGroup, //
- final String consumerGroup, //
- final long timeoutMillis//
+ public void unregisterClient(
+ final String addr,
+ final String clientID,
+ final String producerGroup,
+ final String consumerGroup,
+ final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
final UnregisterClientRequestHeader requestHeader = new UnregisterClientRequestHeader();
requestHeader.setClientID(clientID);
@@ -899,11 +897,11 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
- public void endTransactionOneway(//
- final String addr, //
- final EndTransactionRequestHeader requestHeader, //
- final String remark, //
- final long timeoutMillis//
+ public void endTransactionOneway(
+ final String addr,
+ final EndTransactionRequestHeader requestHeader,
+ final String remark,
+ final long timeoutMillis
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);
@@ -965,9 +963,9 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
- public Set<MessageQueue> lockBatchMQ(//
- final String addr, //
- final LockBatchRequestBody requestBody, //
+ public Set<MessageQueue> lockBatchMQ(
+ final String addr,
+ final LockBatchRequestBody requestBody,
final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
@@ -987,11 +985,11 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
- public void unlockBatchMQ(//
- final String addr, //
- final UnlockBatchRequestBody requestBody, //
- final long timeoutMillis, //
- final boolean oneway//
+ public void unlockBatchMQ(
+ final String addr,
+ final UnlockBatchRequestBody requestBody,
+ final long timeoutMillis,
+ final boolean oneway
) throws RemotingException, MQBrokerException, InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null);
@@ -1213,7 +1211,7 @@ public class MQClientAPIImpl {
if (allowTopicNotExist && !topic.equals(MixAll.DEFAULT_TOPIC)) {
log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
}
- // TODO :- Log when if condition is not satisfied
+
break;
}
case ResponseCode.SUCCESS: {
@@ -1566,12 +1564,12 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
- public void registerMessageFilterClass(final String addr, //
- final String consumerGroup, //
- final String topic, //
- final String className, //
- final int classCRC, //
- final byte[] classBody, //
+ public void registerMessageFilterClass(final String addr,
+ final String consumerGroup,
+ final String topic,
+ final String className,
+ final int classCRC,
+ final byte[] classBody,
final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException, MQBrokerException {
RegisterMessageFilterClassRequestHeader requestHeader = new RegisterMessageFilterClassRequestHeader();
@@ -1706,10 +1704,10 @@ public class MQClientAPIImpl {
throw new MQClientException(response.getCode(), response.getRemark());
}
- public ConsumeMessageDirectlyResult consumeMessageDirectly(final String addr, //
- String consumerGroup, //
- String clientId, //
- String msgId, //
+ public ConsumeMessageDirectlyResult consumeMessageDirectly(final String addr,
+ String consumerGroup,
+ String clientId,
+ String msgId,
final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
ConsumeMessageDirectlyResultRequestHeader requestHeader = new ConsumeMessageDirectlyResultRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
@@ -1912,7 +1910,6 @@ public class MQClientAPIImpl {
public Set<String> getClusterList(String topic,
long timeoutMillis) throws MQClientException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
- // todo:jodie
return Collections.EMPTY_SET;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
index f566ed0..961e062 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
@@ -69,12 +69,12 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
- this.consumeExecutor = new ThreadPoolExecutor(//
- this.defaultMQPushConsumer.getConsumeThreadMin(), //
- this.defaultMQPushConsumer.getConsumeThreadMax(), //
- 1000 * 60, //
- TimeUnit.MILLISECONDS, //
- this.consumeRequestQueue, //
+ this.consumeExecutor = new ThreadPoolExecutor(
+ this.defaultMQPushConsumer.getConsumeThreadMin(),
+ this.defaultMQPushConsumer.getConsumeThreadMax(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_"));
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
@@ -100,8 +100,8 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
@Override
public void updateCorePoolSize(int corePoolSize) {
- if (corePoolSize > 0 //
- && corePoolSize <= Short.MAX_VALUE //
+ if (corePoolSize > 0
+ && corePoolSize <= Short.MAX_VALUE
&& corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
this.consumeExecutor.setCorePoolSize(corePoolSize);
}
@@ -115,11 +115,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
// this.consumeExecutor.setCorePoolSize(this.consumeExecutor.getCorePoolSize()
// + 1);
// }
- //
// log.info("incCorePoolSize Concurrently from {} to {}, ConsumerGroup:
- // {}", //
- // corePoolSize,//
- // this.consumeExecutor.getCorePoolSize(),//
+ // {}",
+ // corePoolSize,
+ // this.consumeExecutor.getCorePoolSize(),
// this.consumerGroup);
}
@@ -131,11 +130,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
// this.consumeExecutor.setCorePoolSize(this.consumeExecutor.getCorePoolSize()
// - 1);
// }
- //
// log.info("decCorePoolSize Concurrently from {} to {}, ConsumerGroup:
- // {}", //
- // corePoolSize,//
- // this.consumeExecutor.getCorePoolSize(),//
+ // {}",
+ // corePoolSize,
+ // this.consumeExecutor.getCorePoolSize(),
// this.consumerGroup);
}
@@ -185,10 +183,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
result.setConsumeResult(CMResult.CR_THROW_EXCEPTION);
result.setRemark(RemotingHelper.exceptionSimpleDesc(e));
- log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", //
- RemotingHelper.exceptionSimpleDesc(e), //
- ConsumeMessageConcurrentlyService.this.consumerGroup, //
- msgs, //
+ log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s",
+ RemotingHelper.exceptionSimpleDesc(e),
+ ConsumeMessageConcurrentlyService.this.consumerGroup,
+ msgs,
mq), e);
}
@@ -200,10 +198,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
}
@Override
- public void submitConsumeRequest(//
- final List<MessageExt> msgs, //
- final ProcessQueue processQueue, //
- final MessageQueue messageQueue, //
+ public void submitConsumeRequest(
+ final List<MessageExt> msgs,
+ final ProcessQueue processQueue,
+ final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
@@ -258,10 +256,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
}
}
- public void processConsumeResult(//
- final ConsumeConcurrentlyStatus status, //
- final ConsumeConcurrentlyContext context, //
- final ConsumeRequest consumeRequest//
+ public void processConsumeResult(
+ final ConsumeConcurrentlyStatus status,
+ final ConsumeConcurrentlyContext context,
+ final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();
@@ -338,10 +336,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
return false;
}
- private void submitConsumeRequestLater(//
- final List<MessageExt> msgs, //
- final ProcessQueue processQueue, //
- final MessageQueue messageQueue//
+ private void submitConsumeRequestLater(
+ final List<MessageExt> msgs,
+ final ProcessQueue processQueue,
+ final MessageQueue messageQueue
) {
this.scheduledExecutorService.schedule(new Runnable() {
@@ -353,7 +351,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
}, 5000, TimeUnit.MILLISECONDS);
}
- private void submitConsumeRequestLater(final ConsumeRequest consumeRequest//
+ private void submitConsumeRequestLater(final ConsumeRequest consumeRequest
) {
this.scheduledExecutorService.schedule(new Runnable() {
@@ -419,7 +417,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
- RemotingHelper.exceptionSimpleDesc(e), //
+ RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
index 1fa474c..abdad79 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -70,12 +70,12 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
- this.consumeExecutor = new ThreadPoolExecutor(//
- this.defaultMQPushConsumer.getConsumeThreadMin(), //
- this.defaultMQPushConsumer.getConsumeThreadMax(), //
- 1000 * 60, //
- TimeUnit.MILLISECONDS, //
- this.consumeRequestQueue, //
+ this.consumeExecutor = new ThreadPoolExecutor(
+ this.defaultMQPushConsumer.getConsumeThreadMin(),
+ this.defaultMQPushConsumer.getConsumeThreadMax(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_"));
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
@@ -107,8 +107,8 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
@Override
public void updateCorePoolSize(int corePoolSize) {
- if (corePoolSize > 0 //
- && corePoolSize <= Short.MAX_VALUE //
+ if (corePoolSize > 0
+ && corePoolSize <= Short.MAX_VALUE
&& corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
this.consumeExecutor.setCorePoolSize(corePoolSize);
}
@@ -171,10 +171,10 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
result.setConsumeResult(CMResult.CR_THROW_EXCEPTION);
result.setRemark(RemotingHelper.exceptionSimpleDesc(e));
- log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", //
- RemotingHelper.exceptionSimpleDesc(e), //
- ConsumeMessageOrderlyService.this.consumerGroup, //
- msgs, //
+ log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s",
+ RemotingHelper.exceptionSimpleDesc(e),
+ ConsumeMessageOrderlyService.this.consumerGroup,
+ msgs,
mq), e);
}
@@ -187,10 +187,10 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
}
@Override
- public void submitConsumeRequest(//
- final List<MessageExt> msgs, //
- final ProcessQueue processQueue, //
- final MessageQueue messageQueue, //
+ public void submitConsumeRequest(
+ final List<MessageExt> msgs,
+ final ProcessQueue processQueue,
+ final MessageQueue messageQueue,
final boolean dispathToConsume) {
if (dispathToConsume) {
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
@@ -226,10 +226,10 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
return false;
}
- private void submitConsumeRequestLater(//
- final ProcessQueue processQueue, //
- final MessageQueue messageQueue, //
- final long suspendTimeMillis//
+ private void submitConsumeRequestLater(
+ final ProcessQueue processQueue,
+ final MessageQueue messageQueue,
+ final long suspendTimeMillis
) {
long timeMillis = suspendTimeMillis;
if (timeMillis == -1) {
@@ -251,11 +251,11 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
}, timeMillis, TimeUnit.MILLISECONDS);
}
- public boolean processConsumeResult(//
- final List<MessageExt> msgs, //
- final ConsumeOrderlyStatus status, //
- final ConsumeOrderlyContext context, //
- final ConsumeRequest consumeRequest//
+ public boolean processConsumeResult(
+ final List<MessageExt> msgs,
+ final ConsumeOrderlyStatus status,
+ final ConsumeOrderlyContext context,
+ final ConsumeRequest consumeRequest
) {
boolean continueConsume = true;
long commitOffset = -1L;
@@ -273,9 +273,9 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
if (checkReconsumeTimes(msgs)) {
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
- this.submitConsumeRequestLater(//
- consumeRequest.getProcessQueue(), //
- consumeRequest.getMessageQueue(), //
+ this.submitConsumeRequestLater(
+ consumeRequest.getProcessQueue(),
+ consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
} else {
@@ -295,9 +295,9 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
break;
case ROLLBACK:
consumeRequest.getProcessQueue().rollback();
- this.submitConsumeRequestLater(//
- consumeRequest.getProcessQueue(), //
- consumeRequest.getMessageQueue(), //
+ this.submitConsumeRequestLater(
+ consumeRequest.getProcessQueue(),
+ consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
break;
@@ -305,9 +305,9 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
if (checkReconsumeTimes(msgs)) {
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
- this.submitConsumeRequestLater(//
- consumeRequest.getProcessQueue(), //
- consumeRequest.getMessageQueue(), //
+ this.submitConsumeRequestLater(
+ consumeRequest.getProcessQueue(),
+ consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
}
@@ -468,22 +468,22 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
- log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", //
- RemotingHelper.exceptionSimpleDesc(e), //
- ConsumeMessageOrderlyService.this.consumerGroup, //
- msgs, //
+ log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
+ RemotingHelper.exceptionSimpleDesc(e),
+ ConsumeMessageOrderlyService.this.consumerGroup,
+ msgs,
messageQueue);
hasException = true;
} finally {
this.processQueue.getLockConsume().unlock();
}
- if (null == status //
- || ConsumeOrderlyStatus.ROLLBACK == status//
+ if (null == status
+ || ConsumeOrderlyStatus.ROLLBACK == status
|| ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
- log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", //
- ConsumeMessageOrderlyService.this.consumerGroup, //
- msgs, //
+ log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",
+ ConsumeMessageOrderlyService.this.consumerGroup,
+ msgs,
messageQueue);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
index 8742191..0f6f3bb 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
@@ -36,9 +36,9 @@ public interface ConsumeMessageService {
ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
- void submitConsumeRequest(//
- final List<MessageExt> msgs, //
- final ProcessQueue processQueue, //
- final MessageQueue messageQueue, //
+ void submitConsumeRequest(
+ final List<MessageExt> msgs,
+ final ProcessQueue processQueue,
+ final MessageQueue messageQueue,
final boolean dispathToConsume);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 35ee16f..8640d2d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -97,8 +97,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
private void makeSureStateOK() throws MQClientException {
if (this.serviceState != ServiceState.RUNNING) {
- throw new MQClientException("The consumer service state not OK, "//
- + this.serviceState//
+ throw new MQClientException("The consumer service state not OK, "
+ + this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
}
@@ -185,7 +185,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
SubscriptionData subscriptionData;
try {
- subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
+ subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
mq.getTopic(), subExpression);
} catch (Exception e) {
throw new MQClientException("parse subscription error", e);
@@ -193,18 +193,18 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
- PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(//
- mq, // 1
- subscriptionData.getSubString(), // 2
- 0L, // 3
- offset, // 4
- maxNums, // 5
- sysFlag, // 6
- 0, // 7
- this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8
- timeoutMillis, // 9
- CommunicationMode.SYNC, // 10
- null// 11
+ PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
+ mq,
+ subscriptionData.getSubString(),
+ 0L,
+ offset,
+ maxNums,
+ sysFlag,
+ 0,
+ this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
+ timeoutMillis,
+ CommunicationMode.SYNC,
+ null
);
this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
if (!this.consumeMessageHookList.isEmpty()) {
@@ -225,7 +225,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
public void subscriptionAutomatically(final String topic) {
if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) {
try {
- SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
+ SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
topic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData);
} catch (Exception ignore) {
@@ -372,13 +372,13 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, false, timeout);
}
- private void pullAsyncImpl(//
- final MessageQueue mq, //
- final String subExpression, //
- final long offset, //
- final int maxNums, //
- final PullCallback pullCallback, //
- final boolean block, //
+ private void pullAsyncImpl(
+ final MessageQueue mq,
+ final String subExpression,
+ final long offset,
+ final int maxNums,
+ final PullCallback pullCallback,
+ final boolean block,
final long timeout) throws MQClientException, RemotingException, InterruptedException {
this.makeSureStateOK();
@@ -405,7 +405,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
final SubscriptionData subscriptionData;
try {
- subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
+ subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
mq.getTopic(), subExpression);
} catch (Exception e) {
throw new MQClientException("parse subscription error", e);
@@ -413,17 +413,17 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
- this.pullAPIWrapper.pullKernelImpl(//
- mq, // 1
- subscriptionData.getSubString(), // 2
- 0L, // 3
- offset, // 4
- maxNums, // 5
- sysFlag, // 6
- 0, // 7
- this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8
- timeoutMillis, // 9
- CommunicationMode.ASYNC, // 10
+ this.pullAPIWrapper.pullKernelImpl(
+ mq,
+ subscriptionData.getSubString(),
+ 0L,
+ offset,
+ maxNums,
+ sysFlag,
+ 0,
+ this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
+ timeoutMillis,
+ CommunicationMode.ASYNC,
new PullCallback() {
@Override
@@ -551,8 +551,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
- this.pullAPIWrapper = new PullAPIWrapper(//
- mQClientFactory, //
+ this.pullAPIWrapper = new PullAPIWrapper(
+ mQClientFactory,
this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
@@ -589,8 +589,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
- throw new MQClientException("The PullConsumer service state not OK, maybe started once, "//
- + this.serviceState//
+ throw new MQClientException("The PullConsumer service state not OK, maybe started once, "
+ + this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
@@ -606,42 +606,42 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
// consumerGroup
if (null == this.defaultMQPullConsumer.getConsumerGroup()) {
throw new MQClientException(
- "consumerGroup is null" //
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
+ "consumerGroup is null"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
// consumerGroup
if (this.defaultMQPullConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) {
throw new MQClientException(
- "consumerGroup can not equal "//
- + MixAll.DEFAULT_CONSUMER_GROUP //
- + ", please specify another one."//
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
+ "consumerGroup can not equal "
+ + MixAll.DEFAULT_CONSUMER_GROUP
+ + ", please specify another one."
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
// messageModel
if (null == this.defaultMQPullConsumer.getMessageModel()) {
throw new MQClientException(
- "messageModel is null" //
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
+ "messageModel is null"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
// allocateMessageQueueStrategy
if (null == this.defaultMQPullConsumer.getAllocateMessageQueueStrategy()) {
throw new MQClientException(
- "allocateMessageQueueStrategy is null" //
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
+ "allocateMessageQueueStrategy is null"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
// allocateMessageQueueStrategy
if (this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() < this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis()) {
throw new MQClientException(
- "Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis" //
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
+ "Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
null);
}
}
@@ -651,7 +651,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
Set<String> registerTopics = this.defaultMQPullConsumer.getRegisterTopics();
if (registerTopics != null) {
for (final String topic : registerTopics) {
- SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
+ SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
topic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}