You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vi...@apache.org on 2016/12/28 10:02:46 UTC

[39/50] [abbrv] incubator-rocketmq git commit: ROCKETMQ-18 Reformat all codes.

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/test/java/org/apache/rocketmq/broker/api/SendMessageTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/api/SendMessageTest.java b/broker/src/test/java/org/apache/rocketmq/broker/api/SendMessageTest.java
index 0ff589d..38d885e 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/api/SendMessageTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/api/SendMessageTest.java
@@ -6,13 +6,15 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * $Id: SendMessageTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
  */
 
 /**
@@ -36,10 +38,9 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
-
+import static org.junit.Assert.assertEquals;
 
-public class SendMessageTest extends BrokerTestHarness{
+public class SendMessageTest extends BrokerTestHarness {
 
     MQClientAPIImpl client = new MQClientAPIImpl(new NettyClientConfig(), null, null, new ClientConfig());
     String topic = "UnitTestTopic";
@@ -60,7 +61,7 @@ public class SendMessageTest extends BrokerTestHarness{
     }
 
     @Test
-    public void testSendSingle() throws Exception{
+    public void testSendSingle() throws Exception {
         Message msg = new Message(topic, "TAG1 TAG2", "100200300", "body".getBytes());
         SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
         requestHeader.setProducerGroup("abc");
@@ -74,7 +75,7 @@ public class SendMessageTest extends BrokerTestHarness{
         requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
 
         SendResult result = client.sendMessage(brokerAddr, BROKER_NAME, msg, requestHeader, 1000 * 5,
-                CommunicationMode.SYNC, new SendMessageContext(), null);
+            CommunicationMode.SYNC, new SendMessageContext(), null);
         assertEquals(result.getSendStatus(), SendStatus.SEND_OK);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
index d6be5fb..89813fc 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
@@ -6,13 +6,15 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * $Id: ConsumerOffsetManagerTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
  */
 
 /**
@@ -25,7 +27,6 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
-
 public class ConsumerOffsetManagerTest extends BrokerTestHarness {
 
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
index ab9ab6f..2f85dbc 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
@@ -6,13 +6,15 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * $Id: TopicConfigManagerTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
  */
 
 /**
@@ -25,8 +27,9 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
-
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 
 public class TopicConfigManagerTest extends BrokerTestHarness {
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/checkstyle/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index e4a8c36..0d22d7d 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -1,7 +1,7 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!DOCTYPE module PUBLIC
-        "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
-        "http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
+    "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
+    "http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
 <!--
   Licensed to the Apache Software Foundation (ASF) under one or more
   contributor license agreements.  See the NOTICE file distributed with
@@ -32,8 +32,8 @@
     </module>
 
     <module name="RegexpSingleline">
-        <property name="format" value="System\.out\.println" />
-        <property name="message" value="Prohibit invoking System.out.println in source code !" />
+        <property name="format" value="System\.out\.println"/>
+        <property name="message" value="Prohibit invoking System.out.println in source code !"/>
     </module>
 
     <module name="RegexpSingleline">

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/pom.xml
----------------------------------------------------------------------
diff --git a/client/pom.xml b/client/pom.xml
index 86d38cf..9c18ebd 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -15,7 +15,7 @@
    limitations under the License.
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <groupId>org.apache.rocketmq</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index 8afca13..9c7a0cd 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client;
 
@@ -20,7 +20,6 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
 
-
 /**
  * Client Common configuration
  *
@@ -123,78 +122,64 @@ public class ClientConfig {
         return clientCallbackExecutorThreads;
     }
 
-
     public void setClientCallbackExecutorThreads(int clientCallbackExecutorThreads) {
         this.clientCallbackExecutorThreads = clientCallbackExecutorThreads;
     }
 
-
     public int getPollNameServerInteval() {
         return pollNameServerInteval;
     }
 
-
     public void setPollNameServerInteval(int pollNameServerInteval) {
         this.pollNameServerInteval = pollNameServerInteval;
     }
 
-
     public int getHeartbeatBrokerInterval() {
         return heartbeatBrokerInterval;
     }
 
-
     public void setHeartbeatBrokerInterval(int heartbeatBrokerInterval) {
         this.heartbeatBrokerInterval = heartbeatBrokerInterval;
     }
 
-
     public int getPersistConsumerOffsetInterval() {
         return persistConsumerOffsetInterval;
     }
 
-
     public void setPersistConsumerOffsetInterval(int persistConsumerOffsetInterval) {
         this.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
     }
 
-
     public String getUnitName() {
         return unitName;
     }
 
-
     public void setUnitName(String unitName) {
         this.unitName = unitName;
     }
 
-
     public boolean isUnitMode() {
         return unitMode;
     }
 
-
     public void setUnitMode(boolean unitMode) {
         this.unitMode = unitMode;
     }
 
-
     public boolean isVipChannelEnabled() {
         return vipChannelEnabled;
     }
 
-
     public void setVipChannelEnabled(final boolean vipChannelEnabled) {
         this.vipChannelEnabled = vipChannelEnabled;
     }
 
-
     @Override
     public String toString() {
         return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
-                + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInteval=" + pollNameServerInteval
-                + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval="
-                + persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
-                + vipChannelEnabled + "]";
+            + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInteval=" + pollNameServerInteval
+            + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval="
+            + persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
+            + vipChannelEnabled + "]";
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java b/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java
index 6596855..7697520 100644
--- a/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java
+++ b/client/src/main/java/org/apache/rocketmq/client/MQAdmin.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client;
 
@@ -22,7 +22,6 @@ import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 
-
 /**
  * Base interface for MQ management
  *
@@ -41,8 +40,7 @@ public interface MQAdmin {
      * @throws MQClientException
      */
     void createTopic(final String key, final String newTopic, final int queueNum)
-            throws MQClientException;
-
+        throws MQClientException;
 
     /**
      * Creates an topic
@@ -59,8 +57,7 @@ public interface MQAdmin {
      * @throws MQClientException
      */
     void createTopic(String key, String newTopic, int queueNum, int topicSysFlag)
