You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/08/29 12:30:35 UTC
[07/28] incubator-rocketmq git commit: Remove unused class
GetRouteInfoResponseHeader and meaningless comments
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 9bf34be..7eda7c1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -297,10 +297,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
- DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
- pullResult.getMsgFoundList(), //
- processQueue, //
- pullRequest.getMessageQueue(), //
+ DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
+ pullResult.getMsgFoundList(),
+ processQueue,
+ pullRequest.getMessageQueue(),
dispathToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
@@ -311,12 +311,12 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
}
- if (pullResult.getNextBeginOffset() < prevRequestOffset//
+ if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
- "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", //
- pullResult.getNextBeginOffset(), //
- firstMsgOffset, //
+ "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
+ pullResult.getNextBeginOffset(),
+ firstMsgOffset,
prevRequestOffset);
}
@@ -336,7 +336,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:
- log.warn("the pull request offset illegal, {} {}", //
+ log.warn("the pull request offset illegal, {} {}",
pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
@@ -396,26 +396,26 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
classFilter = sd.isClassFilterMode();
}
- int sysFlag = PullSysFlag.buildSysFlag(//
+ int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
try {
- this.pullAPIWrapper.pullKernelImpl(//
- pullRequest.getMessageQueue(), // 1
- subExpression, // 2
- subscriptionData.getExpressionType(), // 3
- subscriptionData.getSubVersion(), // 4
- pullRequest.getNextOffset(), // 5
- this.defaultMQPushConsumer.getPullBatchSize(), // 6
- sysFlag, // 7
- commitOffsetValue, // 8
- BROKER_SUSPEND_MAX_TIME_MILLIS, // 9
- CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 10
- CommunicationMode.ASYNC, // 11
- pullCallback // 12
+ this.pullAPIWrapper.pullKernelImpl(
+ pullRequest.getMessageQueue(),
+ subExpression,
+ subscriptionData.getExpressionType(),
+ subscriptionData.getSubVersion(),
+ pullRequest.getNextOffset(),
+ this.defaultMQPushConsumer.getPullBatchSize(),
+ sysFlag,
+ commitOffsetValue,
+ BROKER_SUSPEND_MAX_TIME_MILLIS,
+ CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
+ CommunicationMode.ASYNC,
+ pullCallback
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
@@ -425,8 +425,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
private void makeSureStateOK() throws MQClientException {
if (this.serviceState != ServiceState.RUNNING) {
- throw new MQClientException("The consumer service state not OK, "//
- + this.serviceState//
+ throw new MQClientException("The consumer service state not OK, "
+ + this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
}
@@ -608,8 +608,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
- throw new MQClientException("The PushConsumer service state not OK, maybe started once, "//
- + this.serviceState//
+ throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ + this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
@@ -764,7 +764,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
for (final Map.Entry<String, String> entry : sub.entrySet()) {
final String topic = entry.getKey();
final String subString = entry.getValue();
- SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
+ SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic, subString);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
@@ -779,7 +779,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
break;
case CLUSTERING:
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
- SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
+ SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
@@ -811,7 +811,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void subscribe(String topic, String subExpression) throws MQClientException {
try {
- SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
+ SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic, subExpression);
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
if (this.mQClientFactory != null) {
@@ -824,7 +824,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {
try {
- SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
+ SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic, "*");
subscriptionData.setSubString(fullClassName);
subscriptionData.setClassFilterMode(true);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index 634e0f0..ef27ff8 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -73,9 +73,9 @@ public abstract class RebalanceImpl {
try {
this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000, oneway);
- log.warn("unlock messageQueue. group:{}, clientId:{}, mq:{}", //
- this.consumerGroup, //
- this.mQClientFactory.getClientId(), //
+ log.warn("unlock messageQueue. group:{}, clientId:{}, mq:{}",
+ this.consumerGroup,
+ this.mQClientFactory.getClientId(),
mq);
} catch (Exception e) {
log.error("unlockBatchMQ exception, " + mq, e);
@@ -245,10 +245,10 @@ public abstract class RebalanceImpl {
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
- log.info("messageQueueChanged {} {} {} {}", //
- consumerGroup, //
- topic, //
- mqSet, //
+ log.info("messageQueueChanged {} {} {} {}",
+ consumerGroup,
+ topic,
+ mqSet,
mqSet);
}
} else {
@@ -280,10 +280,10 @@ public abstract class RebalanceImpl {
List<MessageQueue> allocateResult = null;
try {
- allocateResult = strategy.allocate(//
- this.consumerGroup, //
- this.mQClientFactory.getClientId(), //
- mqAll, //
+ allocateResult = strategy.allocate(
+ this.consumerGroup,
+ this.mQClientFactory.getClientId(),
+ mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index 112bcee..2f4f745 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -74,8 +74,8 @@ public class RebalancePushImpl extends RebalanceImpl {
pq.getLockConsume().unlock();
}
} else {
- log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", //
- mq, //
+ log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}",
+ mq,
pq.getTryUnlockTimes());
pq.incTryUnlockTimes();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index f146be9..6ef594b 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -148,10 +148,10 @@ public class MQClientInstance {
this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
- log.info("created a new client Instance, FactoryIndex: {} ClinetID: {} {} {}, serializeType={}", //
- this.instanceIndex, //
- this.clientId, //
- this.clientConfig, //
+ log.info("created a new client Instance, FactoryIndex: {} ClinetID: {} {} {}, serializeType={}",
+ this.instanceIndex,
+ this.clientId,
+ this.clientConfig,
MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
}
@@ -727,13 +727,13 @@ public class MQClientInstance {
classBody = filterClassSource.getBytes(MixAll.DEFAULT_CHARSET);
classCRC = UtilAll.crc32(classBody);
} catch (Exception e1) {
- log.warn("uploadFilterClassToAllFilterServer Exception, ClassName: {} {}", //
- fullClassName, //
+ log.warn("uploadFilterClassToAllFilterServer Exception, ClassName: {} {}",
+ fullClassName,
RemotingHelper.exceptionSimpleDesc(e1));
}
TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
- if (topicRouteData != null //
+ if (topicRouteData != null
&& topicRouteData.getFilterServerTable() != null && !topicRouteData.getFilterServerTable().isEmpty()) {
Iterator<Entry<String, List<String>>> it = topicRouteData.getFilterServerTable().entrySet().iterator();
while (it.hasNext()) {
@@ -1006,10 +1006,10 @@ public class MQClientInstance {
return null;
}
- public FindBrokerResult findBrokerAddressInSubscribe(//
- final String brokerName, //
- final long brokerId, //
- final boolean onlyThisBroker//
+ public FindBrokerResult findBrokerAddressInSubscribe(
+ final String brokerName,
+ final long brokerId,
+ final boolean onlyThisBroker
) {
String brokerAddr = null;
boolean slave = false;
@@ -1102,7 +1102,6 @@ public class MQClientInstance {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
- //
}
Iterator<MessageQueue> iterator = processQueueTable.keySet().iterator();
@@ -1171,8 +1170,8 @@ public class MQClientInstance {
return topicRouteTable;
}
- public ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, //
- final String consumerGroup, //
+ public ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg,
+ final String consumerGroup,
final String brokerName) {
MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);
if (null != mqConsumerInner) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 12f8a36..602fedd 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -116,11 +116,11 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public void initTransactionEnv() {
TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer;
this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
- this.checkExecutor = new ThreadPoolExecutor(//
- producer.getCheckThreadPoolMinSize(), //
- producer.getCheckThreadPoolMaxSize(), //
- 1000 * 60, //
- TimeUnit.MILLISECONDS, //
+ this.checkExecutor = new ThreadPoolExecutor(
+ producer.getCheckThreadPoolMinSize(),
+ producer.getCheckThreadPoolMaxSize(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
this.checkRequestQueue);
}
@@ -172,8 +172,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
- throw new MQClientException("The producer service state not OK, maybe started once, "//
- + this.serviceState//
+ throw new MQClientException("The producer service state not OK, maybe started once, "
+ + this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
@@ -268,18 +268,18 @@ public class DefaultMQProducerImpl implements MQProducerInner {
exception = e;
}
- this.processTransactionState(//
- localTransactionState, //
- group, //
+ this.processTransactionState(
+ localTransactionState,
+ group,
exception);
} else {
log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group);
}
}
- private void processTransactionState(//
- final LocalTransactionState localTransactionState, //
- final String producerGroup, //
+ private void processTransactionState(
+ final LocalTransactionState localTransactionState,
+ final String producerGroup,
final Throwable exception) {
final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
@@ -354,8 +354,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
private void makeSureStateOK() throws MQClientException {
if (this.serviceState != ServiceState.RUNNING) {
- throw new MQClientException("The producer service state not OK, "//
- + this.serviceState//
+ throw new MQClientException("The producer service state not OK, "
+ + this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
}
@@ -428,11 +428,11 @@ public class DefaultMQProducerImpl implements MQProducerInner {
this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
}
- private SendResult sendDefaultImpl(//
- Message msg, //
- final CommunicationMode communicationMode, //
- final SendCallback sendCallback, //
- final long timeout//
+ private SendResult sendDefaultImpl(
+ Message msg,
+ final CommunicationMode communicationMode,
+ final SendCallback sendCallback,
+ final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
@@ -579,11 +579,11 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
}
- private SendResult sendKernelImpl(final Message msg, //
- final MessageQueue mq, //
- final CommunicationMode communicationMode, //
- final SendCallback sendCallback, //
- final TopicPublishInfo topicPublishInfo, //
+ private SendResult sendKernelImpl(final Message msg,
+ final MessageQueue mq,
+ final CommunicationMode communicationMode,
+ final SendCallback sendCallback,
+ final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
@@ -674,18 +674,18 @@ public class DefaultMQProducerImpl implements MQProducerInner {
SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
- sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
- brokerAddr, // 1
- mq.getBrokerName(), // 2
- msg, // 3
- requestHeader, // 4
- timeout, // 5
- communicationMode, // 6
- sendCallback, // 7
- topicPublishInfo, // 8
- this.mQClientFactory, // 9
- this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10
- context, //
+ sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
+ brokerAddr,
+ mq.getBrokerName(),
+ msg,
+ requestHeader,
+ timeout,
+ communicationMode,
+ sendCallback,
+ topicPublishInfo,
+ this.mQClientFactory,
+ this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
+ context,
this);
break;
case ONEWAY:
@@ -887,12 +887,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
return this.sendSelectImpl(msg, selector, arg, CommunicationMode.SYNC, null, timeout);
}
- private SendResult sendSelectImpl(//
- Message msg, //
- MessageQueueSelector selector, //
- Object arg, //
- final CommunicationMode communicationMode, //
- final SendCallback sendCallback, final long timeout//
+ private SendResult sendSelectImpl(
+ Message msg,
+ MessageQueueSelector selector,
+ Object arg,
+ final CommunicationMode communicationMode,
+ final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
@@ -1017,9 +1017,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}
- public void endTransaction(//
- final SendResult sendResult, //
- final LocalTransactionState localTransactionState, //
+ public void endTransaction(
+ final SendResult sendResult,
+ final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
if (sendResult.getOffsetMsgId() != null) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java
index 5b2039e..dfd485d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java
@@ -28,9 +28,9 @@ public interface MQProducerInner {
TransactionCheckListener checkListener();
- void checkTransactionState(//
- final String addr, //
- final MessageExt msg, //
+ void checkTransactionState(
+ final String addr,
+ final MessageExt msg,
final CheckTransactionStateRequestHeader checkRequestHeader);
void updateTopicPublishInfo(final String topic, final TopicPublishInfo info);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
index b85f6f5..4795cce 100644
--- a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
@@ -46,24 +46,14 @@ public class TopicConfig {
public String encode() {
StringBuilder sb = new StringBuilder();
-
- // 1
sb.append(this.topicName);
sb.append(SEPARATOR);
-
- // 2
sb.append(this.readQueueNums);
sb.append(SEPARATOR);
-
- // 3
sb.append(this.writeQueueNums);
sb.append(SEPARATOR);
-
- // 4
sb.append(this.perm);
sb.append(SEPARATOR);
-
- // 5
sb.append(this.topicFilterType);
return sb.toString();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/common/src/main/java/org/apache/rocketmq/common/help/FAQUrl.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/help/FAQUrl.java b/common/src/main/java/org/apache/rocketmq/common/help/FAQUrl.java
index eea0da1..5d950be 100644
--- a/common/src/main/java/org/apache/rocketmq/common/help/FAQUrl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/help/FAQUrl.java
@@ -18,44 +18,44 @@ package org.apache.rocketmq.common.help;
public class FAQUrl {
- public static final String APPLY_TOPIC_URL = //
+ public static final String APPLY_TOPIC_URL =
"http://rocketmq.apache.org/docs/faq/";
- public static final String NAME_SERVER_ADDR_NOT_EXIST_URL = //
+ public static final String NAME_SERVER_ADDR_NOT_EXIST_URL =
"http://rocketmq.apache.org/docs/faq/";
- public static final String GROUP_NAME_DUPLICATE_URL = //
+ public static final String GROUP_NAME_DUPLICATE_URL =
"http://rocketmq.apache.org/docs/faq/";
- public static final String CLIENT_PARAMETER_CHECK_URL = //
+ public static final String CLIENT_PARAMETER_CHECK_URL =
"http://rocketmq.apache.org/docs/faq/";
- public static final String SUBSCRIPTION_GROUP_NOT_EXIST = //
+ public static final String SUBSCRIPTION_GROUP_NOT_EXIST =
"http://rocketmq.apache.org/docs/faq/";
- public static final String CLIENT_SERVICE_NOT_OK = //
+ public static final String CLIENT_SERVICE_NOT_OK =
"http://rocketmq.apache.org/docs/faq/";
// FAQ: No route info of this topic, TopicABC
- public static final String NO_TOPIC_ROUTE_INFO = //
+ public static final String NO_TOPIC_ROUTE_INFO =
"http://rocketmq.apache.org/docs/faq/";
- public static final String LOAD_JSON_EXCEPTION = //
+ public static final String LOAD_JSON_EXCEPTION =
"http://rocketmq.apache.org/docs/faq/";
- public static final String SAME_GROUP_DIFFERENT_TOPIC = //
+ public static final String SAME_GROUP_DIFFERENT_TOPIC =
"http://rocketmq.apache.org/docs/faq/";
- public static final String MQLIST_NOT_EXIST = //
+ public static final String MQLIST_NOT_EXIST =
"http://rocketmq.apache.org/docs/faq/";
- public static final String UNEXPECTED_EXCEPTION_URL = //
+ public static final String UNEXPECTED_EXCEPTION_URL =
"http://rocketmq.apache.org/docs/faq/";
- public static final String SEND_MSG_FAILED = //
+ public static final String SEND_MSG_FAILED =
"http://rocketmq.apache.org/docs/faq/";
- public static final String UNKNOWN_HOST_EXCEPTION = //
+ public static final String UNKNOWN_HOST_EXCEPTION =
"http://rocketmq.apache.org/docs/faq/";
private static final String TIP_STRING_BEGIN = "\nSee ";
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/common/src/main/java/org/apache/rocketmq/common/message/MessageClientIDSetter.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageClientIDSetter.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageClientIDSetter.java
index a1d3ede..d0b202e 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageClientIDSetter.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageClientIDSetter.java
@@ -42,7 +42,7 @@ public class MessageClientIDSetter {
tempBuffer.put(createFakeIP());
}
tempBuffer.position(6);
- tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode()); //4
+ tempBuffer.putInt(MessageClientIDSetter.class.getClassLoader().hashCode());
FIX_STRING = UtilAll.bytes2string(tempBuffer.array());
setStartTime(System.currentTimeMillis());
COUNTER = new AtomicInteger(0);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java
index 41e76fc..d7942eb 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java
@@ -130,15 +130,15 @@ public class ConsumerRunningInfo extends RemotingSerializable {
if (orderMsg) {
if (!pq.isLocked()) {
- sb.append(String.format("%s %s can't lock for a while, %dms%n", //
- clientId, //
- mq, //
+ sb.append(String.format("%s %s can't lock for a while, %dms%n",
+ clientId,
+ mq,
System.currentTimeMillis() - pq.getLastLockTimestamp()));
} else {
if (pq.isDroped() && (pq.getTryUnlockTimes() > 0)) {
- sb.append(String.format("%s %s unlock %d times, still failed%n", //
- clientId, //
- mq, //
+ sb.append(String.format("%s %s unlock %d times, still failed%n",
+ clientId,
+ mq,
pq.getTryUnlockTimes()));
}
}
@@ -147,9 +147,9 @@ public class ConsumerRunningInfo extends RemotingSerializable {
long diff = System.currentTimeMillis() - pq.getLastConsumeTimestamp();
if (diff > (1000 * 60) && pq.getCachedMsgCount() > 0) {
- sb.append(String.format("%s %s can't consume for a while, maybe blocked, %dms%n", //
- clientId, //
- mq, //
+ sb.append(String.format("%s %s can't consume for a while, maybe blocked, %dms%n",
+ clientId,
+ mq,
diff));
}
}
@@ -211,10 +211,10 @@ public class ConsumerRunningInfo extends RemotingSerializable {
int i = 0;
while (it.hasNext()) {
SubscriptionData next = it.next();
- String item = String.format("%03d Topic: %-40s ClassFilter: %-8s SubExpression: %s%n", //
- ++i, //
- next.getTopic(), //
- next.isClassFilterMode(), //
+ String item = String.format("%03d Topic: %-40s ClassFilter: %-8s SubExpression: %s%n",
+ ++i,
+ next.getTopic(),
+ next.isClassFilterMode(),
next.getSubString());
sb.append(item);
@@ -223,20 +223,20 @@ public class ConsumerRunningInfo extends RemotingSerializable {
{
sb.append("\n\n#Consumer Offset#\n");
- sb.append(String.format("%-32s %-32s %-4s %-20s%n", //
- "#Topic", //
- "#Broker Name", //
- "#QID", //
- "#Consumer Offset"//
+ sb.append(String.format("%-32s %-32s %-4s %-20s%n",
+ "#Topic",
+ "#Broker Name",
+ "#QID",
+ "#Consumer Offset"
));
Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = this.mqTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueueInfo> next = it.next();
- String item = String.format("%-32s %-32s %-4d %-20d%n", //
- next.getKey().getTopic(), //
- next.getKey().getBrokerName(), //
- next.getKey().getQueueId(), //
+ String item = String.format("%-32s %-32s %-4d %-20d%n",
+ next.getKey().getTopic(),
+ next.getKey().getBrokerName(),
+ next.getKey().getQueueId(),
next.getValue().getCommitOffset());
sb.append(item);
@@ -245,20 +245,20 @@ public class ConsumerRunningInfo extends RemotingSerializable {
{
sb.append("\n\n#Consumer MQ Detail#\n");
- sb.append(String.format("%-32s %-32s %-4s %-20s%n", //
- "#Topic", //
- "#Broker Name", //
- "#QID", //
- "#ProcessQueueInfo"//
+ sb.append(String.format("%-32s %-32s %-4s %-20s%n",
+ "#Topic",
+ "#Broker Name",
+ "#QID",
+ "#ProcessQueueInfo"
));
Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = this.mqTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueueInfo> next = it.next();
- String item = String.format("%-32s %-32s %-4d %s%n", //
- next.getKey().getTopic(), //
- next.getKey().getBrokerName(), //
- next.getKey().getQueueId(), //
+ String item = String.format("%-32s %-32s %-4d %s%n",
+ next.getKey().getTopic(),
+ next.getKey().getBrokerName(),
+ next.getKey().getQueueId(),
next.getValue().toString());
sb.append(item);
@@ -267,27 +267,27 @@ public class ConsumerRunningInfo extends RemotingSerializable {
{
sb.append("\n\n#Consumer RT&TPS#\n");
- sb.append(String.format("%-32s %14s %14s %14s %14s %18s %25s%n", //
- "#Topic", //
- "#Pull RT", //
- "#Pull TPS", //
- "#Consume RT", //
- "#ConsumeOK TPS", //
- "#ConsumeFailed TPS", //
- "#ConsumeFailedMsgsInHour"//
+ sb.append(String.format("%-32s %14s %14s %14s %14s %18s %25s%n",
+ "#Topic",
+ "#Pull RT",
+ "#Pull TPS",
+ "#Consume RT",
+ "#ConsumeOK TPS",
+ "#ConsumeFailed TPS",
+ "#ConsumeFailedMsgsInHour"
));
Iterator<Entry<String, ConsumeStatus>> it = this.statusTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, ConsumeStatus> next = it.next();
- String item = String.format("%-32s %14.2f %14.2f %14.2f %14.2f %18.2f %25d%n", //
- next.getKey(), //
- next.getValue().getPullRT(), //
- next.getValue().getPullTPS(), //
- next.getValue().getConsumeRT(), //
- next.getValue().getConsumeOKTPS(), //
- next.getValue().getConsumeFailedTPS(), //
- next.getValue().getConsumeFailedMsgs()//
+ String item = String.format("%-32s %14.2f %14.2f %14.2f %14.2f %18.2f %25d%n",
+ next.getKey(),
+ next.getValue().getPullRT(),
+ next.getValue().getPullTPS(),
+ next.getValue().getConsumeRT(),
+ next.getValue().getConsumeOKTPS(),
+ next.getValue().getConsumeFailedTPS(),
+ next.getValue().getConsumeFailedMsgs()
);
sb.append(item);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumeStatsRequestHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumeStatsRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumeStatsRequestHeader.java
index ba6b129..6ba069e 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumeStatsRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumeStatsRequestHeader.java
@@ -27,8 +27,6 @@ public class GetConsumeStatsRequestHeader implements CommandCustomHeader {
@Override
public void checkFields() throws RemotingCommandException {
- // TODO Auto-generated method stub
-
}
public String getConsumerGroup() {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerStatusRequestHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerStatusRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerStatusRequestHeader.java
index 20990a6..ca26a86 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerStatusRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetConsumerStatusRequestHeader.java
@@ -32,7 +32,6 @@ public class GetConsumerStatusRequestHeader implements CommandCustomHeader {
@Override
public void checkFields() throws RemotingCommandException {
- // TODO Auto-generated method stub
}
public String getTopic() {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java
index 222382e..c64381f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeRequestHeader.java
@@ -32,7 +32,6 @@ public class GetEarliestMsgStoretimeRequestHeader implements CommandCustomHeader
@Override
public void checkFields() throws RemotingCommandException {
- // TODO Auto-generated method stub
}
public String getTopic() {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryCorrectionOffsetHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryCorrectionOffsetHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryCorrectionOffsetHeader.java
index 6a998d9..93fa722 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryCorrectionOffsetHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/QueryCorrectionOffsetHeader.java
@@ -33,7 +33,7 @@ public class QueryCorrectionOffsetHeader implements CommandCustomHeader {
@Override
public void checkFields() throws RemotingCommandException {
- // TODO Auto-generated method stub
+
}
public String getFilterGroups() {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java
index 113e46f..3685ef9 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java
@@ -34,7 +34,7 @@ public class SearchOffsetRequestHeader implements CommandCustomHeader {
@Override
public void checkFields() throws RemotingCommandException {
- // TODO Auto-generated method stub
+
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/common/src/main/java/org/apache/rocketmq/common/protocol/header/UnregisterClientRequestHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UnregisterClientRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UnregisterClientRequestHeader.java
index 082329c..95e18d0 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UnregisterClientRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UnregisterClientRequestHeader.java
@@ -57,7 +57,7 @@ public class UnregisterClientRequestHeader implements CommandCustomHeader {
@Override
public void checkFields() throws RemotingCommandException {
- // TODO Auto-generated method stub
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/common/src/main/java/org/apache/rocketmq/common/protocol/header/UnregisterClientResponseHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UnregisterClientResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UnregisterClientResponseHeader.java
index 6ae6929..f61f0cd 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/UnregisterClientResponseHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/UnregisterClientResponseHeader.java
@@ -24,7 +24,7 @@ public class UnregisterClientResponseHeader implements CommandCustomHeader {
@Override
public void checkFields() throws RemotingCommandException {
- // TODO Auto-generated method stub
+
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoResponseHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoResponseHeader.java
deleted file mode 100644
index 64081ea..0000000
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/GetRouteInfoResponseHeader.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * $Id: GetRouteInfoResponseHeader.java 1835 2013-05-16 02:00:50Z vintagewang@apache.org $
- */
-package org.apache.rocketmq.common.protocol.header.namesrv;
-
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-
-public class GetRouteInfoResponseHeader implements CommandCustomHeader {
-
- @Override
- public void checkFields() throws RemotingCommandException {
- // TODO Auto-generated method stub
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterOrderTopicRequestHeader.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterOrderTopicRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterOrderTopicRequestHeader.java
index 93069fe..8307e20 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterOrderTopicRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterOrderTopicRequestHeader.java
@@ -32,7 +32,7 @@ public class RegisterOrderTopicRequestHeader implements CommandCustomHeader {
@Override
public void checkFields() throws RemotingCommandException {
- // TODO Auto-generated method stub
+
}
public String getTopic() {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java b/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java
index 9966a90..8fd8628 100644
--- a/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java
+++ b/common/src/main/java/org/apache/rocketmq/common/sysflag/TopicSysFlag.java
@@ -16,11 +16,7 @@
*/
package org.apache.rocketmq.common.sysflag;
-/**
- *
- *
- */
public class TopicSysFlag {
private final static int FLAG_UNIT = 0x1 << 0;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java b/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java
index 28ead5c..e43ae41 100644
--- a/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java
@@ -114,9 +114,7 @@ public class IOTinyUtils {
fileOrDir.delete();
}
- /**
- */
public static void cleanDirectory(File directory) throws IOException {
if (!directory.exists()) {
String message = directory + " does not exist";
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java
----------------------------------------------------------------------
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java
index c8252d0..9bd9ea1 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java
@@ -35,9 +35,7 @@ public class PushConsumer {
consumer.setConsumeTimestamp("20170422221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
- /**
- */
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/example/src/main/resources/MessageFilterImpl.java
----------------------------------------------------------------------
diff --git a/example/src/main/resources/MessageFilterImpl.java b/example/src/main/resources/MessageFilterImpl.java
index 23e4a79..6cb5d15 100644
--- a/example/src/main/resources/MessageFilterImpl.java
+++ b/example/src/main/resources/MessageFilterImpl.java
@@ -28,7 +28,7 @@ public class MessageFilterImpl implements MessageFilter {
String property = msg.getProperty("SequenceId");
if (property != null) {
int id = Integer.parseInt(property);
- if (((id % 10) == 0) && //
+ if (((id % 10) == 0) &&
(id > 100)) {
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/filter/src/main/java/org/apache/rocketmq/filter/parser/SelectorParser.java
----------------------------------------------------------------------
diff --git a/filter/src/main/java/org/apache/rocketmq/filter/parser/SelectorParser.java b/filter/src/main/java/org/apache/rocketmq/filter/parser/SelectorParser.java
index 74e5501..2948c10 100644
--- a/filter/src/main/java/org/apache/rocketmq/filter/parser/SelectorParser.java
+++ b/filter/src/main/java/org/apache/rocketmq/filter/parser/SelectorParser.java
@@ -56,7 +56,6 @@ public class SelectorParser implements SelectorParserConstants {
// convertStringExpressions = true;
// sql = sql.substring(CONVERT_STRING_EXPRESSIONS_PREFIX.length());
// }
- //
// if( convertStringExpressions ) {
// ComparisonExpression.CONVERT_STRING_EXPRESSIONS.set(true);
// }
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/filter/src/main/java/org/apache/rocketmq/filter/parser/SelectorParser.jj
----------------------------------------------------------------------
diff --git a/filter/src/main/java/org/apache/rocketmq/filter/parser/SelectorParser.jj b/filter/src/main/java/org/apache/rocketmq/filter/parser/SelectorParser.jj
index 5d1a4a7..b533ac1 100644
--- a/filter/src/main/java/org/apache/rocketmq/filter/parser/SelectorParser.jj
+++ b/filter/src/main/java/org/apache/rocketmq/filter/parser/SelectorParser.jj
@@ -82,7 +82,6 @@ public class SelectorParser {
// convertStringExpressions = true;
// sql = sql.substring(CONVERT_STRING_EXPRESSIONS_PREFIX.length());
// }
-//
// if( convertStringExpressions ) {
// ComparisonExpression.CONVERT_STRING_EXPRESSIONS.set(true);
// }
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java
----------------------------------------------------------------------
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java
index be13bd6..376a814 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/kvconfig/KVConfigManager.java
@@ -72,10 +72,10 @@ public class KVConfigManager {
final String prev = kvTable.put(key, value);
if (null != prev) {
- log.info("putKVConfig update config item, Namespace: {} Key: {} Value: {}", //
+ log.info("putKVConfig update config item, Namespace: {} Key: {} Value: {}",
namespace, key, value);
} else {
- log.info("putKVConfig create new config item, Namespace: {} Key: {} Value: {}", //
+ log.info("putKVConfig create new config item, Namespace: {} Key: {} Value: {}",
namespace, key, value);
}
} finally {
@@ -119,7 +119,7 @@ public class KVConfigManager {
HashMap<String, String> kvTable = this.configTable.get(namespace);
if (null != kvTable) {
String value = kvTable.remove(key);
- log.info("deleteKVConfig delete a config item, Namespace: {} Key: {} Value: {}", //
+ log.info("deleteKVConfig delete a config item, Namespace: {} Key: {} Value: {}",
namespace, key, value);
}
} finally {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
----------------------------------------------------------------------
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index 7479fcc..35790c9 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -131,9 +131,9 @@ public class RouteInfoManager {
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
- if (null != topicConfigWrapper //
+ if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
- if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())//
+ if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
index 4ed156d..6e99b32 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
public class NettyDecoder extends LengthFieldBasedFrameDecoder {
private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
- private static final int FRAME_MAX_LENGTH = //
+ private static final int FRAME_MAX_LENGTH =
Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216"));
public NettyDecoder() {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index ba74b53..b66e7de 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -188,7 +188,7 @@ public abstract class NettyRemotingAbstract {
log.error(cmd.toString());
if (!cmd.isOnewayRPC()) {
- final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, //
+ final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
@@ -210,9 +210,9 @@ public abstract class NettyRemotingAbstract {
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
if ((System.currentTimeMillis() % 10000) == 0) {
- log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) //
- + ", too many requests and system thread pool busy, RejectedExecutionException " //
- + pair.getObject2().toString() //
+ log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+ + ", too many requests and system thread pool busy, RejectedExecutionException "
+ + pair.getObject2().toString()
+ " request code: " + cmd.getCode());
}
@@ -422,10 +422,10 @@ public abstract class NettyRemotingAbstract {
throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
} else {
String info =
- String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", //
- timeoutMillis, //
- this.semaphoreAsync.getQueueLength(), //
- this.semaphoreAsync.availablePermits()//
+ String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
+ timeoutMillis,
+ this.semaphoreAsync.getQueueLength(),
+ this.semaphoreAsync.availablePermits()
);
log.warn(info);
throw new RemotingTimeoutException(info);
@@ -459,10 +459,10 @@ public abstract class NettyRemotingAbstract {
throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
} else {
String info = String.format(
- "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", //
- timeoutMillis, //
- this.semaphoreOneway.getQueueLength(), //
- this.semaphoreOneway.availablePermits()//
+ "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
+ timeoutMillis,
+ this.semaphoreOneway.getQueueLength(),
+ this.semaphoreOneway.availablePermits()
);
log.warn(info);
throw new RemotingTimeoutException(info);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index db6a7e4..ecf9ab2 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -92,7 +92,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
this(nettyClientConfig, null);
}
- public NettyRemotingClient(final NettyClientConfig nettyClientConfig, //
+ public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
final ChannelEventListener channelEventListener) {
super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
this.nettyClientConfig = nettyClientConfig;
@@ -130,8 +130,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
@Override
public void start() {
- this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(//
- nettyClientConfig.getClientWorkerThreads(), //
+ this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
+ nettyClientConfig.getClientWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@@ -142,7 +142,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
});
- Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)//
+ Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
index c4354e9..0570c84 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java
@@ -16,11 +16,7 @@
*/
package org.apache.rocketmq.remoting.netty;
-/**
- *
- *
- */
public class NettyServerConfig implements Cloneable {
private int listenPort = 8888;
private int serverWorkerThreads = 8;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
----------------------------------------------------------------------
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
index 52556fc..2e0a81e 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java
@@ -20,23 +20,23 @@ package org.apache.rocketmq.remoting.netty;
public class NettySystemConfig {
public static final String COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE =
"com.rocketmq.remoting.nettyPooledByteBufAllocatorEnable";
- public static final String COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE = //
+ public static final String COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE =
"com.rocketmq.remoting.socket.sndbuf.size";
- public static final String COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE = //
+ public static final String COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE =
"com.rocketmq.remoting.socket.rcvbuf.size";
- public static final String COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE = //
+ public static final String COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE =
"com.rocketmq.remoting.clientAsyncSemaphoreValue";
- public static final String COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE = //
+ public static final String COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE =
"com.rocketmq.remoting.clientOnewaySemaphoreValue";
- public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = //
+ public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE =
Boolean
.parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false"));
- public static final int CLIENT_ASYNC_SEMAPHORE_VALUE = //
+ public static final int CLIENT_ASYNC_SEMAPHORE_VALUE =
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE, "65535"));
- public static final int CLIENT_ONEWAY_SEMAPHORE_VALUE = //
+ public static final int CLIENT_ONEWAY_SEMAPHORE_VALUE =
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE, "65535"));
- public static int socketSndbufSize = //
+ public static int socketSndbufSize =
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE, "65535"));
- public static int socketRcvbufSize = //
+ public static int socketRcvbufSize =
Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE, "65535"));
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 0810d0c..a2cb629 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -125,11 +125,11 @@ public class CommitLog {
return this.mappedFileQueue.remainHowManyDataToFlush();
}
- public int deleteExpiredFile(//
- final long expiredTime, //
- final int deleteFilesInterval, //
- final long intervalForcibly, //
- final boolean cleanImmediately//
+ public int deleteExpiredFile(
+ final long expiredTime,
+ final int deleteFilesInterval,
+ final long intervalForcibly,
+ final boolean cleanImmediately
) {
return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
}
@@ -244,43 +244,30 @@ public class CommitLog {
byte[] bytesContent = new byte[totalSize];
- // 3 BODYCRC
int bodyCRC = byteBuffer.getInt();
- // 4 QUEUEID
int queueId = byteBuffer.getInt();
- // 5 FLAG
int flag = byteBuffer.getInt();
- // 6 QUEUEOFFSET
long queueOffset = byteBuffer.getLong();
- // 7 PHYSICALOFFSET
long physicOffset = byteBuffer.getLong();
- // 8 SYSFLAG
int sysFlag = byteBuffer.getInt();
- // 9 BORNTIMESTAMP
long bornTimeStamp = byteBuffer.getLong();
- // 10
ByteBuffer byteBuffer1 = byteBuffer.get(bytesContent, 0, 8);
- // 11 STORETIMESTAMP
long storeTimestamp = byteBuffer.getLong();
- // 12
ByteBuffer byteBuffer2 = byteBuffer.get(bytesContent, 0, 8);
- // 13 RECONSUMETIMES
int reconsumeTimes = byteBuffer.getInt();
- // 14 Prepared Transaction Offset
long preparedTransactionOffset = byteBuffer.getLong();
- // 15 BODY
int bodyLen = byteBuffer.getInt();
if (bodyLen > 0) {
if (readBody) {
@@ -298,7 +285,6 @@ public class CommitLog {
}
}
- // 16 TOPIC
byte topicLen = byteBuffer.get();
byteBuffer.get(bytesContent, 0, topicLen);
String topic = new String(bytesContent, 0, topicLen, MessageDecoder.CHARSET_UTF8);
@@ -307,7 +293,6 @@ public class CommitLog {
String keys = "";
String uniqKey = null;
- // 17 properties
short propertiesLength = byteBuffer.getShort();
Map<String, String> propertiesMap = null;
if (propertiesLength > 0) {
@@ -355,19 +340,19 @@ public class CommitLog {
return new DispatchRequest(totalSize, false/* success */);
}
- return new DispatchRequest(//
- topic, // 1
- queueId, // 2
- physicOffset, // 3
- totalSize, // 4
- tagsCode, // 5
- storeTimestamp, // 6
- queueOffset, // 7
- keys, // 8
- uniqKey, //9
- sysFlag, // 10
- preparedTransactionOffset, // 11
- propertiesMap // 12
+ return new DispatchRequest(
+ topic,
+ queueId,
+ physicOffset,
+ totalSize,
+ tagsCode,
+ storeTimestamp,
+ queueOffset,
+ keys,
+ uniqKey,
+ sysFlag,
+ preparedTransactionOffset,
+ propertiesMap
);
} catch (Exception e) {
}
@@ -376,24 +361,23 @@ public class CommitLog {
}
private static int calMsgLength(int bodyLength, int topicLength, int propertiesLength) {
- final int msgLen = 4 // 1 TOTALSIZE
- + 4 // 2 MAGICCODE
- + 4 // 3 BODYCRC
- + 4 // 4 QUEUEID
- + 4 // 5 FLAG
- + 8 // 6 QUEUEOFFSET
- + 8 // 7 PHYSICALOFFSET
- + 4 // 8 SYSFLAG
- + 8 // 9 BORNTIMESTAMP
- + 8 // 10 BORNHOST
- + 8 // 11 STORETIMESTAMP
- + 8 // 12 STOREHOSTADDRESS
- + 4 // 13 RECONSUMETIMES
- + 8 // 14 Prepared Transaction Offset
- + 4 + (bodyLength > 0 ? bodyLength : 0) // 14 BODY
- + 1 + topicLength // 15 TOPIC
- + 2 + (propertiesLength > 0 ? propertiesLength : 0) // 16
- // propertiesLength
+ final int msgLen = 4 //TOTALSIZE
+ + 4 //MAGICCODE
+ + 4 //BODYCRC
+ + 4 //QUEUEID
+ + 4 //FLAG
+ + 8 //QUEUEOFFSET
+ + 8 //PHYSICALOFFSET
+ + 4 //SYSFLAG
+ + 8 //BORNTIMESTAMP
+ + 8 //BORNHOST
+ + 8 //STORETIMESTAMP
+ + 8 //STOREHOSTADDRESS
+ + 4 //RECONSUMETIMES
+ + 8 //Prepared Transaction Offset
+ + 4 + (bodyLength > 0 ? bodyLength : 0) //BODY
+ + 1 + topicLength //TOPIC
+ + 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength
+ 0;
return msgLen;
}
@@ -500,18 +484,18 @@ public class CommitLog {
return false;
}
- if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()//
+ if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()
&& this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
- log.info("find check timestamp, {} {}", //
- storeTimestamp, //
+ log.info("find check timestamp, {} {}",
+ storeTimestamp,
UtilAll.timeMillisToHumanString(storeTimestamp));
return true;
}
} else {
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
- log.info("find check timestamp, {} {}", //
- storeTimestamp, //
+ log.info("find check timestamp, {} {}",
+ storeTimestamp,
UtilAll.timeMillisToHumanString(storeTimestamp));
return true;
}
@@ -547,7 +531,7 @@ public class CommitLog {
int queueId = msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
- if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE//
+ if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
@@ -1270,8 +1254,6 @@ public class CommitLog {
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
- //
-
// Here the length of the specially set maxBlank
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
@@ -1391,7 +1373,6 @@ public class CommitLog {
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
- //
//ignore previous read
messagesByteBuff.reset();
// Here the length of the specially set maxBlank
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 49a1eba..36c15d4 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -218,9 +218,7 @@ public class DefaultMessageStore implements MessageStore {
this.shutdown = false;
}
- /**
- */
public void shutdown() {
if (!this.shutdown) {
this.shutdown = true;
@@ -392,7 +390,7 @@ public class DefaultMessageStore implements MessageStore {
long begin = this.getCommitLog().getBeginTimeInLock();
long diff = this.systemClock.now() - begin;
- if (diff < 10000000 //
+ if (diff < 10000000
&& diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills()) {
return true;
}
@@ -579,9 +577,7 @@ public class DefaultMessageStore implements MessageStore {
return getResult;
}
- /**
- */
public long getMaxOffsetInQueue(String topic, int queueId) {
ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
if (logic != null) {
@@ -592,9 +588,7 @@ public class DefaultMessageStore implements MessageStore {
return 0;
}
- /**
- */
public long getMinOffsetInQueue(String topic, int queueId) {
ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
if (logic != null) {
@@ -891,9 +885,9 @@ public class DefaultMessageStore implements MessageStore {
ConcurrentMap<Integer, ConsumeQueue> queueTable = next.getValue();
for (ConsumeQueue cq : queueTable.values()) {
cq.destroy();
- log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned", //
- cq.getTopic(), //
- cq.getQueueId() //
+ log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned",
+ cq.getTopic(),
+ cq.getQueueId()
);
this.commitLog.removeQueueFromTopicQueueTable(cq.getTopic(), cq.getQueueId());
@@ -922,17 +916,17 @@ public class DefaultMessageStore implements MessageStore {
long maxCLOffsetInConsumeQueue = nextQT.getValue().getLastOffset();
if (maxCLOffsetInConsumeQueue == -1) {
- log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.", //
- nextQT.getValue().getTopic(), //
- nextQT.getValue().getQueueId(), //
- nextQT.getValue().getMaxPhysicOffset(), //
+ log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.",
+ nextQT.getValue().getTopic(),
+ nextQT.getValue().getQueueId(),
+ nextQT.getValue().getMaxPhysicOffset(),
nextQT.getValue().getMinLogicOffset());
} else if (maxCLOffsetInConsumeQueue < minCommitLogOffset) {
log.info(
- "cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}", //
- topic, //
- nextQT.getKey(), //
- minCommitLogOffset, //
+ "cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}",
+ topic,
+ nextQT.getKey(),
+ minCommitLogOffset,
maxCLOffsetInConsumeQueue);
DefaultMessageStore.this.commitLog.removeQueueFromTopicQueueTable(nextQT.getValue().getTopic(),
@@ -1072,11 +1066,11 @@ public class DefaultMessageStore implements MessageStore {
ConsumeQueue logic = map.get(queueId);
if (null == logic) {
- ConsumeQueue newLogic = new ConsumeQueue(//
- topic, //
- queueId, //
- StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), //
- this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), //
+ ConsumeQueue newLogic = new ConsumeQueue(
+ topic,
+ queueId,
+ StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
+ this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
this);
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
@@ -1462,11 +1456,11 @@ public class DefaultMessageStore implements MessageStore {
boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
- log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", //
- fileReservedTime, //
- timeup, //
- spacefull, //
- manualDeleteFileSeveralTimes, //
+ log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",
+ fileReservedTime,
+ timeup,
+ spacefull,
+ manualDeleteFileSeveralTimes,
cleanAtOnce);
fileReservedTime *= 60 * 60 * 1000;
@@ -1725,7 +1719,7 @@ public class DefaultMessageStore implements MessageStore {
private void doReput() {
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
- if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() //
+ if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
break;
}
@@ -1751,7 +1745,7 @@ public class DefaultMessageStore implements MessageStore {
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
- // FIXED BUG By shijia
+
this.reputFromOffset += size;
readSize += size;
if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java b/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java
index 3d33eaf..819bb94 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DispatchRequest.java
@@ -66,23 +66,14 @@ public class DispatchRequest {
}
public DispatchRequest(int size) {
- // 1
this.topic = "";
- // 2
this.queueId = 0;
- // 3
this.commitLogOffset = 0;
- // 4
this.msgSize = size;
- // 5
this.tagsCode = 0;
- // 6
this.storeTimestamp = 0;
- // 7
this.consumeQueueOffset = 0;
- // 8
this.keys = "";
- //9
this.uniqKey = null;
this.sysFlag = 0;
this.preparedTransactionOffset = 0;
@@ -91,23 +82,14 @@ public class DispatchRequest {
}
public DispatchRequest(int size, boolean success) {
- // 1
this.topic = "";
- // 2
this.queueId = 0;
- // 3
this.commitLogOffset = 0;
- // 4
this.msgSize = size;
- // 5
this.tagsCode = 0;
- // 6
this.storeTimestamp = 0;
- // 7
this.consumeQueueOffset = 0;
- // 8
this.keys = "";
- // 9
this.uniqKey = null;
this.sysFlag = 0;
this.preparedTransactionOffset = 0;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
index 4250450..81cf0f7 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
@@ -404,9 +404,7 @@ public class MappedFile extends ReferenceResource {
return null;
}
- /**
- */
public SelectMappedBufferResult selectMappedBuffer(int pos) {
int readPosition = getReadPosition();
if (pos < readPosition && pos >= 0) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
index a8fa364..edf4c91 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
@@ -405,7 +405,6 @@ public class MappedFileQueue {
break;
}
- // TODO: Externalize this hardcoded value
if (destroy && mappedFile.destroy(1000 * 60)) {
files.add(mappedFile);
deleteCount++;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
----------------------------------------------------------------------
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
index 3967b64..e0c51a1 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAConnection.java
@@ -78,10 +78,7 @@ public class HAConnection {
return socketChannel;
}
- /**
- *
- */
class ReadSocketService extends ServiceThread {
private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024;
private final Selector selector;
@@ -194,10 +191,7 @@ public class HAConnection {
}
}
- /**
- *
- */
class WriteSocketService extends ServiceThread {
private final Selector selector;
private final SocketChannel socketChannel;
@@ -333,9 +327,7 @@ public class HAConnection {
HAConnection.log.info(this.getServiceName() + " service end");
}
- /**
- */
private boolean transferData() throws Exception {
int writeSizeZeroTimes = 0;
// Write Header