You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:15:16 UTC

[85/99] [abbrv] incubator-rocketmq git commit: ROCKETMQ-18 Reformat all codes.

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 4241c0e..c22c515 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -16,6 +16,17 @@
  */
 package org.apache.rocketmq.client.impl.consumer;
 
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
@@ -44,7 +55,11 @@ import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.filter.FilterAPI;
 import org.apache.rocketmq.common.help.FAQUrl;
-import org.apache.rocketmq.common.message.*;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.body.ConsumeStatus;
 import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
@@ -60,11 +75,6 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.slf4j.Logger;
 
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
-
 public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     /**
      * Delay some time when exception occur
@@ -98,7 +108,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     private long flowControlTimes1 = 0;
     private long flowControlTimes2 = 0;
 
-
     public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
         this.defaultMQPushConsumer = defaultMQPushConsumer;
         this.rpcHook = rpcHook;
@@ -214,8 +223,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
             this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
             if ((flowControlTimes1++ % 1000) == 0) {
                 log.warn(
-                        "the consumer message buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}",
-                        processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, flowControlTimes1);
+                    "the consumer message buffer is full, so do flow control, minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}",
+                    processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), size, pullRequest, flowControlTimes1);
             }
             return;
         }
@@ -225,9 +234,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
                 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                 if ((flowControlTimes2++ % 1000) == 0) {
                     log.warn(
-                            "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
-                            processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
-                            pullRequest, flowControlTimes2);
+                        "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
+                        processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
+                        pullRequest, flowControlTimes2);
                 }
                 return;
             }
@@ -237,10 +246,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
                     final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
                     boolean brokerBusy = offset < pullRequest.getNextOffset();
                     log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
-                            pullRequest, offset, brokerBusy);
+                        pullRequest, offset, brokerBusy);
                     if (brokerBusy) {
                         log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
-                                pullRequest, offset);
+                            pullRequest, offset);
                     }
 
                     pullRequest.setLockedFirst(true);
@@ -267,7 +276,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
             public void onSuccess(PullResult pullResult) {
                 if (pullResult != null) {
                     pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
-                            subscriptionData);
+                        subscriptionData);
 
                     switch (pullResult.getPullStatus()) {
                         case FOUND:
@@ -275,7 +284,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
                             pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                             long pullRT = System.currentTimeMillis() - beginTimestamp;
                             DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
-                                    pullRequest.getMessageQueue().getTopic(), pullRT);
+                                pullRequest.getMessageQueue().getTopic(), pullRT);
 
                             long firstMsgOffset = Long.MAX_VALUE;
                             if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
@@ -284,30 +293,30 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
                                 firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
 
                                 DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
-                                        pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
+                                    pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
 
                                 boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
-                                        pullResult.getMsgFoundList(), //
-                                        processQueue, //
-                                        pullRequest.getMessageQueue(), //
-                                        dispathToConsume);
+                                    pullResult.getMsgFoundList(), //
+                                    processQueue, //
+                                    pullRequest.getMessageQueue(), //
+                                    dispathToConsume);
 
                                 if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                     DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
-                                            DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
+                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                 } else {
                                     DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                 }
                             }
 
                             if (pullResult.getNextBeginOffset() < prevRequestOffset//
-                                    || firstMsgOffset < prevRequestOffset) {
+                                || firstMsgOffset < prevRequestOffset) {
                                 log.warn(
-                                        "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", //
-                                        pullResult.getNextBeginOffset(), //
-                                        firstMsgOffset, //
-                                        prevRequestOffset);
+                                    "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", //
+                                    pullResult.getNextBeginOffset(), //
+                                    firstMsgOffset, //
+                                    prevRequestOffset);
                             }
 
                             break;
@@ -327,7 +336,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
                             break;
                         case OFFSET_ILLEGAL:
                             log.warn("the pull request offset illegal, {} {}", //
-                                    pullRequest.toString(), pullResult.toString());
+                                pullRequest.toString(), pullResult.toString());
                             pullRequest.setNextOffset(pullResult.getNextBeginOffset());
 
                             pullRequest.getProcessQueue().setDropped(true);
@@ -337,7 +346,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
                                 public void run() {
                                     try {
                                         DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
-                                                pullRequest.getNextOffset(), false);
+                                            pullRequest.getNextOffset(), false);
 
                                         DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
 
@@ -356,7 +365,6 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
                 }
             }
 
-
             @Override
             public void onException(Throwable e) {
                 if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
@@ -388,24 +396,24 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
         }
 
         int sysFlag = PullSysFlag.buildSysFlag(//
-                commitOffsetEnable, // commitOffset
-                true, // suspend
-                subExpression != null, // subscription
-                classFilter // class filter
+            commitOffsetEnable, // commitOffset
+            true, // suspend
+            subExpression != null, // subscription
+            classFilter // class filter
         );
         try {
             this.pullAPIWrapper.pullKernelImpl(//
-                    pullRequest.getMessageQueue(), // 1
-                    subExpression, // 2
-                    subscriptionData.getSubVersion(), // 3
-                    pullRequest.getNextOffset(), // 4
-                    this.defaultMQPushConsumer.getPullBatchSize(), // 5
-                    sysFlag, // 6
-                    commitOffsetValue, // 7
-                    BROKER_SUSPEND_MAX_TIME_MILLIS, // 8
-                    CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 9
-                    CommunicationMode.ASYNC, // 10
-                    pullCallback// 11
+                pullRequest.getMessageQueue(), // 1
+                subExpression, // 2
+                subscriptionData.getSubVersion(), // 3
+                pullRequest.getNextOffset(), // 4
+                this.defaultMQPushConsumer.getPullBatchSize(), // 5
+                sysFlag, // 6
+                commitOffsetValue, // 7
+                BROKER_SUSPEND_MAX_TIME_MILLIS, // 8
+                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 9
+                CommunicationMode.ASYNC, // 10
+                pullCallback// 11
             );
         } catch (Exception e) {
             log.error("pullKernelImpl exception", e);
@@ -416,9 +424,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     private void makeSureStateOK() throws MQClientException {
         if (this.serviceState != ServiceState.RUNNING) {
             throw new MQClientException("The consumer service state not OK, "//
-                    + this.serviceState//
-                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
-                    null);
+                + this.serviceState//
+                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
+                null);
         }
     }
 
@@ -453,16 +461,15 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     }
 
     public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
-            throws MQClientException, InterruptedException {
+        throws MQClientException, InterruptedException {
         return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end);
     }
 
     public MessageExt queryMessageByUniqKey(String topic, String uniqKey) throws MQClientException,
-            InterruptedException {
+        InterruptedException {
         return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey);
     }
 
-
     public void registerMessageListener(MessageListener messageListener) {
         this.messageListenerInner = messageListener;
     }
@@ -474,12 +481,12 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     }
 
     public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
-            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         try {
             String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
-                    : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
+                : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
             this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
-                    this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
+                this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
         } catch (Exception e) {
             log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
 
@@ -532,7 +539,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
         switch (this.serviceState) {
             case CREATE_JUST:
                 log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
-                        this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
+                    this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                 this.serviceState = ServiceState.START_FAILED;
 
                 this.checkConfig();
@@ -551,8 +558,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
                 this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
 
                 this.pullAPIWrapper = new PullAPIWrapper(
-                        mQClientFactory,
-                        this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
+                    mQClientFactory,
+                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                 this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
 
                 if (this.defaultMQPushConsumer.getOffsetStore() != null) {
@@ -574,11 +581,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
                 if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                     this.consumeOrderly = true;
                     this.consumeMessageService =
-                            new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
+                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly)this.getMessageListenerInner());
                 } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                     this.consumeOrderly = false;
                     this.consumeMessageService =
-                            new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
+                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently)this.getMessageListenerInner());
                 }
 
                 this.consumeMessageService.start();
@@ -588,8 +595,8 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
                     this.serviceState = ServiceState.CREATE_JUST;
                     this.consumeMessageService.shutdown();
                     throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
-                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
-                            null);
+                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
+                        null);
                 }
 
                 mQClientFactory.start();
@@ -600,9 +607,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
             case START_FAILED:
             case SHUTDOWN_ALREADY:
                 throw new MQClientException("The PushConsumer service state not OK, maybe started once, "//
-                        + this.serviceState//
-                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
-                        null);
+                    + this.serviceState//
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
+                    null);
             default:
                 break;
         }
@@ -619,133 +626,133 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
 
         if (null == this.defaultMQPushConsumer.getConsumerGroup()) {
             throw new MQClientException(
-                    "consumerGroup is null"
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
+                "consumerGroup is null"
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                null);
         }
 
         if (this.defaultMQPushConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) {
             throw new MQClientException(
-                    "consumerGroup can not equal "
-                            + MixAll.DEFAULT_CONSUMER_GROUP
-                            + ", please specify another one."
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
+                "consumerGroup can not equal "
+                    + MixAll.DEFAULT_CONSUMER_GROUP
+                    + ", please specify another one."
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                null);
         }
 
         if (null == this.defaultMQPushConsumer.getMessageModel()) {
             throw new MQClientException(
-                    "messageModel is null"
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
+                "messageModel is null"
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                null);
         }
 
         if (null == this.defaultMQPushConsumer.getConsumeFromWhere()) {
             throw new MQClientException(
-                    "consumeFromWhere is null"
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
+                "consumeFromWhere is null"
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                null);
         }
 
         Date dt = UtilAll.parseDate(this.defaultMQPushConsumer.getConsumeTimestamp(), UtilAll.YYYY_MMDD_HHMMSS);
         if (null == dt) {
             throw new MQClientException(
-                    "consumeTimestamp is invalid, YYYY_MMDD_HHMMSS"
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
+                "consumeTimestamp is invalid, YYYY_MMDD_HHMMSS"
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                null);
         }
 
         // allocateMessageQueueStrategy
         if (null == this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()) {
             throw new MQClientException(
-                    "allocateMessageQueueStrategy is null"
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
+                "allocateMessageQueueStrategy is null"
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                null);
         }
 
         // subscription
         if (null == this.defaultMQPushConsumer.getSubscription()) {
             throw new MQClientException(
-                    "subscription is null"
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
+                "subscription is null"
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                null);
         }
 
         // messageListener
         if (null == this.defaultMQPushConsumer.getMessageListener()) {
             throw new MQClientException(
-                    "messageListener is null"
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
+                "messageListener is null"
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                null);
         }
 
         boolean orderly = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerOrderly;
         boolean concurrently = this.defaultMQPushConsumer.getMessageListener() instanceof MessageListenerConcurrently;
         if (!orderly && !concurrently) {
             throw new MQClientException(
-                    "messageListener must be instanceof MessageListenerOrderly or MessageListenerConcurrently"
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
+                "messageListener must be instanceof MessageListenerOrderly or MessageListenerConcurrently"
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                null);
         }
 
         // consumeThreadMin
         if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1
-                || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000
-                || this.defaultMQPushConsumer.getConsumeThreadMin() > this.defaultMQPushConsumer.getConsumeThreadMax()) {
+            || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000
+            || this.defaultMQPushConsumer.getConsumeThreadMin() > this.defaultMQPushConsumer.getConsumeThreadMax()) {
             throw new MQClientException(
-                    "consumeThreadMin Out of range [1, 1000]"
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
+                "consumeThreadMin Out of range [1, 1000]"
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                null);
         }
 
         // consumeThreadMax
         if (this.defaultMQPushConsumer.getConsumeThreadMax() < 1 || this.defaultMQPushConsumer.getConsumeThreadMax() > 1000) {
             throw new MQClientException(
-                    "consumeThreadMax Out of range [1, 1000]"
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
+                "consumeThreadMax Out of range [1, 1000]"
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                null);
         }
 
         // consumeConcurrentlyMaxSpan
         if (this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() < 1
-                || this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 65535) {
+            || this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 65535) {
             throw new MQClientException(
-                    "consumeConcurrentlyMaxSpan Out of range [1, 65535]"
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
+                "consumeConcurrentlyMaxSpan Out of range [1, 65535]"
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                null);
         }
 
         // pullThresholdForQueue
         if (this.defaultMQPushConsumer.getPullThresholdForQueue() < 1 || this.defaultMQPushConsumer.getPullThresholdForQueue() > 65535) {
             throw new MQClientException(
-                    "pullThresholdForQueue Out of range [1, 65535]"
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
+                "pullThresholdForQueue Out of range [1, 65535]"
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                null);
         }
 
         // pullInterval
         if (this.defaultMQPushConsumer.getPullInterval() < 0 || this.defaultMQPushConsumer.getPullInterval() > 65535) {
             throw new MQClientException(
-                    "pullInterval Out of range [0, 65535]"
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
+                "pullInterval Out of range [0, 65535]"
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                null);
         }
 
         // consumeMessageBatchMaxSize
         if (this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() < 1
-                || this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() > 1024) {
+            || this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize() > 1024) {
             throw new MQClientException(
-                    "consumeMessageBatchMaxSize Out of range [1, 1024]"
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
+                "consumeMessageBatchMaxSize Out of range [1, 1024]"
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                null);
         }
 
         // pullBatchSize
         if (this.defaultMQPushConsumer.getPullBatchSize() < 1 || this.defaultMQPushConsumer.getPullBatchSize() > 1024) {
             throw new MQClientException(
-                    "pullBatchSize Out of range [1, 1024]"
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
-                    null);
+                "pullBatchSize Out of range [1, 1024]"
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+                null);
         }
     }
 
@@ -757,7 +764,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
                     final String topic = entry.getKey();
                     final String subString = entry.getValue();
                     SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
-                            topic, subString);
+                        topic, subString);
                     this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
                 }
             }
@@ -772,7 +779,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
                 case CLUSTERING:
                     final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                     SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
-                            retryTopic, SubscriptionData.SUB_ALL);
+                        retryTopic, SubscriptionData.SUB_ALL);
                     this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                     break;
                 default:
@@ -804,7 +811,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     public void subscribe(String topic, String subExpression) throws MQClientException {
         try {
             SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
-                    topic, subExpression);
+                topic, subExpression);
             this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
             if (this.mQClientFactory != null) {
                 this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
@@ -817,7 +824,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {
         try {
             SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
-                    topic, "*");
+                topic, "*");
             subscriptionData.setSubString(fullClassName);
             subscriptionData.setClassFilterMode(true);
             subscriptionData.setFilterClassSource(filterClassSource);
@@ -865,7 +872,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     }
 
     public void resetOffsetByTimeStamp(long timeStamp)
-            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         for (String topic : rebalanceImpl.getSubscriptionInner().keySet()) {
             Set<MessageQueue> mqs = rebalanceImpl.getTopicSubscribeInfoTable().get(topic);
             Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
@@ -1017,9 +1024,9 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
         long computeAccTotal = this.computeAccumulationTotal();
         long adjustThreadPoolNumsThreshold = this.defaultMQPushConsumer.getAdjustThreadPoolNumsThreshold();
 
-        long incThreshold = (long) (adjustThreadPoolNumsThreshold * 1.0);
+        long incThreshold = (long)(adjustThreadPoolNumsThreshold * 1.0);
 
-        long decThreshold = (long) (adjustThreadPoolNumsThreshold * 0.8);
+        long decThreshold = (long)(adjustThreadPoolNumsThreshold * 0.8);
 
         if (computeAccTotal >= incThreshold) {
             this.consumeMessageService.incCorePoolSize();
@@ -1044,7 +1051,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     }
 
     public List<QueueTimeSpan> queryConsumeTimeSpan(final String topic)
-            throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
+        throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
         List<QueueTimeSpan> queueTimeSpan = new ArrayList<QueueTimeSpan>();
         TopicRouteData routeData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, 3000);
         for (BrokerData brokerData : routeData.getBrokerDatas()) {
@@ -1055,12 +1062,10 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
         return queueTimeSpan;
     }
 
-
     public ConsumeMessageService getConsumeMessageService() {
         return consumeMessageService;
     }
 
-
     public void setConsumeMessageService(ConsumeMessageService consumeMessageService) {
         this.consumeMessageService = consumeMessageService;
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java
index b1a2a25..ce4f2b9 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java
@@ -6,16 +6,17 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client.impl.consumer;
 
+import java.util.Set;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
@@ -23,9 +24,6 @@ import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 
-import java.util.Set;
-
-
 /**
  * Consumer inner interface
  *
@@ -33,33 +31,23 @@ import java.util.Set;
 public interface MQConsumerInner {
     String groupName();
 
-
     MessageModel messageModel();
 
-
     ConsumeType consumeType();
 
-
     ConsumeFromWhere consumeFromWhere();
 
-
     Set<SubscriptionData> subscriptions();
 
-
     void doRebalance();
 
-
     void persistConsumerOffset();
 
-
     void updateTopicSubscribeInfo(final String topic, final Set<MessageQueue> info);
 
-
     boolean isSubscribeTopicNeedUpdate(final String topic);
 
-
     boolean isUnitMode();
 
-
     ConsumerRunningInfo consumerRunningInfo();
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java
index 1e573c3..47ae2b0 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MessageQueueLock.java
@@ -6,20 +6,18 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client.impl.consumer;
 
-import org.apache.rocketmq.common.message.MessageQueue;
-
 import java.util.concurrent.ConcurrentHashMap;
-
+import org.apache.rocketmq.common.message.MessageQueue;
 
 /**
  * Message lock,strictly ensure the single queue only one thread at a time consuming
@@ -27,8 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
  */
 public class MessageQueueLock {
     private ConcurrentHashMap<MessageQueue, Object> mqLockTable =
-            new ConcurrentHashMap<MessageQueue, Object>();
-
+        new ConcurrentHashMap<MessageQueue, Object>();
 
     public Object fetchLockObject(final MessageQueue mq) {
         Object objLock = this.mqLockTable.get(mq);

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index f361f1f..2d17703 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -6,24 +6,16 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client.impl.consumer;
 
-import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.common.message.MessageAccessor;
-import org.apache.rocketmq.common.message.MessageConst;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
-import org.slf4j.Logger;
-
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -33,7 +25,13 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
+import org.slf4j.Logger;
 
 /**
  * Queue consumption snapshot
@@ -41,7 +39,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
  */
 public class ProcessQueue {
     public final static long REBALANCE_LOCK_MAX_LIVE_TIME =
-            Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockMaxLiveTime", "30000"));
+        Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockMaxLiveTime", "30000"));
     public final static long REBALANCE_LOCK_INTERVAL = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));
     private final static long PULL_MAX_IDLE_TIME = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000"));
     private final Logger log = ClientLogger.getLog();
@@ -65,7 +63,6 @@ public class ProcessQueue {
         return result;
     }
 
-
     public boolean isPullExpired() {
         boolean result = (System.currentTimeMillis() - this.lastPullTimestamp) > PULL_MAX_IDLE_TIME;
         return result;
@@ -80,7 +77,7 @@ public class ProcessQueue {
         if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) {
             return;
         }
-        
+
         int loop = msgTreeMap.size() < 16 ? msgTreeMap.size() : 16;
         for (int i = 0; i < loop; i++) {
             MessageExt msg = null;
@@ -126,7 +123,6 @@ public class ProcessQueue {
         }
     }
 
-
     public boolean putMessage(final List<MessageExt> msgs) {
         boolean dispatchToConsume = false;
         try {
@@ -167,7 +163,6 @@ public class ProcessQueue {
         return dispatchToConsume;
     }
 
-
     public long getMaxSpan() {
         try {
             this.lockTreeMap.readLock().lockInterruptibly();
@@ -185,7 +180,6 @@ public class ProcessQueue {
         return 0;
     }
 
-
     public long removeMessage(final List<MessageExt> msgs) {
         long result = -1;
         final long now = System.currentTimeMillis();
@@ -218,22 +212,18 @@ public class ProcessQueue {
         return result;
     }
 
-
     public TreeMap<Long, MessageExt> getMsgTreeMap() {
         return msgTreeMap;
     }
 
-
     public AtomicLong getMsgCount() {
         return msgCount;
     }
 
-
     public boolean isDropped() {
         return dropped;
     }
 
-
     public void setDropped(boolean dropped) {
         this.dropped = dropped;
     }
@@ -260,7 +250,6 @@ public class ProcessQueue {
         }
     }
 
-
     public long commit() {
         try {
             this.lockTreeMap.writeLock().lockInterruptibly();
@@ -281,7 +270,6 @@ public class ProcessQueue {
         return -1;
     }
 
-
     public void makeMessageToCosumeAgain(List<MessageExt> msgs) {
         try {
             this.lockTreeMap.writeLock().lockInterruptibly();
@@ -298,7 +286,6 @@ public class ProcessQueue {
         }
     }
 
-
     public List<MessageExt> takeMessags(final int batchSize) {
         List<MessageExt> result = new ArrayList<MessageExt>(batchSize);
         final long now = System.currentTimeMillis();
@@ -331,7 +318,6 @@ public class ProcessQueue {
         return result;
     }
 
-
     public boolean hasTempMessage() {
         try {
             this.lockTreeMap.readLock().lockInterruptibly();
@@ -346,7 +332,6 @@ public class ProcessQueue {
         return true;
     }
 
-
     public void clear() {
         try {
             this.lockTreeMap.writeLock().lockInterruptibly();
@@ -363,52 +348,42 @@ public class ProcessQueue {
         }
     }
 
-
     public long getLastLockTimestamp() {
         return lastLockTimestamp;
     }
 
-
     public void setLastLockTimestamp(long lastLockTimestamp) {
         this.lastLockTimestamp = lastLockTimestamp;
     }
 
-
     public Lock getLockConsume() {
         return lockConsume;
     }
 
-
     public long getLastPullTimestamp() {
         return lastPullTimestamp;
     }
 
-
     public void setLastPullTimestamp(long lastPullTimestamp) {
         this.lastPullTimestamp = lastPullTimestamp;
     }
 
-
     public long getMsgAccCnt() {
         return msgAccCnt;
     }
 
-
     public void setMsgAccCnt(long msgAccCnt) {
         this.msgAccCnt = msgAccCnt;
     }
 
-
     public long getTryUnlockTimes() {
         return this.tryUnlockTimes.get();
     }
 
-
     public void incTryUnlockTimes() {
         this.tryUnlockTimes.incrementAndGet();
     }
 
-
     public void fillProcessQueueInfo(final ProcessQueueInfo info) {
         try {
             this.lockTreeMap.readLock().lockInterruptibly();
@@ -438,12 +413,10 @@ public class ProcessQueue {
         }
     }
 
-
     public long getLastConsumeTimestamp() {
         return lastConsumeTimestamp;
     }
 
-
     public void setLastConsumeTimestamp(long lastConsumeTimestamp) {
         this.lastConsumeTimestamp = lastConsumeTimestamp;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
index 59c9b1c..d358175 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java
@@ -16,6 +16,12 @@
  */
 package org.apache.rocketmq.client.impl.consumer;
 
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.client.consumer.PullCallback;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.PullStatus;
@@ -28,7 +34,11 @@ import org.apache.rocketmq.client.impl.FindBrokerResult;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.message.*;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
@@ -36,21 +46,13 @@ import org.apache.rocketmq.common.sysflag.PullSysFlag;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.slf4j.Logger;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-
 public class PullAPIWrapper {
     private final Logger log = ClientLogger.getLog();
     private final MQClientInstance mQClientFactory;
     private final String consumerGroup;
     private final boolean unitMode;
     private ConcurrentHashMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
-            new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
+        new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
     private volatile boolean connectBrokerByUser = false;
     private volatile long defaultBrokerId = MixAll.MASTER_ID;
     private Random random = new Random(System.currentTimeMillis());
@@ -63,8 +65,8 @@ public class PullAPIWrapper {
     }
 
     public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
-                                        final SubscriptionData subscriptionData) {
-        PullResultExt pullResultExt = (PullResultExt) pullResult;
+        final SubscriptionData subscriptionData) {
+        PullResultExt pullResultExt = (PullResultExt)pullResult;
 
         this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
         if (PullStatus.FOUND == pullResult.getPullStatus()) {
@@ -92,9 +94,9 @@ public class PullAPIWrapper {
 
             for (MessageExt msg : msgListFilterAgain) {
                 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
-                        Long.toString(pullResult.getMinOffset()));
+                    Long.toString(pullResult.getMinOffset()));
                 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
-                        Long.toString(pullResult.getMaxOffset()));
+                    Long.toString(pullResult.getMaxOffset()));
             }
 
             pullResultExt.setMsgFoundList(msgListFilterAgain);
@@ -131,26 +133,26 @@ public class PullAPIWrapper {
     }
 
     public PullResult pullKernelImpl(
-            final MessageQueue mq,
-            final String subExpression,
-            final long subVersion,
-            final long offset,
-            final int maxNums,
-            final int sysFlag,
-            final long commitOffset,
-            final long brokerSuspendMaxTimeMillis,
-            final long timeoutMillis,
-            final CommunicationMode communicationMode,
-            final PullCallback pullCallback
+        final MessageQueue mq,
+        final String subExpression,
+        final long subVersion,
+        final long offset,
+        final int maxNums,
+        final int sysFlag,
+        final long commitOffset,
+        final long brokerSuspendMaxTimeMillis,
+        final long timeoutMillis,
+        final CommunicationMode communicationMode,
+        final PullCallback pullCallback
     ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         FindBrokerResult findBrokerResult =
-                this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
-                        this.recalculatePullFromWhichNode(mq), false);
+            this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
+                this.recalculatePullFromWhichNode(mq), false);
         if (null == findBrokerResult) {
             this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
             findBrokerResult =
-                    this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
-                            this.recalculatePullFromWhichNode(mq), false);
+                this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
+                    this.recalculatePullFromWhichNode(mq), false);
         }
 
         if (findBrokerResult != null) {
@@ -178,11 +180,11 @@ public class PullAPIWrapper {
             }
 
             PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
-                    brokerAddr,
-                    requestHeader,
-                    timeoutMillis,
-                    communicationMode,
-                    pullCallback);
+                brokerAddr,
+                requestHeader,
+                timeoutMillis,
+                communicationMode,
+                pullCallback);
 
             return pullResult;
         }
@@ -204,7 +206,7 @@ public class PullAPIWrapper {
     }
 
     private String computPullFromWhichFilterServer(final String topic, final String brokerAddr)
-            throws MQClientException {
+        throws MQClientException {
         ConcurrentHashMap<String, TopicRouteData> topicRouteTable = this.mQClientFactory.getTopicRouteTable();
         if (topicRouteTable != null) {
             TopicRouteData topicRouteData = topicRouteTable.get(topic);
@@ -216,13 +218,18 @@ public class PullAPIWrapper {
         }
 
         throw new MQClientException("Find Filter Server Failed, Broker Addr: " + brokerAddr + " topic: "
-                + topic, null);
+            + topic, null);
     }
 
     public boolean isConnectBrokerByUser() {
         return connectBrokerByUser;
     }
 
+    public void setConnectBrokerByUser(boolean connectBrokerByUser) {
+        this.connectBrokerByUser = connectBrokerByUser;
+
+    }
+
     public int randomNum() {
         int value = random.nextInt();
         if (value < 0) {
@@ -233,11 +240,6 @@ public class PullAPIWrapper {
         return value;
     }
 
-    public void setConnectBrokerByUser(boolean connectBrokerByUser) {
-        this.connectBrokerByUser = connectBrokerByUser;
-
-    }
-
     public void registerFilterMessageHook(ArrayList<FilterMessageHook> filterMessageHookList) {
         this.filterMessageHookList = filterMessageHookList;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
index 4634c24..55e3d59 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullMessageService.java
@@ -6,35 +6,37 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client.impl.consumer;
 
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.common.ServiceThread;
 import org.slf4j.Logger;
 
-import java.util.concurrent.*;
-
-
 public class PullMessageService extends ServiceThread {
     private final Logger log = ClientLogger.getLog();
     private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
     private final MQClientInstance mQClientFactory;
     private final ScheduledExecutorService scheduledExecutorService = Executors
-            .newSingleThreadScheduledExecutor(new ThreadFactory() {
-                @Override
-                public Thread newThread(Runnable r) {
-                    return new Thread(r, "PullMessageServiceScheduledThread");
-                }
-            });
+        .newSingleThreadScheduledExecutor(new ThreadFactory() {
+            @Override
+            public Thread newThread(Runnable r) {
+                return new Thread(r, "PullMessageServiceScheduledThread");
+            }
+        });
 
     public PullMessageService(MQClientInstance mQClientFactory) {
         this.mQClientFactory = mQClientFactory;
@@ -69,14 +71,13 @@ public class PullMessageService extends ServiceThread {
     private void pullMessage(final PullRequest pullRequest) {
         final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
         if (consumer != null) {
-            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
+            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl)consumer;
             impl.pullMessage(pullRequest);
         } else {
             log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
         }
     }
 
-
     @Override
     public void run() {
         log.info(this.getServiceName() + " service started");
@@ -96,11 +97,9 @@ public class PullMessageService extends ServiceThread {
         log.info(this.getServiceName() + " service end");
     }
 
-
     @Override
     public String getServiceName() {
         return PullMessageService.class.getSimpleName();
     }
 
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java
index ccc624b..4850313 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullRequest.java
@@ -6,19 +6,18 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client.impl.consumer;
 
 import org.apache.rocketmq.common.message.MessageQueue;
 
-
 public class PullRequest {
     private String consumerGroup;
     private MessageQueue messageQueue;
@@ -38,27 +37,22 @@ public class PullRequest {
         return consumerGroup;
     }
 
-
     public void setConsumerGroup(String consumerGroup) {
         this.consumerGroup = consumerGroup;
     }
 
-
     public MessageQueue getMessageQueue() {
         return messageQueue;
     }
 
-
     public void setMessageQueue(MessageQueue messageQueue) {
         this.messageQueue = messageQueue;
     }
 
-
     public long getNextOffset() {
         return nextOffset;
     }
 
-
     public void setNextOffset(long nextOffset) {
         this.nextOffset = nextOffset;
     }
@@ -80,7 +74,7 @@ public class PullRequest {
             return false;
         if (getClass() != obj.getClass())
             return false;
-        PullRequest other = (PullRequest) obj;
+        PullRequest other = (PullRequest)obj;
         if (consumerGroup == null) {
             if (other.consumerGroup != null)
                 return false;
@@ -97,14 +91,13 @@ public class PullRequest {
     @Override
     public String toString() {
         return "PullRequest [consumerGroup=" + consumerGroup + ", messageQueue=" + messageQueue
-                + ", nextOffset=" + nextOffset + "]";
+            + ", nextOffset=" + nextOffset + "]";
     }
 
     public ProcessQueue getProcessQueue() {
         return processQueue;
     }
 
-
     public void setProcessQueue(ProcessQueue processQueue) {
         this.processQueue = processQueue;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullResultExt.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullResultExt.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullResultExt.java
index d248603..c43c9a9 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullResultExt.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullResultExt.java
@@ -6,46 +6,40 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client.impl.consumer;
 
+import java.util.List;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.common.message.MessageExt;
 
-import java.util.List;
-
-
 public class PullResultExt extends PullResult {
     private final long suggestWhichBrokerId;
     private byte[] messageBinary;
 
-
     public PullResultExt(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset,
-                         List<MessageExt> msgFoundList, final long suggestWhichBrokerId, final byte[] messageBinary) {
+        List<MessageExt> msgFoundList, final long suggestWhichBrokerId, final byte[] messageBinary) {
         super(pullStatus, nextBeginOffset, minOffset, maxOffset, msgFoundList);
         this.suggestWhichBrokerId = suggestWhichBrokerId;
         this.messageBinary = messageBinary;
     }
 
-
     public byte[] getMessageBinary() {
         return messageBinary;
     }
 
-
     public void setMessageBinary(byte[] messageBinary) {
         this.messageBinary = messageBinary;
     }
 
-
     public long getSuggestWhichBrokerId() {
         return suggestWhichBrokerId;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index 53d775f..91bfd1a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -16,6 +16,16 @@
  */
 package org.apache.rocketmq.client.impl.consumer;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
 import org.apache.rocketmq.client.impl.FindBrokerResult;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
@@ -29,30 +39,23 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.slf4j.Logger;
 
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
-
 /**
  * Base class for rebalance algorithm
- *
  */
 public abstract class RebalanceImpl {
     protected static final Logger log = ClientLogger.getLog();
     protected final ConcurrentHashMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
     protected final ConcurrentHashMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =
-            new ConcurrentHashMap<String, Set<MessageQueue>>();
+        new ConcurrentHashMap<String, Set<MessageQueue>>();
     protected final ConcurrentHashMap<String /* topic */, SubscriptionData> subscriptionInner =
-            new ConcurrentHashMap<String, SubscriptionData>();
+        new ConcurrentHashMap<String, SubscriptionData>();
     protected String consumerGroup;
     protected MessageModel messageModel;
     protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
     protected MQClientInstance mQClientFactory;
 
-
     public RebalanceImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy,
-                         MQClientInstance mQClientFactory) {
+        MQClientInstance mQClientFactory) {
         this.consumerGroup = consumerGroup;
         this.messageModel = messageModel;
         this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
@@ -70,9 +73,9 @@ public abstract class RebalanceImpl {
             try {
                 this.mQClientFactory.getMQClientAPIImpl().unlockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000, oneway);
                 log.warn("unlock messageQueue. group:{}, clientId:{}, mq:{}", //
-                        this.consumerGroup, //
-                        this.mQClientFactory.getClientId(), //
-                        mq);
+                    this.consumerGroup, //
+                    this.mQClientFactory.getClientId(), //
+                    mq);
             } catch (Exception e) {
                 log.error("unlockBatchMQ exception, " + mq, e);
             }
@@ -138,7 +141,7 @@ public abstract class RebalanceImpl {
 
             try {
                 Set<MessageQueue> lockedMq =
-                        this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
+                    this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
                 for (MessageQueue mmqq : lockedMq) {
                     ProcessQueue processQueue = this.processQueueTable.get(mmqq);
                     if (processQueue != null) {
@@ -149,9 +152,9 @@ public abstract class RebalanceImpl {
 
                 boolean lockOK = lockedMq.contains(mq);
                 log.info("the message queue lock {}, {} {}",
-                        lockOK ? "OK" : "Failed",
-                        this.consumerGroup,
-                        mq);
+                    lockOK ? "OK" : "Failed",
+                    this.consumerGroup,
+                    mq);
                 return lockOK;
             } catch (Exception e) {
                 log.error("lockBatchMQ exception, " + mq, e);
@@ -182,7 +185,7 @@ public abstract class RebalanceImpl {
 
                 try {
                     Set<MessageQueue> lockOKMQSet =
-                            this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
+                        this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
 
                     for (MessageQueue mq : lockOKMQSet) {
                         ProcessQueue processQueue = this.processQueueTable.get(mq);
@@ -242,10 +245,10 @@ public abstract class RebalanceImpl {
                     if (changed) {
                         this.messageQueueChanged(topic, mqSet, mqSet);
                         log.info("messageQueueChanged {} {} {} {}", //
-                                consumerGroup, //
-                                topic, //
-                                mqSet, //
-                                mqSet);
+                            consumerGroup, //
+                            topic, //
+                            mqSet, //
+                            mqSet);
                     }
                 } else {
                     log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
@@ -277,13 +280,13 @@ public abstract class RebalanceImpl {
                     List<MessageQueue> allocateResult = null;
                     try {
                         allocateResult = strategy.allocate(//
-                                this.consumerGroup, //
-                                this.mQClientFactory.getClientId(), //
-                                mqAll, //
-                                cidAll);
+                            this.consumerGroup, //
+                            this.mQClientFactory.getClientId(), //
+                            mqAll, //
+                            cidAll);
                     } catch (Throwable e) {
                         log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
-                                e);
+                            e);
                         return;
                     }
 
@@ -295,9 +298,9 @@ public abstract class RebalanceImpl {
                     boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                     if (changed) {
                         log.info(
-                                "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
-                                strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
-                                allocateResultSet.size(), allocateResultSet);
+                            "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
+                            strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
+                            allocateResultSet.size(), allocateResultSet);
                         this.messageQueueChanged(topic, mqSet, allocateResultSet);
                     }
                 }
@@ -350,7 +353,7 @@ public abstract class RebalanceImpl {
                                 it.remove();
                                 changed = true;
                                 log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
-                                        consumerGroup, mq);
+                                    consumerGroup, mq);
                             }
                             break;
                         default:
@@ -422,52 +425,42 @@ public abstract class RebalanceImpl {
         return processQueueTable;
     }
 
-
     public ConcurrentHashMap<String, Set<MessageQueue>> getTopicSubscribeInfoTable() {
         return topicSubscribeInfoTable;
     }
 
-
     public String getConsumerGroup() {
         return consumerGroup;
     }
 
-
     public void setConsumerGroup(String consumerGroup) {
         this.consumerGroup = consumerGroup;
     }
 
-
     public MessageModel getMessageModel() {
         return messageModel;
     }
 
-
     public void setMessageModel(MessageModel messageModel) {
         this.messageModel = messageModel;
     }
 
-
     public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
         return allocateMessageQueueStrategy;
     }
 
-
     public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
         this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
     }
 
-
     public MQClientInstance getmQClientFactory() {
         return mQClientFactory;
     }
 
-
     public void setmQClientFactory(MQClientInstance mQClientFactory) {
         this.mQClientFactory = mQClientFactory;
     }
 
-
     public void destroy() {
         Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
         while (it.hasNext()) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
index d37090d..1130943 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
@@ -6,16 +6,18 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client.impl.consumer;
 
+import java.util.List;
+import java.util.Set;
 import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
 import org.apache.rocketmq.client.consumer.MessageQueueListener;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
@@ -23,21 +25,15 @@ import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 
-import java.util.List;
-import java.util.Set;
-
-
 public class RebalancePullImpl extends RebalanceImpl {
     private final DefaultMQPullConsumerImpl defaultMQPullConsumerImpl;
 
-
     public RebalancePullImpl(DefaultMQPullConsumerImpl defaultMQPullConsumerImpl) {
         this(null, null, null, null, defaultMQPullConsumerImpl);
     }
 
-
     public RebalancePullImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy,
-                             MQClientInstance mQClientFactory, DefaultMQPullConsumerImpl defaultMQPullConsumerImpl) {
+        MQClientInstance mQClientFactory, DefaultMQPullConsumerImpl defaultMQPullConsumerImpl) {
         super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory);
         this.defaultMQPullConsumerImpl = defaultMQPullConsumerImpl;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index 4d0d47f..707b9a1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -6,16 +6,19 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client.impl.consumer;
 
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
 import org.apache.rocketmq.client.consumer.store.OffsetStore;
 import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
@@ -28,23 +31,16 @@ import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-
 public class RebalancePushImpl extends RebalanceImpl {
     private final static long UNLOCK_DELAY_TIME_MILLS = Long.parseLong(System.getProperty("rocketmq.client.unlockDelayTimeMills", "20000"));
     private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
 
-
     public RebalancePushImpl(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
         this(null, null, null, null, defaultMQPushConsumerImpl);
     }
 
-
     public RebalancePushImpl(String consumerGroup, MessageModel messageModel, AllocateMessageQueueStrategy allocateMessageQueueStrategy,
-                             MQClientInstance mQClientFactory, DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
+        MQClientInstance mQClientFactory, DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
         super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory);
         this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
     }
@@ -58,7 +54,7 @@ public class RebalancePushImpl extends RebalanceImpl {
         this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
         this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
         if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
-                && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
+            && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
             try {
                 if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
                     try {
@@ -68,8 +64,8 @@ public class RebalancePushImpl extends RebalanceImpl {
                     }
                 } else {
                     log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}", //
-                            mq, //
-                            pq.getTryUnlockTimes());
+                        mq, //
+                        pq.getTryUnlockTimes());
 
                     pq.incTryUnlockTimes();
                 }
@@ -164,7 +160,7 @@ public class RebalancePushImpl extends RebalanceImpl {
                     } else {
                         try {
                             long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
-                                    UtilAll.YYYY_MMDD_HHMMSS).getTime();
+                                UtilAll.YYYY_MMDD_HHMMSS).getTime();
                             result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
                         } catch (MQClientException e) {
                             result = -1;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceService.java
index 5b5ab2a..985129e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceService.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client.impl.consumer;
 
@@ -21,15 +21,14 @@ import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.common.ServiceThread;
 import org.slf4j.Logger;
 
-
 /**
  * Rebalance Service
  *
  */
 public class RebalanceService extends ServiceThread {
     private static long waitInterval =
-            Long.parseLong(System.getProperty(
-                    "rocketmq.client.rebalance.waitInterval", "20000"));
+        Long.parseLong(System.getProperty(
+            "rocketmq.client.rebalance.waitInterval", "20000"));
     private final Logger log = ClientLogger.getLog();
     private final MQClientInstance mqClientFactory;
 
@@ -49,7 +48,6 @@ public class RebalanceService extends ServiceThread {
         log.info(this.getServiceName() + " service end");
     }
 
-
     @Override
     public String getServiceName() {
         return RebalanceService.class.getSimpleName();