You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vi...@apache.org on 2016/12/28 10:02:48 UTC

[41/50] [abbrv] incubator-rocketmq git commit: ROCKETMQ-18 Reformat all codes.

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java
index 146770a..2971b6c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java
@@ -6,38 +6,34 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.broker.pagecache;
 
-import org.apache.rocketmq.store.QueryMessageResult;
 import io.netty.channel.FileRegion;
 import io.netty.util.AbstractReferenceCounted;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
 import java.util.List;
-
+import org.apache.rocketmq.store.QueryMessageResult;
 
 public class QueryMessageTransfer extends AbstractReferenceCounted implements FileRegion {
     private final ByteBuffer byteBufferHeader;
     private final QueryMessageResult queryMessageResult;
     private long transfered; // the bytes which was transfered already
 
-
     public QueryMessageTransfer(ByteBuffer byteBufferHeader, QueryMessageResult queryMessageResult) {
         this.byteBufferHeader = byteBufferHeader;
         this.queryMessageResult = queryMessageResult;
     }
 
-
     @Override
     public long position() {
         int pos = byteBufferHeader.position();

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
index 601e2f3..bac941d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 /**
@@ -20,12 +20,16 @@
  */
 package org.apache.rocketmq.broker.plugin;
 
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import org.apache.rocketmq.store.*;
-
 import java.util.HashMap;
 import java.util.Set;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.store.GetMessageResult;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.QueryMessageResult;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
 
 public abstract class AbstractPluginMessageStore implements MessageStore {
     protected MessageStore next = null;
@@ -83,7 +87,7 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
 
     @Override
     public GetMessageResult getMessage(String group, String topic, int queueId, long offset,
-                                       int maxMsgNums, SubscriptionData subscriptionData) {
+        int maxMsgNums, SubscriptionData subscriptionData) {
         return next.getMessage(group, topic, queueId, offset, maxMsgNums, subscriptionData);
     }
 
@@ -174,7 +178,7 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
 
     @Override
     public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin,
-                                           long end) {
+        long end) {
         return next.queryMessage(topic, key, maxNum, begin, end);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java
index 42793ae..294bf8c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStoreFactory.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 /**
@@ -20,14 +20,13 @@
  */
 package org.apache.rocketmq.broker.plugin;
 
-import org.apache.rocketmq.store.MessageStore;
-
 import java.io.IOException;
 import java.lang.reflect.Constructor;
+import org.apache.rocketmq.store.MessageStore;
 
 public final class MessageStoreFactory {
     public final static MessageStore build(MessageStorePluginContext context, MessageStore messageStore)
-            throws IOException {
+        throws IOException {
         String plugin = context.getBrokerConfig().getMessageStorePlugIn();
         if (plugin != null && plugin.trim().length() != 0) {
             String[] pluginClasses = plugin.split(",");
@@ -35,12 +34,12 @@ public final class MessageStoreFactory {
                 String pluginClass = pluginClasses[i];
                 try {
                     @SuppressWarnings("unchecked")
-                    Class<AbstractPluginMessageStore> clazz = (Class<AbstractPluginMessageStore>) Class.forName(pluginClass);
+                    Class<AbstractPluginMessageStore> clazz = (Class<AbstractPluginMessageStore>)Class.forName(pluginClass);
                     Constructor<AbstractPluginMessageStore> construct = clazz.getConstructor(MessageStorePluginContext.class, MessageStore.class);
                     messageStore = construct.newInstance(context, messageStore);
                 } catch (Throwable e) {
                     throw new RuntimeException(String.format(
-                            "Initialize plugin's class %s not found!", pluginClass), e);
+                        "Initialize plugin's class %s not found!", pluginClass), e);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java
index 32af402..fcab1e6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/MessageStorePluginContext.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 /**
@@ -32,8 +32,8 @@ public class MessageStorePluginContext {
     private BrokerConfig brokerConfig;
 
     public MessageStorePluginContext(MessageStoreConfig messageStoreConfig,
-                                     BrokerStatsManager brokerStatsManager, MessageArrivingListener messageArrivingListener,
-                                     BrokerConfig brokerConfig) {
+        BrokerStatsManager brokerStatsManager, MessageArrivingListener messageArrivingListener,
+        BrokerConfig brokerConfig) {
         super();
         this.messageStoreConfig = messageStoreConfig;
         this.brokerStatsManager = brokerStatsManager;
@@ -57,5 +57,4 @@ public class MessageStorePluginContext {
         return brokerConfig;
     }
 
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index f04e86c..75e5766 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@ -6,16 +6,22 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.broker.processor;
 
+import io.netty.channel.ChannelHandlerContext;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
 import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
@@ -42,17 +48,9 @@ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.store.MessageExtBrokerInner;
-import io.netty.channel.ChannelHandlerContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-
 public abstract class AbstractSendMessageProcessor implements NettyRequestProcessor {
     protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
 
@@ -62,16 +60,15 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
     protected final SocketAddress storeHost;
     private List<SendMessageHook> sendMessageHookList;
 
-
     public AbstractSendMessageProcessor(final BrokerController brokerController) {
         this.brokerController = brokerController;
         this.storeHost =
-                new InetSocketAddress(brokerController.getBrokerConfig().getBrokerIP1(), brokerController
-                        .getNettyServerConfig().getListenPort());
+            new InetSocketAddress(brokerController.getBrokerConfig().getBrokerIP1(), brokerController
+                .getNettyServerConfig().getListenPort());
     }
 
     protected SendMessageContext buildMsgContext(ChannelHandlerContext ctx,
-                                                 SendMessageRequestHeader requestHeader) {
+        SendMessageRequestHeader requestHeader) {
         if (!this.hasSendMessageHook()) {
             return null;
         }
@@ -91,7 +88,6 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
         properties.put(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
         requestHeader.setProperties(MessageDecoder.messageProperties2String(properties));
 
-
         if (uniqueKey == null) {
             uniqueKey = "";
         }
@@ -104,7 +100,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
     }
 
     protected MessageExtBrokerInner buildInnerMsg(final ChannelHandlerContext ctx,
-                                                  final SendMessageRequestHeader requestHeader, final byte[] body, TopicConfig topicConfig) {
+        final SendMessageRequestHeader requestHeader, final byte[] body, TopicConfig topicConfig) {
         int queueIdInt = requestHeader.getQueueId();
         if (queueIdInt < 0) {
             queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
@@ -120,10 +116,10 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
         msgInner.setBody(body);
         msgInner.setFlag(requestHeader.getFlag());
         MessageAccessor.setProperties(msgInner,
-                MessageDecoder.string2messageProperties(requestHeader.getProperties()));
+            MessageDecoder.string2messageProperties(requestHeader.getProperties()));
         msgInner.setPropertiesString(requestHeader.getProperties());
         msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(),
-                msgInner.getTags()));
+            msgInner.getTags()));
 
         msgInner.setQueueId(queueIdInt);
         msgInner.setSysFlag(sysFlag);
@@ -131,7 +127,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
         msgInner.setBornHost(ctx.channel().remoteAddress());
         msgInner.setStoreHost(this.getStoreHost());
         msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader
-                .getReconsumeTimes());
+            .getReconsumeTimes());
         return msgInner;
     }
 
@@ -140,8 +136,8 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
     }
 
     protected RemotingCommand msgContentCheck(final ChannelHandlerContext ctx,
-                                              final SendMessageRequestHeader requestHeader, RemotingCommand request,
-                                              final RemotingCommand response) {
+        final SendMessageRequestHeader requestHeader, RemotingCommand request,
+        final RemotingCommand response) {
         if (requestHeader.getTopic().length() > Byte.MAX_VALUE) {
             log.warn("putMessage message topic length too long " + requestHeader.getTopic().length());
             response.setCode(ResponseCode.MESSAGE_ILLEGAL);
@@ -149,13 +145,13 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
         }
         if (requestHeader.getProperties() != null && requestHeader.getProperties().length() > Short.MAX_VALUE) {
             log.warn("putMessage message properties length too long "
-                    + requestHeader.getProperties().length());
+                + requestHeader.getProperties().length());
             response.setCode(ResponseCode.MESSAGE_ILLEGAL);
             return response;
         }
         if (request.getBody().length > DBMsgConstants.MAX_BODY_SIZE) {
             log.warn(" topic {}  msg body size {}  from {}", requestHeader.getTopic(),
-                    request.getBody().length, ChannelUtil.getRemoteIp(ctx.channel()));
+                request.getBody().length, ChannelUtil.getRemoteIp(ctx.channel()));
             response.setRemark("msg body must be less 64KB");
             response.setCode(ResponseCode.MESSAGE_ILLEGAL);
             return response;
@@ -164,12 +160,12 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
     }
 
     protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
-                                       final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
+        final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
         if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
-                && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
+            && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
             response.setCode(ResponseCode.NO_PERMISSION);
             response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
-                    + "] sending message is forbidden");
+                + "] sending message is forbidden");
             return response;
         }
         if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {
@@ -181,7 +177,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
         }
 
         TopicConfig topicConfig =
-                this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
+            this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
         if (null == topicConfig) {
             int topicSysFlag = 0;
             if (requestHeader.isUnitMode()) {
@@ -193,26 +189,26 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
             }
 
             log.warn("the topic " + requestHeader.getTopic() + " not exist, producer: "
-                    + ctx.channel().remoteAddress());
+                + ctx.channel().remoteAddress());
             topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(//
-                    requestHeader.getTopic(), //
-                    requestHeader.getDefaultTopic(), //
-                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()), //
-                    requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
+                requestHeader.getTopic(), //
+                requestHeader.getDefaultTopic(), //
+                RemotingHelper.parseChannelRemoteAddr(ctx.channel()), //
+                requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
 
             if (null == topicConfig) {
                 if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                     topicConfig =
-                            this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
-                                    requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,
-                                    topicSysFlag);
+                        this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
+                            requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,
+                            topicSysFlag);
                 }
             }
 
             if (null == topicConfig) {
                 response.setCode(ResponseCode.TOPIC_NOT_EXIST);
                 response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
-                        + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
+                    + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
                 return response;
             }
         }
@@ -221,9 +217,9 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
         int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
         if (queueIdInt >= idValid) {
             String errorInfo = String.format("request queueId[%d] is illagal, %s Producer: %s",
-                    queueIdInt,
-                    topicConfig.toString(),
-                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+                queueIdInt,
+                topicConfig.toString(),
+                RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
 
             log.warn(errorInfo);
             response.setCode(ResponseCode.SYSTEM_ERROR);
@@ -239,7 +235,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
     }
 
     protected void doResponse(ChannelHandlerContext ctx, RemotingCommand request,
-                              final RemotingCommand response) {
+        final RemotingCommand response) {
         if (!request.isOnewayRPC()) {
             try {
                 ctx.writeAndFlush(response);
@@ -252,7 +248,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
     }
 
     public void executeSendMessageHookBefore(final ChannelHandlerContext ctx, final RemotingCommand request,
-                                             SendMessageContext context) {
+        SendMessageContext context) {
         if (hasSendMessageHook()) {
             for (SendMessageHook hook : this.sendMessageHookList) {
                 try {
@@ -280,20 +276,20 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
     }
 
     protected SendMessageRequestHeader parseRequestHeader(RemotingCommand request)
-            throws RemotingCommandException {
+        throws RemotingCommandException {
 
         SendMessageRequestHeaderV2 requestHeaderV2 = null;
         SendMessageRequestHeader requestHeader = null;
         switch (request.getCode()) {
             case RequestCode.SEND_MESSAGE_V2:
                 requestHeaderV2 =
-                        (SendMessageRequestHeaderV2) request
-                                .decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
+                    (SendMessageRequestHeaderV2)request
+                        .decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
             case RequestCode.SEND_MESSAGE:
                 if (null == requestHeaderV2) {
                     requestHeader =
-                            (SendMessageRequestHeader) request
-                                    .decodeCommandCustomHeader(SendMessageRequestHeader.class);
+                        (SendMessageRequestHeader)request
+                            .decodeCommandCustomHeader(SendMessageRequestHeader.class);
                 } else {
                     requestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2);
                 }
@@ -309,7 +305,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces
                 try {
                     if (response != null) {
                         final SendMessageResponseHeader responseHeader =
-                                (SendMessageResponseHeader) response.readCustomHeader();
+                            (SendMessageResponseHeader)response.readCustomHeader();
                         context.setMsgId(responseHeader.getMsgId());
                         context.setQueueId(responseHeader.getQueueId());
                         context.setQueueOffset(responseHeader.getQueueOffset());

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index d2d4bc7..722bec2 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -16,6 +16,19 @@
  */
 package org.apache.rocketmq.broker.processor;
 
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import java.io.UnsupportedEncodingException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
@@ -33,8 +46,48 @@ import org.apache.rocketmq.common.message.MessageId;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.body.*;
-import org.apache.rocketmq.common.protocol.header.*;
+import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
+import org.apache.rocketmq.common.protocol.body.BrokerStatsItem;
+import org.apache.rocketmq.common.protocol.body.Connection;
+import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
+import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.common.protocol.body.GroupList;
+import org.apache.rocketmq.common.protocol.body.KVTable;
+import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
+import org.apache.rocketmq.common.protocol.body.LockBatchResponseBody;
+import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody;
+import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody;
+import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
+import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
+import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
+import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
+import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader;
+import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetAllTopicConfigResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetBrokerConfigResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumeStatsInBrokerHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumeStatsRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumerConnectionListRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetProducerConnectionListRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumeTimeSpanRequestHeader;
+import org.apache.rocketmq.common.protocol.header.QueryCorrectionOffsetHeader;
+import org.apache.rocketmq.common.protocol.header.QueryTopicConsumeByWhoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader;
 import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerRequestHeader;
 import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterFilterServerResponseHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
@@ -50,17 +103,9 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.UnsupportedEncodingException;
-import java.net.UnknownHostException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-
-
 public class AdminBrokerProcessor implements NettyRequestProcessor {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private final BrokerController brokerController;
@@ -157,10 +202,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
     private RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         final CreateTopicRequestHeader requestHeader =
-                (CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
+            (CreateTopicRequestHeader)request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
         log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
 
-
         if (requestHeader.getTopic().equals(this.brokerController.getBrokerConfig().getBrokerClusterName())) {
             String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
             log.warn(errorMsg);
@@ -193,13 +237,13 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
     private RemotingCommand deleteTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         DeleteTopicRequestHeader requestHeader =
-                (DeleteTopicRequestHeader) request.decodeCommandCustomHeader(DeleteTopicRequestHeader.class);
+            (DeleteTopicRequestHeader)request.decodeCommandCustomHeader(DeleteTopicRequestHeader.class);
 
         log.info("deleteTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
 
         this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic());
         this.brokerController.getMessageStore()
-                .cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet());
+            .cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet());
 
         response.setCode(ResponseCode.SUCCESS);
         response.setRemark(null);
@@ -274,7 +318,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
     private RemotingCommand getBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
 
         final RemotingCommand response = RemotingCommand.createResponseCommand(GetBrokerConfigResponseHeader.class);
-        final GetBrokerConfigResponseHeader responseHeader = (GetBrokerConfigResponseHeader) response.readCustomHeader();
+        final GetBrokerConfigResponseHeader responseHeader = (GetBrokerConfigResponseHeader)response.readCustomHeader();
 
         String content = this.brokerController.getConfiguration().getAllConfigsFormatString();
         if (content != null && content.length() > 0) {
@@ -298,12 +342,12 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
 
     private RemotingCommand searchOffsetByTimestamp(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class);
-        final SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader) response.readCustomHeader();
+        final SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader)response.readCustomHeader();
         final SearchOffsetRequestHeader requestHeader =
-                (SearchOffsetRequestHeader) request.decodeCommandCustomHeader(SearchOffsetRequestHeader.class);
+            (SearchOffsetRequestHeader)request.decodeCommandCustomHeader(SearchOffsetRequestHeader.class);
 
         long offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(requestHeader.getTopic(), requestHeader.getQueueId(),
-                requestHeader.getTimestamp());
+            requestHeader.getTimestamp());
 
         responseHeader.setOffset(offset);
 
@@ -314,9 +358,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
 
     private RemotingCommand getMaxOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
-        final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader();
+        final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader)response.readCustomHeader();
         final GetMaxOffsetRequestHeader requestHeader =
-                (GetMaxOffsetRequestHeader) request.decodeCommandCustomHeader(GetMaxOffsetRequestHeader.class);
+            (GetMaxOffsetRequestHeader)request.decodeCommandCustomHeader(GetMaxOffsetRequestHeader.class);
 
         long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(requestHeader.getTopic(), requestHeader.getQueueId());
 
@@ -329,9 +373,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
 
     private RemotingCommand getMinOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
-        final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader();
+        final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader)response.readCustomHeader();
         final GetMinOffsetRequestHeader requestHeader =
-                (GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
+            (GetMinOffsetRequestHeader)request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
 
         long offset = this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(), requestHeader.getQueueId());
 
@@ -343,12 +387,12 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
 
     private RemotingCommand getEarliestMsgStoretime(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(GetEarliestMsgStoretimeResponseHeader.class);
-        final GetEarliestMsgStoretimeResponseHeader responseHeader = (GetEarliestMsgStoretimeResponseHeader) response.readCustomHeader();
+        final GetEarliestMsgStoretimeResponseHeader responseHeader = (GetEarliestMsgStoretimeResponseHeader)response.readCustomHeader();
         final GetEarliestMsgStoretimeRequestHeader requestHeader =
-                (GetEarliestMsgStoretimeRequestHeader) request.decodeCommandCustomHeader(GetEarliestMsgStoretimeRequestHeader.class);
+            (GetEarliestMsgStoretimeRequestHeader)request.decodeCommandCustomHeader(GetEarliestMsgStoretimeRequestHeader.class);
 
         long timestamp =
-                this.brokerController.getMessageStore().getEarliestMessageTime(requestHeader.getTopic(), requestHeader.getQueueId());
+            this.brokerController.getMessageStore().getEarliestMessageTime(requestHeader.getTopic(), requestHeader.getQueueId());
 
         responseHeader.setTimestamp(timestamp);
         response.setCode(ResponseCode.SUCCESS);
@@ -375,9 +419,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class);
 
         Set<MessageQueue> lockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch(//
-                requestBody.getConsumerGroup(), //
-                requestBody.getMqSet(), //
-                requestBody.getClientId());
+            requestBody.getConsumerGroup(), //
+            requestBody.getMqSet(), //
+            requestBody.getClientId());
 
         LockBatchResponseBody responseBody = new LockBatchResponseBody();
         responseBody.setLockOKMQSet(lockOKMQSet);
@@ -393,9 +437,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         UnlockBatchRequestBody requestBody = UnlockBatchRequestBody.decode(request.getBody(), UnlockBatchRequestBody.class);
 
         this.brokerController.getRebalanceLockManager().unlockBatch(//
-                requestBody.getConsumerGroup(), //
-                requestBody.getMqSet(), //
-                requestBody.getClientId());
+            requestBody.getConsumerGroup(), //
+            requestBody.getMqSet(), //
+            requestBody.getClientId());
 
         response.setCode(ResponseCode.SUCCESS);
         response.setRemark(null);
@@ -403,7 +447,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
     }
 
     private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request)