-            throws MQClientException;
-
+        throws MQClientException;
 
     /**
      * Gets the message queue offset according to some time in milliseconds<br>
@@ -77,7 +74,6 @@ public interface MQAdmin {
      */
     long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException;
 
-
     /**
      * Gets the max offset
      *
@@ -90,7 +86,6 @@ public interface MQAdmin {
      */
     long maxOffset(final MessageQueue mq) throws MQClientException;
 
-
     /**
      * Gets the minimum offset
      *
@@ -103,7 +98,6 @@ public interface MQAdmin {
      */
     long minOffset(final MessageQueue mq) throws MQClientException;
 
-
     /**
      * Gets the earliest stored message time
      *
@@ -116,7 +110,6 @@ public interface MQAdmin {
      */
     long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException;
 
-
     /**
      * Query message according tto message id
      *
@@ -131,8 +124,7 @@ public interface MQAdmin {
      * @throws MQClientException
      */
     MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException,
-            InterruptedException, MQClientException;
-
+        InterruptedException, MQClientException;
 
     /**
      * Query messages
@@ -154,8 +146,8 @@ public interface MQAdmin {
      * @throws InterruptedException
      */
     QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
-                             final long end) throws MQClientException, InterruptedException;
-    
+        final long end) throws MQClientException, InterruptedException;
+
     /**
 
      * @param topic
@@ -166,7 +158,6 @@ public interface MQAdmin {
      * @throws InterruptedException
      * @throws MQClientException
      */
-    MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;        
+    MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
 
-    
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/MQHelper.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/MQHelper.java b/client/src/main/java/org/apache/rocketmq/client/MQHelper.java
index b4ddb08..937e846 100644
--- a/client/src/main/java/org/apache/rocketmq/client/MQHelper.java
+++ b/client/src/main/java/org/apache/rocketmq/client/MQHelper.java
@@ -16,47 +16,39 @@
  */
 package org.apache.rocketmq.client;
 
+import java.util.Set;
+import java.util.TreeSet;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.slf4j.Logger;
 
