You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vi...@apache.org on 2016/12/28 10:02:42 UTC

[35/50] [abbrv] incubator-rocketmq git commit: ROCKETMQ-18 Reformat all codes.

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
index 3cc2fdf..9fd1b34 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java
@@ -6,36 +6,32 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client.impl;
 
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.remoting.RPCHook;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-
 public class MQClientManager {
     private static MQClientManager instance = new MQClientManager();
     private AtomicInteger factoryIndexGenerator = new AtomicInteger();
     private ConcurrentHashMap<String/* clientId */, MQClientInstance> factoryTable =
-            new ConcurrentHashMap<String, MQClientInstance>();
-
+        new ConcurrentHashMap<String, MQClientInstance>();
 
     private MQClientManager() {
 
     }
 
-
     public static MQClientManager getInstance() {
         return instance;
     }
@@ -49,8 +45,8 @@ public class MQClientManager {
         MQClientInstance instance = this.factoryTable.get(clientId);
         if (null == instance) {
             instance =
-                    new MQClientInstance(clientConfig.cloneClientConfig(),
-                            this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
+                new MQClientInstance(clientConfig.cloneClientConfig(),
+                    this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
             MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
             if (prev != null) {
                 instance = prev;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
index e02bd4f..e7a6ca3 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
@@ -16,6 +16,19 @@
  */
 package org.apache.rocketmq.client.impl.consumer;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@@ -35,10 +48,6 @@ import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.slf4j.Logger;
 
-import java.util.*;
-import java.util.concurrent.*;
-
-
 public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
     private static final Logger log = ClientLogger.getLog();
     private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
@@ -51,9 +60,8 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
     private final ScheduledExecutorService scheduledExecutorService;
     private final ScheduledExecutorService cleanExpireMsgExecutors;
 
-
     public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
-                                             MessageListenerConcurrently messageListener) {
+        MessageListenerConcurrently messageListener) {
         this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
         this.messageListener = messageListener;
 
@@ -62,18 +70,17 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
         this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
 
         this.consumeExecutor = new ThreadPoolExecutor(//
-                this.defaultMQPushConsumer.getConsumeThreadMin(), //
-                this.defaultMQPushConsumer.getConsumeThreadMax(), //
-                1000 * 60, //
-                TimeUnit.MILLISECONDS, //
-                this.consumeRequestQueue, //
-                new ThreadFactoryImpl("ConsumeMessageThread_"));
+            this.defaultMQPushConsumer.getConsumeThreadMin(), //
+            this.defaultMQPushConsumer.getConsumeThreadMax(), //
+            1000 * 60, //
+            TimeUnit.MILLISECONDS, //
+            this.consumeRequestQueue, //
+            new ThreadFactoryImpl("ConsumeMessageThread_"));
 
         this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
         this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
     }
 
-
     public void start() {
         this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
 
@@ -85,7 +92,6 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
         }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
     }
 
-
     public void shutdown() {
         this.scheduledExecutorService.shutdown();
         this.consumeExecutor.shutdown();
@@ -95,8 +101,8 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
     @Override
     public void updateCorePoolSize(int corePoolSize) {
         if (corePoolSize > 0 //
-                && corePoolSize <= Short.MAX_VALUE //
-                && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
+            && corePoolSize <= Short.MAX_VALUE //
+            && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
             this.consumeExecutor.setCorePoolSize(corePoolSize);
         }
     }
@@ -180,10 +186,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
             result.setRemark(RemotingHelper.exceptionSimpleDesc(e));
 
             log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", //
-                    RemotingHelper.exceptionSimpleDesc(e), //
-                    ConsumeMessageConcurrentlyService.this.consumerGroup, //
-                    msgs, //
-                    mq), e);
+                RemotingHelper.exceptionSimpleDesc(e), //
+                ConsumeMessageConcurrentlyService.this.consumerGroup, //
+                msgs, //
+                mq), e);
         }
 
         result.setSpentTimeMills(System.currentTimeMillis() - beginTime);