-            throws RemotingCommandException {
+        throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
 
         log.info("updateAndCreateSubscriptionGroup called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
@@ -447,7 +491,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
     private RemotingCommand deleteSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         DeleteSubscriptionGroupRequestHeader requestHeader =
-                (DeleteSubscriptionGroupRequestHeader) request.decodeCommandCustomHeader(DeleteSubscriptionGroupRequestHeader.class);
+            (DeleteSubscriptionGroupRequestHeader)request.decodeCommandCustomHeader(DeleteSubscriptionGroupRequestHeader.class);
 
         log.info("deleteSubscriptionGroup called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
 
@@ -461,7 +505,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
     private RemotingCommand getTopicStatsInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         final GetTopicStatsInfoRequestHeader requestHeader =
-                (GetTopicStatsInfoRequestHeader) request.decodeCommandCustomHeader(GetTopicStatsInfoRequestHeader.class);
+            (GetTopicStatsInfoRequestHeader)request.decodeCommandCustomHeader(GetTopicStatsInfoRequestHeader.class);
 
         final String topic = requestHeader.getTopic();
         TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
@@ -509,10 +553,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
     private RemotingCommand getConsumerConnectionList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         final GetConsumerConnectionListRequestHeader requestHeader =
-                (GetConsumerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetConsumerConnectionListRequestHeader.class);
+            (GetConsumerConnectionListRequestHeader)request.decodeCommandCustomHeader(GetConsumerConnectionListRequestHeader.class);
 
         ConsumerGroupInfo consumerGroupInfo =
-                this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
+            this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
         if (consumerGroupInfo != null) {
             ConsumerConnection bodydata = new ConsumerConnection();
             bodydata.setConsumeFromWhere(consumerGroupInfo.getConsumeFromWhere());
@@ -548,11 +592,11 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
     private RemotingCommand getProducerConnectionList(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         final GetProducerConnectionListRequestHeader requestHeader =
-                (GetProducerConnectionListRequestHeader) request.decodeCommandCustomHeader(GetProducerConnectionListRequestHeader.class);
+            (GetProducerConnectionListRequestHeader)request.decodeCommandCustomHeader(GetProducerConnectionListRequestHeader.class);
 
         ProducerConnection bodydata = new ProducerConnection();
         HashMap<Channel, ClientChannelInfo> channelInfoHashMap =
-                this.brokerController.getProducerManager().getGroupChannelTable().get(requestHeader.getProducerGroup());
+            this.brokerController.getProducerManager().getGroupChannelTable().get(requestHeader.getProducerGroup());
         if (channelInfoHashMap != null) {
             Iterator<Map.Entry<Channel, ClientChannelInfo>> it = channelInfoHashMap.entrySet().iterator();
             while (it.hasNext()) {
@@ -581,7 +625,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
     private RemotingCommand getConsumeStats(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         final GetConsumeStatsRequestHeader requestHeader =
-                (GetConsumeStatsRequestHeader) request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class);
+            (GetConsumeStatsRequestHeader)request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class);
 
         ConsumeStats consumeStats = new ConsumeStats();
 
@@ -604,10 +648,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
              */
             {
                 SubscriptionData findSubscriptionData =
-                        this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic);
+                    this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic);
 
                 if (null == findSubscriptionData //
-                        && this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup()) > 0) {
+                    && this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup()) > 0) {
                     log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", requestHeader.getConsumerGroup(), topic);
                     continue;
                 }
@@ -626,16 +670,15 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
                     brokerOffset = 0;
 
                 long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(//
-                        requestHeader.getConsumerGroup(), //
-                        topic, //
-                        i);
+                    requestHeader.getConsumerGroup(), //
+                    topic, //
+                    i);
                 if (consumerOffset < 0)
                     consumerOffset = 0;
 
                 offsetWrapper.setBrokerOffset(brokerOffset);
                 offsetWrapper.setConsumerOffset(consumerOffset);
 
-
                 long timeOffset = consumerOffset - 1;
                 if (timeOffset >= 0) {
                     long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset);
@@ -690,7 +733,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
     private RemotingCommand getAllDelayOffset(ChannelHandlerContext ctx, RemotingCommand request) {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
 
-        String content = ((DefaultMessageStore) this.brokerController.getMessageStore()).getScheduleMessageService().encode();
+        String content = ((DefaultMessageStore)this.brokerController.getMessageStore()).getScheduleMessageService().encode();
         if (content != null && content.length() > 0) {
             try {
                 response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));
@@ -716,10 +759,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
 
     public RemotingCommand resetOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
         final ResetOffsetRequestHeader requestHeader =
-                (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
+            (ResetOffsetRequestHeader)request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class);
         log.info("[reset-offset] reset offset started by {}. topic={}, group={}, timestamp={}, isForce={}",
-                RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(),
-                requestHeader.getTimestamp(), requestHeader.isForce());
+            RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(),
+            requestHeader.getTimestamp(), requestHeader.isForce());
         boolean isC = false;
         LanguageCode language = request.getLanguage();
         switch (language) {
@@ -728,25 +771,24 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
                 break;
         }
         return this.brokerController.getBroker2Client().resetOffset(requestHeader.getTopic(), requestHeader.getGroup(),
-                requestHeader.getTimestamp(), requestHeader.isForce(), isC);
+            requestHeader.getTimestamp(), requestHeader.isForce(), isC);
     }
 
     public RemotingCommand getConsumerStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
         final GetConsumerStatusRequestHeader requestHeader =
