You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:15:22 UTC

[91/99] [abbrv] incubator-rocketmq git commit: ROCKETMQ-18 Reformat all codes.

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
index c493c70..6a34a69 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.broker.processor;
 
+import io.netty.channel.ChannelHandlerContext;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.TopicFilterType;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -33,11 +34,9 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.store.MessageExtBrokerInner;
 import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.PutMessageResult;
-import io.netty.channel.ChannelHandlerContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class EndTransactionProcessor implements NettyRequestProcessor {
     private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
     private final BrokerController brokerController;
@@ -50,36 +49,35 @@ public class EndTransactionProcessor implements NettyRequestProcessor {
     public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         final EndTransactionRequestHeader requestHeader =
-                (EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
-
+            (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
 
         if (requestHeader.getFromTransactionCheck()) {
             switch (requestHeader.getCommitOrRollback()) {
                 case MessageSysFlag.TRANSACTION_NOT_TYPE: {
                     LOGGER.warn("check producer[{}] transaction state, but it's pending status."
-                                    + "RequestHeader: {} Remark: {}",
-                            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
-                            requestHeader.toString(),
-                            request.getRemark());
+                            + "RequestHeader: {} Remark: {}",
+                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+                        requestHeader.toString(),
+                        request.getRemark());
                     return null;
                 }
 
                 case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
                     LOGGER.warn("check producer[{}] transaction state, the producer commit the message."
-                                    + "RequestHeader: {} Remark: {}",
-                            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
-                            requestHeader.toString(),
-                            request.getRemark());
+                            + "RequestHeader: {} Remark: {}",
+                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+                        requestHeader.toString(),
+                        request.getRemark());
 
                     break;
                 }
 
                 case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
                     LOGGER.warn("check producer[{}] transaction state, the producer rollback the message."
-                                    + "RequestHeader: {} Remark: {}",
-                            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
-                            requestHeader.toString(),
-                            request.getRemark());
+                            + "RequestHeader: {} Remark: {}",
+                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+                        requestHeader.toString(),
+                        request.getRemark());
                     break;
                 }
                 default:
@@ -89,10 +87,10 @@ public class EndTransactionProcessor implements NettyRequestProcessor {
             switch (requestHeader.getCommitOrRollback()) {
                 case MessageSysFlag.TRANSACTION_NOT_TYPE: {
                     LOGGER.warn("the producer[{}] end transaction in sending message,  and it's pending status."
-                                    + "RequestHeader: {} Remark: {}",
-                            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
-                            requestHeader.toString(),
-                            request.getRemark());
+                            + "RequestHeader: {} Remark: {}",
+                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+                        requestHeader.toString(),
+                        request.getRemark());
                     return null;
                 }
 
@@ -102,10 +100,10 @@ public class EndTransactionProcessor implements NettyRequestProcessor {
 
                 case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
                     LOGGER.warn("the producer[{}] end transaction in sending message, rollback the message."
-                                    + "RequestHeader: {} Remark: {}",
-                            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
-                            requestHeader.toString(),
-                            request.getRemark());
+                            + "RequestHeader: {} Remark: {}",
+                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
+                        requestHeader.toString(),
+                        request.getRemark());
                     break;
                 }
                 default:
@@ -210,8 +208,8 @@ public class EndTransactionProcessor implements NettyRequestProcessor {
         MessageAccessor.setProperties(msgInner, msgExt.getProperties());
 
         TopicFilterType topicFilterType =
-                (msgInner.getSysFlag() & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG ? TopicFilterType.MULTI_TAG
-                        : TopicFilterType.SINGLE_TAG;
+            (msgInner.getSysFlag() & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG ? TopicFilterType.MULTI_TAG
+                : TopicFilterType.SINGLE_TAG;
         long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
         msgInner.setTagsCode(tagsCodeValue);
         msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java
index 67e55a4..2a6482c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ForwardRequestProcessor.java
@@ -6,36 +6,33 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.broker.processor;
 
+import io.netty.channel.ChannelHandlerContext;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import io.netty.channel.ChannelHandlerContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class ForwardRequestProcessor implements NettyRequestProcessor {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
 
     private final BrokerController brokerController;
 
-
     public ForwardRequestProcessor(final BrokerController brokerController) {
         this.brokerController = brokerController;
     }
 
-
     @Override
     public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
         return null;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 041037f..7169b9c 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -16,6 +16,13 @@
  */
 package org.apache.rocketmq.broker.processor;
 
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.FileRegion;
+import java.nio.ByteBuffer;
+import java.util.List;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
 import org.apache.rocketmq.broker.longpolling.PullRequest;
@@ -49,14 +56,9 @@ import org.apache.rocketmq.store.MessageExtBrokerInner;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
-import io.netty.channel.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
-import java.util.List;
-
-
 public class PullMessageProcessor implements NettyRequestProcessor {
     private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private final BrokerController brokerController;
@@ -77,12 +79,11 @@ public class PullMessageProcessor implements NettyRequestProcessor {
     }
 
     private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
-            throws RemotingCommandException {
+        throws RemotingCommandException {
         RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
-        final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
+        final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader)response.readCustomHeader();
         final PullMessageRequestHeader requestHeader =
-                (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
-
+            (PullMessageRequestHeader)request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
 
         response.setOpaque(request.getOpaque());
 
@@ -90,24 +91,21 @@ public class PullMessageProcessor implements NettyRequestProcessor {
             LOG.debug("receive PullMessage request command, " + request);
         }
 
-
         if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
             response.setCode(ResponseCode.NO_PERMISSION);
             response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] pulling message is forbidden");
             return response;
         }
 
-
         SubscriptionGroupConfig subscriptionGroupConfig =
-                this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
+            this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
         if (null == subscriptionGroupConfig) {
             response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
             response.setRemark("subscription group not exist, " + requestHeader.getConsumerGroup() + " "
-                    + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
+                + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
             return response;
         }
 
-
         if (!subscriptionGroupConfig.isConsumeEnable()) {
             response.setCode(ResponseCode.NO_PERMISSION);
             response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
@@ -120,49 +118,45 @@ public class PullMessageProcessor implements NettyRequestProcessor {
 
         final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;
 
-
         TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
         if (null == topicConfig) {
             LOG.error("the topic " + requestHeader.getTopic() + " not exist, consumer: " + RemotingHelper.parseChannelRemoteAddr(channel));
             response.setCode(ResponseCode.TOPIC_NOT_EXIST);
             response.setRemark(
-                    "topic[" + requestHeader.getTopic() + "] not exist, apply first please!" + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
+                "topic[" + requestHeader.getTopic() + "] not exist, apply first please!" + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
             return response;
         }
 
-
         if (!PermName.isReadable(topicConfig.getPerm())) {
             response.setCode(ResponseCode.NO_PERMISSION);
             response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");
             return response;
         }
 
-
         if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
             String errorInfo = "queueId[" + requestHeader.getQueueId() + "] is illagal,Topic :" + requestHeader.getTopic()
-                    + " topicConfig.readQueueNums: " + topicConfig.getReadQueueNums() + " consumer: " + channel.remoteAddress();
+                + " topicConfig.readQueueNums: " + topicConfig.getReadQueueNums() + " consumer: " + channel.remoteAddress();
             LOG.warn(errorInfo);
             response.setCode(ResponseCode.SYSTEM_ERROR);
             response.setRemark(errorInfo);
             return response;
         }
 
-
         SubscriptionData subscriptionData = null;
         if (hasSubscriptionFlag) {
             try {
                 subscriptionData = FilterAPI.buildSubscriptionData(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
-                        requestHeader.getSubscription());
+                    requestHeader.getSubscription());
             } catch (Exception e) {
                 LOG.warn("parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), //
-                        requestHeader.getConsumerGroup());
+                    requestHeader.getConsumerGroup());
                 response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
                 response.setRemark("parse the consumer's subscription failed");
                 return response;
             }
         } else {
             ConsumerGroupInfo consumerGroupInfo =
-                    this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
+                this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
             if (null == consumerGroupInfo) {
                 LOG.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
                 response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
@@ -171,7 +165,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
             }
 
             if (!subscriptionGroupConfig.isConsumeBroadcastEnable() //
-                    && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
+                && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
                 response.setCode(ResponseCode.NO_PERMISSION);
                 response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
                 return response;
@@ -185,10 +179,9 @@ public class PullMessageProcessor implements NettyRequestProcessor {
                 return response;
             }
 
-
             if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
                 LOG.warn("the broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
-                        subscriptionData.getSubString());
+                    subscriptionData.getSubString());
                 response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
                 response.setRemark("the consumer's subscription not latest");
                 return response;
@@ -196,15 +189,14 @@ public class PullMessageProcessor implements NettyRequestProcessor {
         }
 
         final GetMessageResult getMessageResult =
-                this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
-                        requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), subscriptionData);
+            this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
+                requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), subscriptionData);
         if (getMessageResult != null) {
             response.setRemark(getMessageResult.getStatus().name());
             responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
             responseHeader.setMinOffset(getMessageResult.getMinOffset());
             responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
 
-
             if (getMessageResult.isSuggestPullingFromSlave()) {
                 responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
             } else {
@@ -250,11 +242,11 @@ public class PullMessageProcessor implements NettyRequestProcessor {
 
                         // XXX: warn and notify me
                         LOG.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", //
-                                requestHeader.getQueueOffset(), //
-                                getMessageResult.getNextBeginOffset(), //
-                                requestHeader.getTopic(), //
-                                requestHeader.getQueueId(), //
-                                requestHeader.getConsumerGroup()//
+                            requestHeader.getQueueOffset(), //
+                            getMessageResult.getNextBeginOffset(), //
+                            requestHeader.getTopic(), //
+                            requestHeader.getQueueId(), //
+                            requestHeader.getConsumerGroup()//
                         );
                     } else {
                         response.setCode(ResponseCode.PULL_NOT_FOUND);
@@ -270,7 +262,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
                     response.setCode(ResponseCode.PULL_OFFSET_MOVED);
                     // XXX: warn and notify me
                     LOG.info("the request offset: " + requestHeader.getQueueOffset() + " over flow badly, broker max offset: "
-                            + getMessageResult.getMaxOffset() + ", consumer: " + channel.remoteAddress());
+                        + getMessageResult.getMaxOffset() + ", consumer: " + channel.remoteAddress());
                     break;
                 case OFFSET_OVERFLOW_ONE:
                     response.setCode(ResponseCode.PULL_NOT_FOUND);
@@ -278,8 +270,8 @@ public class PullMessageProcessor implements NettyRequestProcessor {
                 case OFFSET_TOO_SMALL:
                     response.setCode(ResponseCode.PULL_OFFSET_MOVED);
                     LOG.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
-                            requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
-                            getMessageResult.getMinOffset(), channel.remoteAddress());
+                        requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
+                        getMessageResult.getMinOffset(), channel.remoteAddress());
                     break;
                 default:
                     assert false;
@@ -308,7 +300,6 @@ public class PullMessageProcessor implements NettyRequestProcessor {
                     case ResponseCode.PULL_NOT_FOUND:
                         if (!brokerAllowSuspend) {
 
-
                             context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
                             context.setCommercialRcvTimes(1);
                             context.setCommercialOwner(owner);
@@ -333,23 +324,23 @@ public class PullMessageProcessor implements NettyRequestProcessor {
                 case ResponseCode.SUCCESS:
 
                     this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
-                            getMessageResult.getMessageCount());
+                        getMessageResult.getMessageCount());
 
                     this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
-                            getMessageResult.getBufferTotalSize());
+                        getMessageResult.getBufferTotalSize());
 
                     this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
                     if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
                         final long beginTimeMills = this.brokerController.getMessageStore().now();
                         final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
                         this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
-                                requestHeader.getTopic(), requestHeader.getQueueId(),
-                                (int) (this.brokerController.getMessageStore().now() - beginTimeMills));
+                            requestHeader.getTopic(), requestHeader.getQueueId(),
+                            (int)(this.brokerController.getMessageStore().now() - beginTimeMills));
                         response.setBody(r);
                     } else {
                         try {
                             FileRegion fileRegion =
-                                    new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
+                                new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
                             channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
                                 @Override
                                 public void operationComplete(ChannelFuture future) throws Exception {
@@ -379,18 +370,17 @@ public class PullMessageProcessor implements NettyRequestProcessor {
                         long offset = requestHeader.getQueueOffset();
                         int queueId = requestHeader.getQueueId();
                         PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
-                                this.brokerController.getMessageStore().now(), offset, subscriptionData);
+                            this.brokerController.getMessageStore().now(), offset, subscriptionData);
                         this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
                         response = null;
                         break;
                     }
 
-
                 case ResponseCode.PULL_RETRY_IMMEDIATELY:
                     break;
                 case ResponseCode.PULL_OFFSET_MOVED:
                     if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE
-                            || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
+                        || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
                         MessageQueue mq = new MessageQueue();
                         mq.setTopic(requestHeader.getTopic());
                         mq.setQueueId(requestHeader.getQueueId());
@@ -403,15 +393,15 @@ public class PullMessageProcessor implements NettyRequestProcessor {
                         event.setOffsetNew(getMessageResult.getNextBeginOffset());
                         this.generateOffsetMovedEvent(event);
                         LOG.warn(
-                                "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
-                                requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
-                                responseHeader.getSuggestWhichBrokerId());
+                            "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
+                            requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
+                            responseHeader.getSuggestWhichBrokerId());
                     } else {
                         responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
                         response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                         LOG.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",
-                                requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
-                                responseHeader.getSuggestWhichBrokerId());
+                            requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
+                            responseHeader.getSuggestWhichBrokerId());
                     }
 
                     break;