@@ -195,10 +201,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
 
     @Override
     public void submitConsumeRequest(//
-                                     final List<MessageExt> msgs, //
-                                     final ProcessQueue processQueue, //
-                                     final MessageQueue messageQueue, //
-                                     final boolean dispatchToConsume) {
+        final List<MessageExt> msgs, //
+        final ProcessQueue processQueue, //
+        final MessageQueue messageQueue, //
+        final boolean dispatchToConsume) {
         final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
         if (msgs.size() <= consumeBatchSize) {
             ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
@@ -244,7 +250,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
 
     private void cleanExpireMsg() {
         Iterator<Map.Entry<MessageQueue, ProcessQueue>> it =
-                this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();
+            this.defaultMQPushConsumerImpl.getRebalanceImpl().getProcessQueueTable().entrySet().iterator();
         while (it.hasNext()) {
             Map.Entry<MessageQueue, ProcessQueue> next = it.next();
             ProcessQueue pq = next.getValue();
@@ -253,9 +259,9 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
     }
 
     public void processConsumeResult(//
-                                     final ConsumeConcurrentlyStatus status, //
-                                     final ConsumeConcurrentlyContext context, //
-                                     final ConsumeRequest consumeRequest//
+        final ConsumeConcurrentlyStatus status, //
+        final ConsumeConcurrentlyContext context, //
+        final ConsumeRequest consumeRequest//
     ) {
         int ackIndex = context.getAckIndex();
 
@@ -275,7 +281,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
             case RECONSUME_LATER:
                 ackIndex = -1;
                 this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
-                        consumeRequest.getMsgs().size());
+                    consumeRequest.getMsgs().size());
                 break;
             default:
                 break;
@@ -333,9 +339,9 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
     }
 
     private void submitConsumeRequestLater(//
-                                           final List<MessageExt> msgs, //
-                                           final ProcessQueue processQueue, //
-                                           final MessageQueue messageQueue//
+        final List<MessageExt> msgs, //
+        final ProcessQueue processQueue, //
+        final MessageQueue messageQueue//
     ) {
 
         this.scheduledExecutorService.schedule(new Runnable() {
@@ -364,7 +370,6 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
         private final ProcessQueue processQueue;
         private final MessageQueue messageQueue;
 
-
         public ConsumeRequest(List<MessageExt> msgs, ProcessQueue processQueue, MessageQueue messageQueue) {
             this.msgs = msgs;
             this.processQueue = processQueue;
@@ -414,10 +419,10 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
                 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
             } catch (Throwable e) {
                 log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
-                        RemotingHelper.exceptionSimpleDesc(e), //
-                        ConsumeMessageConcurrentlyService.this.consumerGroup,
-                        msgs,
-                        messageQueue);
+                    RemotingHelper.exceptionSimpleDesc(e), //
+                    ConsumeMessageConcurrentlyService.this.consumerGroup,
+                    msgs,
+                    messageQueue);
                 hasException = true;
             }
             long consumeRT = System.currentTimeMillis() - beginTimestamp;
@@ -437,9 +442,9 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
             consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
             if (null == status) {
                 log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
-                        ConsumeMessageConcurrentlyService.this.consumerGroup,
-                        msgs,
-                        messageQueue);
+                    ConsumeMessageConcurrentlyService.this.consumerGroup,
+                    msgs,
+                    messageQueue);
                 status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
             }
 
@@ -450,7 +455,7 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
             }
 
             ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
-                    .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
+                .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
 
             if (!processQueue.isDropped()) {
                 ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
@@ -463,6 +468,5 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
             return messageQueue;
         }
 
-
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
index f6a1e4d..3def223 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -16,35 +16,42 @@
  */
 package org.apache.rocketmq.client.impl.consumer;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
 import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
 import org.apache.rocketmq.client.hook.ConsumeMessageContext;
 import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.client.stat.ConsumerStatsManager;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.message.*;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.body.CMResult;
 import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
-import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
 import org.slf4j.Logger;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.*;