-import java.util.Set;
-import java.util.TreeSet;
-
-
 public class MQHelper {
     public static void resetOffsetByTimestamp(
-            final MessageModel messageModel,
-            final String consumerGroup,
-            final String topic,
-            final long timestamp) throws Exception {
+        final MessageModel messageModel,
+        final String consumerGroup,
+        final String topic,
+        final long timestamp) throws Exception {
         resetOffsetByTimestamp(messageModel, "DEFAULT", consumerGroup, topic, timestamp);
     }
 
     /**
      * Reset consumer topic offset according to time
      *
-     * @param messageModel
-     *         which model
-     * @param instanceName
-     *         which instance
-     * @param consumerGroup
-     *         consumer group
-     * @param topic
-     *         topic
-     * @param timestamp
-     *         time
-     *
+     * @param messageModel which model
+     * @param instanceName which instance
+     * @param consumerGroup consumer group
+     * @param topic topic
+     * @param timestamp time
      * @throws Exception
      */
     public static void resetOffsetByTimestamp(
-            final MessageModel messageModel,
-            final String instanceName,
-            final String consumerGroup,
-            final String topic,
-            final long timestamp) throws Exception {
+        final MessageModel messageModel,
+        final String instanceName,
+        final String consumerGroup,
+        final String topic,
+        final long timestamp) throws Exception {
         final Logger log = ClientLogger.getLog();
 
         DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(consumerGroup);
@@ -74,7 +66,7 @@ public class MQHelper {
                     if (offset >= 0) {
                         consumer.updateConsumeOffset(mq, offset);
                         log.info("resetOffsetByTimestamp updateConsumeOffset success, {} {} {}",
-                                consumerGroup, offset, mq);
+                            consumerGroup, offset, mq);
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/QueryResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/QueryResult.java b/client/src/main/java/org/apache/rocketmq/client/QueryResult.java
index af3649b..7b1cc01 100644
--- a/client/src/main/java/org/apache/rocketmq/client/QueryResult.java
+++ b/client/src/main/java/org/apache/rocketmq/client/QueryResult.java
@@ -6,45 +6,39 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client;
 
-import org.apache.rocketmq.common.message.MessageExt;
-
 import java.util.List;
-
+import org.apache.rocketmq.common.message.MessageExt;
 
 public class QueryResult {
     private final long indexLastUpdateTimestamp;
     private final List<MessageExt> messageList;
 
-
     public QueryResult(long indexLastUpdateTimestamp, List<MessageExt> messageList) {
         this.indexLastUpdateTimestamp = indexLastUpdateTimestamp;
         this.messageList = messageList;
     }
 
-
     public long getIndexLastUpdateTimestamp() {
         return indexLastUpdateTimestamp;
     }
 
-
     public List<MessageExt> getMessageList() {
         return messageList;
     }
 
-
     @Override
     public String toString() {
         return "QueryResult [indexLastUpdateTimestamp=" + indexLastUpdateTimestamp + ", messageList="
-                + messageList + "]";
+            + messageList + "]";
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/Validators.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/Validators.java b/client/src/main/java/org/apache/rocketmq/client/Validators.java
index 92fc53b..fa9e4e6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/Validators.java
+++ b/client/src/main/java/org/apache/rocketmq/client/Validators.java
@@ -6,17 +6,19 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.rocketmq.client;
 
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.common.MixAll;
@@ -24,10 +26,6 @@ import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-
 /**
  * Common Validator
  *
@@ -65,8 +63,8 @@ public class Validators {
         }
         if (!regularExpressionMatcher(group, PATTERN)) {
             throw new MQClientException(String.format(
-                    "the specified group[%s] contains illegal characters, allowing only %s", group,
-                    VALID_PATTERN_STR), null);
+                "the specified group[%s] contains illegal characters, allowing only %s", group,
+                VALID_PATTERN_STR), null);
         }
         if (group.length() > CHARACTER_MAX_LENGTH) {
             throw new MQClientException("the specified group is longer than group max length 255.", null);
@@ -97,7 +95,7 @@ public class Validators {
      * @throws MQClientException
      */
     public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
-            throws MQClientException {
+        throws MQClientException {
         if (null == msg) {
             throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
         }
@@ -114,7 +112,7 @@ public class Validators {
 
         if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
             throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
-                    "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
+                "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
         }
     }
 
@@ -132,8 +130,8 @@ public class Validators {
 
         if (!regularExpressionMatcher(topic, PATTERN)) {
             throw new MQClientException(String.format(
-                    "the specified topic[%s] contains illegal characters, allowing only %s", topic,
-                    VALID_PATTERN_STR), null);
+                "the specified topic[%s] contains illegal characters, allowing only %s", topic,
+                VALID_PATTERN_STR), null);
         }
 
         if (topic.length() > CHARACTER_MAX_LENGTH) {
@@ -143,7 +141,7 @@ public class Validators {
         //whether the same with system reserved keyword
         if (topic.equals(MixAll.DEFAULT_TOPIC)) {
             throw new MQClientException(
-                    String.format("the topic[%s] is conflict with default topic.", topic), null);
+                String.format("the topic[%s] is conflict with default topic.", topic), null);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/admin/MQAdminExtInner.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/admin/MQAdminExtInner.java b/client/src/main/java/org/apache/rocketmq/client/admin/MQAdminExtInner.java
index bc4ca6c..913d4f2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/admin/MQAdminExtInner.java
+++ b/client/src/main/java/org/apache/rocketmq/client/admin/MQAdminExtInner.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client.admin;
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java b/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java
index 360cfdf..391f1d1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java
+++ b/client/src/main/java/org/apache/rocketmq/client/common/ThreadLocalIndex.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.rocketmq.client.common;
@@ -22,6 +22,7 @@ import java.util.Random;
 public class ThreadLocalIndex {
     private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
     private final Random random = new Random();
+
     public ThreadLocalIndex(int value) {
 
     }
@@ -30,7 +31,8 @@ public class ThreadLocalIndex {
         Integer index = this.threadLocalIndex.get();
         if (null == index) {
             index = Math.abs(random.nextInt());
-            if (index < 0) index = 0;
+            if (index < 0)
+                index = 0;
             this.threadLocalIndex.set(index);
         }
 
@@ -45,7 +47,7 @@ public class ThreadLocalIndex {
     @Override
     public String toString() {
         return "ThreadLocalIndex{" +
-                "threadLocalIndex=" + threadLocalIndex.get() +
-                '}';
+            "threadLocalIndex=" + threadLocalIndex.get() +
+            '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java b/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java
index 81a71e4..ca692d3 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java
@@ -16,39 +16,30 @@
  */
 package org.apache.rocketmq.client.consumer;
 
-import org.apache.rocketmq.common.message.MessageQueue;
-
 import java.util.List;
-
+import org.apache.rocketmq.common.message.MessageQueue;
 
 /**
  * Strategy Algorithm for message allocating between consumers
- *
  */
 public interface AllocateMessageQueueStrategy {
 
     /**
      * Allocating by consumer id
      *
-     * @param consumerGroup
-     *         current consumer group
-     * @param currentCID
-     *         current consumer id
-     * @param mqAll
-     *         message queue set in current topic
-     * @param cidAll
-     *         consumer set in current consumer group
-     *
+     * @param consumerGroup current consumer group
+     * @param currentCID current consumer id
+     * @param mqAll message queue set in current topic
+     * @param cidAll consumer set in current consumer group
      * @return The allocate result of given strategy
      */
     List<MessageQueue> allocate(
-            final String consumerGroup,
-            final String currentCID,
-            final List<MessageQueue> mqAll,
-            final List<String> cidAll
+        final String consumerGroup,
+        final String currentCID,
+        final List<MessageQueue> mqAll,
+        final List<String> cidAll
     );
 
-
     /**
      * Algorithm name
      *

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index 156b3d0..8eb1258 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -6,16 +6,18 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client.consumer;
 
+import java.util.HashSet;
+import java.util.Set;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
@@ -31,10 +33,6 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 
-import java.util.HashSet;
-import java.util.Set;
-
-
 /**
  * Default pulling consumer
  *
@@ -88,23 +86,19 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
 
     private int maxReconsumeTimes = 16;
 
-
     public DefaultMQPullConsumer() {
         this(MixAll.DEFAULT_CONSUMER_GROUP, null);
     }
 
-
     public DefaultMQPullConsumer(final String consumerGroup, RPCHook rpcHook) {
         this.consumerGroup = consumerGroup;
         defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, rpcHook);
     }
 
-
     public DefaultMQPullConsumer(final String consumerGroup) {
         this(consumerGroup, null);
     }
 
-
     public DefaultMQPullConsumer(RPCHook rpcHook) {
         this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook);
     }
@@ -114,141 +108,116 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
         createTopic(key, newTopic, queueNum, 0);
     }
 
-
     @Override
     public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
         this.defaultMQPullConsumerImpl.createTopic(key, newTopic, queueNum, topicSysFlag);
     }
 
-
     @Override
     public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
         return this.defaultMQPullConsumerImpl.searchOffset(mq, timestamp);
     }
 
-
     @Override
     public long maxOffset(MessageQueue mq) throws MQClientException {
         return this.defaultMQPullConsumerImpl.maxOffset(mq);
     }
 
-
     @Override
     public long minOffset(MessageQueue mq) throws MQClientException {
         return this.defaultMQPullConsumerImpl.minOffset(mq);
     }
 
-
     @Override
     public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
         return this.defaultMQPullConsumerImpl.earliestMsgStoreTime(mq);
     }
 
-
     @Override
     public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException,
-            InterruptedException, MQClientException {
+        InterruptedException, MQClientException {
         return this.defaultMQPullConsumerImpl.viewMessage(offsetMsgId);
     }
 
-
     @Override
     public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
-            throws MQClientException, InterruptedException {
+        throws MQClientException, InterruptedException {
         return this.defaultMQPullConsumerImpl.queryMessage(topic, key, maxNum, begin, end);
     }
 
-
     public AllocateMessageQueueStrategy getAllocateMessageQueueStrategy() {
         return allocateMessageQueueStrategy;
     }
 
-
     public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
         this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
     }
 
-
     public long getBrokerSuspendMaxTimeMillis() {
         return brokerSuspendMaxTimeMillis;
     }
 
-
     public void setBrokerSuspendMaxTimeMillis(long brokerSuspendMaxTimeMillis) {
         this.brokerSuspendMaxTimeMillis = brokerSuspendMaxTimeMillis;
     }
 
-
     public String getConsumerGroup() {
         return consumerGroup;
     }
 
-
     public void setConsumerGroup(String consumerGroup) {
         this.consumerGroup = consumerGroup;
     }
 
-
     public long getConsumerPullTimeoutMillis() {
         return consumerPullTimeoutMillis;
     }
 
-
     public void setConsumerPullTimeoutMillis(long consumerPullTimeoutMillis) {
         this.consumerPullTimeoutMillis = consumerPullTimeoutMillis;
     }
 
-
     public long getConsumerTimeoutMillisWhenSuspend() {
         return consumerTimeoutMillisWhenSuspend;
     }
 
-
     public void setConsumerTimeoutMillisWhenSuspend(long consumerTimeoutMillisWhenSuspend) {
         this.consumerTimeoutMillisWhenSuspend = consumerTimeoutMillisWhenSuspend;
     }
 
-
     public MessageModel getMessageModel() {
         return messageModel;
     }
 
-
     public void setMessageModel(MessageModel messageModel) {
         this.messageModel = messageModel;
     }
 
-
     public MessageQueueListener getMessageQueueListener() {
         return messageQueueListener;
     }
 
-
     public void setMessageQueueListener(MessageQueueListener messageQueueListener) {
         this.messageQueueListener = messageQueueListener;
     }
 
-
     public Set<String> getRegisterTopics() {
         return registerTopics;
     }
 
-
     public void setRegisterTopics(Set<String> registerTopics) {
         this.registerTopics = registerTopics;
     }
 
-
     @Override
     public void sendMessageBack(MessageExt msg, int delayLevel)
-            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, null);
     }
 
-
     @Override
     public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName)
-            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, brokerName);
     }
 
@@ -279,37 +248,37 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
 
     @Override
     public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums)
-            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums);
     }
 
     @Override
     public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout)
-            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, timeout);
     }
 
     @Override
     public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback)
-            throws MQClientException, RemotingException, InterruptedException {
+        throws MQClientException, RemotingException, InterruptedException {
         this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback);
     }
 
     @Override
     public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback, long timeout)
-            throws MQClientException, RemotingException, InterruptedException {
+        throws MQClientException, RemotingException, InterruptedException {
         this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback, timeout);
     }
 
     @Override
     public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
-            throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
         return this.defaultMQPullConsumerImpl.pullBlockIfNotFound(mq, subExpression, offset, maxNums);
     }
 
     @Override
     public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback)
-            throws MQClientException, RemotingException, InterruptedException {
+        throws MQClientException, RemotingException, InterruptedException {
         this.defaultMQPullConsumerImpl.pullBlockIfNotFound(mq, subExpression, offset, maxNums, pullCallback);
     }
 
@@ -341,7 +310,7 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
 
     @Override
     public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup)
-            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, brokerName, consumerGroup);
     }
 
@@ -349,32 +318,26 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
         return offsetStore;
     }
 
-
     public void setOffsetStore(OffsetStore offsetStore) {
         this.offsetStore = offsetStore;
     }
 
-
     public DefaultMQPullConsumerImpl getDefaultMQPullConsumerImpl() {
         return defaultMQPullConsumerImpl;
     }
 
-
     public boolean isUnitMode() {
         return unitMode;
     }
 
-
     public void setUnitMode(boolean isUnitMode) {
         this.unitMode = isUnitMode;
     }
 
-
     public int getMaxReconsumeTimes() {
         return maxReconsumeTimes;
     }
 
-
     public void setMaxReconsumeTimes(final int maxReconsumeTimes) {
         this.maxReconsumeTimes = maxReconsumeTimes;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 228e075..fcb3e64 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -16,6 +16,9 @@
  */
 package org.apache.rocketmq.client.consumer;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.consumer.listener.MessageListener;
@@ -36,14 +39,8 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-
 /**
  * Wrapped push consumer.in fact,it works as remarkable as the pull consumer
- *
  */
 public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
     protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
@@ -133,24 +130,20 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
     private long suspendCurrentQueueTimeMillis = 1000;
     private long consumeTimeout = 15;
 
-
     public DefaultMQPushConsumer() {
         this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely());
     }
 
-
     public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
         this.consumerGroup = consumerGroup;
         this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
         defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
     }
 
-
     public DefaultMQPushConsumer(RPCHook rpcHook) {
         this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely());
     }
 
-
     public DefaultMQPushConsumer(final String consumerGroup) {
         this(consumerGroup, null, new AllocateMessageQueueAveragely());
     }
@@ -160,46 +153,39 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
         createTopic(key, newTopic, queueNum, 0);
     }
 
-
     @Override
     public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
         this.defaultMQPushConsumerImpl.createTopic(key, newTopic, queueNum, topicSysFlag);
     }
 
-
     @Override
     public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
         return this.defaultMQPushConsumerImpl.searchOffset(mq, timestamp);
     }
 
-
     @Override
     public long maxOffset(MessageQueue mq) throws MQClientException {
         return this.defaultMQPushConsumerImpl.maxOffset(mq);
     }
 
-
     @Override
     public long minOffset(MessageQueue mq) throws MQClientException {
         return this.defaultMQPushConsumerImpl.minOffset(mq);
     }
 
-
     @Override
     public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
         return this.defaultMQPushConsumerImpl.earliestMsgStoreTime(mq);
     }
 
-
     @Override
     public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         return this.defaultMQPushConsumerImpl.viewMessage(offsetMsgId);
     }
 
-
     @Override
     public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
-            throws MQClientException, InterruptedException {
+        throws MQClientException, InterruptedException {
         return this.defaultMQPushConsumerImpl.queryMessage(topic, key, maxNum, begin, end);
     }
 
@@ -218,169 +204,137 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
         return allocateMessageQueueStrategy;
     }
 
-
     public void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
         this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
     }
 
-
     public int getConsumeConcurrentlyMaxSpan() {
         return consumeConcurrentlyMaxSpan;
     }
 
-
     public void setConsumeConcurrentlyMaxSpan(int consumeConcurrentlyMaxSpan) {
         this.consumeConcurrentlyMaxSpan = consumeConcurrentlyMaxSpan;
     }
 
-
     public ConsumeFromWhere getConsumeFromWhere() {
         return consumeFromWhere;
     }
 
-
     public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
         this.consumeFromWhere = consumeFromWhere;
     }
 
-
     public int getConsumeMessageBatchMaxSize() {
         return consumeMessageBatchMaxSize;
     }
 
-
     public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) {
         this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
     }
 
-
     public String getConsumerGroup() {
         return consumerGroup;
     }
 
-
     public void setConsumerGroup(String consumerGroup) {
         this.consumerGroup = consumerGroup;
     }
 
-
     public int getConsumeThreadMax() {
         return consumeThreadMax;
     }
 
-
     public void setConsumeThreadMax(int consumeThreadMax) {
         this.consumeThreadMax = consumeThreadMax;
     }
 
-
     public int getConsumeThreadMin() {
         return consumeThreadMin;
     }
 
-
     public void setConsumeThreadMin(int consumeThreadMin) {
         this.consumeThreadMin = consumeThreadMin;
     }
 
-
     public DefaultMQPushConsumerImpl getDefaultMQPushConsumerImpl() {
         return defaultMQPushConsumerImpl;
     }
 
-
     public MessageListener getMessageListener() {
         return messageListener;
     }
 
-
     public void setMessageListener(MessageListener messageListener) {
         this.messageListener = messageListener;
     }
 
-
     public MessageModel getMessageModel() {
         return messageModel;
     }
 
-
     public void setMessageModel(MessageModel messageModel) {
         this.messageModel = messageModel;
     }
 
-
     public int getPullBatchSize() {
         return pullBatchSize;
     }
 
-
     public void setPullBatchSize(int pullBatchSize) {
         this.pullBatchSize = pullBatchSize;
     }
 
-
     public long getPullInterval() {
         return pullInterval;
     }
 
-
     public void setPullInterval(long pullInterval) {
         this.pullInterval = pullInterval;
     }
 
-
     public int getPullThresholdForQueue() {
         return pullThresholdForQueue;
     }
 
-
     public void setPullThresholdForQueue(int pullThresholdForQueue) {
         this.pullThresholdForQueue = pullThresholdForQueue;
     }
 
-
     public Map<String, String> getSubscription() {
         return subscription;
     }
 
-
     public void setSubscription(Map<String, String> subscription) {
         this.subscription = subscription;
     }
 
-
     @Override
     public void sendMessageBack(MessageExt msg, int delayLevel)
-            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, null);
     }
 