-                (GetConsumerStatusRequestHeader) request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class);
+            (GetConsumerStatusRequestHeader)request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class);
 
         log.info("[get-consumer-status] get consumer status by {}. topic={}, group={}",
-                RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup());
+            RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup());
 
         return this.brokerController.getBroker2Client().getConsumeStatus(requestHeader.getTopic(), requestHeader.getGroup(),
-                requestHeader.getClientAddr());
+            requestHeader.getClientAddr());
     }
 
     private RemotingCommand queryTopicConsumeByWho(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         QueryTopicConsumeByWhoRequestHeader requestHeader =
-                (QueryTopicConsumeByWhoRequestHeader) request.decodeCommandCustomHeader(QueryTopicConsumeByWhoRequestHeader.class);
-
+            (QueryTopicConsumeByWhoRequestHeader)request.decodeCommandCustomHeader(QueryTopicConsumeByWhoRequestHeader.class);
 
         HashSet<String> groups = this.brokerController.getConsumerManager().queryTopicConsumeByWho(requestHeader.getTopic());
 
@@ -767,9 +809,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
 
     private RemotingCommand registerFilterServer(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterFilterServerResponseHeader.class);
-        final RegisterFilterServerResponseHeader responseHeader = (RegisterFilterServerResponseHeader) response.readCustomHeader();
+        final RegisterFilterServerResponseHeader responseHeader = (RegisterFilterServerResponseHeader)response.readCustomHeader();
         final RegisterFilterServerRequestHeader requestHeader =
-                (RegisterFilterServerRequestHeader) request.decodeCommandCustomHeader(RegisterFilterServerRequestHeader.class);
+            (RegisterFilterServerRequestHeader)request.decodeCommandCustomHeader(RegisterFilterServerRequestHeader.class);
 
         this.brokerController.getFilterServerManager().registerFilterServer(ctx.channel(), requestHeader.getFilterServerAddr());
 
@@ -784,7 +826,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
     private RemotingCommand queryConsumeTimeSpan(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         QueryConsumeTimeSpanRequestHeader requestHeader =
-                (QueryConsumeTimeSpanRequestHeader) request.decodeCommandCustomHeader(QueryConsumeTimeSpanRequestHeader.class);
+            (QueryConsumeTimeSpanRequestHeader)request.decodeCommandCustomHeader(QueryConsumeTimeSpanRequestHeader.class);
 
         final String topic = requestHeader.getTopic();
         TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
@@ -812,7 +854,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
 
             long consumeTime;
             long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(
-                    requestHeader.getGroup(), topic, i);
+                requestHeader.getGroup(), topic, i);
             if (consumerOffset > 0) {
                 consumeTime = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, consumerOffset - 1);
             } else {
@@ -837,7 +879,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
     }
 
     private RemotingCommand getSystemTopicListFromBroker(ChannelHandlerContext ctx, RemotingCommand request)