-
-
 public class ConsumeMessageOrderlyService implements ConsumeMessageService {
     private static final Logger log = ClientLogger.getLog();
     private final static long MAX_TIME_CONSUME_CONTINUOUSLY =
-            Long.parseLong(System.getProperty("rocketmq.client.maxTimeConsumeContinuously", "60000"));
+        Long.parseLong(System.getProperty("rocketmq.client.maxTimeConsumeContinuously", "60000"));
     private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
     private final DefaultMQPushConsumer defaultMQPushConsumer;
     private final MessageListenerOrderly messageListener;
@@ -55,7 +62,6 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
     private final ScheduledExecutorService scheduledExecutorService;
     private volatile boolean stopped = false;
 
-
     public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerOrderly messageListener) {
         this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
         this.messageListener = messageListener;
@@ -65,17 +71,16 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
         this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
 
         this.consumeExecutor = new ThreadPoolExecutor(//
-                this.defaultMQPushConsumer.getConsumeThreadMin(), //
-                this.defaultMQPushConsumer.getConsumeThreadMax(), //
-                1000 * 60, //
-                TimeUnit.MILLISECONDS, //
-                this.consumeRequestQueue, //
-                new ThreadFactoryImpl("ConsumeMessageThread_"));
+            this.defaultMQPushConsumer.getConsumeThreadMin(), //
+            this.defaultMQPushConsumer.getConsumeThreadMax(), //
+            1000 * 60, //
+            TimeUnit.MILLISECONDS, //
+            this.consumeRequestQueue, //
+            new ThreadFactoryImpl("ConsumeMessageThread_"));
 
         this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
     }
 
-
     public void start() {
         if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
             this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@@ -87,7 +92,6 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
         }
     }
 
-
     public void shutdown() {
         this.stopped = true;
         this.scheduledExecutorService.shutdown();
@@ -97,7 +101,6 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
         }
     }
 
-
     public synchronized void unlockAllMQ() {
         this.defaultMQPushConsumerImpl.getRebalanceImpl().unlockAll(false);
     }
@@ -105,8 +108,8 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
     @Override
     public void updateCorePoolSize(int corePoolSize) {
         if (corePoolSize > 0 //
-                && corePoolSize <= Short.MAX_VALUE //
-                && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
+            && corePoolSize <= Short.MAX_VALUE //
+            && corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax()) {
             this.consumeExecutor.setCorePoolSize(corePoolSize);
         }
     }
@@ -169,10 +172,10 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
             result.setRemark(RemotingHelper.exceptionSimpleDesc(e));
 
             log.warn(String.format("consumeMessageDirectly exception: %s Group: %s Msgs: %s MQ: %s", //
-                    RemotingHelper.exceptionSimpleDesc(e), //
-                    ConsumeMessageOrderlyService.this.consumerGroup, //
-                    msgs, //
-                    mq), e);
+                RemotingHelper.exceptionSimpleDesc(e), //
+                ConsumeMessageOrderlyService.this.consumerGroup, //
+                msgs, //
+                mq), e);
         }
 
         result.setAutoCommit(context.isAutoCommit());
