You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/08/29 12:30:36 UTC

[08/28] incubator-rocketmq git commit: Remove unused class GetRouteInfoResponseHeader and meaningless comments

Remove unused class GetRouteInfoResponseHeader and meaningless comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/7f96008c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/7f96008c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/7f96008c

Branch: refs/heads/master
Commit: 7f96008c8b6f3ce5ac38cd168bd12252799973e3
Parents: ffad656
Author: yukon <yu...@apache.org>
Authored: Fri Aug 11 20:28:13 2017 +0800
Committer: yukon <yu...@apache.org>
Committed: Fri Aug 11 20:28:13 2017 +0800

----------------------------------------------------------------------
 .../rocketmq/broker/BrokerController.java       |  11 +-
 .../apache/rocketmq/broker/BrokerStartup.java   |   8 +-
 .../client/rebalance/RebalanceLockManager.java  |  44 ++--
 .../broker/filtersrv/FilterServerManager.java   |   2 -
 .../processor/AbstractSendMessageProcessor.java |   8 +-
 .../broker/processor/AdminBrokerProcessor.java  |  49 ++--
 .../broker/processor/PullMessageProcessor.java  |  16 +-
 .../broker/processor/SendMessageProcessor.java  |  28 +--
 .../rocketmq/broker/BrokerControllerTest.java   |   8 +-
 .../consumer/store/LocalFileOffsetStore.java    |  10 +-
 .../consumer/store/RemoteBrokerOffsetStore.java |   4 +-
 .../rocketmq/client/impl/MQAdminImpl.java       |   2 +-
 .../rocketmq/client/impl/MQClientAPIImpl.java   | 247 +++++++++----------
 .../ConsumeMessageConcurrentlyService.java      |  66 +++--
 .../consumer/ConsumeMessageOrderlyService.java  |  86 +++----
 .../impl/consumer/ConsumeMessageService.java    |   8 +-
 .../consumer/DefaultMQPullConsumerImpl.java     | 104 ++++----
 .../consumer/DefaultMQPushConsumerImpl.java     |  62 ++---
 .../client/impl/consumer/RebalanceImpl.java     |  22 +-
 .../client/impl/consumer/RebalancePushImpl.java |   4 +-
 .../client/impl/factory/MQClientInstance.java   |  27 +-
 .../impl/producer/DefaultMQProducerImpl.java    |  92 +++----
 .../client/impl/producer/MQProducerInner.java   |   6 +-
 .../org/apache/rocketmq/common/TopicConfig.java |  10 -
 .../org/apache/rocketmq/common/help/FAQUrl.java |  26 +-
 .../common/message/MessageClientIDSetter.java   |   2 +-
 .../protocol/body/ConsumerRunningInfo.java      |  94 +++----
 .../header/GetConsumeStatsRequestHeader.java    |   2 -
 .../header/GetConsumerStatusRequestHeader.java  |   1 -
 .../GetEarliestMsgStoretimeRequestHeader.java   |   1 -
 .../header/QueryCorrectionOffsetHeader.java     |   2 +-
 .../header/SearchOffsetRequestHeader.java       |   2 +-
 .../header/UnregisterClientRequestHeader.java   |   2 +-
 .../header/UnregisterClientResponseHeader.java  |   2 +-
 .../namesrv/GetRouteInfoResponseHeader.java     |  33 ---
 .../RegisterOrderTopicRequestHeader.java        |   2 +-
 .../rocketmq/common/sysflag/TopicSysFlag.java   |   4 -
 .../rocketmq/common/utils/IOTinyUtils.java      |   2 -
 .../rocketmq/example/simple/PushConsumer.java   |   2 -
 .../src/main/resources/MessageFilterImpl.java   |   2 +-
 .../rocketmq/filter/parser/SelectorParser.java  |   1 -
 .../rocketmq/filter/parser/SelectorParser.jj    |   1 -
 .../namesrv/kvconfig/KVConfigManager.java       |   6 +-
 .../namesrv/routeinfo/RouteInfoManager.java     |   4 +-
 .../rocketmq/remoting/netty/NettyDecoder.java   |   2 +-
 .../remoting/netty/NettyRemotingAbstract.java   |  24 +-
 .../remoting/netty/NettyRemotingClient.java     |   8 +-
 .../remoting/netty/NettyServerConfig.java       |   4 -
 .../remoting/netty/NettySystemConfig.java       |  18 +-
 .../org/apache/rocketmq/store/CommitLog.java    | 101 +++-----
 .../rocketmq/store/DefaultMessageStore.java     |  54 ++--
 .../apache/rocketmq/store/DispatchRequest.java  |  18 --
 .../org/apache/rocketmq/store/MappedFile.java   |   2 -
 .../apache/rocketmq/store/MappedFileQueue.java  |   1 -
 .../apache/rocketmq/store/ha/HAConnection.java  |   8 -
 .../org/apache/rocketmq/store/ha/HAService.java |  18 --
 .../apache/rocketmq/store/index/IndexFile.java  |   1 -
 .../rocketmq/store/index/IndexHeader.java       |   4 -
 .../store/schedule/ScheduleMessageService.java  |   3 -
 .../rocketmq/store/stats/BrokerStats.java       |   2 -
 .../tools/admin/DefaultMQAdminExtImpl.java      |  10 +-
 .../command/message/PrintMessageSubCommand.java |   5 +-
 62 files changed, 617 insertions(+), 781 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index c8624c4..cd68552 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -135,11 +135,11 @@ public class BrokerController {
     private BrokerFastFailure brokerFastFailure;
     private Configuration configuration;
 
-    public BrokerController(//
-        final BrokerConfig brokerConfig, //
-        final NettyServerConfig nettyServerConfig, //
-        final NettyClientConfig nettyClientConfig, //
-        final MessageStoreConfig messageStoreConfig //
+    public BrokerController(
+        final BrokerConfig brokerConfig,
+        final NettyServerConfig nettyServerConfig,
+        final NettyClientConfig nettyClientConfig,
+        final MessageStoreConfig messageStoreConfig
     ) {
         this.brokerConfig = brokerConfig;
         this.nettyServerConfig = nettyServerConfig;
@@ -255,7 +255,6 @@ public class BrokerController {
 
             this.registerProcessor();
 
-            // TODO remove in future
             final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis();
             final long period = 1000 * 60 * 60 * 24;
             this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
index 85d2e3a..e0a3b69 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -190,10 +190,10 @@ public class BrokerStartup {
             MixAll.printObjectProperties(log, nettyClientConfig);
             MixAll.printObjectProperties(log, messageStoreConfig);
 
-            final BrokerController controller = new BrokerController(//
-                brokerConfig, //
-                nettyServerConfig, //
-                nettyClientConfig, //
+            final BrokerController controller = new BrokerController(
+                brokerConfig,
+                nettyServerConfig,
+                nettyClientConfig,
                 messageStoreConfig);
             // remember all configs to prevent discard
             controller.getConfiguration().registerConfig(properties);

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java
index ed5a875..519745e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java
@@ -52,9 +52,9 @@ public class RebalanceLockManager {
                         lockEntry = new LockEntry();
                         lockEntry.setClientId(clientId);
                         groupValue.put(mq, lockEntry);
-                        log.info("tryLock, message queue not locked, I got it. Group: {} NewClientId: {} {}", //
-                            group, //
-                            clientId, //
+                        log.info("tryLock, message queue not locked, I got it. Group: {} NewClientId: {} {}",
+                            group,
+                            clientId,
                             mq);
                     }
 
@@ -69,19 +69,19 @@ public class RebalanceLockManager {
                         lockEntry.setClientId(clientId);
                         lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
                         log.warn(
-                            "tryLock, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", //
-                            group, //
-                            oldClientId, //
-                            clientId, //
+                            "tryLock, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}",
+                            group,
+                            oldClientId,
+                            clientId,
                             mq);
                         return true;
                     }
 
                     log.warn(
-                        "tryLock, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", //
-                        group, //
-                        oldClientId, //
-                        clientId, //
+                        "tryLock, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}",
+                        group,
+                        oldClientId,
+                        clientId,
                         mq);
                     return false;
                 } finally {
@@ -144,9 +144,9 @@ public class RebalanceLockManager {
                             lockEntry.setClientId(clientId);
                             groupValue.put(mq, lockEntry);
                             log.info(
-                                "tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}", //
-                                group, //
-                                clientId, //
+                                "tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}",
+                                group,
+                                clientId,
                                 mq);
                         }
 
@@ -162,20 +162,20 @@ public class RebalanceLockManager {
                             lockEntry.setClientId(clientId);
                             lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
                             log.warn(
-                                "tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", //
-                                group, //
-                                oldClientId, //
-                                clientId, //
+                                "tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}",
+                                group,
+                                oldClientId,
+                                clientId,
                                 mq);
                             lockedMqs.add(mq);
                             continue;
                         }
 
                         log.warn(
-                            "tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", //
-                            group, //
-                            oldClientId, //
-                            clientId, //
+                            "tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}",
+                            group,
+                            oldClientId,
+                            clientId,
                             mq);
                     }
                 } finally {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
index 52cb919..ff63127 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
@@ -111,9 +111,7 @@ public class FilterServerManager {
         }
     }
 
-    /**
 
-     */
     public void scanNotActiveChannel() {
 
         Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator();

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index 3faa7ae..410192f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -189,10 +189,10 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
             }
 
             log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());
-            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(//
-                requestHeader.getTopic(), //
-                requestHeader.getDefaultTopic(), //
-                RemotingHelper.parseChannelRemoteAddr(ctx.channel()), //
+            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
+                requestHeader.getTopic(),
+                requestHeader.getDefaultTopic(),
+                RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                 requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
 
             if (null == topicConfig) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 71fdda9..937f575 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -116,6 +116,7 @@ import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 public class AdminBrokerProcessor implements NettyRequestProcessor {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private final BrokerController brokerController;
@@ -432,9 +433,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class);
 
-        Set<MessageQueue> lockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch(//
-            requestBody.getConsumerGroup(), //
-            requestBody.getMqSet(), //
+        Set<MessageQueue> lockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch(
+            requestBody.getConsumerGroup(),
+            requestBody.getMqSet(),
             requestBody.getClientId());
 
         LockBatchResponseBody responseBody = new LockBatchResponseBody();
@@ -450,9 +451,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         UnlockBatchRequestBody requestBody = UnlockBatchRequestBody.decode(request.getBody(), UnlockBatchRequestBody.class);
 
-        this.brokerController.getRebalanceLockManager().unlockBatch(//
-            requestBody.getConsumerGroup(), //
-            requestBody.getMqSet(), //
+        this.brokerController.getRebalanceLockManager().unlockBatch(
+            requestBody.getConsumerGroup(),
+            requestBody.getMqSet(),
             requestBody.getClientId());
 
         response.setCode(ResponseCode.SUCCESS);
@@ -657,14 +658,12 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
                 continue;
             }
 
-            /**
 
-             */
             {
                 SubscriptionData findSubscriptionData =
                     this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic);
 
-                if (null == findSubscriptionData //
+                if (null == findSubscriptionData
                     && this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup()) > 0) {
                     log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", requestHeader.getConsumerGroup(), topic);
                     continue;
@@ -683,9 +682,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
                 if (brokerOffset < 0)
                     brokerOffset = 0;
 
-                long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(//
-                    requestHeader.getConsumerGroup(), //
-                    topic, //
+                long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(
+                    requestHeader.getConsumerGroup(),
+                    topic,
                     i);
                 if (consumerOffset < 0)
                     consumerOffset = 0;
@@ -925,9 +924,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         return response;
     }
 
-    /**
 
-     */
     private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
         final GetConsumerRunningInfoRequestHeader requestHeader =
             (GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);
@@ -1007,9 +1004,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
                 continue;
             }
 
-            /**
 
-             */
             if (!requestHeader.isOffline()) {
 
                 SubscriptionData findSubscriptionData =
@@ -1107,13 +1102,11 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
                 if (isOrder && !topicConfig.isOrder()) {
                     continue;
                 }
-                /**
 
-                 */
                 {
                     SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(group, topic);
 
-                    if (null == findSubscriptionData //
+                    if (null == findSubscriptionData
                         && this.brokerController.getConsumerManager().findSubscriptionDataCount(group) > 0) {
                         log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", group, topic);
                         continue;
@@ -1129,9 +1122,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
                     long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
                     if (brokerOffset < 0)
                         brokerOffset = 0;
-                    long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(//
-                        group, //
-                        topic, //
+                    long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(
+                        group,
+                        topic,
                         i);
                     if (consumerOffset < 0)
                         consumerOffset = 0;
@@ -1215,10 +1208,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         return runtimeInfo;
     }
 
-    private RemotingCommand callConsumer(//
-        final int requestCode, //
-        final RemotingCommand request, //
-        final String consumerGroup, //
+    private RemotingCommand callConsumer(
+        final int requestCode,
+        final RemotingCommand request,
+        final String consumerGroup,
         final String clientId) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         ClientChannelInfo clientChannelInfo = this.brokerController.getConsumerManager().findChannel(consumerGroup, clientId);
@@ -1231,8 +1224,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
 
         if (clientChannelInfo.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) {
             response.setCode(ResponseCode.SYSTEM_ERROR);
-            response.setRemark(String.format("The Consumer <%s> Version <%s> too low to finish, please upgrade it to V3_1_8_SNAPSHOT", //
-                clientId, //
+            response.setRemark(String.format("The Consumer <%s> Version <%s> too low to finish, please upgrade it to V3_1_8_SNAPSHOT",
+                clientId,
                 MQVersion.getVersionDesc(clientChannelInfo.getVersion())));
             return response;
         }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index fb7ea20..fe2fcfe 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -160,7 +160,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
                     assert consumerFilterData != null;
                 }
             } catch (Exception e) {
-                log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), //
+                log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(),
                     requestHeader.getConsumerGroup());
                 response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
                 response.setRemark("parse the consumer's subscription failed");
@@ -176,7 +176,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
                 return response;
             }
 
-            if (!subscriptionGroupConfig.isConsumeBroadcastEnable() //
+            if (!subscriptionGroupConfig.isConsumeBroadcastEnable()
                 && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
                 response.setCode(ResponseCode.NO_PERMISSION);
                 response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
@@ -285,12 +285,12 @@ public class PullMessageProcessor implements NettyRequestProcessor {
                         response.setCode(ResponseCode.PULL_OFFSET_MOVED);
 
                         // XXX: warn and notify me
-                        log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", //
-                                requestHeader.getQueueOffset(), //
-                                getMessageResult.getNextBeginOffset(), //
-                                requestHeader.getTopic(), //
-                                requestHeader.getQueueId(), //
-                                requestHeader.getConsumerGroup()//
+                        log.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}",
+                                requestHeader.getQueueOffset(),
+                                getMessageResult.getNextBeginOffset(),
+                                requestHeader.getTopic(),
+                                requestHeader.getQueueId(),
+                                requestHeader.getConsumerGroup()
                         );
                     } else {
                         response.setCode(ResponseCode.PULL_NOT_FOUND);

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 5c716cc..cd60c44 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -139,9 +139,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
             topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
         }
 
-        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(//
-            newTopic, //
-            subscriptionGroupConfig.getRetryQueueNums(), //
+        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
+            newTopic,
+            subscriptionGroupConfig.getRetryQueueNums(),
             PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
         if (null == topicConfig) {
             response.setCode(ResponseCode.SYSTEM_ERROR);
@@ -175,13 +175,13 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
             maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
         }
 
-        if (msgExt.getReconsumeTimes() >= maxReconsumeTimes//
+        if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
             || delayLevel < 0) {
             newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
             queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
 
-            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
-                DLQ_NUMS_PER_GROUP, //
+            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
+                DLQ_NUMS_PER_GROUP,
                 PermName.PERM_WRITE, 0
             );
             if (null == topicConfig) {
@@ -268,8 +268,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
             if (reconsumeTimes >= maxReconsumeTimes) {
                 newTopic = MixAll.getDLQTopic(groupName);
                 int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
-                topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
-                    DLQ_NUMS_PER_GROUP, //
+                topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
+                    DLQ_NUMS_PER_GROUP,
                     PermName.PERM_WRITE, 0
                 );
                 msg.setTopic(newTopic);
@@ -289,9 +289,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
         return true;
     }
 
-    private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //
-        final RemotingCommand request, //
-        final SendMessageContext sendMessageContext, //
+    private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
+        final RemotingCommand request,
+        final SendMessageContext sendMessageContext,
         final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
 
         final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
@@ -464,9 +464,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
         }
         return  response;
     }
-    private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx, //
-                                        final RemotingCommand request, //
-                                        final SendMessageContext sendMessageContext, //
+    private RemotingCommand sendBatchMessage(final ChannelHandlerContext ctx,
+                                        final RemotingCommand request,
+                                        final SendMessageContext sendMessageContext,
                                         final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
 
         final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
index fe30d8f..d4edd9a 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
@@ -38,10 +38,10 @@ public class BrokerControllerTest {
     @Test
     public void testBrokerRestart() throws Exception {
         for (int i = 0; i < 2; i++) {
-            BrokerController brokerController = new BrokerController(//
-                new BrokerConfig(), //
-                new NettyServerConfig(), //
-                new NettyClientConfig(), //
+            BrokerController brokerController = new BrokerController(
+                new BrokerConfig(),
+                new NettyServerConfig(),
+                new NettyClientConfig(),
                 new MessageStoreConfig());
             assertThat(brokerController.initialize());
             brokerController.start();

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
index d4b19b2..22ec674 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
@@ -52,9 +52,9 @@ public class LocalFileOffsetStore implements OffsetStore {
     public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) {
         this.mQClientFactory = mQClientFactory;
         this.groupName = groupName;
-        this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator + //
-            this.mQClientFactory.getClientId() + File.separator + //
-            this.groupName + File.separator + //
+        this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator +
+            this.mQClientFactory.getClientId() + File.separator +
+            this.groupName + File.separator +
             "offsets.json";
     }
 
@@ -217,8 +217,8 @@ public class LocalFileOffsetStore implements OffsetStore {
                     OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);
             } catch (Exception e) {
                 log.warn("readLocalOffset Exception", e);
-                throw new MQClientException("readLocalOffset Exception, maybe fastjson version too low" //
-                    + FAQUrl.suggestTodo(FAQUrl.LOAD_JSON_EXCEPTION), //
+                throw new MQClientException("readLocalOffset Exception, maybe fastjson version too low"
+                    + FAQUrl.suggestTodo(FAQUrl.LOAD_JSON_EXCEPTION),
                     e);
             }
             return offsetSerializeWrapper;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
index 5bd5749..b82e992 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
@@ -204,7 +204,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
         MQBrokerException, InterruptedException, MQClientException {
         FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
         if (null == findBrokerResult) {
-            // TODO Here may be heavily overhead for Name Server,need tuning
+
             this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
             findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
         }
@@ -232,7 +232,7 @@ public class RemoteBrokerOffsetStore implements OffsetStore {
         InterruptedException, MQClientException {
         FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
         if (null == findBrokerResult) {
-            // TODO Here may be heavily overhead for Name Server,need tuning
+
             this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
             findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
         }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
index 983e515..92d8513 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
@@ -159,7 +159,7 @@ public class MQAdminImpl {
             }
         } catch (Exception e) {
             throw new MQClientException(
-                "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST), //
+                "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST),
                 e);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index ae9ed6c..c5abc36 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -285,32 +285,32 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-    public SendResult sendMessage(//
-        final String addr, // 1
-        final String brokerName, // 2
-        final Message msg, // 3
-        final SendMessageRequestHeader requestHeader, // 4
-        final long timeoutMillis, // 5
-        final CommunicationMode communicationMode, // 6
-        final SendMessageContext context, // 7
-        final DefaultMQProducerImpl producer // 8
+    public SendResult sendMessage(
+        final String addr,
+        final String brokerName,
+        final Message msg,
+        final SendMessageRequestHeader requestHeader,
+        final long timeoutMillis,
+        final CommunicationMode communicationMode,
+        final SendMessageContext context,
+        final DefaultMQProducerImpl producer
     ) throws RemotingException, MQBrokerException, InterruptedException {
         return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer);
     }
 
-    public SendResult sendMessage(//
-        final String addr, // 1
-        final String brokerName, // 2
-        final Message msg, // 3
-        final SendMessageRequestHeader requestHeader, // 4
-        final long timeoutMillis, // 5
-        final CommunicationMode communicationMode, // 6
-        final SendCallback sendCallback, // 7
-        final TopicPublishInfo topicPublishInfo, // 8
-        final MQClientInstance instance, // 9
-        final int retryTimesWhenSendFailed, // 10
-        final SendMessageContext context, // 11
-        final DefaultMQProducerImpl producer // 12
+    public SendResult sendMessage(
+        final String addr,
+        final String brokerName,
+        final Message msg,
+        final SendMessageRequestHeader requestHeader,
+        final long timeoutMillis,
+        final CommunicationMode communicationMode,
+        final SendCallback sendCallback,
+        final TopicPublishInfo topicPublishInfo,
+        final MQClientInstance instance,
+        final int retryTimesWhenSendFailed,
+        final SendMessageContext context,
+        final DefaultMQProducerImpl producer
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand request = null;
         if (sendSmartMsg || msg instanceof MessageBatch) {
@@ -341,31 +341,31 @@ public class MQClientAPIImpl {
         return null;
     }
 
-    private SendResult sendMessageSync(//
-        final String addr, //
-        final String brokerName, //
-        final Message msg, //
-        final long timeoutMillis, //
-        final RemotingCommand request//
+    private SendResult sendMessageSync(
+        final String addr,
+        final String brokerName,
+        final Message msg,
+        final long timeoutMillis,
+        final RemotingCommand request
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
         assert response != null;
         return this.processSendResponse(brokerName, msg, response);
     }
 
-    private void sendMessageAsync(//
-        final String addr, //
-        final String brokerName, //
-        final Message msg, //
-        final long timeoutMillis, //
-        final RemotingCommand request, //
-        final SendCallback sendCallback, //
-        final TopicPublishInfo topicPublishInfo, //
-        final MQClientInstance instance, //
-        final int retryTimesWhenSendFailed, //
-        final AtomicInteger times, //
-        final SendMessageContext context, //
-        final DefaultMQProducerImpl producer //
+    private void sendMessageAsync(
+        final String addr,
+        final String brokerName,
+        final Message msg,
+        final long timeoutMillis,
+        final RemotingCommand request,
+        final SendCallback sendCallback,
+        final TopicPublishInfo topicPublishInfo,
+        final MQClientInstance instance,
+        final int retryTimesWhenSendFailed,
+        final AtomicInteger times,
+        final SendMessageContext context,
+        final DefaultMQProducerImpl producer
     ) throws InterruptedException, RemotingException {
         this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
             @Override
@@ -380,7 +380,6 @@ public class MQClientAPIImpl {
                             context.getProducer().executeSendMessageHookAfter(context);
                         }
                     } catch (Throwable e) {
-                        //
                     }
 
                     producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
@@ -428,19 +427,19 @@ public class MQClientAPIImpl {
         });
     }
 
-    private void onExceptionImpl(final String brokerName, //
-        final Message msg, //
-        final long timeoutMillis, //
-        final RemotingCommand request, //
-        final SendCallback sendCallback, //
-        final TopicPublishInfo topicPublishInfo, //
-        final MQClientInstance instance, //
-        final int timesTotal, //
-        final AtomicInteger curTimes, //
-        final Exception e, //
-        final SendMessageContext context, //
-        final boolean needRetry, //
-        final DefaultMQProducerImpl producer // 12
+    private void onExceptionImpl(final String brokerName,
+        final Message msg,
+        final long timeoutMillis,
+        final RemotingCommand request,
+        final SendCallback sendCallback,
+        final TopicPublishInfo topicPublishInfo,
+        final MQClientInstance instance,
+        final int timesTotal,
+        final AtomicInteger curTimes,
+        final Exception e,
+        final SendMessageContext context,
+        final boolean needRetry,
+        final DefaultMQProducerImpl producer
     ) {
         int tmp = curTimes.incrementAndGet();
         if (needRetry && tmp <= timesTotal) {
@@ -485,16 +484,15 @@ public class MQClientAPIImpl {
         }
     }
 
-    private SendResult processSendResponse(//
-        final String brokerName, //
-        final Message msg, //
-        final RemotingCommand response//
+    private SendResult processSendResponse(
+        final String brokerName,
+        final Message msg,
+        final RemotingCommand response
     ) throws MQBrokerException, RemotingCommandException {
         switch (response.getCode()) {
             case ResponseCode.FLUSH_DISK_TIMEOUT:
             case ResponseCode.FLUSH_SLAVE_TIMEOUT:
             case ResponseCode.SLAVE_NOT_AVAILABLE: {
-                // TODO LOG
             }
             case ResponseCode.SUCCESS: {
                 SendStatus sendStatus = SendStatus.SEND_OK;
@@ -553,12 +551,12 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-    public PullResult pullMessage(//
-        final String addr, //
-        final PullMessageRequestHeader requestHeader, //
-        final long timeoutMillis, //
-        final CommunicationMode communicationMode, //
-        final PullCallback pullCallback//
+    public PullResult pullMessage(
+        final String addr,
+        final PullMessageRequestHeader requestHeader,
+        final long timeoutMillis,
+        final CommunicationMode communicationMode,
+        final PullCallback pullCallback
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
 
@@ -579,11 +577,11 @@ public class MQClientAPIImpl {
         return null;
     }
 
-    private void pullMessageAsync(//
-        final String addr, // 1
-        final RemotingCommand request, //
-        final long timeoutMillis, //
-        final PullCallback pullCallback//
+    private void pullMessageAsync(
+        final String addr,
+        final RemotingCommand request,
+        final long timeoutMillis,
+        final PullCallback pullCallback
     ) throws RemotingException, InterruptedException {
         this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
             @Override
@@ -611,10 +609,10 @@ public class MQClientAPIImpl {
         });
     }
 
-    private PullResult pullMessageSync(//
-        final String addr, // 1
-        final RemotingCommand request, // 2
-        final long timeoutMillis// 3
+    private PullResult pullMessageSync(
+        final String addr,
+        final RemotingCommand request,
+        final long timeoutMillis
     ) throws RemotingException, InterruptedException, MQBrokerException {
         RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
         assert response != null;
@@ -720,9 +718,9 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-    public List<String> getConsumerIdListByGroup(//
-        final String addr, //
-        final String consumerGroup, //
+    public List<String> getConsumerIdListByGroup(
+        final String addr,
+        final String consumerGroup,
         final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
         MQBrokerException, InterruptedException {
         GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader();
@@ -796,10 +794,10 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-    public long queryConsumerOffset(//
-        final String addr, //
-        final QueryConsumerOffsetRequestHeader requestHeader, //
-        final long timeoutMillis//
+    public long queryConsumerOffset(
+        final String addr,
+        final QueryConsumerOffsetRequestHeader requestHeader,
+        final long timeoutMillis
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader);
 
@@ -820,10 +818,10 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-    public void updateConsumerOffset(//
-        final String addr, //
-        final UpdateConsumerOffsetRequestHeader requestHeader, //
-        final long timeoutMillis//
+    public void updateConsumerOffset(
+        final String addr,
+        final UpdateConsumerOffsetRequestHeader requestHeader,
+        final long timeoutMillis
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
 
@@ -841,10 +839,10 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-    public void updateConsumerOffsetOneway(//
-        final String addr, //
-        final UpdateConsumerOffsetRequestHeader requestHeader, //
-        final long timeoutMillis//
+    public void updateConsumerOffsetOneway(
+        final String addr,
+        final UpdateConsumerOffsetRequestHeader requestHeader,
+        final long timeoutMillis
     ) throws RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException,
         InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
@@ -852,10 +850,10 @@ public class MQClientAPIImpl {
         this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
     }
 
-    public int sendHearbeat(//
-        final String addr, //
-        final HeartbeatData heartbeatData, //
-        final long timeoutMillis//
+    public int sendHearbeat(
+        final String addr,
+        final HeartbeatData heartbeatData,
+        final long timeoutMillis
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
 
@@ -873,12 +871,12 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-    public void unregisterClient(//
-        final String addr, //
-        final String clientID, //
-        final String producerGroup, //
-        final String consumerGroup, //
-        final long timeoutMillis//
+    public void unregisterClient(
+        final String addr,
+        final String clientID,
+        final String producerGroup,
+        final String consumerGroup,
+        final long timeoutMillis
     ) throws RemotingException, MQBrokerException, InterruptedException {
         final UnregisterClientRequestHeader requestHeader = new UnregisterClientRequestHeader();
         requestHeader.setClientID(clientID);
@@ -899,11 +897,11 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-    public void endTransactionOneway(//
-        final String addr, //
-        final EndTransactionRequestHeader requestHeader, //
-        final String remark, //
-        final long timeoutMillis//
+    public void endTransactionOneway(
+        final String addr,
+        final EndTransactionRequestHeader requestHeader,
+        final String remark,
+        final long timeoutMillis
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);
 
@@ -965,9 +963,9 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-    public Set<MessageQueue> lockBatchMQ(//
-        final String addr, //
-        final LockBatchRequestBody requestBody, //
+    public Set<MessageQueue> lockBatchMQ(
+        final String addr,
+        final LockBatchRequestBody requestBody,
         final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
 
@@ -987,11 +985,11 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-    public void unlockBatchMQ(//
-        final String addr, //
-        final UnlockBatchRequestBody requestBody, //
-        final long timeoutMillis, //
-        final boolean oneway//
+    public void unlockBatchMQ(
+        final String addr,
+        final UnlockBatchRequestBody requestBody,
+        final long timeoutMillis,
+        final boolean oneway
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null);
 
@@ -1213,7 +1211,7 @@ public class MQClientAPIImpl {
                 if (allowTopicNotExist && !topic.equals(MixAll.DEFAULT_TOPIC)) {
                     log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
                 }
-                // TODO :- Log when if condition is not satisfied
+
                 break;
             }
             case ResponseCode.SUCCESS: {
@@ -1566,12 +1564,12 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-    public void registerMessageFilterClass(final String addr, //
-        final String consumerGroup, //
-        final String topic, //
-        final String className, //
-        final int classCRC, //
-        final byte[] classBody, //
+    public void registerMessageFilterClass(final String addr,
+        final String consumerGroup,
+        final String topic,
+        final String className,
+        final int classCRC,
+        final byte[] classBody,
         final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
         InterruptedException, MQBrokerException {
         RegisterMessageFilterClassRequestHeader requestHeader = new RegisterMessageFilterClassRequestHeader();
@@ -1706,10 +1704,10 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-    public ConsumeMessageDirectlyResult consumeMessageDirectly(final String addr, //
-        String consumerGroup, //
-        String clientId, //
-        String msgId, //
+    public ConsumeMessageDirectlyResult consumeMessageDirectly(final String addr,
+        String consumerGroup,
+        String clientId,
+        String msgId,
         final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException {
         ConsumeMessageDirectlyResultRequestHeader requestHeader = new ConsumeMessageDirectlyResultRequestHeader();
         requestHeader.setConsumerGroup(consumerGroup);
@@ -1912,7 +1910,6 @@ public class MQClientAPIImpl {
     public Set<String> getClusterList(String topic,
         long timeoutMillis) throws MQClientException, RemotingConnectException,
         RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
-        // todo:jodie
         return Collections.EMPTY_SET;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
index f566ed0..961e062 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
@@ -69,12 +69,12 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
         this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
         this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
 
-        this.consumeExecutor = new ThreadPoolExecutor(//
-            this.defaultMQPushConsumer.getConsumeThreadMin(), //
-            this.defaultMQPushConsumer.getConsumeThreadMax(), //
-            1000 * 60, //
-            TimeUnit.MILLISECONDS, //
-            this.consumeRequestQueue, //
+        this.consumeExecutor = new ThreadPoolExecutor(
+            this.defaultMQPushConsumer.getConsumeThreadMin(),
+            this.defaultMQPushConsumer.getConsumeThreadMax(),
+            1000 * 60,
+            TimeUnit.MILLISECONDS,
+            this.consumeRequestQueue,
             new ThreadFactoryImpl("ConsumeMessageThread_"));
 
         this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
@@ -100,8 +100,8 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
 
     @Override
     public void updateCorePoolSize(int corePoolSize) {
-        if (corePoolSize > 0 //
-            && corePoolSize <= Short.MAX_VALUE //
+        if (corePoolSize > 0
+            && corePoolSize <= Short.MAX_VALUE
             && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
             this.consumeExecutor.setCorePoolSize(corePoolSize);
         }
@@ -115,11 +115,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
         // this.consumeExecutor.setCorePoolSize(this.consumeExecutor.getCorePoolSize()
         // + 1);
         // }
-        //
         // log.info("incCorePoolSize Concurrently from {} to {}, ConsumerGroup:
-        // {}", //
-        // corePoolSize,//
-        // this.consumeExecutor.getCorePoolSize(),//
+        // {}",
+        // corePoolSize,
+        // this.consumeExecutor.getCorePoolSize(),
         // this.consumerGroup);
     }
 
@@ -131,11 +130,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
         // this.consumeExecutor.setCorePoolSize(this.consumeExecutor.getCorePoolSize()
         // - 1);
         // }
-        //
         // log.info("decCorePoolSize Concurrently from {} to {}, ConsumerGroup:
-        // {}", //
-        // corePoolSize,//
-        // this.consumeExecutor.getCorePoolSize(),//
+        // {}",
+        // corePoolSize,
+        // this.consumeExecutor.getCorePoolSize(),
         // this.consumerGroup);
     }
 
@@ -185,10 +183,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
             result.setConsumeResult(CMResult.CR_THROW_EXCEPTION);
             result.setRemark(RemotingHelper.exceptionSimpleDesc(e));
 
-            log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", //
-                RemotingHelper.exceptionSimpleDesc(e), //
-                ConsumeMessageConcurrentlyService.this.consumerGroup, //
-                msgs, //
+            log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s",
+                RemotingHelper.exceptionSimpleDesc(e),
+                ConsumeMessageConcurrentlyService.this.consumerGroup,
+                msgs,
                 mq), e);
         }
 
@@ -200,10 +198,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
     }
 
     @Override
-    public void submitConsumeRequest(//
-        final List<MessageExt> msgs, //
-        final ProcessQueue processQueue, //
-        final MessageQueue messageQueue, //
+    public void submitConsumeRequest(
+        final List<MessageExt> msgs,
+        final ProcessQueue processQueue,
+        final MessageQueue messageQueue,
         final boolean dispatchToConsume) {
         final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
         if (msgs.size() <= consumeBatchSize) {
@@ -258,10 +256,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
         }
     }
 
-    public void processConsumeResult(//
-        final ConsumeConcurrentlyStatus status, //
-        final ConsumeConcurrentlyContext context, //
-        final ConsumeRequest consumeRequest//
+    public void processConsumeResult(
+        final ConsumeConcurrentlyStatus status,
+        final ConsumeConcurrentlyContext context,
+        final ConsumeRequest consumeRequest
     ) {
         int ackIndex = context.getAckIndex();
 
@@ -338,10 +336,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
         return false;
     }
 
-    private void submitConsumeRequestLater(//
-        final List<MessageExt> msgs, //
-        final ProcessQueue processQueue, //
-        final MessageQueue messageQueue//
+    private void submitConsumeRequestLater(
+        final List<MessageExt> msgs,
+        final ProcessQueue processQueue,
+        final MessageQueue messageQueue
     ) {
 
         this.scheduledExecutorService.schedule(new Runnable() {
@@ -353,7 +351,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
         }, 5000, TimeUnit.MILLISECONDS);
     }
 
-    private void submitConsumeRequestLater(final ConsumeRequest consumeRequest//
+    private void submitConsumeRequestLater(final ConsumeRequest consumeRequest
     ) {
 
         this.scheduledExecutorService.schedule(new Runnable() {
@@ -419,7 +417,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
                 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
             } catch (Throwable e) {
                 log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
-                    RemotingHelper.exceptionSimpleDesc(e), //
+                    RemotingHelper.exceptionSimpleDesc(e),
                     ConsumeMessageConcurrentlyService.this.consumerGroup,
                     msgs,
                     messageQueue);

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
index 1fa474c..abdad79 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -70,12 +70,12 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
         this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
         this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
 
-        this.consumeExecutor = new ThreadPoolExecutor(//
-            this.defaultMQPushConsumer.getConsumeThreadMin(), //
-            this.defaultMQPushConsumer.getConsumeThreadMax(), //
-            1000 * 60, //
-            TimeUnit.MILLISECONDS, //
-            this.consumeRequestQueue, //
+        this.consumeExecutor = new ThreadPoolExecutor(
+            this.defaultMQPushConsumer.getConsumeThreadMin(),
+            this.defaultMQPushConsumer.getConsumeThreadMax(),
+            1000 * 60,
+            TimeUnit.MILLISECONDS,
+            this.consumeRequestQueue,
             new ThreadFactoryImpl("ConsumeMessageThread_"));
 
         this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
@@ -107,8 +107,8 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
 
     @Override
     public void updateCorePoolSize(int corePoolSize) {
-        if (corePoolSize > 0 //
-            && corePoolSize <= Short.MAX_VALUE //
+        if (corePoolSize > 0
+            && corePoolSize <= Short.MAX_VALUE
             && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
             this.consumeExecutor.setCorePoolSize(corePoolSize);
         }
@@ -171,10 +171,10 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
             result.setConsumeResult(CMResult.CR_THROW_EXCEPTION);
             result.setRemark(RemotingHelper.exceptionSimpleDesc(e));
 
-            log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", //
-                RemotingHelper.exceptionSimpleDesc(e), //
-                ConsumeMessageOrderlyService.this.consumerGroup, //
-                msgs, //
+            log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s",
+                RemotingHelper.exceptionSimpleDesc(e),
+                ConsumeMessageOrderlyService.this.consumerGroup,
+                msgs,
                 mq), e);
         }
 
@@ -187,10 +187,10 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
     }
 
     @Override
-    public void submitConsumeRequest(//
-        final List<MessageExt> msgs, //
-        final ProcessQueue processQueue, //
-        final MessageQueue messageQueue, //
+    public void submitConsumeRequest(
+        final List<MessageExt> msgs,
+        final ProcessQueue processQueue,
+        final MessageQueue messageQueue,
         final boolean dispathToConsume) {
         if (dispathToConsume) {
             ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
@@ -226,10 +226,10 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
         return false;
     }
 
-    private void submitConsumeRequestLater(//
-        final ProcessQueue processQueue, //
-        final MessageQueue messageQueue, //
-        final long suspendTimeMillis//
+    private void submitConsumeRequestLater(
+        final ProcessQueue processQueue,
+        final MessageQueue messageQueue,
+        final long suspendTimeMillis
     ) {
         long timeMillis = suspendTimeMillis;
         if (timeMillis == -1) {
@@ -251,11 +251,11 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
         }, timeMillis, TimeUnit.MILLISECONDS);
     }
 
-    public boolean processConsumeResult(//
-        final List<MessageExt> msgs, //
-        final ConsumeOrderlyStatus status, //
-        final ConsumeOrderlyContext context, //
-        final ConsumeRequest consumeRequest//
+    public boolean processConsumeResult(
+        final List<MessageExt> msgs,
+        final ConsumeOrderlyStatus status,
+        final ConsumeOrderlyContext context,
+        final ConsumeRequest consumeRequest
     ) {
         boolean continueConsume = true;
         long commitOffset = -1L;
@@ -273,9 +273,9 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
                     this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                     if (checkReconsumeTimes(msgs)) {
                         consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
-                        this.submitConsumeRequestLater(//
-                            consumeRequest.getProcessQueue(), //
-                            consumeRequest.getMessageQueue(), //
+                        this.submitConsumeRequestLater(
+                            consumeRequest.getProcessQueue(),
+                            consumeRequest.getMessageQueue(),
                             context.getSuspendCurrentQueueTimeMillis());
                         continueConsume = false;
                     } else {
@@ -295,9 +295,9 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
                     break;
                 case ROLLBACK:
                     consumeRequest.getProcessQueue().rollback();
-                    this.submitConsumeRequestLater(//
-                        consumeRequest.getProcessQueue(), //
-                        consumeRequest.getMessageQueue(), //
+                    this.submitConsumeRequestLater(
+                        consumeRequest.getProcessQueue(),
+                        consumeRequest.getMessageQueue(),
                         context.getSuspendCurrentQueueTimeMillis());
                     continueConsume = false;
                     break;
@@ -305,9 +305,9 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
                     this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                     if (checkReconsumeTimes(msgs)) {
                         consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
-                        this.submitConsumeRequestLater(//
-                            consumeRequest.getProcessQueue(), //
-                            consumeRequest.getMessageQueue(), //
+                        this.submitConsumeRequestLater(
+                            consumeRequest.getProcessQueue(),
+                            consumeRequest.getMessageQueue(),
                             context.getSuspendCurrentQueueTimeMillis());
                         continueConsume = false;
                     }
@@ -468,22 +468,22 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
 
                                 status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                             } catch (Throwable e) {
-                                log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", //
-                                    RemotingHelper.exceptionSimpleDesc(e), //
-                                    ConsumeMessageOrderlyService.this.consumerGroup, //
-                                    msgs, //
+                                log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
+                                    RemotingHelper.exceptionSimpleDesc(e),
+                                    ConsumeMessageOrderlyService.this.consumerGroup,
+                                    msgs,
                                     messageQueue);
                                 hasException = true;
                             } finally {
                                 this.processQueue.getLockConsume().unlock();
                             }
 
-                            if (null == status //
-                                || ConsumeOrderlyStatus.ROLLBACK == status//
+                            if (null == status
+                                || ConsumeOrderlyStatus.ROLLBACK == status
                                 || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
-                                log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", //
-                                    ConsumeMessageOrderlyService.this.consumerGroup, //
-                                    msgs, //
+                                log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",
+                                    ConsumeMessageOrderlyService.this.consumerGroup,
+                                    msgs,
                                     messageQueue);
                             }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
index 8742191..0f6f3bb 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
@@ -36,9 +36,9 @@ public interface ConsumeMessageService {
 
     ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
 
-    void submitConsumeRequest(//
-        final List<MessageExt> msgs, //
-        final ProcessQueue processQueue, //
-        final MessageQueue messageQueue, //
+    void submitConsumeRequest(
+        final List<MessageExt> msgs,
+        final ProcessQueue processQueue,
+        final MessageQueue messageQueue,
         final boolean dispathToConsume);
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/7f96008c/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 35ee16f..8640d2d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -97,8 +97,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
 
     private void makeSureStateOK() throws MQClientException {
         if (this.serviceState != ServiceState.RUNNING) {
-            throw new MQClientException("The consumer service state not OK, "//
-                + this.serviceState//
+            throw new MQClientException("The consumer service state not OK, "
+                + this.serviceState
                 + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                 null);
         }
@@ -185,7 +185,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
 
         SubscriptionData subscriptionData;
         try {
-            subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
+            subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
                 mq.getTopic(), subExpression);
         } catch (Exception e) {
             throw new MQClientException("parse subscription error", e);
@@ -193,18 +193,18 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
 
         long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
 
-        PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(//
-            mq, // 1
-            subscriptionData.getSubString(), // 2
-            0L, // 3
-            offset, // 4
-            maxNums, // 5
-            sysFlag, // 6
-            0, // 7
-            this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8
-            timeoutMillis, // 9
-            CommunicationMode.SYNC, // 10
-            null// 11
+        PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
+            mq,
+            subscriptionData.getSubString(),
+            0L,
+            offset,
+            maxNums,
+            sysFlag,
+            0,
+            this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
+            timeoutMillis,
+            CommunicationMode.SYNC,
+            null
         );
         this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
         if (!this.consumeMessageHookList.isEmpty()) {
@@ -225,7 +225,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
     public void subscriptionAutomatically(final String topic) {
         if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) {
             try {
-                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
+                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
                     topic, SubscriptionData.SUB_ALL);
                 this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData);
             } catch (Exception ignore) {
@@ -372,13 +372,13 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
         this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, false, timeout);
     }
 
-    private void pullAsyncImpl(//
-        final MessageQueue mq, //
-        final String subExpression, //
-        final long offset, //
-        final int maxNums, //
-        final PullCallback pullCallback, //
-        final boolean block, //
+    private void pullAsyncImpl(
+        final MessageQueue mq,
+        final String subExpression,
+        final long offset,
+        final int maxNums,
+        final PullCallback pullCallback,
+        final boolean block,
         final long timeout) throws MQClientException, RemotingException, InterruptedException {
         this.makeSureStateOK();
 
@@ -405,7 +405,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
 
             final SubscriptionData subscriptionData;
             try {
-                subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
+                subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
                     mq.getTopic(), subExpression);
             } catch (Exception e) {
                 throw new MQClientException("parse subscription error", e);
@@ -413,17 +413,17 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
 
             long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
 
-            this.pullAPIWrapper.pullKernelImpl(//
-                mq, // 1
-                subscriptionData.getSubString(), // 2
-                0L, // 3
-                offset, // 4
-                maxNums, // 5
-                sysFlag, // 6
-                0, // 7
-                this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8
-                timeoutMillis, // 9
-                CommunicationMode.ASYNC, // 10
+            this.pullAPIWrapper.pullKernelImpl(
+                mq,
+                subscriptionData.getSubString(),
+                0L,
+                offset,
+                maxNums,
+                sysFlag,
+                0,
+                this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
+                timeoutMillis,
+                CommunicationMode.ASYNC,
                 new PullCallback() {
 
                     @Override
@@ -551,8 +551,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
                 this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());
                 this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
 
-                this.pullAPIWrapper = new PullAPIWrapper(//
-                    mQClientFactory, //
+                this.pullAPIWrapper = new PullAPIWrapper(
+                    mQClientFactory,
                     this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());
                 this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
 
@@ -589,8 +589,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
             case RUNNING:
             case START_FAILED:
             case SHUTDOWN_ALREADY:
-                throw new MQClientException("The PullConsumer service state not OK, maybe started once, "//
-                    + this.serviceState//
+                throw new MQClientException("The PullConsumer service state not OK, maybe started once, "
+                    + this.serviceState
                     + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                     null);
             default:
@@ -606,42 +606,42 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
         // consumerGroup
         if (null == this.defaultMQPullConsumer.getConsumerGroup()) {
             throw new MQClientException(
-                "consumerGroup is null" //
-                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
+                "consumerGroup is null"
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
                 null);
         }
 
         // consumerGroup
         if (this.defaultMQPullConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) {
             throw new MQClientException(
-                "consumerGroup can not equal "//
-                    + MixAll.DEFAULT_CONSUMER_GROUP //
-                    + ", please specify another one."//
-                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
+                "consumerGroup can not equal "
+                    + MixAll.DEFAULT_CONSUMER_GROUP
+                    + ", please specify another one."
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
                 null);
         }
 
         // messageModel
         if (null == this.defaultMQPullConsumer.getMessageModel()) {
             throw new MQClientException(
-                "messageModel is null" //
-                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
+                "messageModel is null"
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
                 null);
         }
 
         // allocateMessageQueueStrategy
         if (null == this.defaultMQPullConsumer.getAllocateMessageQueueStrategy()) {
             throw new MQClientException(
-                "allocateMessageQueueStrategy is null" //
-                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
+                "allocateMessageQueueStrategy is null"
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
                 null);
         }
 
         // allocateMessageQueueStrategy
         if (this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() < this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis()) {
             throw new MQClientException(
-                "Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis" //
-                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
+                "Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis"
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
                 null);
         }
     }
@@ -651,7 +651,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
             Set<String> registerTopics = this.defaultMQPullConsumer.getRegisterTopics();
             if (registerTopics != null) {
                 for (final String topic : registerTopics) {
-                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
+                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
                         topic, SubscriptionData.SUB_ALL);
                     this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
                 }