-            throws RemotingCommandException {
+        throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
 
         Set<String> topics = this.brokerController.getTopicConfigManager().getSystemTopic();
@@ -874,28 +916,28 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
      */
     private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
         final GetConsumerRunningInfoRequestHeader requestHeader =
-                (GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);
+            (GetConsumerRunningInfoRequestHeader)request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);
 
         return this.callConsumer(RequestCode.GET_CONSUMER_RUNNING_INFO, request, requestHeader.getConsumerGroup(),
-                requestHeader.getClientId());
+            requestHeader.getClientId());
     }
 
     private RemotingCommand queryCorrectionOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         QueryCorrectionOffsetHeader requestHeader =
-                (QueryCorrectionOffsetHeader) request.decodeCommandCustomHeader(QueryCorrectionOffsetHeader.class);
+            (QueryCorrectionOffsetHeader)request.decodeCommandCustomHeader(QueryCorrectionOffsetHeader.class);
 
         Map<Integer, Long> correctionOffset = this.brokerController.getConsumerOffsetManager()
-                .queryMinOffsetInAllGroup(requestHeader.getTopic(), requestHeader.getFilterGroups());
+            .queryMinOffsetInAllGroup(requestHeader.getTopic(), requestHeader.getFilterGroups());
 
         Map<Integer, Long> compareOffset =