@@ -185,10 +188,10 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
 
     @Override
     public void submitConsumeRequest(//
-                                     final List<MessageExt> msgs, //
-                                     final ProcessQueue processQueue, //
-                                     final MessageQueue messageQueue, //
-                                     final boolean dispathToConsume) {
+        final List<MessageExt> msgs, //
+        final ProcessQueue processQueue, //
+        final MessageQueue messageQueue, //
+        final boolean dispathToConsume) {
         if (dispathToConsume) {
             ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
             this.consumeExecutor.submit(consumeRequest);
@@ -224,9 +227,9 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
     }
 
     private void submitConsumeRequestLater(//
-                                           final ProcessQueue processQueue, //
-                                           final MessageQueue messageQueue, //
-                                           final long suspendTimeMillis//
+        final ProcessQueue processQueue, //
+        final MessageQueue messageQueue, //
+        final long suspendTimeMillis//
     ) {
         long timeMillis = suspendTimeMillis;
         if (timeMillis == -1) {
@@ -249,10 +252,10 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
     }
 
     public boolean processConsumeResult(//
-                                        final List<MessageExt> msgs, //
-                                        final ConsumeOrderlyStatus status, //
-                                        final ConsumeOrderlyContext context, //
-                                        final ConsumeRequest consumeRequest//
+        final List<MessageExt> msgs, //
+        final ConsumeOrderlyStatus status, //
+        final ConsumeOrderlyContext context, //
+        final ConsumeRequest consumeRequest//
     ) {
         boolean continueConsume = true;
         long commitOffset = -1L;
@@ -261,7 +264,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
                 case COMMIT:
                 case ROLLBACK:
                     log.warn("the message queue consume result is illegal, we think you want to ack these message {}",
-                            consumeRequest.getMessageQueue());
+                        consumeRequest.getMessageQueue());
                 case SUCCESS:
                     commitOffset = consumeRequest.getProcessQueue().commit();
                     this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
@@ -271,9 +274,9 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
                     if (checkReconsumeTimes(msgs)) {
                         consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
                         this.submitConsumeRequestLater(//
-                                consumeRequest.getProcessQueue(), //
-                                consumeRequest.getMessageQueue(), //
-                                context.getSuspendCurrentQueueTimeMillis());
+                            consumeRequest.getProcessQueue(), //
+                            consumeRequest.getMessageQueue(), //
+                            context.getSuspendCurrentQueueTimeMillis());
                         continueConsume = false;
                     } else {
                         commitOffset = consumeRequest.getProcessQueue().commit();
@@ -293,9 +296,9 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
                 case ROLLBACK:
                     consumeRequest.getProcessQueue().rollback();
                     this.submitConsumeRequestLater(//
-                            consumeRequest.getProcessQueue(), //
-                            consumeRequest.getMessageQueue(), //
-                            context.getSuspendCurrentQueueTimeMillis());
+                        consumeRequest.getProcessQueue(), //
+                        consumeRequest.getMessageQueue(), //
+                        context.getSuspendCurrentQueueTimeMillis());
                     continueConsume = false;
                     break;
                 case SUSPEND_CURRENT_QUEUE_A_MOMENT:
@@ -303,9 +306,9 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
                     if (checkReconsumeTimes(msgs)) {
                         consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
                         this.submitConsumeRequestLater(//
-                                consumeRequest.getProcessQueue(), //
-                                consumeRequest.getMessageQueue(), //
-                                context.getSuspendCurrentQueueTimeMillis());
+                            consumeRequest.getProcessQueue(), //
+                            consumeRequest.getMessageQueue(), //
+                            context.getSuspendCurrentQueueTimeMillis());
                         continueConsume = false;
                     }
                     break;
@@ -379,7 +382,6 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
         private final ProcessQueue processQueue;
         private final MessageQueue messageQueue;
 
-
         public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
             this.processQueue = processQueue;
             this.messageQueue = messageQueue;
@@ -403,7 +405,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
             final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
             synchronized (objLock) {
                 if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
-                        || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
+                    || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
                     final long beginTime = System.currentTimeMillis();
                     for (boolean continueConsume = true; continueConsume; ) {
                         if (this.processQueue.isDropped()) {
@@ -412,14 +414,14 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
                         }
 
                         if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
-                                && !this.processQueue.isLocked()) {
+                            && !this.processQueue.isLocked()) {
                             log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
                             ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                             break;
                         }
 
                         if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
-                                && this.processQueue.isLockExpired()) {
+                            && this.processQueue.isLockExpired()) {
                             log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
                             ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                             break;
@@ -432,7 +434,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
                         }
 
                         final int consumeBatchSize =
-                                ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
+                            ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
 
                         List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
                         if (!msgs.isEmpty()) {
@@ -444,7 +446,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
                             if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                                 consumeMessageContext = new ConsumeMessageContext();
                                 consumeMessageContext
-                                        .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
+                                    .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
                                 consumeMessageContext.setMq(messageQueue);
                                 consumeMessageContext.setMsgList(msgs);
                                 consumeMessageContext.setSuccess(false);
@@ -460,29 +462,29 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
                                 this.processQueue.getLockConsume().lock();
                                 if (this.processQueue.isDropped()) {
                                     log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
-                                            this.messageQueue);
+                                        this.messageQueue);
                                     break;
                                 }
 
                                 status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                             } catch (Throwable e) {
                                 log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", //
-                                        RemotingHelper.exceptionSimpleDesc(e), //
-                                        ConsumeMessageOrderlyService.this.consumerGroup, //
-                                        msgs, //
-                                        messageQueue);
+                                    RemotingHelper.exceptionSimpleDesc(e), //
+                                    ConsumeMessageOrderlyService.this.consumerGroup, //
+                                    msgs, //
+                                    messageQueue);
                                 hasException = true;
                             } finally {
                                 this.processQueue.getLockConsume().unlock();
                             }
 
                             if (null == status //
-                                    || ConsumeOrderlyStatus.ROLLBACK == status//
-                                    || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
+                                || ConsumeOrderlyStatus.ROLLBACK == status//
+                                || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
                                 log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}", //
