You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vi...@apache.org on 2016/12/28 10:02:41 UTC
[34/50] [abbrv] incubator-rocketmq git commit: ROCKETMQ-18 Reformat
all codes.
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 4241c0e..c22c515 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -16,6 +16,17 @@
*/
package org.apache.rocketmq.client.impl.consumer;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
@@ -44,7 +55,11 @@ import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
-import org.apache.rocketmq.common.message.*;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ConsumeStatus;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
@@ -60,11 +75,6 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
-
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
/**
* Delay some time when exception occur
@@ -98,7 +108,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private long flowControlTimes1 = 0;
private long flowControlTimes2 = 0;
-
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
this.defaultMQPushConsumer = defaultMQPushConsumer;
this.rpcHook = rpcHook;
@@ -214,8 +223,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((flowControlTimes1++ % 1000) == 0) {
log.warn(
- "the consumer message buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}",
- processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, flowControlTimes1);
+ "the consumer message buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}",
+ processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, flowControlTimes1);
}
return;
}
@@ -225,9 +234,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((flowControlTimes2++ % 1000) == 0) {
log.warn(
- "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
- processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
- pullRequest, flowControlTimes2);
+ "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
+ processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
+ pullRequest, flowControlTimes2);
}
return;
}
@@ -237,10 +246,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
- pullRequest, offset, brokerBusy);
+ pullRequest, offset, brokerBusy);
if (brokerBusy) {
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
- pullRequest, offset);
+ pullRequest, offset);
}
pullRequest.setLockedFirst(true);
@@ -267,7 +276,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
- subscriptionData);
+ subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
@@ -275,7 +284,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
- pullRequest.getMessageQueue().getTopic(), pullRT);
+ pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
@@ -284,30 +293,30 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
- pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
+ pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
- pullResult.getMsgFoundList(), //
- processQueue, //
- pullRequest.getMessageQueue(), //
- dispathToConsume);
+ pullResult.getMsgFoundList(), //
+ processQueue, //
+ pullRequest.getMessageQueue(), //
+ dispathToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
- DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
+ DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
if (pullResult.getNextBeginOffset() < prevRequestOffset//
- || firstMsgOffset < prevRequestOffset) {
+ || firstMsgOffset < prevRequestOffset) {
log.warn(
- "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", //
- pullResult.getNextBeginOffset(), //
- firstMsgOffset, //
- prevRequestOffset);
+ "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", //
+ pullResult.getNextBeginOffset(), //
+ firstMsgOffset, //
+ prevRequestOffset);
}
break;
@@ -327,7 +336,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
break;
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, {} {}", //
- pullRequest.toString(), pullResult.toString());
+ pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true);
@@ -337,7 +346,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void run() {
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
- pullRequest.getNextOffset(), false);
+ pullRequest.getNextOffset(), false);
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
@@ -356,7 +365,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
}
-
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
@@ -388,24 +396,24 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
int sysFlag = PullSysFlag.buildSysFlag(//
- commitOffsetEnable, // commitOffset
- true, // suspend
- subExpression != null, // subscription
- classFilter // class filter
+ commitOffsetEnable, // commitOffset
+ true, // suspend
+ subExpression != null, // subscription
+ classFilter // class filter
);
try {
this.pullAPIWrapper.pullKernelImpl(//
- pullRequest.getMessageQueue(), // 1
- subExpression, // 2
- subscriptionData.getSubVersion(), // 3
- pullRequest.getNextOffset(), // 4
- this.defaultMQPushConsumer.getPullBatchSize(), // 5
- sysFlag, // 6
- commitOffsetValue, // 7
- BROKER_SUSPEND_MAX_TIME_MILLIS, // 8
- CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 9
- CommunicationMode.ASYNC, // 10
- pullCallback// 11
+ pullRequest.getMessageQueue(), // 1
+ subExpression, // 2
+ subscriptionData.getSubVersion(), // 3
+ pullRequest.getNextOffset(), // 4
+ this.defaultMQPushConsumer.getPullBatchSize(), // 5
+ sysFlag, // 6
+ commitOffsetValue, // 7
+ BROKER_SUSPEND_MAX_TIME_MILLIS, // 8
+ CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 9
+ CommunicationMode.ASYNC, // 10
+ pullCallback// 11
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
@@ -416,9 +424,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private void makeSureStateOK() throws MQClientException {
if (this.serviceState != ServiceState.RUNNING) {
throw new MQClientException("The consumer service state not OK, "//
- + this.serviceState//
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
- null);
+ + this.serviceState//
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
+ null);
}
}
@@ -453,16 +461,15 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
- throws MQClientException, InterruptedException {
+ throws MQClientException, InterruptedException {
return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end);
}
public MessageExt queryMessageByUniqKey(String topic, String uniqKey) throws MQClientException,
- InterruptedException {
+ InterruptedException {
return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey);
}
-
public void registerMessageListener(MessageListener messageListener) {
this.messageListenerInner = messageListener;
}
@@ -474,12 +481,12 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
- : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
+ : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
- this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
+ this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
@@ -532,7 +539,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
switch (this.serviceState) {
case CREATE_JUST:
log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
- this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
+ this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
@@ -551,8 +558,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(
- mQClientFactory,
- this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
+ mQClientFactory,
+ this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
@@ -574,11 +581,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
- new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
+ new ConsumeMessageOrderlyService(this, (MessageListenerOrderly)this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
- new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
+ new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently)this.getMessageListenerInner());
}
this.consumeMessageService.start();
@@ -588,8 +595,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
- + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
- null);
+ + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
+ null);
}
mQClientFactory.start();
@@ -600,9 +607,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "//
- + this.serviceState//
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
- null);
+ + this.serviceState//
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
+ null);
default:
break;
}
@@ -619,133 +626,133 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
if (null == this.defaultMQPushConsumer.getConsumerGroup()) {
throw new MQClientException(
- "consumerGroup is null"
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
+ "consumerGroup is null"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
}
if (this.defaultMQPushConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) {
throw new MQClientException(
- "consumerGroup can not equal "
- + MixAll.DEFAULT_CONSUMER_GROUP
- + ", please specify another one."
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
+ "consumerGroup can not equal "
+ + MixAll.DEFAULT_CONSUMER_GROUP
+ + ", please specify another one."
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
}
if (null == this.defaultMQPushConsumer.getMessageModel()) {
throw new MQClientException(
- "messageModel is null"
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
+ "messageModel is null"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
}
if (null == this.defaultMQPushConsumer.getConsumeFromWhere()) {
throw new MQClientException(
- "consumeFromWhere is null"
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
+ "consumeFromWhere is null"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
}
Date dt = UtilAll.parseDate(this.defaultMQPushConsumer.getConsumeTimestamp(), UtilAll.YYYY_MMDD_HHMMSS);
if (null == dt) {
throw new MQClientException(
- "consumeTimestamp is invalid, YYYY_MMDD_HHMMSS"
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
+ "consumeTimestamp is invalid, YYYY_MMDD_HHMMSS"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
}
// allocateMessageQueueStrategy
if (null == this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()) {
throw new MQClientException(
- "allocateMessageQueueStrategy is null"
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
+ "allocateMessageQueueStrategy is null"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
}
// subscription
if (null == this.defaultMQPushConsumer.getSubscription()) {
throw new MQClientException(
- "subscription is null"
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
+ "subscription is null"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
}
// messageListener
if (null == this.defaultMQPushConsumer.getMessageListener()) {
throw new MQClientException(
- "messageListener is null"
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
+ "messageListener is null"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
}
boolean orderly = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerOrderly;
boolean concurrently = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerConcurrently;
if (!orderly && !concurrently) {
throw new MQClientException(
- "messageListener must be instanceof MessageListenerOrderly or MessageListenerConcurrently"
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
+ "messageListener must be instanceof MessageListenerOrderly or MessageListenerConcurrently"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
}
// consumeThreadMin
if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1
- || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000
- || this.defaultMQPushConsumer.getConsumeThreadMin() > this.defaultMQPushConsumer.getConsumeThreadMax()) {
+ || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000
+ || this.defaultMQPushConsumer.getConsumeThreadMin() > this.defaultMQPushConsumer.getConsumeThreadMax()) {
throw new MQClientException(
- "consumeThreadMin Out of range [1, 1000]"
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
+ "consumeThreadMin Out of range [1, 1000]"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
}
// consumeThreadMax
if (this.defaultMQPushConsumer.getConsumeThreadMax() < 1 || this.defaultMQPushConsumer.getConsumeThreadMax() > 1000) {
throw new MQClientException(
- "consumeThreadMax Out of range [1, 1000]"
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
+ "consumeThreadMax Out of range [1, 1000]"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
}
// consumeConcurrentlyMaxSpan
if (this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() < 1
- || this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 65535) {
+ || this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 65535) {
throw new MQClientException(
- "consumeConcurrentlyMaxSpan Out of range [1, 65535]"
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
+ "consumeConcurrentlyMaxSpan Out of range [1, 65535]"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
}
// pullThresholdForQueue
if (this.defaultMQPushConsumer.getPullThresholdForQueue() < 1 || this.defaultMQPushConsumer.getPullThresholdForQueue() > 65535) {
throw new MQClientException(
- "pullThresholdForQueue Out of range [1, 65535]"
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
+ "pullThresholdForQueue Out of range [1, 65535]"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
}
// pullInterval
if (this.defaultMQPushConsumer.getPullInterval() < 0 || this.defaultMQPushConsumer.getPullInterval() > 65535) {
throw new MQClientException(
- "pullInterval Out of range [0, 65535]"
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
+ "pullInterval Out of range [0, 65535]"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
}
// consumeMessageBatchMaxSize
if (this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() < 1
- || this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() > 1024) {
+ || this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() > 1024) {
throw new MQClientException(
- "consumeMessageBatchMaxSize Out of range [1, 1024]"
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
+ "consumeMessageBatchMaxSize Out of range [1, 1024]"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
}
// pullBatchSize
if (this.defaultMQPushConsumer.getPullBatchSize() < 1 || this.defaultMQPushConsumer.getPullBatchSize() > 1024) {
throw new MQClientException(
- "pullBatchSize Out of range [1, 1024]"
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
- null);
+ "pullBatchSize Out of range [1, 1024]"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
}
}
@@ -757,7 +764,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
final String topic = entry.getKey();
final String subString = entry.getValue();
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
- topic, subString);
+ topic, subString);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
}
@@ -772,7 +779,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
case CLUSTERING:
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
- retryTopic, SubscriptionData.SUB_ALL);
+ retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
@@ -804,7 +811,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void subscribe(String topic, String subExpression) throws MQClientException {
try {
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
- topic, subExpression);
+ topic, subExpression);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
@@ -817,7 +824,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {
try {
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
- topic, "*");
+ topic, "*");
subscriptionData.setSubString(fullClassName);
subscriptionData.setClassFilterMode(true);
subscriptionData.setFilterClassSource(filterClassSource);
@@ -865,7 +872,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
public void resetOffsetByTimeStamp(long timeStamp)
- throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+ throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
for (String topic : rebalanceImpl.getSubscriptionInner().keySet()) {
Set<MessageQueue> mqs = rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
@@ -1017,9 +1024,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
long computeAccTotal = this.computeAccumulationTotal();
long adjustThreadPoolNumsThreshold = this.defaultMQPushConsumer.getAdjustThreadPoolNumsThreshold();
- long incThreshold = (long) (adjustThreadPoolNumsThreshold * 1.0);
+ long incThreshold = (long)(adjustThreadPoolNumsThreshold * 1.0);
- long decThreshold = (long) (adjustThreadPoolNumsThreshold * 0.8);
+ long decThreshold = (long)(adjustThreadPoolNumsThreshold * 0.8);
if (computeAccTotal >= incThreshold) {
this.consumeMessageService.incCorePoolSize();
@@ -1044,7 +1051,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
public List<QueueTimeSpan> queryConsumeTimeSpan(final String topic)
- throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+ throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
List<QueueTimeSpan> queueTimeSpan = new ArrayList<QueueTimeSpan>();
TopicRouteData routeData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, 3000);
for (BrokerData brokerData : routeData.getBrokerDatas()) {
@@ -1055,12 +1062,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
return queueTimeSpan;
}
-
public ConsumeMessageService getConsumeMessageService() {
return consumeMessageService;
}
-
public void setConsumeMessageService(ConsumeMessageService consumeMessageService) {
this.consumeMessageService = consumeMessageService;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java
index b1a2a25..ce4f2b9 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java
@@ -6,16 +6,17 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
+import java.util.Set;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
@@ -23,9 +24,6 @@ import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import java.util.Set;
-
-
/**
* Consumer inner interface
*
@@ -33,33 +31,23 @@ import java.util.Set;
public interface MQConsumerInner {
String groupName();
-
MessageModel messageModel();
-
ConsumeType consumeType();
-
ConsumeFromWhere consumeFromWhere();
-
Set<SubscriptionData> subscriptions();
-
void doRebalance();
-
void persistConsumerOffset();
-
void updateTopicSubscribeInfo(final String topic, final Set<MessageQueue> info);
-
boolean isSubscribeTopicNeedUpdate(final String topic);
-
boolean isUnitMode();
-
ConsumerRunningInfo consumerRunningInfo();
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java
index 1e573c3..47ae2b0 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java
@@ -6,20 +6,18 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
-import org.apache.rocketmq.common.message.MessageQueue;
-
import java.util.concurrent.ConcurrentHashMap;
-
+import org.apache.rocketmq.common.message.MessageQueue;
/**
* Message lock,strictly ensure the single queue only one thread at a time consuming
@@ -27,8 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class MessageQueueLock {
private ConcurrentHashMap<MessageQueue, Object> mqLockTable =
- new ConcurrentHashMap<MessageQueue, Object>();
-
+ new ConcurrentHashMap<MessageQueue, Object>();
public Object fetchLockObject(final MessageQueue mq) {
Object objLock = this.mqLockTable.get(mq);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index f361f1f..2d17703 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -6,24 +6,16 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.common.message.MessageAccessor;
-import org.apache.rocketmq.common.message.MessageConst;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
-import org.slf4j.Logger;
-
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -33,7 +25,13 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
+import org.slf4j.Logger;
/**
* Queue consumption snapshot
@@ -41,7 +39,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
*/
public class ProcessQueue {
public final static long REBALANCE_LOCK_MAX_LIVE_TIME =
- Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockMaxLiveTime", "30000"));
+ Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockMaxLiveTime", "30000"));
public final static long REBALANCE_LOCK_INTERVAL = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));
private final static long PULL_MAX_IDLE_TIME = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000"));
private final Logger log = ClientLogger.getLog();
@@ -65,7 +63,6 @@ public class ProcessQueue {
return result;
}
-
public boolean isPullExpired() {
boolean result = (System.currentTimeMillis() - this.lastPullTimestamp) > PULL_MAX_IDLE_TIME;
return result;
@@ -80,7 +77,7 @@ public class ProcessQueue {
if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
return;
}
-
+
int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
for (int i = 0; i < loop; i++) {
MessageExt msg = null;
@@ -126,7 +123,6 @@ public class ProcessQueue {
}
}
-
public boolean putMessage(final List<MessageExt> msgs) {
boolean dispatchToConsume = false;
try {
@@ -167,7 +163,6 @@ public class ProcessQueue {
return dispatchToConsume;
}
-
public long getMaxSpan() {
try {
this.lockTreeMap.readLock().lockInterruptibly();
@@ -185,7 +180,6 @@ public class ProcessQueue {
return 0;
}
-
public long removeMessage(final List<MessageExt> msgs) {
long result = -1;
final long now = System.currentTimeMillis();
@@ -218,22 +212,18 @@ public class ProcessQueue {
return result;
}
-
public TreeMap<Long, MessageExt> getMsgTreeMap() {
return msgTreeMap;
}
-
public AtomicLong getMsgCount() {
return msgCount;
}
-
public boolean isDropped() {
return dropped;
}
-
public void setDropped(boolean dropped) {
this.dropped = dropped;
}
@@ -260,7 +250,6 @@ public class ProcessQueue {
}
}
-
public long commit() {
try {
this.lockTreeMap.writeLock().lockInterruptibly();
@@ -281,7 +270,6 @@ public class ProcessQueue {
return -1;
}
-
public void makeMessageToCosumeAgain(List<MessageExt> msgs) {
try {
this.lockTreeMap.writeLock().lockInterruptibly();
@@ -298,7 +286,6 @@ public class ProcessQueue {
}
}
-
public List<MessageExt> takeMessags(final int batchSize) {
List<MessageExt> result = new ArrayList<MessageExt>(batchSize);
final long now = System.currentTimeMillis();
@@ -331,7 +318,6 @@ public class ProcessQueue {
return result;
}
-
public boolean hasTempMessage() {
try {
this.lockTreeMap.readLock().lockInterruptibly();
@@ -346,7 +332,6 @@ public class ProcessQueue {
return true;
}
-
public void clear() {
try {
this.lockTreeMap.writeLock().lockInterruptibly();
@@ -363,52 +348,42 @@ public class ProcessQueue {
}
}
-
public long getLastLockTimestamp() {
return lastLockTimestamp;
}
-
public void setLastLockTimestamp(long lastLockTimestamp) {
this.lastLockTimestamp = lastLockTimestamp;
}
-
public Lock getLockConsume() {
return lockConsume;
}
-
public long getLastPullTimestamp() {
return lastPullTimestamp;
}
-
public void setLastPullTimestamp(long lastPullTimestamp) {
this.lastPullTimestamp = lastPullTimestamp;
}
-
public long getMsgAccCnt() {
return msgAccCnt;
}
-
public void setMsgAccCnt(long msgAccCnt) {
this.msgAccCnt = msgAccCnt;
}
-
public long getTryUnlockTimes() {
return this.tryUnlockTimes.get();
}
-
public void incTryUnlockTimes() {
this.tryUnlockTimes.incrementAndGet();
}
-
public void fillProcessQueueInfo(final ProcessQueueInfo info) {
try {
this.lockTreeMap.readLock().lockInterruptibly();
@@ -438,12 +413,10 @@ public class ProcessQueue {
}
}
-
public long getLastConsumeTimestamp() {
return lastConsumeTimestamp;
}
-
public void setLastConsumeTimestamp(long lastConsumeTimestamp) {
this.lastConsumeTimestamp = lastConsumeTimestamp;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
index 59c9b1c..d358175 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
@@ -16,6 +16,12 @@
*/
package org.apache.rocketmq.client.impl.consumer;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
@@ -28,7 +34,11 @@ import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.message.*;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
@@ -36,21 +46,13 @@ import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-
public class PullAPIWrapper {
private final Logger log = ClientLogger.getLog();
private final MQClientInstance mQClientFactory;
private final String consumerGroup;
private final boolean unitMode;
private ConcurrentHashMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
- new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
+ new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
private volatile boolean connectBrokerByUser = false;
private volatile long defaultBrokerId = MixAll.MASTER_ID;
private Random random = new Random(System.currentTimeMillis());
@@ -63,8 +65,8 @@ public class PullAPIWrapper {
}
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
- final SubscriptionData subscriptionData) {
- PullResultExt pullResultExt = (PullResultExt) pullResult;
+ final SubscriptionData subscriptionData) {
+ PullResultExt pullResultExt = (PullResultExt)pullResult;
this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
if (PullStatus.FOUND == pullResult.getPullStatus()) {
@@ -92,9 +94,9 @@ public class PullAPIWrapper {
for (MessageExt msg : msgListFilterAgain) {
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
- Long.toString(pullResult.getMinOffset()));
+ Long.toString(pullResult.getMinOffset()));
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
- Long.toString(pullResult.getMaxOffset()));
+ Long.toString(pullResult.getMaxOffset()));
}
pullResultExt.setMsgFoundList(msgListFilterAgain);
@@ -131,26 +133,26 @@ public class PullAPIWrapper {
}
public PullResult pullKernelImpl(
- final MessageQueue mq,
- final String subExpression,
- final long subVersion,
- final long offset,
- final int maxNums,
- final int sysFlag,
- final long commitOffset,
- final long brokerSuspendMaxTimeMillis,
- final long timeoutMillis,
- final CommunicationMode communicationMode,
- final PullCallback pullCallback
+ final MessageQueue mq,
+ final String subExpression,
+ final long subVersion,
+ final long offset,
+ final int maxNums,
+ final int sysFlag,
+ final long commitOffset,
+ final long brokerSuspendMaxTimeMillis,
+ final long timeoutMillis,
+ final CommunicationMode communicationMode,
+ final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
FindBrokerResult findBrokerResult =
- this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
- this.recalculatePullFromWhichNode(mq), false);
+ this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
+ this.recalculatePullFromWhichNode(mq), false);
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult =
- this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
- this.recalculatePullFromWhichNode(mq), false);
+ this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
+ this.recalculatePullFromWhichNode(mq), false);
}
if (findBrokerResult != null) {
@@ -178,11 +180,11 @@ public class PullAPIWrapper {
}
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
- brokerAddr,
- requestHeader,
- timeoutMillis,
- communicationMode,
- pullCallback);
+ brokerAddr,
+ requestHeader,
+ timeoutMillis,
+ communicationMode,
+ pullCallback);
return pullResult;
}
@@ -204,7 +206,7 @@ public class PullAPIWrapper {
}
private String computPullFromWhichFilterServer(final String topic, final String brokerAddr)
- throws MQClientException {
+ throws MQClientException {
ConcurrentHashMap<String, TopicRouteData> topicRouteTable = this.mQClientFactory.getTopicRouteTable();
if (topicRouteTable != null) {
TopicRouteData topicRouteData = topicRouteTable.get(topic);
@@ -216,13 +218,18 @@ public class PullAPIWrapper {
}
throw new MQClientException("Find Filter Server Failed, Broker Addr: " + brokerAddr + " topic: "
- + topic, null);
+ + topic, null);
}
public boolean isConnectBrokerByUser() {
return connectBrokerByUser;
}
+ public void setConnectBrokerByUser(boolean connectBrokerByUser) {
+ this.connectBrokerByUser = connectBrokerByUser;
+
+ }
+
public int randomNum() {
int value = random.nextInt();
if (value < 0) {
@@ -233,11 +240,6 @@ public class PullAPIWrapper {
return value;
}
- public void setConnectBrokerByUser(boolean connectBrokerByUser) {
- this.connectBrokerByUser = connectBrokerByUser;
-
- }
-
public void registerFilterMessageHook(ArrayList<FilterMessageHook> filterMessageHookList) {
this.filterMessageHookList = filterMessageHookList;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
index 4634c24..55e3d59 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
@@ -6,35 +6,37 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.ServiceThread;
import org.slf4j.Logger;
-import java.util.concurrent.*;
-
-
public class PullMessageService extends ServiceThread {
private final Logger log = ClientLogger.getLog();
private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
private final MQClientInstance mQClientFactory;
private final ScheduledExecutorService scheduledExecutorService = Executors
- .newSingleThreadScheduledExecutor(new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "PullMessageServiceScheduledThread");
- }
- });
+ .newSingleThreadScheduledExecutor(new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "PullMessageServiceScheduledThread");
+ }
+ });
public PullMessageService(MQClientInstance mQClientFactory) {
this.mQClientFactory = mQClientFactory;
@@ -69,14 +71,13 @@ public class PullMessageService extends ServiceThread {
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
- DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
+ DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl)consumer;
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}
-
@Override
public void run() {
log.info(this.getServiceName() + " service started");
@@ -96,11 +97,9 @@ public class PullMessageService extends ServiceThread {
log.info(this.getServiceName() + " service end");
}
-
@Override
public String getServiceName() {
return PullMessageService.class.getSimpleName();
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java
index ccc624b..4850313 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java
@@ -6,19 +6,18 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
import org.apache.rocketmq.common.message.MessageQueue;
-
public class PullRequest {
private String consumerGroup;
private MessageQueue messageQueue;
@@ -38,27 +37,22 @@ public class PullRequest {
return consumerGroup;
}
-
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
-
public MessageQueue getMessageQueue() {
return messageQueue;
}
-
public void setMessageQueue(MessageQueue messageQueue) {
this.messageQueue = messageQueue;
}
-
public long getNextOffset() {
return nextOffset;
}
-
public void setNextOffset(long nextOffset) {
this.nextOffset = nextOffset;
}
@@ -80,7 +74,7 @@ public class PullRequest {
return false;
if (getClass() != obj.getClass())
return false;
- PullRequest other = (PullRequest) obj;
+ PullRequest other = (PullRequest)obj;
if (consumerGroup == null) {
if (other.consumerGroup != null)
return false;
@@ -97,14 +91,13 @@ public class PullRequest {
@Override
public String toString() {
return "PullRequest [consumerGroup=" + consumerGroup + ", messageQueue=" + messageQueue
- + ", nextOffset=" + nextOffset + "]";
+ + ", nextOffset=" + nextOffset + "]";
}
public ProcessQueue getProcessQueue() {
return processQueue;
}
-
public void setProcessQueue(ProcessQueue processQueue) {
this.processQueue = processQueue;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullResultExt.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullResultExt.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullResultExt.java
index d248603..c43c9a9 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullResultExt.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullResultExt.java
@@ -6,46 +6,40 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
+import java.util.List;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.common.message.MessageExt;
-import java.util.List;
-
-
public class PullResultExt extends PullResult {
private final long suggestWhichBrokerId;
private byte[] messageBinary;
-
public PullResultExt(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset,
- List<MessageExt> msgFoundList, final long suggestWhichBrokerId, final byte[] messageBinary) {
+ List<MessageExt> msgFoundList, final long suggestWhichBrokerId, final byte[] messageBinary) {
super(pullStatus, nextBeginOffset, minOffset, maxOffset, msgFoundList);
this.suggestWhichBrokerId = suggestWhichBrokerId;
this.messageBinary = messageBinary;
}
-
public byte[] getMessageBinary() {
return messageBinary;
}
-
public void setMessageBinary(byte[] messageBinary) {
this.messageBinary = messageBinary;
}
-
public long getSuggestWhichBrokerId() {
return suggestWhichBrokerId;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index 53d775f..91bfd1a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -16,6 +16,16 @@
*/
package org.apache.rocketmq.client.impl.consumer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
@@ -29,30 +39,23 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.slf4j.Logger;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
-
/**
* Base class for rebalance algorithm
- *
*/
public abstract class RebalanceImpl {
protected static final Logger log = ClientLogger.getLog();
protected final ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
protected final ConcurrentHashMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =
- new ConcurrentHashMap<String, Set<MessageQueue>>();
+ new ConcurrentHashMap<String, Set<MessageQueue>>();
protected final ConcurrentHashMap<String /* topic */, SubscriptionData> subscriptionInner =
- new ConcurrentHashMap<String, SubscriptionData>();
+ new ConcurrentHashMap<String, SubscriptionData>();
protected String consumerGroup;
protected MessageModel messageModel;
protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
protected MQClientInstance mQClientFactory;
-
public RebalanceImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy,
- MQClientInstance mQClientFactory) {
+ MQClientInstance mQClientFactory) {
this.consumerGroup = consumerGroup;
this.messageModel = messageModel;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
@@ -70,9 +73,9 @@ public abstract class RebalanceImpl {
try {
this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000, oneway);
log.warn("unlock messageQueue. group:{}, clientId:{}, mq:{}", //
- this.consumerGroup, //
- this.mQClientFactory.getClientId(), //
- mq);
+ this.consumerGroup, //
+ this.mQClientFactory.getClientId(), //
+ mq);
} catch (Exception e) {
log.error("unlockBatchMQ exception, " + mq, e);
}
@@ -138,7 +141,7 @@ public abstract class RebalanceImpl {
try {
Set<MessageQueue> lockedMq =
- this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
+ this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
for (MessageQueue mmqq : lockedMq) {
ProcessQueue processQueue = this.processQueueTable.get(mmqq);
if (processQueue != null) {
@@ -149,9 +152,9 @@ public abstract class RebalanceImpl {
boolean lockOK = lockedMq.contains(mq);
log.info("the message queue lock {}, {} {}",
- lockOK ? "OK" : "Failed",
- this.consumerGroup,
- mq);
+ lockOK ? "OK" : "Failed",
+ this.consumerGroup,
+ mq);
return lockOK;
} catch (Exception e) {
log.error("lockBatchMQ exception, " + mq, e);
@@ -182,7 +185,7 @@ public abstract class RebalanceImpl {
try {
Set<MessageQueue> lockOKMQSet =
- this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
+ this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
for (MessageQueue mq : lockOKMQSet) {
ProcessQueue processQueue = this.processQueueTable.get(mq);
@@ -242,10 +245,10 @@ public abstract class RebalanceImpl {
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}", //
- consumerGroup, //
- topic, //
- mqSet, //
- mqSet);
+ consumerGroup, //
+ topic, //
+ mqSet, //
+ mqSet);
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
@@ -277,13 +280,13 @@ public abstract class RebalanceImpl {
List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(//
- this.consumerGroup, //
- this.mQClientFactory.getClientId(), //
- mqAll, //
- cidAll);
+ this.consumerGroup, //
+ this.mQClientFactory.getClientId(), //
+ mqAll, //
+ cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
- e);
+ e);
return;
}
@@ -295,9 +298,9 @@ public abstract class RebalanceImpl {
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info(
- "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
- strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
- allocateResultSet.size(), allocateResultSet);
+ "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
+ strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
+ allocateResultSet.size(), allocateResultSet);
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
@@ -350,7 +353,7 @@ public abstract class RebalanceImpl {
it.remove();
changed = true;
log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
- consumerGroup, mq);
+ consumerGroup, mq);
}
break;
default:
@@ -422,52 +425,42 @@ public abstract class RebalanceImpl {
return processQueueTable;
}
-
public ConcurrentHashMap<String, Set<MessageQueue>> getTopicSubscribeInfoTable() {
return topicSubscribeInfoTable;
}
-
public String getConsumerGroup() {
return consumerGroup;
}
-
public void setConsumerGroup(String consumerGroup) {
this.consumerGroup = consumerGroup;
}
-
public MessageModel getMessageModel() {
return messageModel;
}
-
public void setMessageModel(MessageModel messageModel) {
this.messageModel = messageModel;
}
-
public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
return allocateMessageQueueStrategy;
}
-
public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
}
-
public MQClientInstance getmQClientFactory() {
return mQClientFactory;
}
-
public void setmQClientFactory(MQClientInstance mQClientFactory) {
this.mQClientFactory = mQClientFactory;
}
-
public void destroy() {
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
index d37090d..1130943 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
@@ -6,16 +6,18 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
+import java.util.List;
+import java.util.Set;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
@@ -23,21 +25,15 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
-import java.util.List;
-import java.util.Set;
-
-
public class RebalancePullImpl extends RebalanceImpl {
private final DefaultMQPullConsumerImpl defaultMQPullConsumerImpl;
-
public RebalancePullImpl(DefaultMQPullConsumerImpl defaultMQPullConsumerImpl) {
this(null, null, null, null, defaultMQPullConsumerImpl);
}
-
public RebalancePullImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy,
- MQClientInstance mQClientFactory, DefaultMQPullConsumerImpl defaultMQPullConsumerImpl) {
+ MQClientInstance mQClientFactory, DefaultMQPullConsumerImpl defaultMQPullConsumerImpl) {
super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory);
this.defaultMQPullConsumerImpl = defaultMQPullConsumerImpl;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index 4d0d47f..707b9a1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -6,16 +6,19 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
@@ -28,23 +31,16 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-
public class RebalancePushImpl extends RebalanceImpl {
private final static long UNLOCK_DELAY_TIME_MILLS = Long.parseLong(System.getProperty("rocketmq.client.unlockDelayTimeMills", "20000"));
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
-
public RebalancePushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
this(null, null, null, null, defaultMQPushConsumerImpl);
}
-
public RebalancePushImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy,
- MQClientInstance mQClientFactory, DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
+ MQClientInstance mQClientFactory, DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory);
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
}
@@ -58,7 +54,7 @@ public class RebalancePushImpl extends RebalanceImpl {
this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
- && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
+ && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
try {
if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
try {
@@ -68,8 +64,8 @@ public class RebalancePushImpl extends RebalanceImpl {
}
} else {
log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", //
- mq, //
- pq.getTryUnlockTimes());
+ mq, //
+ pq.getTryUnlockTimes());
pq.incTryUnlockTimes();
}
@@ -164,7 +160,7 @@ public class RebalancePushImpl extends RebalanceImpl {
} else {
try {
long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
- UtilAll.YYYY_MMDD_HHMMSS).getTime();
+ UtilAll.YYYY_MMDD_HHMMSS).getTime();
result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
} catch (MQClientException e) {
result = -1;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceService.java
index 5b5ab2a..985129e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceService.java
@@ -6,13 +6,13 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
@@ -21,15 +21,14 @@ import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.ServiceThread;
import org.slf4j.Logger;
-
/**
* Rebalance Service
*
*/
public class RebalanceService extends ServiceThread {
private static long waitInterval =
- Long.parseLong(System.getProperty(
- "rocketmq.client.rebalance.waitInterval", "20000"));
+ Long.parseLong(System.getProperty(
+ "rocketmq.client.rebalance.waitInterval", "20000"));
private final Logger log = ClientLogger.getLog();
private final MQClientInstance mqClientFactory;
@@ -49,7 +48,6 @@ public class RebalanceService extends ServiceThread {
log.info(this.getServiceName() + " service end");
}
-
@Override
public String getServiceName() {
return RebalanceService.class.getSimpleName();