-                this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getTopic(), requestHeader.getCompareGroup());
+            this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getTopic(), requestHeader.getCompareGroup());
 
         if (compareOffset != null && !compareOffset.isEmpty()) {
             for (Map.Entry<Integer, Long> entry : compareOffset.entrySet()) {
                 Integer queueId = entry.getKey();
                 correctionOffset.put(queueId,
-                        correctionOffset.get(queueId) > entry.getValue() ? Long.MAX_VALUE : correctionOffset.get(queueId));
+                    correctionOffset.get(queueId) > entry.getValue() ? Long.MAX_VALUE : correctionOffset.get(queueId));
             }
         }
 
@@ -908,8 +950,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
     }
 
     private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
-        final ConsumeMessageDirectlyResultRequestHeader requestHeader = (ConsumeMessageDirectlyResultRequestHeader) request
-                .decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class);
+        final ConsumeMessageDirectlyResultRequestHeader requestHeader = (ConsumeMessageDirectlyResultRequestHeader)request
+            .decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class);
 
         request.getExtFields().put("brokerName", this.brokerController.getBrokerConfig().getBrokerName());
         SelectMappedBufferResult selectMappedBufferResult = null;
@@ -928,13 +970,13 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         }
 
         return this.callConsumer(RequestCode.CONSUME_MESSAGE_DIRECTLY, request, requestHeader.getConsumerGroup(),
-                requestHeader.getClientId());
+            requestHeader.getClientId());
     }
 
     private RemotingCommand cloneGroupOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         CloneGroupOffsetRequestHeader requestHeader =