-                                        ConsumeMessageOrderlyService.this.consumerGroup, //
-                                        msgs, //
-                                        messageQueue);
+                                    ConsumeMessageOrderlyService.this.consumerGroup, //
+                                    msgs, //
+                                    messageQueue);
                             }
 
                             long consumeRT = System.currentTimeMillis() - beginTimestamp;
@@ -507,12 +509,12 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
                             if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                                 consumeMessageContext.setStatus(status.toString());
                                 consumeMessageContext
-                                        .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
+                                    .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
                                 ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
                             }
 
                             ConsumeMessageOrderlyService.this.getConsumerStatsManager()
-                                    .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
+                                .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
 
                             continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                         } else {
@@ -530,7 +532,6 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
             }
         }
 
-
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
index 3dc768c..a59ab98 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
@@ -6,48 +6,39 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client.impl.consumer;
 
+import java.util.List;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
 
-import java.util.List;
-
-
 public interface ConsumeMessageService {
     void start();
 
-
     void shutdown();
 
-
     void updateCorePoolSize(int corePoolSize);
 
-
     void incCorePoolSize();
 
-
     void decCorePoolSize();
 
-
     int getCorePoolSize();
 
-
     ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);
 
-
     void submitConsumeRequest(//
-                              final List<MessageExt> msgs, //
-                              final ProcessQueue processQueue, //
-                              final MessageQueue messageQueue, //
-                              final boolean dispathToConsume);
+        final List<MessageExt> msgs, //
+        final ProcessQueue processQueue, //
+        final MessageQueue messageQueue, //
+        final boolean dispathToConsume);
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index f216533..7c1b4d6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -16,6 +16,13 @@
  */
 package org.apache.rocketmq.client.impl.consumer;
 
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
@@ -41,7 +48,11 @@ import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.filter.FilterAPI;
 import org.apache.rocketmq.common.help.FAQUrl;
-import org.apache.rocketmq.common.message.*;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
@@ -52,10 +63,6 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.slf4j.Logger;
 
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-
-
 public class DefaultMQPullConsumerImpl implements MQConsumerInner {
     private final Logger log = ClientLogger.getLog();
     private final DefaultMQPullConsumer defaultMQPullConsumer;
@@ -69,7 +76,6 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
     private OffsetStore offsetStore;
     private RebalanceImpl rebalanceImpl = new RebalancePullImpl(this);
 
-
     public DefaultMQPullConsumerImpl(final DefaultMQPullConsumer defaultMQPullConsumer, final RPCHook rpcHook) {
         this.defaultMQPullConsumer = defaultMQPullConsumer;
         this.rpcHook = rpcHook;
@@ -92,9 +98,9 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
     private void makeSureStateOK() throws MQClientException {
         if (this.serviceState != ServiceState.RUNNING) {
             throw new MQClientException("The consumer service state not OK, "//
-                    + this.serviceState//
-                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
-                    null);
+                + this.serviceState//
+                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
+                null);
         }
     }
 
@@ -146,17 +152,17 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
     }
 
     public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums)
-            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         return pull(mq, subExpression, offset, maxNums, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis());
     }
 
     public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout)
-            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         return this.pullSyncImpl(mq, subExpression, offset, maxNums, false, timeout);
     }
 
     private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block, long timeout)
-            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         this.makeSureStateOK();
 
         if (null == mq) {
@@ -179,7 +185,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
         SubscriptionData subscriptionData;
         try {
             subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
-                    mq.getTopic(), subExpression);
+                mq.getTopic(), subExpression);
         } catch (Exception e) {
             throw new MQClientException("parse subscription error", e);
         }