-
     @Override
     public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName)
-            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
         this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, brokerName);
     }
 
-
     @Override
     public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
         return this.defaultMQPushConsumerImpl.fetchSubscribeMessageQueues(topic);
     }
 
-
     @Override
     public void start() throws MQClientException {
         this.defaultMQPushConsumerImpl.start();
     }
 
-
     @Override
     public void shutdown() {
         this.defaultMQPushConsumerImpl.shutdown();
     }
 
-
     @Override
     @Deprecated
     public void registerMessageListener(MessageListener messageListener) {
@@ -388,127 +342,104 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
         this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
     }
 
-
     @Override
     public void registerMessageListener(MessageListenerConcurrently messageListener) {
         this.messageListener = messageListener;
         this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
     }
 
-
     @Override
     public void registerMessageListener(MessageListenerOrderly messageListener) {
         this.messageListener = messageListener;
         this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
     }
 
-
     @Override
     public void subscribe(String topic, String subExpression) throws MQClientException {
         this.defaultMQPushConsumerImpl.subscribe(topic, subExpression);
     }
 
-
     @Override
     public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {
         this.defaultMQPushConsumerImpl.subscribe(topic, fullClassName, filterClassSource);
     }
 
-
     @Override
     public void unsubscribe(String topic) {
         this.defaultMQPushConsumerImpl.unsubscribe(topic);
     }
 