-                (CloneGroupOffsetRequestHeader) request.decodeCommandCustomHeader(CloneGroupOffsetRequestHeader.class);
+            (CloneGroupOffsetRequestHeader)request.decodeCommandCustomHeader(CloneGroupOffsetRequestHeader.class);
 
         Set<String> topics;
         if (UtilAll.isBlank(requestHeader.getTopic())) {
@@ -957,16 +999,16 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
             if (!requestHeader.isOffline()) {
 
                 SubscriptionData findSubscriptionData =
-                        this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getSrcGroup(), topic);
+                    this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getSrcGroup(), topic);
                 if (this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getSrcGroup()) > 0
-                        && findSubscriptionData == null) {
+                    && findSubscriptionData == null) {
                     log.warn("[cloneGroupOffset], the consumer group[{}], topic[{}] not exist", requestHeader.getSrcGroup(), topic);
                     continue;
                 }
             }
 
             this.brokerController.getConsumerOffsetManager().cloneOffset(requestHeader.getSrcGroup(), requestHeader.getDestGroup(),
-                    requestHeader.getTopic());
+                requestHeader.getTopic());
         }
 
         response.setCode(ResponseCode.SUCCESS);
@@ -976,9 +1018,9 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
 
     private RemotingCommand ViewBrokerStatsData(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
         final ViewBrokerStatsDataRequestHeader requestHeader =
-                (ViewBrokerStatsDataRequestHeader) request.decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class);
+            (ViewBrokerStatsDataRequestHeader)request.decodeCommandCustomHeader(ViewBrokerStatsDataRequestHeader.class);
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
-        DefaultMessageStore messageStore = (DefaultMessageStore) this.brokerController.getMessageStore();
+        DefaultMessageStore messageStore = (DefaultMessageStore)this.brokerController.getMessageStore();
 
         StatsItem statsItem = messageStore.getBrokerStatsManager().getStatsItem(requestHeader.getStatsName(), requestHeader.getStatsKey());
         if (null == statsItem) {
@@ -998,7 +1040,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
             brokerStatsData.setStatsMinute(it);
         }
 
-
         {
             BrokerStatsItem it = new BrokerStatsItem();
             StatsSnapshot ss = statsItem.getStatsDataInHour();
@@ -1008,7 +1049,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
             brokerStatsData.setStatsHour(it);
         }
 
-
         {
             BrokerStatsItem it = new BrokerStatsItem();
             StatsSnapshot ss = statsItem.getStatsDataInDay();
@@ -1025,16 +1065,16 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
     }
 
     private RemotingCommand fetchAllConsumeStatsInBroker(ChannelHandlerContext ctx, RemotingCommand request)
-            throws RemotingCommandException {
+        throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         GetConsumeStatsInBrokerHeader requestHeader =
-                (GetConsumeStatsInBrokerHeader) request.decodeCommandCustomHeader(GetConsumeStatsInBrokerHeader.class);
+            (GetConsumeStatsInBrokerHeader)request.decodeCommandCustomHeader(GetConsumeStatsInBrokerHeader.class);
         boolean isOrder = requestHeader.isOrder();
         ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroups =
-                brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable();
+            brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable();
 
         List<Map<String/* subscriptionGroupName */, List<ConsumeStats>>> brokerConsumeStatsList =
-                new ArrayList<Map<String, List<ConsumeStats>>>();
+            new ArrayList<Map<String, List<ConsumeStats>>>();
 
         long totalDiff = 0L;
 
@@ -1060,7 +1100,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
                     SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(group, topic);
 
                     if (null == findSubscriptionData //
-                            && this.brokerController.getConsumerManager().findSubscriptionDataCount(group) > 0) {
+                        && this.brokerController.getConsumerManager().findSubscriptionDataCount(group) > 0) {
                         log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", group, topic);
                         continue;
                     }
@@ -1076,16 +1116,15 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
                     if (brokerOffset < 0)
                         brokerOffset = 0;
                     long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(//
-                            group, //
-                            topic, //
-                            i);
+                        group, //
+                        topic, //
+                        i);
                     if (consumerOffset < 0)
                         consumerOffset = 0;
 
                     offsetWrapper.setBrokerOffset(brokerOffset);
                     offsetWrapper.setConsumerOffset(consumerOffset);
 
-
                     long timeOffset = consumerOffset - 1;
                     if (timeOffset >= 0) {
                         long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset);
@@ -1120,23 +1159,23 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         runtimeInfo.put("brokerVersion", String.valueOf(MQVersion.CURRENT_VERSION));
 
         runtimeInfo.put("msgPutTotalYesterdayMorning",
-                String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalYesterdayMorning()));
+            String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalYesterdayMorning()));
         runtimeInfo.put("msgPutTotalTodayMorning", String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalTodayMorning()));
         runtimeInfo.put("msgPutTotalTodayNow", String.valueOf(this.brokerController.getBrokerStats().getMsgPutTotalTodayNow()));
 
         runtimeInfo.put("msgGetTotalYesterdayMorning",
-                String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalYesterdayMorning()));
+            String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalYesterdayMorning()));
         runtimeInfo.put("msgGetTotalTodayMorning", String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalTodayMorning()));
         runtimeInfo.put("msgGetTotalTodayNow", String.valueOf(this.brokerController.getBrokerStats().getMsgGetTotalTodayNow()));
 
         runtimeInfo.put("sendThreadPoolQueueSize", String.valueOf(this.brokerController.getSendThreadPoolQueue().size()));
 
         runtimeInfo.put("sendThreadPoolQueueCapacity",
-                String.valueOf(this.brokerController.getBrokerConfig().getSendThreadPoolQueueCapacity()));
+            String.valueOf(this.brokerController.getBrokerConfig().getSendThreadPoolQueueCapacity()));
 
         runtimeInfo.put("pullThreadPoolQueueSize", String.valueOf(this.brokerController.getPullThreadPoolQueue().size()));
         runtimeInfo.put("pullThreadPoolQueueCapacity",
-                String.valueOf(this.brokerController.getBrokerConfig().getPullThreadPoolQueueCapacity()));
+            String.valueOf(this.brokerController.getBrokerConfig().getPullThreadPoolQueueCapacity()));
 
         runtimeInfo.put("dispatchBehindBytes", String.valueOf(this.brokerController.getMessageStore().dispatchBehindBytes()));
         runtimeInfo.put("pageCacheLockTimeMills", String.valueOf(this.brokerController.getMessageStore().lockTimeMills()));