@@ -187,17 +193,17 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
         long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
 
         PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(//
-                mq, // 1
-                subscriptionData.getSubString(), // 2
-                0L, // 3
-                offset, // 4
-                maxNums, // 5
-                sysFlag, // 6
-                0, // 7
-                this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8
-                timeoutMillis, // 9
-                CommunicationMode.SYNC, // 10
-                null// 11
+            mq, // 1
+            subscriptionData.getSubString(), // 2
+            0L, // 3
+            offset, // 4
+            maxNums, // 5
+            sysFlag, // 6
+            0, // 7
+            this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8
+            timeoutMillis, // 9
+            CommunicationMode.SYNC, // 10
+            null// 11
         );
         this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
         if (!this.consumeMessageHookList.isEmpty()) {
@@ -219,7 +225,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
         if (!this.rebalanceImpl.getSubscriptionInner().containsKey(topic)) {
             try {
                 SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
-                        topic, SubscriptionData.SUB_ALL);
+                    topic, SubscriptionData.SUB_ALL);
                 this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData);
             } catch (Exception e) {
             }
@@ -357,23 +363,23 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
     }
 
     public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback)
-            throws MQClientException, RemotingException, InterruptedException {
+        throws MQClientException, RemotingException, InterruptedException {
         pull(mq, subExpression, offset, maxNums, pullCallback, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis());
     }
 
     public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback, long timeout)
-            throws MQClientException, RemotingException, InterruptedException {
+        throws MQClientException, RemotingException, InterruptedException {
         this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, false, timeout);
     }
 
     private void pullAsyncImpl(//
-                               final MessageQueue mq, //
-                               final String subExpression, //
-                               final long offset, //
-                               final int maxNums, //
-                               final PullCallback pullCallback, //
-                               final boolean block, //
-                               final long timeout) throws MQClientException, RemotingException, InterruptedException {
+        final MessageQueue mq, //
+        final String subExpression, //
+        final long offset, //
+        final int maxNums, //
+        final PullCallback pullCallback, //
+        final boolean block, //
+        final long timeout) throws MQClientException, RemotingException, InterruptedException {
         this.makeSureStateOK();
 
         if (null == mq) {
@@ -400,7 +406,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
             final SubscriptionData subscriptionData;
             try {
                 subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
-                        mq.getTopic(), subExpression);
+                    mq.getTopic(), subExpression);
             } catch (Exception e) {
                 throw new MQClientException("parse subscription error", e);
             }
@@ -408,36 +414,36 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
             long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
 
             this.pullAPIWrapper.pullKernelImpl(//
-                    mq, // 1
-                    subscriptionData.getSubString(), // 2
-                    0L, // 3
-                    offset, // 4
-                    maxNums, // 5
-                    sysFlag, // 6
-                    0, // 7
-                    this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8
-                    timeoutMillis, // 9
-                    CommunicationMode.ASYNC, // 10
-                    new PullCallback() {
-
-                        @Override
-                        public void onSuccess(PullResult pullResult) {
-                            pullCallback
-                                    .onSuccess(DefaultMQPullConsumerImpl.this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData));
-                        }
-
-                        @Override
-                        public void onException(Throwable e) {
-                            pullCallback.onException(e);
-                        }
-                    });
+                mq, // 1
+                subscriptionData.getSubString(), // 2
+                0L, // 3
+                offset, // 4
+                maxNums, // 5
+                sysFlag, // 6
+                0, // 7
+                this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), // 8
+                timeoutMillis, // 9
+                CommunicationMode.ASYNC, // 10
+                new PullCallback() {
+
+                    @Override
+                    public void onSuccess(PullResult pullResult) {
+                        pullCallback
+                            .onSuccess(DefaultMQPullConsumerImpl.this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData));
+                    }
+
+                    @Override
+                    public void onException(Throwable e) {
+                        pullCallback.onException(e);
+                    }
+                });
         } catch (MQBrokerException e) {
             throw new MQClientException("pullAsync unknow exception", e);
         }
     }
 
     public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
-            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
     }
 
@@ -446,19 +452,19 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
     }
 
     public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback)