-
     @Override
     public void updateCorePoolSize(int corePoolSize) {
         this.defaultMQPushConsumerImpl.updateCorePoolSize(corePoolSize);
     }
 
-
     @Override
     public void suspend() {
         this.defaultMQPushConsumerImpl.suspend();
     }
 
-
     @Override
     public void resume() {
         this.defaultMQPushConsumerImpl.resume();
     }
 
-
     public OffsetStore getOffsetStore() {
         return offsetStore;
     }
 
-
     public void setOffsetStore(OffsetStore offsetStore) {
         this.offsetStore = offsetStore;
     }
 
-
     public String getConsumeTimestamp() {
         return consumeTimestamp;
     }
 
-
     public void setConsumeTimestamp(String consumeTimestamp) {
         this.consumeTimestamp = consumeTimestamp;
     }
 
-
     public boolean isPostSubscriptionWhenPull() {
         return postSubscriptionWhenPull;
     }
 
-
     public void setPostSubscriptionWhenPull(boolean postSubscriptionWhenPull) {
         this.postSubscriptionWhenPull = postSubscriptionWhenPull;
     }
 
-
     public boolean isUnitMode() {
         return unitMode;
     }
 
-
     public void setUnitMode(boolean isUnitMode) {
         this.unitMode = isUnitMode;
     }
 