@@ -1146,7 +1185,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         runtimeInfo.put("earliestMessageTimeStamp", String.valueOf(this.brokerController.getMessageStore().getEarliestMessageTime()));
         runtimeInfo.put("startAcceptSendRequestTimeStamp", String.valueOf(this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp()));
         if (this.brokerController.getMessageStore() instanceof DefaultMessageStore) {
-            DefaultMessageStore defaultMessageStore = (DefaultMessageStore) this.brokerController.getMessageStore();
+            DefaultMessageStore defaultMessageStore = (DefaultMessageStore)this.brokerController.getMessageStore();
             runtimeInfo.put("remainTransientStoreBufferNumbs", String.valueOf(defaultMessageStore.remainTransientStoreBufferNumbs()));
             if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                 runtimeInfo.put("remainHowManyDataToCommit", MixAll.humanReadableByteCount(defaultMessageStore.getCommitLog().remainHowManyDataToCommit(), false));
@@ -1163,10 +1202,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
     }
 
     private RemotingCommand callConsumer(//
-                                         final int requestCode, //
-                                         final RemotingCommand request, //
-                                         final String consumerGroup, //
-                                         final String clientId) throws RemotingCommandException {
+        final int requestCode, //
+        final RemotingCommand request, //
+        final String consumerGroup, //
+        final String clientId) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         ClientChannelInfo clientChannelInfo = this.brokerController.getConsumerManager().findChannel(consumerGroup, clientId);
 
@@ -1179,8 +1218,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         if (clientChannelInfo.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) {
             response.setCode(ResponseCode.SYSTEM_ERROR);
             response.setRemark(String.format("The Consumer <%s> Version <%s> too low to finish, please upgrade it to V3_1_8_SNAPSHOT", //
-                    clientId, //
-                    MQVersion.getVersionDesc(clientChannelInfo.getVersion())));
+                clientId, //
+                MQVersion.getVersionDesc(clientChannelInfo.getVersion())));
             return response;
         }
 