-            throws MQClientException, RemotingException, InterruptedException {
+        throws MQClientException, RemotingException, InterruptedException {
         this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, true,
-                this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
+            this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
     }
 
     public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
-            throws MQClientException, InterruptedException {
+        throws MQClientException, InterruptedException {
         this.makeSureStateOK();
         return this.mQClientFactory.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end);
     }
 
     public MessageExt queryMessageByUniqKey(String topic, String uniqKey)
-            throws MQClientException, InterruptedException {
+        throws MQClientException, InterruptedException {
         this.makeSureStateOK();
         return this.mQClientFactory.getMQAdminImpl().queryMessageByUniqKey(topic, uniqKey);
     }
@@ -469,27 +475,27 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
     }
 
     public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
-            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         sendMessageBack(msg, delayLevel, brokerName, this.defaultMQPullConsumer.getConsumerGroup());
     }
 
     public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
-            MQBrokerException, InterruptedException, MQClientException {
+        MQBrokerException, InterruptedException, MQClientException {
         this.offsetStore.updateConsumeOffsetToBroker(mq, offset, isOneway);
     }
 
     public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName, String consumerGroup)
-            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         try {
             String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
-                    : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
+                : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
 
             if (UtilAll.isBlank(consumerGroup)) {
                 consumerGroup = this.defaultMQPullConsumer.getConsumerGroup();
             }
 
             this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, 3000,
-                    this.defaultMQPullConsumer.getMaxReconsumeTimes());
+                this.defaultMQPullConsumer.getMaxReconsumeTimes());
         } catch (Exception e) {
             log.error("sendMessageBack Exception, " + this.defaultMQPullConsumer.getConsumerGroup(), e);
 
@@ -545,8 +551,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
                 this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
 
                 this.pullAPIWrapper = new PullAPIWrapper(//
-                        mQClientFactory, //
-                        this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());
+                    mQClientFactory, //
+                    this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());
                 this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
 
                 if (this.defaultMQPullConsumer.getOffsetStore() != null) {
@@ -571,8 +577,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
                     this.serviceState = ServiceState.CREATE_JUST;
 
                     throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup()
-                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
-                            null);
+                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
+                        null);
                 }
 
                 mQClientFactory.start();
@@ -583,9 +589,9 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
             case START_FAILED:
             case SHUTDOWN_ALREADY:
                 throw new MQClientException("The PullConsumer service state not OK, maybe started once, "//
-                        + this.serviceState//
-                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
-                        null);
+                    + this.serviceState//
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
+                    null);
             default:
                 break;
         }
@@ -598,43 +604,43 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
         // consumerGroup
         if (null == this.defaultMQPullConsumer.getConsumerGroup()) {
             throw new MQClientException(
-                    "consumerGroup is null" //
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
-                    null);
+                "consumerGroup is null" //
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
+                null);
         }
 
         // consumerGroup
         if (this.defaultMQPullConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) {
             throw new MQClientException(
-                    "consumerGroup can not equal "//
-                            + MixAll.DEFAULT_CONSUMER_GROUP //
-                            + ", please specify another one."//
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
-                    null);
+                "consumerGroup can not equal "//
+                    + MixAll.DEFAULT_CONSUMER_GROUP //
+                    + ", please specify another one."//
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
+                null);
         }
 
         // messageModel
         if (null == this.defaultMQPullConsumer.getMessageModel()) {
             throw new MQClientException(
-                    "messageModel is null" //
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
-                    null);
+                "messageModel is null" //
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
+                null);
         }
 
         // allocateMessageQueueStrategy
         if (null == this.defaultMQPullConsumer.getAllocateMessageQueueStrategy()) {
             throw new MQClientException(
-                    "allocateMessageQueueStrategy is null" //
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
-                    null);
+                "allocateMessageQueueStrategy is null" //
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
+                null);
         }
 
         // allocateMessageQueueStrategy
         if (this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() < this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis()) {
             throw new MQClientException(
-                    "Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis" //
-                            + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
-                    null);
+                "Long polling mode, the consumer consumerTimeoutMillisWhenSuspend must greater than brokerSuspendMaxTimeMillis" //
+                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), //
+                null);
         }
     }
 
@@ -644,7 +650,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
             if (registerTopics != null) {
                 for (final String topic : registerTopics) {
                     SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), //
-                            topic, SubscriptionData.SUB_ALL);
+                        topic, SubscriptionData.SUB_ALL);
                     this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
                 }
             }
@@ -696,7 +702,6 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
         return consumerStartTimestamp;
     }
 
-
     public RebalanceImpl getRebalanceImpl() {
         return rebalanceImpl;
     }