-
     public long getAdjustThreadPoolNumsThreshold() {
         return adjustThreadPoolNumsThreshold;
     }
 
-
     public void setAdjustThreadPoolNumsThreshold(long adjustThreadPoolNumsThreshold) {
         this.adjustThreadPoolNumsThreshold = adjustThreadPoolNumsThreshold;
     }
 
-
     public int getMaxReconsumeTimes() {
         return maxReconsumeTimes;
     }
 
-
     public void setMaxReconsumeTimes(final int maxReconsumeTimes) {
         this.maxReconsumeTimes = maxReconsumeTimes;
     }
 
-
     public long getSuspendCurrentQueueTimeMillis() {
         return suspendCurrentQueueTimeMillis;
     }
 
-
     public void setSuspendCurrentQueueTimeMillis(final long suspendCurrentQueueTimeMillis) {
         this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
     }
 
-
     public long getConsumeTimeout() {
         return consumeTimeout;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java
index 9d9c72b..343a0a2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java
@@ -6,16 +6,17 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client.consumer;
 
+import java.util.Set;
 import org.apache.rocketmq.client.MQAdmin;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -23,9 +24,6 @@ import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 
-import java.util.Set;
-
-
 /**
  * Message queue consumer interface
  *
@@ -44,8 +42,7 @@ public interface MQConsumer extends MQAdmin {
      */
     @Deprecated
     void sendMessageBack(final MessageExt msg, final int delayLevel) throws RemotingException,
-            MQBrokerException, InterruptedException, MQClientException;
-
+        MQBrokerException, InterruptedException, MQClientException;
 
     /**
      * If consuming failure,message will be send back to the broker,and delay consuming some time
@@ -60,8 +57,7 @@ public interface MQConsumer extends MQAdmin {
      * @throws MQClientException
      */
     void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName)
-            throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
-
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
 
     /**
      * Fetch message queues from consumer cache according to the topic

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
index 2335e3d..d199f8a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java
@@ -6,25 +6,23 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client.consumer;
 
+import java.util.Set;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 
-import java.util.Set;
-
-
 /**
  * Pulling consumer interface
  *
@@ -37,13 +35,11 @@ public interface MQPullConsumer extends MQConsumer {
      */
     void start() throws MQClientException;
 
-
     /**
      * Shutdown the consumer
      */
     void shutdown();
 
-
     /**
      * Register the message queue listener
      *
@@ -52,7 +48,6 @@ public interface MQPullConsumer extends MQConsumer {
      */
     void registerMessageQueueListener(final String topic, final MessageQueueListener listener);
 
-
     /**
      * Pulling the messages,not blocking
      *
@@ -74,9 +69,8 @@ public interface MQPullConsumer extends MQConsumer {
      * @throws RemotingException
      */
     PullResult pull(final MessageQueue mq, final String subExpression, final long offset,
-                    final int maxNums) throws MQClientException, RemotingException, MQBrokerException,
-            InterruptedException;
-
+        final int maxNums) throws MQClientException, RemotingException, MQBrokerException,
+        InterruptedException;
 
     /**
      * Pulling the messages in the specified timeout
@@ -95,9 +89,8 @@ public interface MQPullConsumer extends MQConsumer {
      * @throws InterruptedException
      */
     PullResult pull(final MessageQueue mq, final String subExpression, final long offset,
-                    final int maxNums, final long timeout) throws MQClientException, RemotingException,
-            MQBrokerException, InterruptedException;
-
+        final int maxNums, final long timeout) throws MQClientException, RemotingException,
+        MQBrokerException, InterruptedException;
 
     /**
      * Pulling the messages in a async. way
@@ -113,8 +106,8 @@ public interface MQPullConsumer extends MQConsumer {
      * @throws InterruptedException
      */
     void pull(final MessageQueue mq, final String subExpression, final long offset, final int maxNums,
-              final PullCallback pullCallback) throws MQClientException, RemotingException,
-            InterruptedException;
+        final PullCallback pullCallback) throws MQClientException, RemotingException,
+        InterruptedException;
 
     /**
      * Pulling the messages in a async. way
@@ -131,9 +124,8 @@ public interface MQPullConsumer extends MQConsumer {
      * @throws InterruptedException
      */
     void pull(final MessageQueue mq, final String subExpression, final long offset, final int maxNums,
-              final PullCallback pullCallback, long timeout) throws MQClientException, RemotingException,
-            InterruptedException;
-
+        final PullCallback pullCallback, long timeout) throws MQClientException, RemotingException,
+        InterruptedException;
 
     /**
      * Pulling the messages,if no message arrival,blocking some time
@@ -151,9 +143,8 @@ public interface MQPullConsumer extends MQConsumer {
      * @throws InterruptedException
      */
     PullResult pullBlockIfNotFound(final MessageQueue mq, final String subExpression,
-                                   final long offset, final int maxNums) throws MQClientException, RemotingException,
-            MQBrokerException, InterruptedException;
-
+        final long offset, final int maxNums) throws MQClientException, RemotingException,
+        MQBrokerException, InterruptedException;
 
     /**
      * Pulling the messages through callback function,if no message arrival,blocking.
@@ -169,9 +160,8 @@ public interface MQPullConsumer extends MQConsumer {
      * @throws InterruptedException
      */
     void pullBlockIfNotFound(final MessageQueue mq, final String subExpression, final long offset,
-                             final int maxNums, final PullCallback pullCallback) throws MQClientException, RemotingException,
-            InterruptedException;
-
+        final int maxNums, final PullCallback pullCallback) throws MQClientException, RemotingException,
+        InterruptedException;
 
     /**
      * Update the offset
@@ -183,7 +173,6 @@ public interface MQPullConsumer extends MQConsumer {
      */
     void updateConsumeOffset(final MessageQueue mq, final long offset) throws MQClientException;
 