@@ -1193,12 +1232,12 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         } catch (RemotingTimeoutException e) {
             response.setCode(ResponseCode.CONSUME_MSG_TIMEOUT);
             response
-                    .setRemark(String.format("consumer <%s> <%s> Timeout: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e)));
+                .setRemark(String.format("consumer <%s> <%s> Timeout: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e)));
             return response;
         } catch (Exception e) {
             response.setCode(ResponseCode.SYSTEM_ERROR);
             response.setRemark(
-                    String.format("invoke consumer <%s> <%s> Exception: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e)));
+                String.format("invoke consumer <%s> <%s> Exception: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e)));
             return response;
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
index 62de995..717afaf 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.broker.processor;
 
+import io.netty.channel.ChannelHandlerContext;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.common.MixAll;
@@ -34,11 +35,9 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import io.netty.channel.ChannelHandlerContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class ClientManageProcessor implements NettyRequestProcessor {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private final BrokerController brokerController;
@@ -49,7 +48,7 @@ public class ClientManageProcessor implements NettyRequestProcessor {
 
     @Override
     public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
-            throws RemotingCommandException {
+        throws RemotingCommandException {
         switch (request.getCode()) {
             case RequestCode.HEART_BEAT:
                 return this.heartBeat(ctx, request);
@@ -70,16 +69,16 @@ public class ClientManageProcessor implements NettyRequestProcessor {
         RemotingCommand response = RemotingCommand.createResponseCommand(null);
         HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
         ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
-                ctx.channel(),
-                heartbeatData.getClientID(),
-                request.getLanguage(),
-                request.getVersion()
+            ctx.channel(),
+            heartbeatData.getClientID(),
+            request.getLanguage(),
+            request.getVersion()
         );
 
         for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
             SubscriptionGroupConfig subscriptionGroupConfig =
-                    this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
-                            data.getGroupName());
+                this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
+                    data.getGroupName());
             boolean isNotifyConsumerIdsChangedEnable = true;
             if (null != subscriptionGroupConfig) {
                 isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
@@ -89,32 +88,32 @@ public class ClientManageProcessor implements NettyRequestProcessor {
                 }
                 String newTopic = MixAll.getRetryTopic(data.getGroupName());
                 this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
-                        newTopic,
-                        subscriptionGroupConfig.getRetryQueueNums(),
-                        PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
+                    newTopic,
+                    subscriptionGroupConfig.getRetryQueueNums(),
+                    PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
             }
 
             boolean changed = this.brokerController.getConsumerManager().registerConsumer(
-                    data.getGroupName(),
-                    clientChannelInfo,
-                    data.getConsumeType(),
-                    data.getMessageModel(),
-                    data.getConsumeFromWhere(),
-                    data.getSubscriptionDataSet(),
-                    isNotifyConsumerIdsChangedEnable
+                data.getGroupName(),
+                clientChannelInfo,
+                data.getConsumeType(),
+                data.getMessageModel(),
+                data.getConsumeFromWhere(),
+                data.getSubscriptionDataSet(),
+                isNotifyConsumerIdsChangedEnable
             );
 
             if (changed) {
                 log.info("registerConsumer info changed {} {}",
-                        data.toString(),
-                        RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+                    data.toString(),
+                    RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                 );
             }
         }
 
         for (ProducerData data : heartbeatData.getProducerDataSet()) {
             this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
-                    clientChannelInfo);
+                clientChannelInfo);
         }
         response.setCode(ResponseCode.SUCCESS);
         response.setRemark(null);
@@ -122,18 +121,18 @@ public class ClientManageProcessor implements NettyRequestProcessor {
     }
 
     public RemotingCommand unregisterClient(ChannelHandlerContext ctx, RemotingCommand request)
-            throws RemotingCommandException {
+        throws RemotingCommandException {
         final RemotingCommand response =
-                RemotingCommand.createResponseCommand(UnregisterClientResponseHeader.class);
+            RemotingCommand.createResponseCommand(UnregisterClientResponseHeader.class);
         final UnregisterClientRequestHeader requestHeader =
-                (UnregisterClientRequestHeader) request
-                        .decodeCommandCustomHeader(UnregisterClientRequestHeader.class);
+            (UnregisterClientRequestHeader)request
+                .decodeCommandCustomHeader(UnregisterClientRequestHeader.class);
 
         ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
-                ctx.channel(),
-                requestHeader.getClientID(),
-                request.getLanguage(),
-                request.getVersion());
+            ctx.channel(),
+            requestHeader.getClientID(),
+            request.getLanguage(),
+            request.getVersion());
         {
             final String group = requestHeader.getProducerGroup();
             if (group != null) {
@@ -145,7 +144,7 @@ public class ClientManageProcessor implements NettyRequestProcessor {
             final String group = requestHeader.getConsumerGroup();
             if (group != null) {
                 SubscriptionGroupConfig subscriptionGroupConfig =
-                        this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(group);
+                    this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(group);
                 boolean isNotifyConsumerIdsChangedEnable = true;
                 if (null != subscriptionGroupConfig) {
                     isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index c0c43e0..d2e6d7d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -16,36 +16,39 @@
  */
 package org.apache.rocketmq.broker.processor;
 
+import io.netty.channel.ChannelHandlerContext;
+import java.util.List;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody;
+import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import io.netty.channel.ChannelHandlerContext;
-import org.apache.rocketmq.common.protocol.header.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-
-
 public class ConsumerManageProcessor implements NettyRequestProcessor {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
 
     private final BrokerController brokerController;
 
-
     public ConsumerManageProcessor(final BrokerController brokerController) {
         this.brokerController = brokerController;
     }
 
     @Override
     public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
-            throws RemotingCommandException {
+        throws RemotingCommandException {
         switch (request.getCode()) {
             case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
                 return this.getConsumerListByGroup(ctx, request);
@@ -64,18 +67,17 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
         return false;
     }
 
-
     public RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request)
-            throws RemotingCommandException {
+        throws RemotingCommandException {
         final RemotingCommand response =
-                RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);
+            RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);
         final GetConsumerListByGroupRequestHeader requestHeader =
-                (GetConsumerListByGroupRequestHeader) request
-                        .decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
+            (GetConsumerListByGroupRequestHeader)request
+                .decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);
 
         ConsumerGroupInfo consumerGroupInfo =
-                this.brokerController.getConsumerManager().getConsumerGroupInfo(
-                        requestHeader.getConsumerGroup());
+            this.brokerController.getConsumerManager().getConsumerGroupInfo(
+                requestHeader.getConsumerGroup());
         if (consumerGroupInfo != null) {
             List<String> clientIds = consumerGroupInfo.getAllClientId();
             if (!clientIds.isEmpty()) {
@@ -87,11 +89,11 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
                 return response;
             } else {
                 log.warn("getAllClientId failed, {} {}", requestHeader.getConsumerGroup(),
-                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
             }
         } else {
             log.warn("getConsumerGroupInfo failed, {} {}", requestHeader.getConsumerGroup(),
-                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+                RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
         }
 
         response.setCode(ResponseCode.SYSTEM_ERROR);
@@ -100,34 +102,32 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
     }
 
     private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
-            throws RemotingCommandException {
+        throws RemotingCommandException {
         final RemotingCommand response =
-                RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);
+            RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);
         final UpdateConsumerOffsetRequestHeader requestHeader =
-                (UpdateConsumerOffsetRequestHeader) request
-                        .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
+            (UpdateConsumerOffsetRequestHeader)request
+                .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
         this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(),
-                requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
+            requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
         response.setCode(ResponseCode.SUCCESS);
         response.setRemark(null);
         return response;
     }
 
-
     private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
-            throws RemotingCommandException {
+        throws RemotingCommandException {
         final RemotingCommand response =
-                RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
+            RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
         final QueryConsumerOffsetResponseHeader responseHeader =
-                (QueryConsumerOffsetResponseHeader) response.readCustomHeader();
+            (QueryConsumerOffsetResponseHeader)response.readCustomHeader();
         final QueryConsumerOffsetRequestHeader requestHeader =
-                (QueryConsumerOffsetRequestHeader) request
-                        .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);
+            (QueryConsumerOffsetRequestHeader)request
+                .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);
 
         long offset =
-                this.brokerController.getConsumerOffsetManager().queryOffset(
-                        requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
-
+            this.brokerController.getConsumerOffsetManager().queryOffset(
+                requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
 
         if (offset >= 0) {
             responseHeader.setOffset(offset);
@@ -135,11 +135,11 @@ public class ConsumerManageProcessor implements NettyRequestProcessor {
             response.setRemark(null);
         } else {
             long minOffset =
-                    this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(),
-                            requestHeader.getQueueId());
+                this.brokerController.getMessageStore().getMinOffsetInQuque(requestHeader.getTopic(),
+                    requestHeader.getQueueId());
             if (minOffset <= 0
-                    && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
-                    requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
+                && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(
+                requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
                 responseHeader.setOffset(0L);
                 response.setCode(ResponseCode.SUCCESS);
                 response.setRemark(null);