You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vi...@apache.org on 2016/12/28 10:02:49 UTC

[42/50] [abbrv] incubator-rocketmq git commit: ROCKETMQ-18 Reformat all codes.

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
index 0f9954b..3047faf 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
@@ -6,23 +6,17 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.broker.client;
 
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.remoting.common.RemotingHelper;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
 import io.netty.channel.Channel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -30,7 +24,11 @@ import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ProducerManager {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -38,16 +36,14 @@ public class ProducerManager {
     private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
     private final Lock groupChannelLock = new ReentrantLock();
     private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable =
-            new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
-
+        new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
 
     public ProducerManager() {
     }
 
-
     public HashMap<String, HashMap<Channel, ClientChannelInfo>> getGroupChannelTable() {
         HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> newGroupChannelTable =
-                new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
+            new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
         try {
             if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                 try {
@@ -62,13 +58,12 @@ public class ProducerManager {
         return newGroupChannelTable;
     }
 
-
     public void scanNotActiveChannel() {
         try {
             if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                 try {
                     for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
-                            .entrySet()) {
+                        .entrySet()) {
                         final String group = entry.getKey();
                         final HashMap<Channel, ClientChannelInfo> chlMap = entry.getValue();
 
@@ -82,8 +77,8 @@ public class ProducerManager {
                             if (diff > CHANNEL_EXPIRED_TIMEOUT) {
                                 it.remove();
                                 log.warn(
-                                        "SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
-                                        RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
+                                    "SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
+                                    RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
                                 RemotingUtil.closeChannel(info.getChannel());
                             }
                         }
@@ -99,23 +94,22 @@ public class ProducerManager {
         }
     }
 
-
     public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
         if (channel != null) {
             try {
                 if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                     try {
                         for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
-                                .entrySet()) {
+                            .entrySet()) {
                             final String group = entry.getKey();
                             final HashMap<Channel, ClientChannelInfo> clientChannelInfoTable =
-                                    entry.getValue();
+                                entry.getValue();
                             final ClientChannelInfo clientChannelInfo =
-                                    clientChannelInfoTable.remove(channel);
+                                clientChannelInfoTable.remove(channel);
                             if (clientChannelInfo != null) {
                                 log.info(
-                                        "NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",
-                                        clientChannelInfo.toString(), remoteAddr, group);
+                                    "NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",
+                                    clientChannelInfo.toString(), remoteAddr, group);
                             }
 
                         }
@@ -131,7 +125,6 @@ public class ProducerManager {
         }
     }
 
-
     public void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {
         try {
             ClientChannelInfo clientChannelInfoFound = null;
@@ -148,7 +141,7 @@ public class ProducerManager {
                     if (null == clientChannelInfoFound) {
                         channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
                         log.info("new producer connected, group: {} channel: {}", group,
-                                clientChannelInfo.toString());
+                            clientChannelInfo.toString());
                     }
                 } finally {
                     this.groupChannelLock.unlock();
@@ -165,7 +158,6 @@ public class ProducerManager {
         }
     }
 
-
     public void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {
         try {
             if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
@@ -175,7 +167,7 @@ public class ProducerManager {
                         ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
                         if (old != null) {
                             log.info("unregister a producer[{}] from groupChannelTable {}", group,
-                                    clientChannelInfo.toString());
+                                clientChannelInfo.toString());
                         }
 
                         if (channelTable.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
index 40eff81..f6d7955 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
@@ -16,6 +16,16 @@
  */
 package org.apache.rocketmq.broker.client.net;
 
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.FileRegion;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
@@ -40,21 +50,9 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.FileRegion;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
-
 public class Broker2Client {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private final BrokerController brokerController;
@@ -64,17 +62,17 @@ public class Broker2Client {
     }
 
     public void checkProducerTransactionState(
-            final Channel channel,
-            final CheckTransactionStateRequestHeader requestHeader,
-            final SelectMappedBufferResult selectMappedBufferResult) {
+        final Channel channel,
+        final CheckTransactionStateRequestHeader requestHeader,
+        final SelectMappedBufferResult selectMappedBufferResult) {
         RemotingCommand request =
-                RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
+            RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
         request.markOnewayRPC();
 
         try {
             FileRegion fileRegion =
-                    new OneMessageTransfer(request.encodeHeader(selectMappedBufferResult.getSize()),
-                            selectMappedBufferResult);
+                new OneMessageTransfer(request.encodeHeader(selectMappedBufferResult.getSize()),
+                    selectMappedBufferResult);
             channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
                 @Override
                 public void operationComplete(ChannelFuture future) throws Exception {
@@ -91,14 +89,14 @@ public class Broker2Client {
     }
 
     public RemotingCommand callClient(final Channel channel,
-                                      final RemotingCommand request
+        final RemotingCommand request
     ) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
         return this.brokerController.getRemotingServer().invokeSync(channel, request, 10000);
     }
 
     public void notifyConsumerIdsChanged(
-            final Channel channel,
-            final String consumerGroup) {
+        final Channel channel,
+        final String consumerGroup) {
         if (null == consumerGroup) {
             log.error("notifyConsumerIdsChanged consumerGroup is null");
             return;
@@ -107,7 +105,7 @@ public class Broker2Client {
         NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
         requestHeader.setConsumerGroup(consumerGroup);
         RemotingCommand request =
-                RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
+            RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
 
         try {
             this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
@@ -121,7 +119,7 @@ public class Broker2Client {
     }
 
     public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce,
-                                       boolean isC) {
+        boolean isC) {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
 
         TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
@@ -141,7 +139,7 @@ public class Broker2Client {
             mq.setQueueId(i);
 
             long consumerOffset =
-                    this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i);
+                this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i);
             if (-1 == consumerOffset) {
                 response.setCode(ResponseCode.SYSTEM_ERROR);
                 response.setRemark(String.format("THe consumer group <%s> not exist", group));
@@ -173,7 +171,7 @@ public class Broker2Client {
         requestHeader.setGroup(group);
         requestHeader.setTimestamp(timeStamp);
         RemotingCommand request =
-                RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader);
+            RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader);
         if (isC) {
             // c++ language
             ResetOffsetBodyForC body = new ResetOffsetBodyForC();
@@ -188,37 +186,37 @@ public class Broker2Client {
         }
 
         ConsumerGroupInfo consumerGroupInfo =
-                this.brokerController.getConsumerManager().getConsumerGroupInfo(group);
+            this.brokerController.getConsumerManager().getConsumerGroupInfo(group);
 
         if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) {
             ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable =
-                    consumerGroupInfo.getChannelInfoTable();
+                consumerGroupInfo.getChannelInfoTable();
             for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {
                 int version = entry.getValue().getVersion();
                 if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
                     try {
                         this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);
                         log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}",
-                                topic, group, entry.getValue().getClientId());
+                            topic, group, entry.getValue().getClientId());
                     } catch (Exception e) {
                         log.error("[reset-offset] reset offset exception. topic={}, group={}",
-                                new Object[]{topic, group}, e);
+                            new Object[] {topic, group}, e);
                     }
                 } else {
                     response.setCode(ResponseCode.SYSTEM_ERROR);
                     response.setRemark("the client does not support this feature. version="
-                            + MQVersion.getVersionDesc(version));
+                        + MQVersion.getVersionDesc(version));
                     log.warn("[reset-offset] the client does not support this feature. version={}",
-                            RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
+                        RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
                     return response;
                 }
             }
         } else {
             String errorInfo =
-                    String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d",
-                            requestHeader.getGroup(),
-                            requestHeader.getTopic(),
-                            requestHeader.getTimestamp());
+                String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d",
+                    requestHeader.getGroup(),
+                    requestHeader.getTopic(),
+                    requestHeader.getTimestamp());
             log.error(errorInfo);
             response.setCode(ResponseCode.CONSUMER_NOT_ONLINE);
             response.setRemark(errorInfo);
@@ -236,7 +234,7 @@ public class Broker2Client {
         for (Entry<MessageQueue, Long> entry : table.entrySet()) {
             MessageQueue mq = entry.getKey();
             MessageQueueForC tmp =
-                    new MessageQueueForC(mq.getTopic(), mq.getBrokerName(), mq.getQueueId(), entry.getValue());
+                new MessageQueueForC(mq.getTopic(), mq.getBrokerName(), mq.getQueueId(), entry.getValue());
             list.add(tmp);
         }
         return list;
@@ -249,13 +247,13 @@ public class Broker2Client {
         requestHeader.setTopic(topic);
         requestHeader.setGroup(group);
         RemotingCommand request =
-                RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT,
-                        requestHeader);
+            RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT,
+                requestHeader);
 
         Map<String, Map<MessageQueue, Long>> consumerStatusTable =
-                new HashMap<String, Map<MessageQueue, Long>>();
+            new HashMap<String, Map<MessageQueue, Long>>();
         ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable =
-                this.brokerController.getConsumerManager().getConsumerGroupInfo(group).getChannelInfoTable();
+            this.brokerController.getConsumerManager().getConsumerGroupInfo(group).getChannelInfoTable();
         if (null == channelInfoTable || channelInfoTable.isEmpty()) {
             result.setCode(ResponseCode.SYSTEM_ERROR);
             result.setRemark(String.format("No Any Consumer online in the consumer group: [%s]", group));
@@ -268,26 +266,26 @@ public class Broker2Client {
             if (version < MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
                 result.setCode(ResponseCode.SYSTEM_ERROR);
                 result.setRemark("the client does not support this feature. version="
-                        + MQVersion.getVersionDesc(version));
+                    + MQVersion.getVersionDesc(version));
                 log.warn("[get-consumer-status] the client does not support this feature. version={}",
-                        RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
+                    RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
                 return result;
             } else if (UtilAll.isBlank(originClientId) || originClientId.equals(clientId)) {
                 try {
                     RemotingCommand response =
-                            this.brokerController.getRemotingServer().invokeSync(entry.getKey(), request, 5000);
+                        this.brokerController.getRemotingServer().invokeSync(entry.getKey(), request, 5000);
                     assert response != null;
                     switch (response.getCode()) {
                         case ResponseCode.SUCCESS: {
                             if (response.getBody() != null) {
                                 GetConsumerStatusBody body =
-                                        GetConsumerStatusBody.decode(response.getBody(),
-                                                GetConsumerStatusBody.class);
+                                    GetConsumerStatusBody.decode(response.getBody(),
+                                        GetConsumerStatusBody.class);
 
                                 consumerStatusTable.put(clientId, body.getMessageQueueTable());
                                 log.info(
-                                        "[get-consumer-status] get consumer status success. topic={}, group={}, channelRemoteAddr={}",
-                                        topic, group, clientId);
+                                    "[get-consumer-status] get consumer status success. topic={}, group={}, channelRemoteAddr={}",
+                                    topic, group, clientId);
                             }
                         }
                         default:
@@ -295,8 +293,8 @@ public class Broker2Client {
                     }
                 } catch (Exception e) {
                     log.error(
-                            "[get-consumer-status] get consumer status exception. topic={}, group={}, offset={}",
-                            new Object[]{topic, group}, e);
+                        "[get-consumer-status] get consumer status exception. topic={}, group={}, offset={}",
+                        new Object[] {topic, group}, e);
                 }
 
                 if (!UtilAll.isBlank(originClientId) && originClientId.equals(clientId)) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java
index 82ca014..88c044a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java
@@ -16,25 +16,23 @@
  */
 package org.apache.rocketmq.broker.client.rebalance;
 
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class RebalanceLockManager {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.REBALANCE_LOCK_LOGGER_NAME);
     private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty(
-            "rocketmq.broker.rebalance.lockMaxLiveTime", "60000"));
+        "rocketmq.broker.rebalance.lockMaxLiveTime", "60000"));
     private final Lock lock = new ReentrantLock();
     private final ConcurrentHashMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =
-            new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024);
+        new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024);
 
     public boolean tryLock(final String group, final MessageQueue mq, final String clientId) {
 
@@ -54,9 +52,9 @@ public class RebalanceLockManager {
                         lockEntry.setClientId(clientId);
                         groupValue.put(mq, lockEntry);
                         log.info("tryLock, message queue not locked, I got it. Group: {} NewClientId: {} {}", //
-                                group, //
-                                clientId, //
-                                mq);
+                            group, //
+                            clientId, //
+                            mq);
                     }
 
                     if (lockEntry.isLocked(clientId)) {
@@ -66,26 +64,24 @@ public class RebalanceLockManager {
 
                     String oldClientId = lockEntry.getClientId();
 
-
                     if (lockEntry.isExpired()) {
                         lockEntry.setClientId(clientId);
                         lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
                         log.warn(
-                                "tryLock, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", //
-                                group, //
-                                oldClientId, //
-                                clientId, //
-                                mq);
-                        return true;
-                    }
-
-
-                    log.warn(
-                            "tryLock, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", //
+                            "tryLock, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", //
                             group, //
                             oldClientId, //
                             clientId, //
                             mq);
+                        return true;
+                    }
+
+                    log.warn(
+                        "tryLock, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", //
+                        group, //
+                        oldClientId, //
+                        clientId, //
+                        mq);
                     return false;
                 } finally {
                     this.lock.unlock();
@@ -118,11 +114,10 @@ public class RebalanceLockManager {
     }
 
     public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs,
-                                          final String clientId) {
+        final String clientId) {
         Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size());
         Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size());
 
-
         for (MessageQueue mq : mqs) {
             if (this.isLocked(group, mq, clientId)) {
                 lockedMqs.add(mq);
@@ -141,7 +136,6 @@ public class RebalanceLockManager {
                         this.mqLockTable.put(group, groupValue);
                     }
 
-
                     for (MessageQueue mq : notLockedMqs) {
                         LockEntry lockEntry = groupValue.get(mq);
                         if (null == lockEntry) {
@@ -149,13 +143,12 @@ public class RebalanceLockManager {
                             lockEntry.setClientId(clientId);
                             groupValue.put(mq, lockEntry);
                             log.info(
-                                    "tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}", //
-                                    group, //
-                                    clientId, //
-                                    mq);
+                                "tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}", //
+                                group, //
+                                clientId, //
+                                mq);
                         }
 
-
                         if (lockEntry.isLocked(clientId)) {
                             lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
                             lockedMqs.add(mq);
@@ -164,27 +157,25 @@ public class RebalanceLockManager {
 
                         String oldClientId = lockEntry.getClientId();
 
-
                         if (lockEntry.isExpired()) {
                             lockEntry.setClientId(clientId);
                             lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
                             log.warn(
-                                    "tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", //
-                                    group, //
-                                    oldClientId, //
-                                    clientId, //
-                                    mq);
+                                "tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", //
+                                group, //
+                                oldClientId, //
+                                clientId, //
+                                mq);
                             lockedMqs.add(mq);
                             continue;
                         }
 
-
                         log.warn(
-                                "tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", //
-                                group, //
-                                oldClientId, //
-                                clientId, //
-                                mq);
+                            "tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", //
+                            group, //
+                            oldClientId, //
+                            clientId, //
+                            mq);
                     }
                 } finally {
                     this.lock.unlock();
@@ -209,27 +200,27 @@ public class RebalanceLockManager {
                             if (lockEntry.getClientId().equals(clientId)) {
                                 groupValue.remove(mq);
                                 log.info("unlockBatch, Group: {} {} {}",
-                                        group,
-                                        mq,
-                                        clientId);
+                                    group,
+                                    mq,
+                                    clientId);
                             } else {
                                 log.warn("unlockBatch, but mq locked by other client: {}, Group: {} {} {}",
-                                        lockEntry.getClientId(),
-                                        group,
-                                        mq,
-                                        clientId);
-                            }
-                        } else {
-                            log.warn("unlockBatch, but mq not locked, Group: {} {} {}",
+                                    lockEntry.getClientId(),
                                     group,
                                     mq,
                                     clientId);
+                            }
+                        } else {
+                            log.warn("unlockBatch, but mq not locked, Group: {} {} {}",
+                                group,
+                                mq,
+                                clientId);
                         }
                     }
                 } else {
                     log.warn("unlockBatch, group not exist, Group: {} {}",
-                            group,
-                            clientId);
+                        group,
+                        clientId);
                 }
             } finally {
                 this.lock.unlock();
@@ -243,22 +234,18 @@ public class RebalanceLockManager {
         private String clientId;
         private volatile long lastUpdateTimestamp = System.currentTimeMillis();
 
-
         public String getClientId() {
             return clientId;
         }
 
-
         public void setClientId(String clientId) {
             this.clientId = clientId;
         }
 
-
         public long getLastUpdateTimestamp() {
             return lastUpdateTimestamp;
         }
 
-
         public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
             this.lastUpdateTimestamp = lastUpdateTimestamp;
         }
@@ -270,7 +257,7 @@ public class RebalanceLockManager {
 
         public boolean isExpired() {
             boolean expired =
-                    (System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;
+                (System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;
 
             return expired;
         }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
index 5b86d99..c1b67ae 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java
@@ -17,15 +17,7 @@
 
 package org.apache.rocketmq.broker.filtersrv;
 
-import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.BrokerStartup;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.remoting.common.RemotingUtil;
 import io.netty.channel.Channel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -34,18 +26,24 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.BrokerStartup;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class FilterServerManager {
 
     public static final long FILTER_SERVER_MAX_IDLE_TIME_MILLS = 30000;
     private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private final ConcurrentHashMap<Channel, FilterServerInfo> filterServerTable =
-            new ConcurrentHashMap<Channel, FilterServerInfo>(16);
+        new ConcurrentHashMap<Channel, FilterServerInfo>(16);
     private final BrokerController brokerController;
 
     private ScheduledExecutorService scheduledExecutorService = Executors
-            .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FilterServerManagerScheduledThread"));
+        .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FilterServerManagerScheduledThread"));
 
     public FilterServerManager(final BrokerController brokerController) {
         this.brokerController = brokerController;
@@ -67,7 +65,7 @@ public class FilterServerManager {
 
     public void createFilterServer() {
         int more =
-                this.brokerController.getBrokerConfig().getFilterServerNums() - this.filterServerTable.size();
+            this.brokerController.getBrokerConfig().getFilterServerNums() - this.filterServerTable.size();
         String cmd = this.buildStartCommand();
         for (int i = 0; i < more; i++) {
             FilterServerUtil.callShell(cmd, log);
@@ -86,12 +84,12 @@ public class FilterServerManager {
 
         if (RemotingUtil.isWindowsPlatform()) {
             return String.format("start /b %s\\bin\\mqfiltersrv.exe %s",
-                    this.brokerController.getBrokerConfig().getRocketmqHome(),
-                    config);
+                this.brokerController.getBrokerConfig().getRocketmqHome(),
+                config);
         } else {
             return String.format("sh %s/bin/startfsrv.sh %s",
-                    this.brokerController.getBrokerConfig().getRocketmqHome(),
-                    config);
+                this.brokerController.getBrokerConfig().getRocketmqHome(),
+                config);
         }
     }
 
@@ -134,7 +132,7 @@ public class FilterServerManager {
         FilterServerInfo old = this.filterServerTable.remove(channel);
         if (old != null) {
             log.warn("The Filter Server<{}> connection<{}> closed, remove it", old.getFilterServerAddr(),
-                    remoteAddr);
+                remoteAddr);
         }
     }
 
@@ -152,22 +150,18 @@ public class FilterServerManager {
         private String filterServerAddr;
         private long lastUpdateTimestamp;
 
-
         public String getFilterServerAddr() {
             return filterServerAddr;
         }
 
-
         public void setFilterServerAddr(String filterServerAddr) {
             this.filterServerAddr = filterServerAddr;
         }
 
-
         public long getLastUpdateTimestamp() {
             return lastUpdateTimestamp;
         }
 
-
         public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
             this.lastUpdateTimestamp = lastUpdateTimestamp;
         }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java
index de4cc37..9edfbe4 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java
@@ -6,20 +6,19 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.rocketmq.broker.filtersrv;
 
 import org.slf4j.Logger;
 
-
 public class FilterServerUtil {
     public static void callShell(final String shellString, final Logger log) {
         Process process = null;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
index 5359368..f616e33 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
@@ -6,16 +6,19 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.broker.latency;
 
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -24,21 +27,27 @@ import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-
 public class BrokerFastFailure {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
-            "BrokerFastFailureScheduledThread"));
+        "BrokerFastFailureScheduledThread"));
     private final BrokerController brokerController;
 
     public BrokerFastFailure(final BrokerController brokerController) {
         this.brokerController = brokerController;
     }
 
+    public static RequestTask castRunnable(final Runnable runnable) {
+        try {
+            FutureTaskExt object = (FutureTaskExt)runnable;
+            return (RequestTask)object.getRunnable();
+        } catch (Throwable e) {
+            log.error(String.format("castRunnable exception, %s", runnable.getClass().getName()), e);
+        }
+
+        return null;
+    }
+
     public void start() {
         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
             @Override
@@ -95,17 +104,6 @@ public class BrokerFastFailure {
         }
     }
 
-    public static RequestTask castRunnable(final Runnable runnable) {
-        try {
-            FutureTaskExt object = (FutureTaskExt) runnable;
-            return (RequestTask) object.getRunnable();
-        } catch (Throwable e) {
-            log.error(String.format("castRunnable exception, %s", runnable.getClass().getName()), e);
-        }
-
-        return null;
-    }
-
     public void shutdown() {
         this.scheduledExecutorService.shutdown();
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
index 8c4c5e8..5a4ad2d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
@@ -17,22 +17,31 @@
 
 package org.apache.rocketmq.broker.latency;
 
-import java.util.concurrent.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 public class BrokerFixedThreadPoolExecutor extends ThreadPoolExecutor {
-    public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue) {
+    public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit,
+        final BlockingQueue<Runnable> workQueue) {
         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
     }
 
-    public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory) {
+    public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit,
+        final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory) {
         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
     }
 
-    public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final RejectedExecutionHandler handler) {
+    public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit,
+        final BlockingQueue<Runnable> workQueue, final RejectedExecutionHandler handler) {
         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
     }
 
-    public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory, final RejectedExecutionHandler handler) {
+    public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit,
+        final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory, final RejectedExecutionHandler handler) {
         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java
index e261b40..0ee02ad 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java
@@ -6,37 +6,33 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.broker.longpolling;
 
 import java.util.ArrayList;
 import java.util.List;
 
-
 public class ManyPullRequest {
     private final ArrayList<PullRequest> pullRequestList = new ArrayList<PullRequest>();
 
-
     public synchronized void addPullRequest(final PullRequest pullRequest) {
         this.pullRequestList.add(pullRequest);
     }
 
-
     public synchronized void addPullRequest(final List<PullRequest> many) {
         this.pullRequestList.addAll(many);
     }
 
-
     public synchronized List<PullRequest> cloneListAndClear() {
         if (!this.pullRequestList.isEmpty()) {
-            List<PullRequest> result = (ArrayList<PullRequest>) this.pullRequestList.clone();
+            List<PullRequest> result = (ArrayList<PullRequest>)this.pullRequestList.clone();
             this.pullRequestList.clear();
             return result;
         }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
index f953c1e..97e54f9 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
@@ -6,29 +6,26 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.rocketmq.broker.longpolling;
 
 import org.apache.rocketmq.store.MessageArrivingListener;
 
-
 public class NotifyMessageArrivingListener implements MessageArrivingListener {
     private final PullRequestHoldService pullRequestHoldService;
 
-
     public NotifyMessageArrivingListener(final PullRequestHoldService pullRequestHoldService) {
         this.pullRequestHoldService = pullRequestHoldService;
     }
 
-
     @Override
     public void arriving(String topic, int queueId, long logicOffset, long tagsCode) {
         this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode);

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java
index 40716f8..1cceec8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequest.java
@@ -6,20 +6,19 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.broker.longpolling;
 
+import io.netty.channel.Channel;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import io.netty.channel.Channel;
-
 
 public class PullRequest {
     private final RemotingCommand requestCommand;
@@ -29,9 +28,8 @@ public class PullRequest {
     private final long pullFromThisOffset;
     private final SubscriptionData subscriptionData;
 
-
     public PullRequest(RemotingCommand requestCommand, Channel clientChannel, long timeoutMillis, long suspendTimestamp,
-                       long pullFromThisOffset, SubscriptionData subscriptionData) {
+        long pullFromThisOffset, SubscriptionData subscriptionData) {
         this.requestCommand = requestCommand;
         this.clientChannel = clientChannel;
         this.timeoutMillis = timeoutMillis;
@@ -40,27 +38,22 @@ public class PullRequest {
         this.subscriptionData = subscriptionData;
     }
 
-
     public RemotingCommand getRequestCommand() {
         return requestCommand;
     }
 
-
     public Channel getClientChannel() {
         return clientChannel;
     }
 
-
     public long getTimeoutMillis() {
         return timeoutMillis;
     }
 
-
     public long getSuspendTimestamp() {
         return suspendTimestamp;
     }
 
-
     public long getPullFromThisOffset() {
         return pullFromThisOffset;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
index 5182664..0e5be9b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
@@ -16,6 +16,9 @@
  */
 package org.apache.rocketmq.broker.longpolling;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.SystemClock;
@@ -25,11 +28,6 @@ import org.apache.rocketmq.store.MessageFilter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-
 public class PullRequestHoldService extends ServiceThread {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private static final String TOPIC_QUEUEID_SEPARATOR = "@";
@@ -37,8 +35,7 @@ public class PullRequestHoldService extends ServiceThread {
     private final SystemClock systemClock = new SystemClock();
     private final MessageFilter messageFilter = new DefaultMessageFilter();
     private ConcurrentHashMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
-            new ConcurrentHashMap<String, ManyPullRequest>(1024);
-
+        new ConcurrentHashMap<String, ManyPullRequest>(1024);
 
     public PullRequestHoldService(final BrokerController brokerController) {
         this.brokerController = brokerController;
@@ -135,7 +132,7 @@ public class PullRequestHoldService extends ServiceThread {
                         if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tmp)) {
                             try {
                                 this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(),
-                                        request.getRequestCommand());
+                                    request.getRequestCommand());
                             } catch (Throwable e) {
                                 log.error("execute request when wakeup failed.", e);
                             }
@@ -146,14 +143,13 @@ public class PullRequestHoldService extends ServiceThread {
                     if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                         try {
                             this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(),
-                                    request.getRequestCommand());
+                                request.getRequestCommand());
                         } catch (Throwable e) {
                             log.error("execute request when wakeup failed.", e);
                         }
                         continue;
                     }
 
-
                     replayList.add(request);
                 }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java
index 3a167fa..cc2f218 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageContext.java
@@ -6,20 +6,18 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.broker.mqtrace;
 
-import org.apache.rocketmq.store.stats.BrokerStatsManager;
-
 import java.util.Map;
-
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
 
 public class ConsumeMessageContext {
     private String consumerGroup;
@@ -38,102 +36,82 @@ public class ConsumeMessageContext {
     private int commercialRcvTimes;
     private int commercialRcvSize;
 
-
     public String getConsumerGroup() {
         return consumerGroup;
     }
 
-
     public void setConsumerGroup(String consumerGroup) {
         this.consumerGroup = consumerGroup;
     }
 
-
     public String getTopic() {
         return topic;
     }
 
-
     public void setTopic(String topic) {
         this.topic = topic;
     }
 
-
     public Integer getQueueId() {
         return queueId;
     }
 
-
     public void setQueueId(Integer queueId) {
         this.queueId = queueId;
     }
 
-
     public String getClientHost() {
         return clientHost;
     }
 
-
     public void setClientHost(String clientHost) {
         this.clientHost = clientHost;
     }
 
-
     public String getStoreHost() {
         return storeHost;
     }
 
-
     public void setStoreHost(String storeHost) {
         this.storeHost = storeHost;
     }
 
-
     public Map<String, Long> getMessageIds() {
         return messageIds;
     }
 
-
     public void setMessageIds(Map<String, Long> messageIds) {
         this.messageIds = messageIds;
     }
 
-
     public boolean isSuccess() {
         return success;
     }
 
-
     public void setSuccess(boolean success) {
         this.success = success;
     }
 
-
     public String getStatus() {
         return status;
     }
 
-
     public void setStatus(String status) {
         this.status = status;
     }
 
-
     public Object getMqTraceContext() {
         return mqTraceContext;
     }
 
-
     public void setMqTraceContext(Object mqTraceContext) {
         this.mqTraceContext = mqTraceContext;
     }
 
-
     public int getBodyLength() {
         return bodyLength;
     }
 
-
     public void setBodyLength(int bodyLength) {
         this.bodyLength = bodyLength;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageHook.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageHook.java b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageHook.java
index c4b7f36..7e724a0 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageHook.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/ConsumeMessageHook.java
@@ -6,22 +6,20 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.broker.mqtrace;
 
 public interface ConsumeMessageHook {
     String hookName();
 
-
     void consumeMessageBefore(final ConsumeMessageContext context);
 
-
     void consumeMessageAfter(final ConsumeMessageContext context);
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageContext.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageContext.java b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageContext.java
index ca8121d..8ad7919 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageContext.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageContext.java
@@ -6,22 +6,20 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.broker.mqtrace;
 
+import java.util.Properties;
 import org.apache.rocketmq.common.message.MessageType;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 
-import java.util.Properties;
-
-
 public class SendMessageContext {
     private String producerGroup;
     private String topic;
@@ -92,137 +90,110 @@ public class SendMessageContext {
         return producerGroup;
     }
 
-
     public void setProducerGroup(String producerGroup) {
         this.producerGroup = producerGroup;
     }
 
-
     public String getTopic() {
         return topic;
     }
 
-
     public void setTopic(String topic) {
         this.topic = topic;
     }
 
-
     public String getMsgId() {
         return msgId;
     }
 
-
     public void setMsgId(String msgId) {
         this.msgId = msgId;
     }
 
-
     public String getOriginMsgId() {
         return originMsgId;
     }
 
-
     public void setOriginMsgId(String originMsgId) {
         this.originMsgId = originMsgId;
     }
 
-
     public Integer getQueueId() {
         return queueId;
     }
 
-
     public void setQueueId(Integer queueId) {
         this.queueId = queueId;
     }
 
-
     public Long getQueueOffset() {
         return queueOffset;
     }
 
-
     public void setQueueOffset(Long queueOffset) {
         this.queueOffset = queueOffset;
     }
 
-
     public String getBrokerAddr() {
         return brokerAddr;
     }
 
-
     public void setBrokerAddr(String brokerAddr) {
         this.brokerAddr = brokerAddr;
     }
 
-
     public String getBornHost() {
         return bornHost;
     }
 
-
     public void setBornHost(String bornHost) {
         this.bornHost = bornHost;
     }
 
-
     public int getBodyLength() {
         return bodyLength;
     }
 
-
     public void setBodyLength(int bodyLength) {
         this.bodyLength = bodyLength;
     }
 
-
     public int getCode() {
         return code;
     }
 
-
     public void setCode(int code) {
         this.code = code;
     }
 
-
     public String getErrorMsg() {
         return errorMsg;
     }
 
-
     public void setErrorMsg(String errorMsg) {
         this.errorMsg = errorMsg;
     }
 
-
     public String getMsgProps() {
         return msgProps;
     }
 
-
     public void setMsgProps(String msgProps) {
         this.msgProps = msgProps;
     }
 
-
     public Object getMqTraceContext() {
         return mqTraceContext;
     }
 
-
     public void setMqTraceContext(Object mqTraceContext) {
         this.mqTraceContext = mqTraceContext;
     }
 
-
     public Properties getExtProps() {
         return extProps;
     }
 
-
     public void setExtProps(Properties extProps) {
         this.extProps = extProps;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java
index 84cbdcb..a84d899 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/mqtrace/SendMessageHook.java
@@ -6,22 +6,20 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.broker.mqtrace;
 
 public interface SendMessageHook {
     public String hookName();
 
-
     public void sendMessageBefore(final SendMessageContext context);
 
-
     public void sendMessageAfter(final SendMessageContext context);
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index 06ceb36..9346067 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -6,16 +6,23 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.broker.offset;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.BrokerPathConfigHelper;
 import org.apache.rocketmq.common.ConfigManager;
@@ -25,30 +32,22 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
-
 public class ConsumerOffsetManager extends ConfigManager {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private static final String TOPIC_GROUP_SEPARATOR = "@";
 
     private ConcurrentHashMap<String/* topic@group */, ConcurrentHashMap<Integer, Long>> offsetTable =
-            new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>>(512);
+        new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>>(512);
 
     private transient BrokerController brokerController;
 
-
     public ConsumerOffsetManager() {
     }
 
-
     public ConsumerOffsetManager(BrokerController brokerController) {
         this.brokerController = brokerController;
     }
 
-
     public void scanUnsubscribedTopic() {
         Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
         while (it.hasNext()) {
@@ -60,7 +59,7 @@ public class ConsumerOffsetManager extends ConfigManager {
                 String group = arrays[1];
 
                 if (null == brokerController.getConsumerManager().findSubscriptionData(group, topic)
-                        && this.offsetBehindMuchThanData(topic, next.getValue())) {
+                    && this.offsetBehindMuchThanData(topic, next.getValue())) {
                     it.remove();
                     log.warn("remove topic offset, {}", topicAtGroup);
                 }
@@ -68,7 +67,6 @@ public class ConsumerOffsetManager extends ConfigManager {
         }
     }
 
-
     private boolean offsetBehindMuchThanData(final String topic, ConcurrentHashMap<Integer, Long> table) {
         Iterator<Entry<Integer, Long>> it = table.entrySet().iterator();
         boolean result = !table.isEmpty();
@@ -83,7 +81,6 @@ public class ConsumerOffsetManager extends ConfigManager {
         return result;
     }
 
-
     public Set<String> whichTopicByConsumer(final String group) {
         Set<String> topics = new HashSet<String>();
 
@@ -102,7 +99,6 @@ public class ConsumerOffsetManager extends ConfigManager {
         return topics;
     }
 
-
     public Set<String> whichGroupByTopic(final String topic) {
         Set<String> groups = new HashSet<String>();
 
@@ -121,7 +117,6 @@ public class ConsumerOffsetManager extends ConfigManager {
         return groups;
     }
 
-
     public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, final long offset) {
         // topic@group
         String key = topic + TOPIC_GROUP_SEPARATOR + group;
@@ -182,12 +177,10 @@ public class ConsumerOffsetManager extends ConfigManager {
         return offsetTable;
     }
 
-
     public void setOffsetTable(ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> offsetTable) {
         this.offsetTable = offsetTable;
     }
 
-
     public Map<Integer, Long> queryMinOffsetInAllGroup(final String topic, final String filterGroups) {
 
         Map<Integer, Long> queueMinOffset = new HashMap<Integer, Long>();
@@ -224,14 +217,12 @@ public class ConsumerOffsetManager extends ConfigManager {
         return queueMinOffset;
     }
 
-
     public Map<Integer, Long> queryOffset(final String group, final String topic) {
         // topic@group
         String key = topic + TOPIC_GROUP_SEPARATOR + group;
         return this.offsetTable.get(key);
     }
 
-
     public void cloneOffset(final String srcGroup, final String destGroup, final String topic) {
         ConcurrentHashMap<Integer, Long> offsets = this.offsetTable.get(topic + TOPIC_GROUP_SEPARATOR + srcGroup);
         if (offsets != null) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 3d969c4..25b333a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -16,6 +16,9 @@
  */
 package org.apache.rocketmq.broker.out;
 
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -23,24 +26,27 @@ import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
 import org.apache.rocketmq.common.namesrv.TopAddressing;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.body.*;
+import org.apache.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper;
+import org.apache.rocketmq.common.protocol.body.KVTable;
+import org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
+import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
 import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.RemotingClient;
-import org.apache.rocketmq.remoting.exception.*;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.List;
-
-
 public class BrokerOuterAPI {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private final RemotingClient remotingClient;
@@ -92,15 +98,15 @@ public class BrokerOuterAPI {
     }
 
     public RegisterBrokerResult registerBrokerAll(
-            final String clusterName,
-            final String brokerAddr,
-            final String brokerName,
-            final long brokerId,
-            final String haServerAddr,
-            final TopicConfigSerializeWrapper topicConfigWrapper,
-            final List<String> filterServerList,
-            final boolean oneway,
-            final int timeoutMills) {
+        final String clusterName,
+        final String brokerAddr,
+        final String brokerName,
+        final long brokerId,
+        final String haServerAddr,
+        final TopicConfigSerializeWrapper topicConfigWrapper,
+        final List<String> filterServerList,
+        final boolean oneway,
+        final int timeoutMills) {
         RegisterBrokerResult registerBrokerResult = null;
 
         List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
@@ -108,7 +114,7 @@ public class BrokerOuterAPI {
             for (String namesrvAddr : nameServerAddressList) {
                 try {
                     RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
-                            haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);
+                        haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);
                     if (result != null) {
                         registerBrokerResult = result;
                     }
@@ -124,18 +130,18 @@ public class BrokerOuterAPI {
     }
 
     private RegisterBrokerResult registerBroker(
-            final String namesrvAddr,
-            final String clusterName,
-            final String brokerAddr,
-            final String brokerName,
-            final long brokerId,
-            final String haServerAddr,
-            final TopicConfigSerializeWrapper topicConfigWrapper,
-            final List<String> filterServerList,
-            final boolean oneway,
-            final int timeoutMills
+        final String namesrvAddr,
+        final String clusterName,
+        final String brokerAddr,
+        final String brokerName,
+        final long brokerId,
+        final String haServerAddr,
+        final TopicConfigSerializeWrapper topicConfigWrapper,
+        final List<String> filterServerList,
+        final boolean oneway,
+        final int timeoutMills
     ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
-            InterruptedException {
+        InterruptedException {
         RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
         requestHeader.setBrokerAddr(brokerAddr);
         requestHeader.setBrokerId(brokerId);
@@ -163,7 +169,7 @@ public class BrokerOuterAPI {
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 RegisterBrokerResponseHeader responseHeader =
-                        (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
+                    (RegisterBrokerResponseHeader)response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
                 RegisterBrokerResult result = new RegisterBrokerResult();
                 result.setMasterAddr(responseHeader.getMasterAddr());
                 result.setHaServerAddr(responseHeader.getHaServerAddr());
@@ -181,10 +187,10 @@ public class BrokerOuterAPI {
     }
 
     public void unregisterBrokerAll(
-            final String clusterName,
-            final String brokerAddr,
-            final String brokerName,
-            final long brokerId
+        final String clusterName,
+        final String brokerAddr,
+        final String brokerName,
+        final long brokerId
     ) {
         List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
         if (nameServerAddressList != null) {
@@ -200,11 +206,11 @@ public class BrokerOuterAPI {
     }
 
     public void unregisterBroker(
-            final String namesrvAddr,
-            final String clusterName,
-            final String brokerAddr,
-            final String brokerName,
-            final long brokerId
+        final String namesrvAddr,
+        final String clusterName,
+        final String brokerAddr,
+        final String brokerName,
+        final long brokerId
     ) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
         UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader();
         requestHeader.setBrokerAddr(brokerAddr);
@@ -227,7 +233,7 @@ public class BrokerOuterAPI {
     }
 
     public TopicConfigSerializeWrapper getAllTopicConfig(final String addr) throws RemotingConnectException, RemotingSendRequestException,
-            RemotingTimeoutException, InterruptedException, MQBrokerException {
+        RemotingTimeoutException, InterruptedException, MQBrokerException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 3000);
@@ -244,7 +250,7 @@ public class BrokerOuterAPI {
     }
 
     public ConsumerOffsetSerializeWrapper getAllConsumerOffset(final String addr) throws InterruptedException, RemotingTimeoutException,
-            RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        RemotingSendRequestException, RemotingConnectException, MQBrokerException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
         RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
         assert response != null;
@@ -260,7 +266,7 @@ public class BrokerOuterAPI {
     }
 
     public String getAllDelayOffset(final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
-            RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
+        RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, null);
         RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
         assert response != null;
@@ -276,7 +282,7 @@ public class BrokerOuterAPI {
     }
 
     public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(final String addr) throws InterruptedException, RemotingTimeoutException,
-            RemotingSendRequestException, RemotingConnectException, MQBrokerException {
+        RemotingSendRequestException, RemotingConnectException, MQBrokerException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
         RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
         assert response != null;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java
index e4c3045..1b1d5bb 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java
@@ -6,38 +6,34 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.broker.pagecache;
 
-import org.apache.rocketmq.store.GetMessageResult;
 import io.netty.channel.FileRegion;
 import io.netty.util.AbstractReferenceCounted;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
 import java.util.List;
-
+import org.apache.rocketmq.store.GetMessageResult;
 
 public class ManyMessageTransfer extends AbstractReferenceCounted implements FileRegion {
     private final ByteBuffer byteBufferHeader;
     private final GetMessageResult getMessageResult;
     private long transfered; // the bytes which was transfered already
 
-
     public ManyMessageTransfer(ByteBuffer byteBufferHeader, GetMessageResult getMessageResult) {
         this.byteBufferHeader = byteBufferHeader;
         this.getMessageResult = getMessageResult;
     }
 
-
     @Override
     public long position() {
         int pos = byteBufferHeader.position();

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java
index 3f00ece..b4cf0da 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java
@@ -6,37 +6,33 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.broker.pagecache;
 
-import org.apache.rocketmq.store.SelectMappedBufferResult;
 import io.netty.channel.FileRegion;
 import io.netty.util.AbstractReferenceCounted;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
-
+import org.apache.rocketmq.store.SelectMappedBufferResult;
 
 public class OneMessageTransfer extends AbstractReferenceCounted implements FileRegion {
     private final ByteBuffer byteBufferHeader;
     private final SelectMappedBufferResult selectMappedBufferResult;
     private long transfered; // the bytes which was transfered already
 
-
     public OneMessageTransfer(ByteBuffer byteBufferHeader, SelectMappedBufferResult selectMappedBufferResult) {
         this.byteBufferHeader = byteBufferHeader;
         this.selectMappedBufferResult = selectMappedBufferResult;
     }
 
-
     @Override
     public long position() {
         return this.byteBufferHeader.position() + this.selectMappedBufferResult.getByteBuffer().position();