@@ -423,19 +413,17 @@ public class PullMessageProcessor implements NettyRequestProcessor {
             response.setRemark("store getMessage return null");
         }
 
-
         boolean storeOffsetEnable = brokerAllowSuspend;
         storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
         storeOffsetEnable = storeOffsetEnable
-                && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
+            && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
         if (storeOffsetEnable) {
             this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
-                    requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
+                requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
         }
         return response;
     }
 
-
     public boolean hasConsumeMessageHook() {
         return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty();
     }
@@ -512,7 +500,7 @@ public class PullMessageProcessor implements NettyRequestProcessor {
                                 public void operationComplete(ChannelFuture future) throws Exception {
                                     if (!future.isSuccess()) {
                                         LOG.error("processRequestWrapper response to " + future.channel().remoteAddress() + " failed",
-                                                future.cause());
+                                            future.cause());
                                         LOG.error(request.toString());
                                         LOG.error(response.toString());
                                     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
index 0b6b775..04f206f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java
@@ -6,16 +6,20 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.broker.processor;
 
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.FileRegion;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.pagecache.OneMessageTransfer;
 import org.apache.rocketmq.broker.pagecache.QueryMessageTransfer;
@@ -31,28 +35,21 @@ import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.store.QueryMessageResult;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.FileRegion;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class QueryMessageProcessor implements NettyRequestProcessor {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
 
     private final BrokerController brokerController;
 
-
     public QueryMessageProcessor(final BrokerController brokerController) {
         this.brokerController = brokerController;
     }
 
-
     @Override
     public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
-            throws RemotingCommandException {
+        throws RemotingCommandException {
         switch (request.getCode()) {
             case RequestCode.QUERY_MESSAGE:
                 return this.queryMessage(ctx, request);
@@ -70,44 +67,40 @@ public class QueryMessageProcessor implements NettyRequestProcessor {
         return false;
     }
 
-
     public RemotingCommand queryMessage(ChannelHandlerContext ctx, RemotingCommand request)
-            throws RemotingCommandException {
+        throws RemotingCommandException {
         final RemotingCommand response =
-                RemotingCommand.createResponseCommand(QueryMessageResponseHeader.class);
+            RemotingCommand.createResponseCommand(QueryMessageResponseHeader.class);
         final QueryMessageResponseHeader responseHeader =
-                (QueryMessageResponseHeader) response.readCustomHeader();
+            (QueryMessageResponseHeader)response.readCustomHeader();
         final QueryMessageRequestHeader requestHeader =
-                (QueryMessageRequestHeader) request
-                        .decodeCommandCustomHeader(QueryMessageRequestHeader.class);
-
+            (QueryMessageRequestHeader)request
+                .decodeCommandCustomHeader(QueryMessageRequestHeader.class);
 
         response.setOpaque(request.getOpaque());
 
-
         String isUniqueKey = request.getExtFields().get(MixAll.UNIQUE_MSG_QUERY_FLAG);
         if (isUniqueKey != null && isUniqueKey.equals("true")) {
             requestHeader.setMaxNum(this.brokerController.getMessageStoreConfig().getDefaultQueryMaxNum());
         }
 
         final QueryMessageResult queryMessageResult =
-                this.brokerController.getMessageStore().queryMessage(requestHeader.getTopic(),
-                        requestHeader.getKey(), requestHeader.getMaxNum(), requestHeader.getBeginTimestamp(),
-                        requestHeader.getEndTimestamp());
+            this.brokerController.getMessageStore().queryMessage(requestHeader.getTopic(),
+                requestHeader.getKey(), requestHeader.getMaxNum(), requestHeader.getBeginTimestamp(),
+                requestHeader.getEndTimestamp());
         assert queryMessageResult != null;
 
         responseHeader.setIndexLastUpdatePhyoffset(queryMessageResult.getIndexLastUpdatePhyoffset());
         responseHeader.setIndexLastUpdateTimestamp(queryMessageResult.getIndexLastUpdateTimestamp());
 
-
         if (queryMessageResult.getBufferTotalSize() > 0) {
             response.setCode(ResponseCode.SUCCESS);
             response.setRemark(null);
 
             try {
                 FileRegion fileRegion =
-                        new QueryMessageTransfer(response.encodeHeader(queryMessageResult
-                                .getBufferTotalSize()), queryMessageResult);
+                    new QueryMessageTransfer(response.encodeHeader(queryMessageResult
+                        .getBufferTotalSize()), queryMessageResult);
                 ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
                     @Override
                     public void operationComplete(ChannelFuture future) throws Exception {
@@ -130,26 +123,24 @@ public class QueryMessageProcessor implements NettyRequestProcessor {
         return response;
     }
 
-
     public RemotingCommand viewMessageById(ChannelHandlerContext ctx, RemotingCommand request)
-            throws RemotingCommandException {
+        throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         final ViewMessageRequestHeader requestHeader =
-                (ViewMessageRequestHeader) request.decodeCommandCustomHeader(ViewMessageRequestHeader.class);
-
+            (ViewMessageRequestHeader)request.decodeCommandCustomHeader(ViewMessageRequestHeader.class);
 
         response.setOpaque(request.getOpaque());
 
         final SelectMappedBufferResult selectMappedBufferResult =
-                this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset());
+            this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset());
         if (selectMappedBufferResult != null) {
             response.setCode(ResponseCode.SUCCESS);
             response.setRemark(null);
 
             try {
                 FileRegion fileRegion =
-                        new OneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()),
-                                selectMappedBufferResult);
+                    new OneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()),
+                        selectMappedBufferResult);
                 ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
                     @Override
                     public void operationComplete(ChannelFuture future) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 6002df2..1b95205 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -17,11 +17,17 @@
 package org.apache.rocketmq.broker.processor;
 
 import io.netty.channel.ChannelHandlerContext;
+import java.net.SocketAddress;
+import java.util.List;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
 import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
 import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
-import org.apache.rocketmq.common.*;
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.help.FAQUrl;
 import org.apache.rocketmq.common.message.MessageAccessor;
@@ -44,10 +50,6 @@ import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.config.StorePathConfigHelper;
 import org.apache.rocketmq.store.stats.BrokerStatsManager;
 
-import java.net.SocketAddress;
-import java.util.List;
-
-
 public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
 
     private List<ConsumeMessageHook> consumeMessageHookList;
@@ -80,14 +82,14 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
     @Override
     public boolean rejectRequest() {
         return this.brokerController.getMessageStore().isOSPageCacheBusy() ||
-                this.brokerController.getMessageStore().isTransientStorePoolDeficient();
+            this.brokerController.getMessageStore().isTransientStorePoolDeficient();
     }
 
     private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
-            throws RemotingCommandException {
+        throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         final ConsumerSendMsgBackRequestHeader requestHeader =
-                (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
+            (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
 
         if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {
 
@@ -101,24 +103,21 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
             this.executeConsumeMessageHookAfter(context);
         }
 
-
         SubscriptionGroupConfig subscriptionGroupConfig =
-                this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
+            this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
         if (null == subscriptionGroupConfig) {
             response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
             response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
-                    + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
+                + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
             return response;
         }
 
-
         if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
             response.setCode(ResponseCode.NO_PERMISSION);
             response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
             return response;
         }
 
-
         if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
             response.setCode(ResponseCode.SUCCESS);
             response.setRemark(null);
@@ -128,24 +127,21 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
         String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
         int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
 
-
         int topicSysFlag = 0;
         if (requestHeader.isUnitMode()) {
             topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
         }
 
-
         TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(//
-                newTopic, //
-                subscriptionGroupConfig.getRetryQueueNums(), //
-                PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
+            newTopic, //
+            subscriptionGroupConfig.getRetryQueueNums(), //
+            PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
         if (null == topicConfig) {
             response.setCode(ResponseCode.SYSTEM_ERROR);
             response.setRemark("topic[" + newTopic + "] not exist");
             return response;
         }
 
-
         if (!PermName.isWriteable(topicConfig.getPerm())) {
             response.setCode(ResponseCode.NO_PERMISSION);
             response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
@@ -159,31 +155,27 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
             return response;
         }
 
-
         final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
         if (null == retryTopic) {
             MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
         }
         msgExt.setWaitStoreMsgOK(false);
 
-
         int delayLevel = requestHeader.getDelayLevel();
 
-
         int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
         if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
             maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
         }
 
-
         if (msgExt.getReconsumeTimes() >= maxReconsumeTimes//
-                || delayLevel < 0) {
+            || delayLevel < 0) {
             newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
             queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
 
             topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
-                    DLQ_NUMS_PER_GROUP, //
-                    PermName.PERM_WRITE, 0
+                DLQ_NUMS_PER_GROUP, //
+                PermName.PERM_WRITE, 0
             );
             if (null == topicConfig) {
                 response.setCode(ResponseCode.SYSTEM_ERROR);
@@ -247,13 +239,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
     }
 
     private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //
-                                        final RemotingCommand request, //
-                                        final SendMessageContext sendMessageContext, //
-                                        final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
+        final RemotingCommand request, //
+        final SendMessageContext sendMessageContext, //
+        final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
 
         final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
-        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
-
+        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
 
         response.setOpaque(request.getOpaque());
 
@@ -296,15 +287,14 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
         if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
             String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
             SubscriptionGroupConfig subscriptionGroupConfig =
-                    this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
+                this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
             if (null == subscriptionGroupConfig) {
                 response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
                 response.setRemark(
-                        "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
+                    "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
                 return response;
             }
 
-
             int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
             if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
                 maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
@@ -314,8 +304,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
                 newTopic = MixAll.getDLQTopic(groupName);
                 queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
                 topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
-                        DLQ_NUMS_PER_GROUP, //
-                        PermName.PERM_WRITE, 0
+                    DLQ_NUMS_PER_GROUP, //
+                    PermName.PERM_WRITE, 0
                 );
                 if (null == topicConfig) {
                     response.setCode(ResponseCode.SYSTEM_ERROR);
@@ -344,7 +334,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
             if (traFlag != null) {
                 response.setCode(ResponseCode.NO_PERMISSION);
                 response.setRemark(
-                        "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");
+                    "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");
                 return response;
             }
         }
@@ -381,12 +371,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
                 case PROPERTIES_SIZE_EXCEEDED:
                     response.setCode(ResponseCode.MESSAGE_ILLEGAL);
                     response.setRemark(
-                            "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
+                        "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
                     break;
                 case SERVICE_NOT_AVAILABLE:
                     response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
                     response.setRemark(
-                            "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");
+                        "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");
                     break;
                 case OS_PAGECACHE_BUSY:
                     response.setCode(ResponseCode.SYSTEM_ERROR);
@@ -407,7 +397,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
 
                 this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
                 this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
-                        putMessageResult.getAppendMessageResult().getWroteBytes());
+                    putMessageResult.getAppendMessageResult().getWroteBytes());
                 this.brokerController.getBrokerStatsManager().incBrokerPutNums();
 
                 response.setRemark(null);
@@ -416,10 +406,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
                 responseHeader.setQueueId(queueIdInt);
                 responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
 
-
                 doResponse(ctx, request, response);
 
-
                 if (hasSendMessageHook()) {
                     sendMessageContext.setMsgId(responseHeader.getMsgId());
                     sendMessageContext.setQueueId(responseHeader.getQueueId());
@@ -427,7 +415,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
 
                     int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
                     int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
-                    int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
+                    int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
 
                     sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
                     sendMessageContext.setCommercialSendTimes(incValue);
@@ -438,7 +426,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
             } else {
                 if (hasSendMessageHook()) {
                     int wroteSize = request.getBody().length;
-                    int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
+                    int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
 
                     sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
                     sendMessageContext.setCommercialSendTimes(incValue);
@@ -479,11 +467,11 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
         double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
 
         String storePathLogis =
-                StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
+            StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
         double logisRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogis);
 
         String storePathIndex =
-                StorePathConfigHelper.getStorePathIndex(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
+            StorePathConfigHelper.getStorePathIndex(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
         double indexRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathIndex);
 
         return String.format("CL: %5.2f CQ: %5.2f INDEX: %5.2f", physicRatio, logisRatio, indexRatio);

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
index 2db2317..2545f1f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
@@ -6,16 +6,17 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.broker.slave;
 
+import java.io.IOException;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
 import org.apache.rocketmq.common.MixAll;
@@ -27,30 +28,23 @@ import org.apache.rocketmq.store.config.StorePathConfigHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-
-
 public class SlaveSynchronize {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private final BrokerController brokerController;
     private volatile String masterAddr = null;
 
-
     public SlaveSynchronize(BrokerController brokerController) {
         this.brokerController = brokerController;
     }
 
-
     public String getMasterAddr() {
         return masterAddr;
     }
 
-
     public void setMasterAddr(String masterAddr) {
         this.masterAddr = masterAddr;
     }
 
-
     public void syncAll() {
         this.syncTopicConfig();
         this.syncConsumerOffset();
@@ -58,21 +52,20 @@ public class SlaveSynchronize {
         this.syncSubscriptionGroupConfig();
     }
 
-
     private void syncTopicConfig() {
         String masterAddrBak = this.masterAddr;
         if (masterAddrBak != null) {
             try {
                 TopicConfigSerializeWrapper topicWrapper =
-                        this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
+                    this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
                 if (!this.brokerController.getTopicConfigManager().getDataVersion()
-                        .equals(topicWrapper.getDataVersion())) {
+                    .equals(topicWrapper.getDataVersion())) {
 
                     this.brokerController.getTopicConfigManager().getDataVersion()
-                            .assignNewOne(topicWrapper.getDataVersion());
+                        .assignNewOne(topicWrapper.getDataVersion());
                     this.brokerController.getTopicConfigManager().getTopicConfigTable().clear();
                     this.brokerController.getTopicConfigManager().getTopicConfigTable()
-                            .putAll(topicWrapper.getTopicConfigTable());
+                        .putAll(topicWrapper.getTopicConfigTable());
                     this.brokerController.getTopicConfigManager().persist();
 
                     log.info("update slave topic config from master, {}", masterAddrBak);
@@ -83,15 +76,14 @@ public class SlaveSynchronize {
         }
     }
 
-
     private void syncConsumerOffset() {
         String masterAddrBak = this.masterAddr;
         if (masterAddrBak != null) {
             try {
                 ConsumerOffsetSerializeWrapper offsetWrapper =
-                        this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
+                    this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
                 this.brokerController.getConsumerOffsetManager().getOffsetTable()
-                        .putAll(offsetWrapper.getOffsetTable());
+                    .putAll(offsetWrapper.getOffsetTable());
                 this.brokerController.getConsumerOffsetManager().persist();
                 log.info("update slave consumer offset from master, {}", masterAddrBak);
             } catch (Exception e) {
@@ -100,18 +92,17 @@ public class SlaveSynchronize {
         }
     }
 
-
     private void syncDelayOffset() {
         String masterAddrBak = this.masterAddr;
         if (masterAddrBak != null) {
             try {
                 String delayOffset =
-                        this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);
+                    this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);
                 if (delayOffset != null) {
 
                     String fileName =
-                            StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController
-                                    .getMessageStoreConfig().getStorePathRootDir());
+                        StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController
+                            .getMessageStoreConfig().getStorePathRootDir());
                     try {
                         MixAll.string2File(delayOffset, fileName);
                     } catch (IOException e) {
@@ -125,24 +116,23 @@ public class SlaveSynchronize {
         }
     }
 
-
     private void syncSubscriptionGroupConfig() {
         String masterAddrBak = this.masterAddr;
         if (masterAddrBak != null) {
             try {
                 SubscriptionGroupWrapper subscriptionWrapper =
-                        this.brokerController.getBrokerOuterAPI()
-                                .getAllSubscriptionGroupConfig(masterAddrBak);
+                    this.brokerController.getBrokerOuterAPI()
+                        .getAllSubscriptionGroupConfig(masterAddrBak);
 
                 if (!this.brokerController.getSubscriptionGroupManager().getDataVersion()
-                        .equals(subscriptionWrapper.getDataVersion())) {
+                    .equals(subscriptionWrapper.getDataVersion())) {
                     SubscriptionGroupManager subscriptionGroupManager =
-                            this.brokerController.getSubscriptionGroupManager();
+                        this.brokerController.getSubscriptionGroupManager();
                     subscriptionGroupManager.getDataVersion().assignNewOne(
-                            subscriptionWrapper.getDataVersion());
+                        subscriptionWrapper.getDataVersion());
                     subscriptionGroupManager.getSubscriptionGroupTable().clear();
                     subscriptionGroupManager.getSubscriptionGroupTable().putAll(
-                            subscriptionWrapper.getSubscriptionGroupTable());
+                        subscriptionWrapper.getSubscriptionGroupTable());
                     subscriptionGroupManager.persist();
                     log.info("update slave Subscription Group from master, {}", masterAddrBak);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index 7865bc7..f77249a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -16,6 +16,10 @@
  */
 package org.apache.rocketmq.broker.subscription;
 
+import java.io.File;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.BrokerPathConfigHelper;
 import org.apache.rocketmq.common.ConfigManager;
@@ -27,25 +31,23 @@ import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
-
 public class SubscriptionGroupManager extends ConfigManager {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
 
     private final ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
-            new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);
+        new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);
     private final DataVersion dataVersion = new DataVersion();
     private transient BrokerController brokerController;
 
-
     public SubscriptionGroupManager() {
         this.init();
     }
 
+    public SubscriptionGroupManager(BrokerController brokerController) {
+        this.brokerController = brokerController;
+        this.init();
+    }
+
     private void init() {
         {
             SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
@@ -94,13 +96,6 @@ public class SubscriptionGroupManager extends ConfigManager {
         }
     }
 
-
-    public SubscriptionGroupManager(BrokerController brokerController) {
-        this.brokerController = brokerController;
-        this.init();
-    }
-
-
     public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {
         SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config);
         if (old != null) {
@@ -122,7 +117,6 @@ public class SubscriptionGroupManager extends ConfigManager {
         }
     }
 
-
     public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {
         SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);
         if (null == subscriptionGroupConfig) {
@@ -141,7 +135,6 @@ public class SubscriptionGroupManager extends ConfigManager {
         return subscriptionGroupConfig;
     }
 
-
     @Override
     public String encode() {
         return this.encode(false);
@@ -181,12 +174,10 @@ public class SubscriptionGroupManager extends ConfigManager {
         return subscriptionGroupTable;
     }
 
-
     public DataVersion getDataVersion() {
         return dataVersion;
     }
 
-
     public void deleteSubscriptionGroupConfig(final String groupName) {
         SubscriptionGroupConfig old = this.subscriptionGroupTable.remove(groupName);
         if (old != null) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 9e14332..e826d24 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -16,6 +16,16 @@
  */
 package org.apache.rocketmq.broker.topic;
 
+import java.io.File;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.BrokerPathConfigHelper;
 import org.apache.rocketmq.common.ConfigManager;
@@ -30,34 +40,20 @@ import org.apache.rocketmq.common.sysflag.TopicSysFlag;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-
 public class TopicConfigManager extends ConfigManager {
     private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private static final long LOCK_TIMEOUT_MILLIS = 3000;
     private transient final Lock lockTopicConfigTable = new ReentrantLock();
 
     private final ConcurrentHashMap<String, TopicConfig> topicConfigTable =
-            new ConcurrentHashMap<String, TopicConfig>(1024);
+        new ConcurrentHashMap<String, TopicConfig>(1024);
     private final DataVersion dataVersion = new DataVersion();
     private final Set<String> systemTopicList = new HashSet<String>();
     private transient BrokerController brokerController;
 
-
     public TopicConfigManager() {
     }
 
-
     public TopicConfigManager(BrokerController brokerController) {
         this.brokerController = brokerController;
         {
@@ -76,9 +72,9 @@ public class TopicConfigManager extends ConfigManager {
                 TopicConfig topicConfig = new TopicConfig(topic);
                 this.systemTopicList.add(topic);
                 topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
-                        .getDefaultTopicQueueNums());
+                    .getDefaultTopicQueueNums());
                 topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
-                        .getDefaultTopicQueueNums());
+                    .getDefaultTopicQueueNums());
                 int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
                 topicConfig.setPerm(perm);
                 this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
@@ -130,29 +126,24 @@ public class TopicConfigManager extends ConfigManager {
         }
     }
 
-
     public boolean isSystemTopic(final String topic) {
         return this.systemTopicList.contains(topic);
     }
 
-
     public Set<String> getSystemTopic() {
         return this.systemTopicList;
     }
 
-
     public boolean isTopicCanSendMessage(final String topic) {
         return !topic.equals(MixAll.DEFAULT_TOPIC);
     }
 
-
     public TopicConfig selectTopicConfig(final String topic) {
         return this.topicConfigTable.get(topic);
     }
 
-
     public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,
-                                                      final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {
+        final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {
         TopicConfig topicConfig = null;
         boolean createNew = false;
 
@@ -175,8 +166,8 @@ public class TopicConfigManager extends ConfigManager {
                             topicConfig = new TopicConfig(topic);
 
                             int queueNums =
-                                    clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig
-                                            .getWriteQueueNums() : clientDefaultTopicQueueNums;
+                                clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig
+                                    .getWriteQueueNums() : clientDefaultTopicQueueNums;
 
                             if (queueNums < 0) {
                                 queueNums = 0;
@@ -191,17 +182,17 @@ public class TopicConfigManager extends ConfigManager {
                             topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
                         } else {
                             LOG.warn("create new topic failed, because the default topic[" + defaultTopic
-                                    + "] no perm, " + defaultTopicConfig.getPerm() + " producer: "
-                                    + remoteAddress);
+                                + "] no perm, " + defaultTopicConfig.getPerm() + " producer: "
+                                + remoteAddress);
                         }
                     } else {
                         LOG.warn("create new topic failed, because the default topic[" + defaultTopic
-                                + "] not exist." + " producer: " + remoteAddress);
+                            + "] not exist." + " producer: " + remoteAddress);
                     }
 
                     if (topicConfig != null) {
                         LOG.info("create new topic by default topic[" + defaultTopic + "], " + topicConfig
-                                + " producer: " + remoteAddress);
+                            + " producer: " + remoteAddress);
 
                         this.topicConfigTable.put(topic, topicConfig);
 
@@ -227,10 +218,10 @@ public class TopicConfigManager extends ConfigManager {
     }
 
     public TopicConfig createTopicInSendMessageBackMethod(
-            final String topic,
-            final int clientDefaultTopicQueueNums,
-            final int perm,
-            final int topicSysFlag) {
+        final String topic,
+        final int clientDefaultTopicQueueNums,
+        final int perm,
+        final int topicSysFlag) {
         TopicConfig topicConfig = this.topicConfigTable.get(topic);
         if (topicConfig != null)
             return topicConfig;
@@ -282,7 +273,7 @@ public class TopicConfigManager extends ConfigManager {
             }
 
             LOG.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag,
-                    topicConfig.getTopicSysFlag());
+                topicConfig.getTopicSysFlag());
 
             this.topicConfigTable.put(topic, topicConfig);
 
@@ -302,7 +293,7 @@ public class TopicConfigManager extends ConfigManager {
             }
 
             LOG.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag,
-                    topicConfig.getTopicSysFlag());
+                topicConfig.getTopicSysFlag());
 
             this.topicConfigTable.put(topic, topicConfig);
 
@@ -326,7 +317,6 @@ public class TopicConfigManager extends ConfigManager {
         this.persist();
     }
 
-
     public void updateOrderTopicConfig(final KVTable orderKVTableFromNs) {
 
         if (orderKVTableFromNs != null && orderKVTableFromNs.getTable() != null) {
@@ -403,7 +393,7 @@ public class TopicConfigManager extends ConfigManager {
     public void decode(String jsonString) {
         if (jsonString != null) {
             TopicConfigSerializeWrapper topicConfigSerializeWrapper =
-                    TopicConfigSerializeWrapper.fromJson(jsonString, TopicConfigSerializeWrapper.class);
+                TopicConfigSerializeWrapper.fromJson(jsonString, TopicConfigSerializeWrapper.class);
             if (topicConfigSerializeWrapper != null) {
                 this.topicConfigTable.putAll(topicConfigSerializeWrapper.getTopicConfigTable());
                 this.dataVersion.assignNewOne(topicConfigSerializeWrapper.getDataVersion());

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionRecord.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionRecord.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionRecord.java
index 68256d9..830a0c5 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionRecord.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionRecord.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.rocketmq.broker.transaction;
@@ -22,22 +22,18 @@ public class TransactionRecord {
     private long offset;
     private String producerGroup;
 
-
     public long getOffset() {
         return offset;
     }
 
-
     public void setOffset(long offset) {
         this.offset = offset;
     }
 
-
     public String getProducerGroup() {
         return producerGroup;
     }
 
-
     public void setProducerGroup(String producerGroup) {
         this.producerGroup = producerGroup;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java
index d6e897a..f9b56d5 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java
@@ -6,41 +6,33 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.rocketmq.broker.transaction;
 
 import java.util.List;
 
-
 public interface TransactionStore {
     boolean open();
 
-
     void close();
 
-
     boolean put(final List<TransactionRecord> trs);
 
-
     void remove(final List<Long> pks);
 
-
     List<TransactionRecord> traverse(final long pk, final int nums);
 
-
     long totalRecords();
 
-
     long minPK();
 
-
     long maxPK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java
index 4bf73d2..240e141 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java
@@ -6,17 +6,27 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.rocketmq.broker.transaction.jdbc;
 
+import java.net.URL;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.broker.transaction.TransactionRecord;
 import org.apache.rocketmq.broker.transaction.TransactionStore;
 import org.apache.rocketmq.common.MixAll;
@@ -24,13 +34,6 @@ import org.apache.rocketmq.common.constant.LoggerName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URL;
-import java.sql.*;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicLong;
-
-
 public class JDBCTransactionStore implements TransactionStore {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
     private final JDBCTransactionStoreConfig jdbcTransactionStoreConfig;
@@ -50,11 +53,10 @@ public class JDBCTransactionStore implements TransactionStore {
 
             try {
                 this.connection =
-                        DriverManager.getConnection(this.jdbcTransactionStoreConfig.getJdbcURL(), props);
+                    DriverManager.getConnection(this.jdbcTransactionStoreConfig.getJdbcURL(), props);
 
                 this.connection.setAutoCommit(false);
 
-
                 if (!this.computeTotalRecords()) {
                     return this.createDB();
                 }
@@ -72,7 +74,7 @@ public class JDBCTransactionStore implements TransactionStore {
         try {
             Class.forName(this.jdbcTransactionStoreConfig.getJdbcDriverClass()).newInstance();
             log.info("Loaded the appropriate driver, {}",
-                    this.jdbcTransactionStoreConfig.getJdbcDriverClass());
+                this.jdbcTransactionStoreConfig.getJdbcDriverClass());
             return true;
         } catch (Exception e) {
             log.info("Loaded the appropriate driver Exception", e);

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java
index 5789329..86c1ec8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.rocketmq.broker.transaction.jdbc;
@@ -23,42 +23,34 @@ public class JDBCTransactionStoreConfig {
     private String jdbcUser = "xxx";
     private String jdbcPassword = "xxx";
 
-
     public String getJdbcDriverClass() {
         return jdbcDriverClass;
     }
 
-
     public void setJdbcDriverClass(String jdbcDriverClass) {
         this.jdbcDriverClass = jdbcDriverClass;
     }
 
-
     public String getJdbcURL() {
         return jdbcURL;
     }
 
-
     public void setJdbcURL(String jdbcURL) {
         this.jdbcURL = jdbcURL;
     }
 
-
     public String getJdbcUser() {
         return jdbcUser;
     }
 
-
     public void setJdbcUser(String jdbcUser) {
         this.jdbcUser = jdbcUser;
     }
 
-
     public String getJdbcPassword() {
         return jdbcPassword;
     }
 
-
     public void setJdbcPassword(String jdbcPassword) {
         this.jdbcPassword = jdbcPassword;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
index 6e7b9b0..f7675c2 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerControllerTest.java
@@ -6,13 +6,13 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.rocketmq.broker;
@@ -27,9 +27,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class BrokerControllerTest {
-    protected Logger logger = LoggerFactory.getLogger(BrokerControllerTest.class);
-
     private static final int RESTART_NUM = 3;
+    protected Logger logger = LoggerFactory.getLogger(BrokerControllerTest.class);
 
     /**
      * Tests if the controller can be properly stopped and started.

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/test/java/org/apache/rocketmq/broker/BrokerTestHarness.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerTestHarness.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerTestHarness.java
index 4fd7a5b..5e944d8 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerTestHarness.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerTestHarness.java
@@ -6,13 +6,15 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * $Id: SendMessageTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
  */
 
 /**
@@ -20,6 +22,8 @@
  */
 package org.apache.rocketmq.broker;
 
+import java.io.File;
+import java.util.Random;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyServerConfig;
@@ -30,15 +34,11 @@ import org.junit.Before;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.util.Random;
-
 public class BrokerTestHarness {
 
+    public final String BROKER_NAME = "TestBrokerName";
     protected BrokerController brokerController = null;
-
     protected Random random = new Random();
-    public final String BROKER_NAME = "TestBrokerName";
     protected String brokerAddr = "";
     protected Logger logger = LoggerFactory.getLogger(BrokerTestHarness.class);
     protected BrokerConfig brokerConfig = new BrokerConfig();