-
     /**
      * Fetch the offset
      *
@@ -196,7 +185,6 @@ public interface MQPullConsumer extends MQConsumer {
      */
     long fetchConsumeOffset(final MessageQueue mq, final boolean fromStore) throws MQClientException;
 
-
     /**
      * Fetch the message queues according to the topic
      *
@@ -224,5 +212,5 @@ public interface MQPullConsumer extends MQConsumer {
      * @throws MQClientException
      */
     void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup)
-            throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
+        throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java
index da8ffb5..ec747e2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumerScheduleService.java
@@ -16,34 +16,31 @@
  */
 package org.apache.rocketmq.client.consumer;
 
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
-import org.slf4j.Logger;
-
 import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.slf4j.Logger;
 
 /**
  * Schedule service for pull consumer
- *
  */
 public class MQPullConsumerScheduleService {
     private final Logger log = ClientLogger.getLog();
     private final MessageQueueListener messageQueueListener = new MessageQueueListenerImpl();
     private final ConcurrentHashMap<MessageQueue, PullTaskImpl> taskTable =
-            new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
+        new ConcurrentHashMap<MessageQueue, PullTaskImpl>();
     private DefaultMQPullConsumer defaultMQPullConsumer;
     private int pullThreadNums = 20;
     private ConcurrentHashMap<String /* topic */, PullTaskCallback> callbackTable =
-            new ConcurrentHashMap<String, PullTaskCallback>();
+        new ConcurrentHashMap<String, PullTaskCallback>();
     private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
 
     public MQPullConsumerScheduleService(final String consumerGroup) {
@@ -76,8 +73,8 @@ public class MQPullConsumerScheduleService {
     public void start() throws MQClientException {
         final String group = this.defaultMQPullConsumer.getConsumerGroup();
         this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
-                this.pullThreadNums,
-                new ThreadFactoryImpl("PullMsgThread-" + group)
+            this.pullThreadNums,
+            new ThreadFactoryImpl("PullMsgThread-" + group)
         );
 
         this.defaultMQPullConsumer.setMessageQueueListener(this.messageQueueListener);
@@ -85,7 +82,7 @@ public class MQPullConsumerScheduleService {
         this.defaultMQPullConsumer.start();
 
         log.info("MQPullConsumerScheduleService start OK, {} {}",
-                this.defaultMQPullConsumer.getConsumerGroup(), this.callbackTable);
+            this.defaultMQPullConsumer.getConsumerGroup(), this.callbackTable);
     }
 
     public void registerPullTaskCallback(final String topic, final PullTaskCallback callback) {
@@ -139,7 +136,7 @@ public class MQPullConsumerScheduleService {
         @Override
         public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) {
             MessageModel messageModel =
-                    MQPullConsumerScheduleService.this.defaultMQPullConsumer.getMessageModel();
+                MQPullConsumerScheduleService.this.defaultMQPullConsumer.getMessageModel();
             switch (messageModel) {
                 case BROADCASTING:
                     MQPullConsumerScheduleService.this.putTask(topic, mqAll);
@@ -157,18 +154,16 @@ public class MQPullConsumerScheduleService {
         private final MessageQueue messageQueue;
         private volatile boolean cancelled = false;
 
-
         public PullTaskImpl(final MessageQueue messageQueue) {
             this.messageQueue = messageQueue;
         }
 
-
         @Override
         public void run() {
             String topic = this.messageQueue.getTopic();
             if (!this.isCancelled()) {
                 PullTaskCallback pullTaskCallback =
-                        MQPullConsumerScheduleService.this.callbackTable.get(topic);
+                    MQPullConsumerScheduleService.this.callbackTable.get(topic);
                 if (pullTaskCallback != null) {
                     final PullTaskContext context = new PullTaskContext();
                     context.setPullConsumer(MQPullConsumerScheduleService.this.defaultMQPullConsumer);
@@ -181,7 +176,7 @@ public class MQPullConsumerScheduleService {
 
                     if (!this.isCancelled()) {
                         MQPullConsumerScheduleService.this.scheduledThreadPoolExecutor.schedule(this,
-                                context.getPullNextDelayTimeMillis(), TimeUnit.MILLISECONDS);
+                            context.getPullNextDelayTimeMillis(), TimeUnit.MILLISECONDS);
                     } else {
                         log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue);
                     }
@@ -193,17 +188,14 @@ public class MQPullConsumerScheduleService {
             }
         }
 
-
         public boolean isCancelled() {
             return cancelled;
         }
 
-
         public void setCancelled(boolean cancelled) {
             this.cancelled = cancelled;
         }
 
-
         public MessageQueue getMessageQueue() {
             return messageQueue;
         }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java
index b04956c..1b969bd 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client.consumer;
 
@@ -21,7 +21,6 @@ import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
 import org.apache.rocketmq.client.exception.MQClientException;
 
-
 /**
  * Push consumer
  *
@@ -34,13 +33,11 @@ public interface MQPushConsumer extends MQConsumer {
      */
     void start() throws MQClientException;
 
-
     /**
      * Shutdown the consumer
      */
     void shutdown();
 
-
     /**
      * Register the message listener
      *
@@ -49,13 +46,10 @@ public interface MQPushConsumer extends MQConsumer {
     @Deprecated
     void registerMessageListener(MessageListener messageListener);
 
-
     void registerMessageListener(final MessageListenerConcurrently messageListener);
 
-
     void registerMessageListener(final MessageListenerOrderly messageListener);
 
-
     /**
      * Subscribe some topic
      *
@@ -69,7 +63,6 @@ public interface MQPushConsumer extends MQConsumer {
      */
     void subscribe(final String topic, final String subExpression) throws MQClientException;
 
-
     /**
      * Subscribe some topic
      *
@@ -85,7 +78,6 @@ public interface MQPushConsumer extends MQConsumer {
      */
     void subscribe(final String topic, final String fullClassName, final String filterClassSource) throws MQClientException;
 
-
     /**
      * Unsubscribe consumption some topic
      *
@@ -94,7 +86,6 @@ public interface MQPushConsumer extends MQConsumer {
      */
     void unsubscribe(final String topic);
 
-
     /**
      * Update the consumer thread pool size Dynamically
      *
@@ -102,13 +93,11 @@ public interface MQPushConsumer extends MQConsumer {
      */
     void updateCorePoolSize(int corePoolSize);
 
-
     /**
      * Suspend the consumption
      */
     void suspend();
 
-
     /**
      * Resume the consumption
      */

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java
index 7a08348..0cc2dc4 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MessageQueueListener.java
@@ -6,20 +6,18 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client.consumer;
 
-import org.apache.rocketmq.common.message.MessageQueue;
-
 import java.util.Set;
-
+import org.apache.rocketmq.common.message.MessageQueue;
 
 /**
  * A MessageQueueListener is implemented by the application and may be specified when a message queue changed
@@ -35,5 +33,5 @@ public interface MessageQueueListener {
      *         collection of queues,assigned to the current consumer
      */
     void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,
-                             final Set<MessageQueue> mqDivided);
+        final Set<MessageQueue> mqDivided);
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/PullCallback.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/PullCallback.java b/client/src/main/java/org/apache/rocketmq/client/consumer/PullCallback.java
index cf554c4..06e47d9 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/PullCallback.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/PullCallback.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client.consumer;
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/PullResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/PullResult.java b/client/src/main/java/org/apache/rocketmq/client/consumer/PullResult.java
index 1cb23ce..e494f74 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/PullResult.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/PullResult.java
@@ -6,20 +6,18 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client.consumer;
 
-import org.apache.rocketmq.common.message.MessageExt;
-
 import java.util.List;
-
+import org.apache.rocketmq.common.message.MessageExt;
 
 public class PullResult {
     private final PullStatus pullStatus;
@@ -28,9 +26,8 @@ public class PullResult {
     private final long maxOffset;
     private List<MessageExt> msgFoundList;
 
-
     public PullResult(PullStatus pullStatus, long nextBeginOffset, long minOffset, long maxOffset,
-                      List<MessageExt> msgFoundList) {
+        List<MessageExt> msgFoundList) {
         super();
         this.pullStatus = pullStatus;
         this.nextBeginOffset = nextBeginOffset;
@@ -39,41 +36,34 @@ public class PullResult {
         this.msgFoundList = msgFoundList;
     }
 
-
     public PullStatus getPullStatus() {
         return pullStatus;
     }
 
-
     public long getNextBeginOffset() {
         return nextBeginOffset;
     }
 
-
     public long getMinOffset() {
         return minOffset;
     }
 
-
     public long getMaxOffset() {
         return maxOffset;
     }
 
-
     public List<MessageExt> getMsgFoundList() {
         return msgFoundList;
     }
 
-
     public void setMsgFoundList(List<MessageExt> msgFoundList) {
         this.msgFoundList = msgFoundList;
     }
 
-
     @Override
     public String toString() {
         return "PullResult [pullStatus=" + pullStatus + ", nextBeginOffset=" + nextBeginOffset
-                + ", minOffset=" + minOffset + ", maxOffset=" + maxOffset + ", msgFoundList="
-                + (msgFoundList == null ? 0 : msgFoundList.size()) + "]";
+            + ", minOffset=" + minOffset + ", maxOffset=" + maxOffset + ", msgFoundList="
+            + (msgFoundList == null ? 0 : msgFoundList.size()) + "]";
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/PullStatus.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/PullStatus.java b/client/src/main/java/org/apache/rocketmq/client/consumer/PullStatus.java
index b2a3c8c..a400d90 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/PullStatus.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/PullStatus.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client.consumer;
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskCallback.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskCallback.java b/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskCallback.java
index dc74bca..bc9a867 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskCallback.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskCallback.java
@@ -6,19 +6,18 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client.consumer;
 
 import org.apache.rocketmq.common.message.MessageQueue;
 
-
 public interface PullTaskCallback {
     void doPullTask(final MessageQueue mq, final PullTaskContext context);
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskContext.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskContext.java b/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskContext.java
index ba66a1f..f0114ae 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskContext.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/PullTaskContext.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.client.consumer;
 
@@ -22,22 +22,18 @@ public class PullTaskContext {
 
     private MQPullConsumer pullConsumer;
 
-
     public int getPullNextDelayTimeMillis() {
         return pullNextDelayTimeMillis;
     }
 
-
     public void setPullNextDelayTimeMillis(int pullNextDelayTimeMillis) {
         this.pullNextDelayTimeMillis = pullNextDelayTimeMillis;
     }
 
-
     public MQPullConsumer getPullConsumer() {
         return pullConsumer;
     }
 
-
     public void setPullConsumer(MQPullConsumer pullConsumer) {
         this.pullConsumer = pullConsumer;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java
index 981ceaf..40ac6c1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/listener/ConsumeConcurrentlyContext.java
@@ -18,10 +18,8 @@ package org.apache.rocketmq.client.consumer.listener;
 
 import org.apache.rocketmq.common.message.MessageQueue;
 
-
 /**
  * Consumer concurrent consumption context
- *
  */
 public class ConsumeConcurrentlyContext {
     private final MessageQueue messageQueue;
@@ -38,27 +36,22 @@ public class ConsumeConcurrentlyContext {
         this.messageQueue = messageQueue;
     }
 
-
     public int getDelayLevelWhenNextConsume() {
         return delayLevelWhenNextConsume;
     }
 
-
     public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
         this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
     }
 
-
     public MessageQueue getMessageQueue() {
         return messageQueue;
     }
 
-
     public int getAckIndex() {
         return ackIndex;
     }
 
-
     public void setAckIndex(int ackIndex) {
         this.ackIndex = ackIndex;
     }