You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 08:13:55 UTC

[30/34] incubator-rocketmq git commit: ROCKETMQ-18 Reformat all codes.

ROCKETMQ-18 Reformat all codes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/388ba7a5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/388ba7a5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/388ba7a5

Branch: refs/heads/ROCKETMQ-18
Commit: 388ba7a58465245389a3592904b6fc7ef777dc7a
Parents: 95cfb8d
Author: yukon <yu...@apache.org>
Authored: Wed Dec 28 15:42:48 2016 +0800
Committer: yukon <yu...@apache.org>
Committed: Wed Dec 28 15:42:48 2016 +0800

----------------------------------------------------------------------
 broker/pom.xml                                  |   2 +-
 .../rocketmq/broker/BrokerController.java       | 155 +++--
 .../rocketmq/broker/BrokerPathConfigHelper.java |  20 +-
 .../apache/rocketmq/broker/BrokerStartup.java   |  41 +-
 .../broker/client/ClientChannelInfo.java        |  32 +-
 .../client/ClientHousekeepingService.java       |  29 +-
 .../broker/client/ConsumerGroupInfo.java        |  73 +-
 .../client/ConsumerIdsChangeListener.java       |  14 +-
 .../rocketmq/broker/client/ConsumerManager.java |  38 +-
 .../DefaultConsumerIdsChangeListener.java       |  18 +-
 .../rocketmq/broker/client/ProducerManager.java |  54 +-
 .../broker/client/net/Broker2Client.java        |  96 ++-
 .../client/rebalance/RebalanceLockManager.java  | 107 ++-
 .../broker/filtersrv/FilterServerManager.java   |  36 +-
 .../broker/filtersrv/FilterServerUtil.java      |  13 +-
 .../broker/latency/BrokerFastFailure.java       |  44 +-
 .../latency/BrokerFixedThreadPoolExecutor.java  |  19 +-
 .../broker/longpolling/ManyPullRequest.java     |  18 +-
 .../NotifyMessageArrivingListener.java          |  15 +-
 .../broker/longpolling/PullRequest.java         |  23 +-
 .../longpolling/PullRequestHoldService.java     |  16 +-
 .../broker/mqtrace/ConsumeMessageContext.java   |  36 +-
 .../broker/mqtrace/ConsumeMessageHook.java      |  14 +-
 .../broker/mqtrace/SendMessageContext.java      |  43 +-
 .../broker/mqtrace/SendMessageHook.java         |  14 +-
 .../broker/offset/ConsumerOffsetManager.java    |  39 +-
 .../rocketmq/broker/out/BrokerOuterAPI.java     |  90 +--
 .../broker/pagecache/ManyMessageTransfer.java   |  18 +-
 .../broker/pagecache/OneMessageTransfer.java    |  18 +-
 .../broker/pagecache/QueryMessageTransfer.java  |  18 +-
 .../plugin/AbstractPluginMessageStore.java      |  28 +-
 .../broker/plugin/MessageStoreFactory.java      |  21 +-
 .../plugin/MessageStorePluginContext.java       |  17 +-
 .../processor/AbstractSendMessageProcessor.java |  98 ++-
 .../broker/processor/AdminBrokerProcessor.java  | 237 ++++---
 .../broker/processor/ClientManageProcessor.java |  61 +-
 .../processor/ConsumerManageProcessor.java      |  68 +-
 .../processor/EndTransactionProcessor.java      |  50 +-
 .../processor/ForwardRequestProcessor.java      |  17 +-
 .../broker/processor/PullMessageProcessor.java  | 100 ++-
 .../broker/processor/QueryMessageProcessor.java |  61 +-
 .../broker/processor/SendMessageProcessor.java  |  80 +--
 .../rocketmq/broker/slave/SlaveSynchronize.java |  54 +-
 .../subscription/SubscriptionGroupManager.java  |  29 +-
 .../broker/topic/TopicConfigManager.java        |  64 +-
 .../broker/transaction/TransactionRecord.java   |  16 +-
 .../broker/transaction/TransactionStore.java    |  20 +-
 .../transaction/jdbc/JDBCTransactionStore.java  |  34 +-
 .../jdbc/JDBCTransactionStoreConfig.java        |  20 +-
 .../rocketmq/broker/BrokerControllerTest.java   |  15 +-
 .../rocketmq/broker/BrokerTestHarness.java      |  22 +-
 .../rocketmq/broker/api/SendMessageTest.java    |  23 +-
 .../offset/ConsumerOffsetManagerTest.java       |  15 +-
 .../broker/topic/TopicConfigManagerTest.java    |  19 +-
 checkstyle/checkstyle.xml                       |   8 +-
 client/pom.xml                                  |   2 +-
 .../apache/rocketmq/client/ClientConfig.java    |  35 +-
 .../org/apache/rocketmq/client/MQAdmin.java     |  33 +-
 .../org/apache/rocketmq/client/MQHelper.java    |  42 +-
 .../org/apache/rocketmq/client/QueryResult.java |  22 +-
 .../org/apache/rocketmq/client/Validators.java  |  32 +-
 .../rocketmq/client/admin/MQAdminExtInner.java  |  12 +-
 .../client/common/ThreadLocalIndex.java         |  20 +-
 .../consumer/AllocateMessageQueueStrategy.java  |  27 +-
 .../client/consumer/DefaultMQPullConsumer.java  |  75 +--
 .../client/consumer/DefaultMQPushConsumer.java  |  81 +--
 .../rocketmq/client/consumer/MQConsumer.java    |  22 +-
 .../client/consumer/MQPullConsumer.java         |  52 +-
 .../consumer/MQPullConsumerScheduleService.java |  36 +-
 .../client/consumer/MQPushConsumer.java         |  23 +-
 .../client/consumer/MessageQueueListener.java   |  18 +-
 .../rocketmq/client/consumer/PullCallback.java  |  12 +-
 .../rocketmq/client/consumer/PullResult.java    |  30 +-
 .../rocketmq/client/consumer/PullStatus.java    |  12 +-
 .../client/consumer/PullTaskCallback.java       |  13 +-
 .../client/consumer/PullTaskContext.java        |  16 +-
 .../listener/ConsumeConcurrentlyContext.java    |   7 -
 .../listener/ConsumeConcurrentlyStatus.java     |  12 +-
 .../listener/ConsumeOrderlyContext.java         |  19 +-
 .../consumer/listener/ConsumeOrderlyStatus.java |  12 +-
 .../consumer/listener/ConsumeReturnType.java    |  12 +-
 .../consumer/listener/MessageListener.java      |  12 +-
 .../listener/MessageListenerConcurrently.java   |  18 +-
 .../listener/MessageListenerOrderly.java        |  18 +-
 .../AllocateMessageQueueAveragely.java          |  19 +-
 .../AllocateMessageQueueAveragelyByCircle.java  |  15 +-
 .../rebalance/AllocateMessageQueueByConfig.java |  19 +-
 .../AllocateMessageQueueByMachineRoom.java      |  21 +-
 .../consumer/store/LocalFileOffsetStore.java    |  51 +-
 .../consumer/store/OffsetSerializeWrapper.java  |  20 +-
 .../client/consumer/store/OffsetStore.java      |  23 +-
 .../client/consumer/store/ReadOffsetType.java   |  12 +-
 .../consumer/store/RemoteBrokerOffsetStore.java |  53 +-
 .../client/exception/MQBrokerException.java     |  18 +-
 .../client/exception/MQClientException.java     |  17 +-
 .../client/hook/CheckForbiddenContext.java      |  40 +-
 .../client/hook/CheckForbiddenHook.java         |  14 +-
 .../client/hook/ConsumeMessageContext.java      |  32 +-
 .../client/hook/ConsumeMessageHook.java         |  12 +-
 .../client/hook/FilterMessageContext.java       |  29 +-
 .../rocketmq/client/hook/FilterMessageHook.java |  13 +-
 .../client/hook/SendMessageContext.java         |  35 +-
 .../rocketmq/client/hook/SendMessageHook.java   |  12 +-
 .../client/impl/ClientRemotingProcessor.java    |  56 +-
 .../rocketmq/client/impl/CommunicationMode.java |  12 +-
 .../rocketmq/client/impl/FindBrokerResult.java  |  15 +-
 .../rocketmq/client/impl/MQAdminImpl.java       | 114 ++--
 .../rocketmq/client/impl/MQClientAPIImpl.java   | 667 ++++++++++---------
 .../rocketmq/client/impl/MQClientManager.java   |  26 +-
 .../ConsumeMessageConcurrentlyService.java      |  88 +--
 .../consumer/ConsumeMessageOrderlyService.java  | 135 ++--
 .../impl/consumer/ConsumeMessageService.java    |  31 +-
 .../consumer/DefaultMQPullConsumerImpl.java     | 193 +++---
 .../consumer/DefaultMQPushConsumerImpl.java     | 261 ++++----
 .../client/impl/consumer/MQConsumerInner.java   |  26 +-
 .../client/impl/consumer/MessageQueueLock.java  |  19 +-
 .../client/impl/consumer/ProcessQueue.java      |  57 +-
 .../client/impl/consumer/PullAPIWrapper.java    |  84 +--
 .../impl/consumer/PullMessageService.java       |  37 +-
 .../client/impl/consumer/PullRequest.java       |  23 +-
 .../client/impl/consumer/PullResultExt.java     |  22 +-
 .../client/impl/consumer/RebalanceImpl.java     |  75 +--
 .../client/impl/consumer/RebalancePullImpl.java |  22 +-
 .../client/impl/consumer/RebalancePushImpl.java |  32 +-
 .../client/impl/consumer/RebalanceService.java  |  18 +-
 .../client/impl/factory/MQClientInstance.java   | 234 ++++---
 .../impl/producer/DefaultMQProducerImpl.java    | 216 +++---
 .../client/impl/producer/MQProducerInner.java   |  27 +-
 .../client/impl/producer/TopicPublishInfo.java  |  17 +-
 .../latency/LatencyFaultToleranceImpl.java      |  52 +-
 .../client/latency/MQFaultStrategy.java         |   7 +-
 .../rocketmq/client/log/ClientLogger.java       |  25 +-
 .../client/producer/DefaultMQProducer.java      |  96 +--
 .../producer/LocalTransactionExecuter.java      |  13 +-
 .../client/producer/LocalTransactionState.java  |  12 +-
 .../rocketmq/client/producer/MQProducer.java    |  71 +-
 .../client/producer/MessageQueueSelector.java   |  16 +-
 .../rocketmq/client/producer/SendCallback.java  |  13 +-
 .../rocketmq/client/producer/SendResult.java    |  31 +-
 .../rocketmq/client/producer/SendStatus.java    |  12 +-
 .../producer/TransactionCheckListener.java      |  13 +-
 .../client/producer/TransactionMQProducer.java  |  27 +-
 .../client/producer/TransactionSendResult.java  |  15 +-
 .../selector/SelectMessageQueueByHash.java      |  16 +-
 .../SelectMessageQueueByMachineRoom.java        |  21 +-
 .../selector/SelectMessageQueueByRandoom.java   |  19 +-
 .../client/stat/ConsumerStatsManager.java       |  40 +-
 .../main/resources/logback_rocketmq_client.xml  |   2 +-
 .../apache/rocketmq/client/ValidatorsTest.java  |  13 +-
 common/pom.xml                                  |   2 +-
 .../apache/rocketmq/common/BrokerConfig.java    |  83 +--
 .../rocketmq/common/BrokerConfigSingleton.java  |  12 +-
 .../apache/rocketmq/common/ConfigManager.java   |  17 +-
 .../apache/rocketmq/common/Configuration.java   |  27 +-
 .../apache/rocketmq/common/CountDownLatch2.java | 111 ++-
 .../org/apache/rocketmq/common/DataVersion.java |  36 +-
 .../org/apache/rocketmq/common/MQVersion.java   |   2 -
 .../java/org/apache/rocketmq/common/MixAll.java |  54 +-
 .../java/org/apache/rocketmq/common/Pair.java   |  17 +-
 .../apache/rocketmq/common/ServiceState.java    |  12 +-
 .../apache/rocketmq/common/ServiceThread.java   |  28 +-
 .../org/apache/rocketmq/common/SystemClock.java |  12 +-
 .../rocketmq/common/ThreadFactoryImpl.java      |  15 +-
 .../org/apache/rocketmq/common/TopicConfig.java |  68 +-
 .../apache/rocketmq/common/TopicFilterType.java |  12 +-
 .../org/apache/rocketmq/common/UtilAll.java     | 114 ++--
 .../rocketmq/common/admin/ConsumeStats.java     |  21 +-
 .../rocketmq/common/admin/OffsetWrapper.java    |  18 +-
 .../rocketmq/common/admin/RollbackStats.java    |  24 +-
 .../rocketmq/common/admin/TopicOffset.java      |  18 +-
 .../rocketmq/common/admin/TopicStatsTable.java  |  18 +-
 .../common/constant/DBMsgConstants.java         |  12 +-
 .../rocketmq/common/constant/LoggerName.java    |  12 +-
 .../rocketmq/common/constant/PermName.java      |  12 +-
 .../common/consumer/ConsumeFromWhere.java       |  12 +-
 .../rocketmq/common/filter/FilterAPI.java       |  18 +-
 .../rocketmq/common/filter/FilterContext.java   |  14 +-
 .../rocketmq/common/filter/MessageFilter.java   |   1 -
 .../apache/rocketmq/common/filter/impl/Op.java  |  15 +-
 .../rocketmq/common/filter/impl/Operand.java    |  12 +-
 .../rocketmq/common/filter/impl/Operator.java   |  14 +-
 .../rocketmq/common/filter/impl/PolishExpr.java |  41 +-
 .../rocketmq/common/filter/impl/Type.java       |  12 +-
 .../org/apache/rocketmq/common/help/FAQUrl.java |  49 +-
 .../rocketmq/common/hook/FilterCheckHook.java   |  14 +-
 .../apache/rocketmq/common/message/Message.java |  48 +-
 .../common/message/MessageAccessor.java         |  24 +-
 .../common/message/MessageClientExt.java        |  34 +-
 .../common/message/MessageClientIDSetter.java   |  16 +-
 .../rocketmq/common/message/MessageConst.java   |   2 -
 .../rocketmq/common/message/MessageDecoder.java |  71 +-
 .../rocketmq/common/message/MessageExt.java     |  58 +-
 .../rocketmq/common/message/MessageId.java      |  18 +-
 .../rocketmq/common/message/MessageQueue.java   |  27 +-
 .../common/message/MessageQueueForC.java        |  30 +-
 .../rocketmq/common/message/MessageType.java    |  12 +-
 .../rocketmq/common/namesrv/NamesrvConfig.java  |  25 +-
 .../rocketmq/common/namesrv/NamesrvUtil.java    |  12 +-
 .../common/namesrv/RegisterBrokerResult.java    |  19 +-
 .../rocketmq/common/namesrv/TopAddressing.java  |  53 +-
 .../common/protocol/MQProtosHelper.java         |  17 +-
 .../rocketmq/common/protocol/RequestCode.java   |  16 +-
 .../rocketmq/common/protocol/ResponseCode.java  |  17 +-
 .../common/protocol/body/BrokerStatsData.java   |  19 +-
 .../common/protocol/body/BrokerStatsItem.java   |  18 +-
 .../rocketmq/common/protocol/body/CMResult.java |  12 +-
 .../common/protocol/body/ClusterInfo.java       |  28 +-
 .../common/protocol/body/Connection.java        |  21 +-
 .../common/protocol/body/ConsumeByWho.java      |  26 +-
 .../body/ConsumeMessageDirectlyResult.java      |  28 +-
 .../common/protocol/body/ConsumeStatsList.java  |  18 +-
 .../common/protocol/body/ConsumeStatus.java     |  24 +-
 .../protocol/body/ConsumerConnection.java       |  31 +-
 .../body/ConsumerOffsetSerializeWrapper.java    |  20 +-
 .../protocol/body/ConsumerRunningInfo.java      | 134 ++--
 .../protocol/body/GetConsumerStatusBody.java    |  24 +-
 .../common/protocol/body/GroupList.java         |  18 +-
 .../rocketmq/common/protocol/body/KVTable.java  |  18 +-
 .../protocol/body/LockBatchRequestBody.java     |  24 +-
 .../protocol/body/LockBatchResponseBody.java    |  20 +-
 .../common/protocol/body/ProcessQueueInfo.java  |  54 +-
 .../protocol/body/ProducerConnection.java       |  18 +-
 .../protocol/body/QueryConsumeTimeSpanBody.java |  18 +-
 .../body/QueryCorrectionOffsetBody.java         |  18 +-
 .../common/protocol/body/QueueTimeSpan.java     |  29 +-
 .../protocol/body/RegisterBrokerBody.java       |  20 +-
 .../common/protocol/body/ResetOffsetBody.java   |  18 +-
 .../protocol/body/ResetOffsetBodyForC.java      |  17 +-
 .../protocol/body/SubscriptionGroupWrapper.java |  24 +-
 .../body/TopicConfigSerializeWrapper.java       |  22 +-
 .../common/protocol/body/TopicList.java         |  20 +-
 .../protocol/body/UnlockBatchRequestBody.java   |  24 +-
 .../CheckTransactionStateRequestHeader.java     |  20 +-
 .../CheckTransactionStateResponseHeader.java    |  24 +-
 .../header/CloneGroupOffsetRequestHeader.java   |  24 +-
 ...nsumeMessageDirectlyResultRequestHeader.java |  22 +-
 .../ConsumerSendMsgBackRequestHeader.java       |  31 +-
 .../header/CreateTopicRequestHeader.java        |  33 +-
 .../DeleteSubscriptionGroupRequestHeader.java   |  16 +-
 .../header/DeleteTopicRequestHeader.java        |  18 +-
 .../header/EndTransactionRequestHeader.java     |  33 +-
 .../header/EndTransactionResponseHeader.java    |  15 +-
 .../header/GetAllTopicConfigResponseHeader.java |  15 +-
 .../header/GetBrokerConfigResponseHeader.java   |  18 +-
 .../header/GetConsumeStatsInBrokerHeader.java   |  13 +-
 .../header/GetConsumeStatsRequestHeader.java    |  18 +-
 .../GetConsumerConnectionListRequestHeader.java |  16 +-
 .../GetConsumerListByGroupRequestHeader.java    |  16 +-
 .../GetConsumerListByGroupResponseBody.java     |  18 +-
 .../GetConsumerListByGroupResponseHeader.java   |  13 +-
 .../GetConsumerRunningInfoRequestHeader.java    |  20 +-
 .../header/GetConsumerStatusRequestHeader.java  |  20 +-
 .../GetEarliestMsgStoretimeRequestHeader.java   |  20 +-
 .../GetEarliestMsgStoretimeResponseHeader.java  |  18 +-
 .../header/GetMaxOffsetRequestHeader.java       |  20 +-
 .../header/GetMaxOffsetResponseHeader.java      |  18 +-
 .../header/GetMinOffsetRequestHeader.java       |  20 +-
 .../header/GetMinOffsetResponseHeader.java      |  18 +-
 .../GetProducerConnectionListRequestHeader.java |  16 +-
 .../header/GetTopicStatsInfoRequestHeader.java  |  16 +-
 .../header/GetTopicsByClusterRequestHeader.java |  16 +-
 .../NotifyConsumerIdsChangedRequestHeader.java  |  16 +-
 .../header/PullMessageRequestHeader.java        |  36 +-
 .../header/PullMessageResponseHeader.java       |  24 +-
 .../QueryConsumeTimeSpanRequestHeader.java      |  18 +-
 .../QueryConsumerOffsetRequestHeader.java       |  22 +-
 .../QueryConsumerOffsetResponseHeader.java      |  18 +-
 .../header/QueryCorrectionOffsetHeader.java     |  22 +-
 .../header/QueryMessageRequestHeader.java       |  26 +-
 .../header/QueryMessageResponseHeader.java      |  20 +-
 .../QueryTopicConsumeByWhoRequestHeader.java    |  18 +-
 .../header/ResetOffsetRequestHeader.java        |  22 +-
 .../header/SearchOffsetRequestHeader.java       |  22 +-
 .../header/SearchOffsetResponseHeader.java      |  18 +-
 .../header/SendMessageRequestHeader.java        |  40 +-
 .../header/SendMessageRequestHeaderV2.java      |  24 -
 .../header/SendMessageResponseHeader.java       |  22 +-
 .../header/UnregisterClientRequestHeader.java   |  20 +-
 .../header/UnregisterClientResponseHeader.java  |  13 +-
 .../UpdateConsumerOffsetRequestHeader.java      |  24 +-
 .../UpdateConsumerOffsetResponseHeader.java     |  15 +-
 .../ViewBrokerStatsDataRequestHeader.java       |  18 +-
 .../header/ViewMessageRequestHeader.java        |  18 +-
 .../header/ViewMessageResponseHeader.java       |  15 +-
 .../RegisterFilterServerRequestHeader.java      |  16 +-
 .../RegisterFilterServerResponseHeader.java     |  18 +-
 ...RegisterMessageFilterClassRequestHeader.java |  22 +-
 .../namesrv/DeleteKVConfigRequestHeader.java    |  18 +-
 .../DeleteTopicInNamesrvRequestHeader.java      |  16 +-
 .../namesrv/GetKVConfigRequestHeader.java       |  18 +-
 .../namesrv/GetKVConfigResponseHeader.java      |  16 +-
 .../GetKVListByNamespaceRequestHeader.java      |  16 +-
 .../namesrv/GetRouteInfoRequestHeader.java      |  18 +-
 .../namesrv/GetRouteInfoResponseHeader.java     |  15 +-
 .../namesrv/PutKVConfigRequestHeader.java       |  20 +-
 .../namesrv/RegisterBrokerRequestHeader.java    |  26 +-
 .../namesrv/RegisterBrokerResponseHeader.java   |  18 +-
 .../RegisterOrderTopicRequestHeader.java        |  20 +-
 .../namesrv/UnRegisterBrokerRequestHeader.java  |  24 +-
 .../WipeWritePermOfBrokerRequestHeader.java     |  16 +-
 .../WipeWritePermOfBrokerResponseHeader.java    |  16 +-
 .../common/protocol/heartbeat/ConsumeType.java  |  15 +-
 .../common/protocol/heartbeat/ConsumerData.java |  35 +-
 .../protocol/heartbeat/HeartbeatData.java       |  27 +-
 .../common/protocol/heartbeat/MessageModel.java |  15 +-
 .../common/protocol/heartbeat/ProducerData.java |  17 +-
 .../protocol/heartbeat/SubscriptionData.java    |  39 +-
 .../common/protocol/route/BrokerData.java       |  20 +-
 .../common/protocol/route/QueueData.java        |  20 +-
 .../common/protocol/route/TopicRouteData.java   |  27 +-
 .../common/protocol/topic/OffsetMovedEvent.java |  24 +-
 .../common/queue/ConcurrentTreeMap.java         |  27 +-
 .../rocketmq/common/queue/RoundQueue.java       |  15 +-
 .../rocketmq/common/running/RunningStats.java   |  12 +-
 .../rocketmq/common/stats/MomentStatsItem.java  |  19 +-
 .../common/stats/MomentStatsItemSet.java        |  11 +-
 .../apache/rocketmq/common/stats/StatsItem.java |  68 +-
 .../rocketmq/common/stats/StatsItemSet.java     |  14 +-
 .../rocketmq/common/stats/StatsSnapshot.java    |  18 +-
 .../subscription/SubscriptionGroupConfig.java   |  35 +-
 .../rocketmq/common/sysflag/MessageSysFlag.java |   3 -
 .../rocketmq/common/sysflag/PullSysFlag.java    |  20 +-
 .../common/sysflag/SubscriptionSysFlag.java     |   5 -
 .../rocketmq/common/sysflag/TopicSysFlag.java   |   8 -
 .../rocketmq/common/utils/ChannelUtil.java      |  15 +-
 .../rocketmq/common/utils/HttpTinyClient.java   |  36 +-
 .../rocketmq/common/utils/IOTinyUtils.java      |  42 +-
 .../org/apache/rocketmq/common/MixAllTest.java  |  18 +-
 .../rocketmq/common/RemotingUtilTest.java       |  13 +-
 .../org/apache/rocketmq/common/UtilAllTest.java |  31 +-
 .../rocketmq/common/filter/FilterAPITest.java   |  17 +-
 .../common/protocol/ConsumeStatusTest.java      |  13 +-
 conf/2m-2s-async/broker-a-s.properties          |   1 -
 conf/2m-2s-async/broker-a.properties            |   1 -
 conf/2m-2s-async/broker-b-s.properties          |   1 -
 conf/2m-2s-async/broker-b.properties            |   1 -
 conf/2m-2s-sync/broker-a-s.properties           |   1 -
 conf/2m-2s-sync/broker-a.properties             |   1 -
 conf/2m-2s-sync/broker-b-s.properties           |   1 -
 conf/2m-2s-sync/broker-b.properties             |   1 -
 conf/2m-noslave/broker-a.properties             |   1 -
 conf/2m-noslave/broker-b.properties             |   1 -
 conf/broker.conf                                |  14 +-
 conf/logback_broker.xml                         |  22 +-
 conf/logback_filtersrv.xml                      |   4 +-
 conf/logback_namesrv.xml                        |   4 +-
 conf/logback_tools.xml                          |   4 +-
 example/pom.xml                                 |   2 +-
 .../rocketmq/example/benchmark/Consumer.java    |  53 +-
 .../rocketmq/example/benchmark/Producer.java    |  52 +-
 .../example/benchmark/TransactionProducer.java  |  67 +-
 .../example/broadcast/PushConsumer.java         |   5 +-
 .../rocketmq/example/filter/Consumer.java       |   8 +-
 .../rocketmq/example/filter/Producer.java       |   6 +-
 .../rocketmq/example/operation/Consumer.java    |  21 +-
 .../rocketmq/example/operation/Producer.java    |  27 +-
 .../rocketmq/example/ordermessage/Consumer.java |   6 +-
 .../rocketmq/example/ordermessage/Producer.java |  13 +-
 .../rocketmq/example/quickstart/Consumer.java   |   5 +-
 .../rocketmq/example/quickstart/Producer.java   |   4 +-
 .../rocketmq/example/simple/AsyncProducer.java  |  10 +-
 .../rocketmq/example/simple/CachedQueue.java    |  17 +-
 .../rocketmq/example/simple/Producer.java       |   7 +-
 .../rocketmq/example/simple/PullConsumer.java   |  10 +-
 .../example/simple/PullScheduleService.java     |   2 -
 .../rocketmq/example/simple/PushConsumer.java   |   4 +-
 .../example/simple/RandomAsyncCommit.java       |  23 +-
 .../rocketmq/example/simple/TestProducer.java   |   8 +-
 .../TransactionCheckListenerImpl.java           |   5 +-
 .../transaction/TransactionExecuterImpl.java    |  16 +-
 .../transaction/TransactionProducer.java        |   9 +-
 .../src/main/resources/MessageFilterImpl.java   |  15 +-
 filtersrv/pom.xml                               |   2 +-
 .../filtersrv/FilterServerOuterAPI.java         |  17 +-
 .../rocketmq/filtersrv/FiltersrvConfig.java     |  41 +-
 .../rocketmq/filtersrv/FiltersrvController.java |  61 +-
 .../rocketmq/filtersrv/FiltersrvStartup.java    |  34 +-
 .../rocketmq/filtersrv/filter/DynaCode.java     | 177 +++--
 .../filter/FilterClassFetchMethod.java          |  12 +-
 .../filtersrv/filter/FilterClassInfo.java       |  19 +-
 .../filtersrv/filter/FilterClassLoader.java     |  12 +-
 .../filtersrv/filter/FilterClassManager.java    |  67 +-
 .../filter/HttpFilterClassFetchMethod.java      |  17 +-
 .../processor/DefaultRequestProcessor.java      |  91 ++-
 .../stats/FilterServerStatsManager.java         |  29 +-
 namesrv/pom.xml                                 |   2 +-
 .../rocketmq/namesrv/NamesrvController.java     |  45 +-
 .../apache/rocketmq/namesrv/NamesrvStartup.java |  36 +-
 .../namesrv/kvconfig/KVConfigManager.java       |  45 +-
 .../kvconfig/KVConfigSerializeWrapper.java      |  18 +-
 .../processor/ClusterTestRequestProcessor.java  |  25 +-
 .../processor/DefaultRequestProcessor.java      | 154 ++---
 .../routeinfo/BrokerHousekeepingService.java    |  20 +-
 .../namesrv/routeinfo/RouteInfoManager.java     | 142 ++--
 remoting/pom.xml                                |   2 +-
 .../rocketmq/remoting/ChannelEventListener.java |  16 +-
 .../rocketmq/remoting/CommandCustomHeader.java  |  13 +-
 .../rocketmq/remoting/InvokeCallback.java       |  13 +-
 .../org/apache/rocketmq/remoting/RPCHook.java   |   4 +-
 .../rocketmq/remoting/RemotingClient.java       |  38 +-
 .../rocketmq/remoting/RemotingServer.java       |  26 +-
 .../rocketmq/remoting/RemotingService.java      |   2 -
 .../remoting/annotation/CFNullable.java         |  12 +-
 .../apache/rocketmq/remoting/common/Pair.java   |  17 +-
 .../remoting/common/RemotingHelper.java         |  41 +-
 .../rocketmq/remoting/common/RemotingUtil.java  |  29 +-
 .../common/SemaphoreReleaseOnlyOnce.java        |  16 +-
 .../rocketmq/remoting/common/ServiceThread.java |  19 +-
 .../exception/RemotingCommandException.java     |  14 +-
 .../exception/RemotingConnectException.java     |  14 +-
 .../remoting/exception/RemotingException.java   |  14 +-
 .../exception/RemotingSendRequestException.java |  14 +-
 .../exception/RemotingTimeoutException.java     |  15 +-
 .../RemotingTooMuchRequestException.java        |  13 +-
 .../remoting/netty/NettyClientConfig.java       |  31 +-
 .../rocketmq/remoting/netty/NettyDecoder.java   |  28 +-
 .../rocketmq/remoting/netty/NettyEncoder.java   |  24 +-
 .../rocketmq/remoting/netty/NettyEvent.java     |  18 +-
 .../rocketmq/remoting/netty/NettyEventType.java |  12 +-
 .../remoting/netty/NettyRemotingAbstract.java   |  85 ++-
 .../remoting/netty/NettyRemotingClient.java     | 111 ++-
 .../remoting/netty/NettyRemotingServer.java     | 120 ++--
 .../remoting/netty/NettyRequestProcessor.java   |  18 +-
 .../remoting/netty/NettyServerConfig.java       |  36 +-
 .../remoting/netty/NettySystemConfig.java       |  26 +-
 .../rocketmq/remoting/netty/RequestTask.java    |  25 +-
 .../rocketmq/remoting/netty/ResponseFuture.java |  45 +-
 .../remoting/protocol/LanguageCode.java         |  30 +-
 .../remoting/protocol/RemotingCommand.java      |  73 +-
 .../remoting/protocol/RemotingCommandType.java  |  12 +-
 .../remoting/protocol/RemotingSerializable.java |  14 +-
 .../protocol/RemotingSysResponseCode.java       |  12 +-
 .../remoting/protocol/RocketMQSerializable.java |  54 +-
 .../remoting/protocol/SerializeType.java        |  16 +-
 .../org/apache/rocketmq/remoting/MixTest.java   |  15 +-
 .../apache/rocketmq/remoting/NettyRPCTest.java  |  92 ++-
 .../rocketmq/subclass/TestSubClassAuto.java     |  13 +-
 srvutil/pom.xml                                 |   2 +-
 .../org/apache/rocketmq/srvutil/ServerUtil.java |  29 +-
 store/pom.xml                                   |   2 +-
 .../store/AllocateMappedFileService.java        |  59 +-
 .../rocketmq/store/AppendMessageCallback.java   |  15 +-
 .../rocketmq/store/AppendMessageResult.java     |  42 +-
 .../rocketmq/store/AppendMessageStatus.java     |  12 +-
 .../org/apache/rocketmq/store/CommitLog.java    | 300 ++++-----
 .../org/apache/rocketmq/store/ConsumeQueue.java |  78 +--
 .../rocketmq/store/DefaultMessageFilter.java    |  13 +-
 .../rocketmq/store/DefaultMessageStore.java     | 195 +++---
 .../apache/rocketmq/store/DispatchRequest.java  |  35 +-
 .../apache/rocketmq/store/GetMessageResult.java |  45 +-
 .../apache/rocketmq/store/GetMessageStatus.java |  12 +-
 .../org/apache/rocketmq/store/MappedFile.java   | 176 ++---
 .../apache/rocketmq/store/MappedFileQueue.java  |  82 +--
 .../rocketmq/store/MessageArrivingListener.java |  12 +-
 .../rocketmq/store/MessageExtBrokerInner.java   |  17 +-
 .../apache/rocketmq/store/MessageFilter.java    |  13 +-
 .../org/apache/rocketmq/store/MessageStore.java |  49 +-
 .../apache/rocketmq/store/PutMessageResult.java |  21 +-
 .../apache/rocketmq/store/PutMessageStatus.java |  12 +-
 .../rocketmq/store/QueryMessageResult.java      |  23 +-
 .../rocketmq/store/ReferenceResource.java       |  22 +-
 .../org/apache/rocketmq/store/RunningFlags.java |  26 +-
 .../store/SelectMappedBufferResult.java         |  21 +-
 .../apache/rocketmq/store/StoreCheckpoint.java  |  40 +-
 .../rocketmq/store/StoreStatsService.java       |  88 +--
 .../org/apache/rocketmq/store/StoreUtil.java    |  16 +-
 .../rocketmq/store/TransientStorePool.java      |  15 +-
 .../rocketmq/store/config/BrokerRole.java       |  12 +-
 .../rocketmq/store/config/FlushDiskType.java    |  12 +-
 .../store/config/MessageStoreConfig.java        | 132 +---
 .../store/config/StorePathConfigHelper.java     |  19 +-
 .../apache/rocketmq/store/ha/HAConnection.java  |  63 +-
 .../org/apache/rocketmq/store/ha/HAService.java |  95 +--
 .../rocketmq/store/ha/WaitNotifyObject.java     |  18 +-
 .../apache/rocketmq/store/index/IndexFile.java  |  72 +-
 .../rocketmq/store/index/IndexHeader.java       |  15 -
 .../rocketmq/store/index/IndexService.java      |  51 +-
 .../rocketmq/store/index/QueryOffsetResult.java |  19 +-
 .../schedule/DelayOffsetSerializeWrapper.java   |  20 +-
 .../store/schedule/ScheduleMessageService.java  | 112 ++--
 .../rocketmq/store/stats/BrokerStats.java       |  29 +-
 .../store/stats/BrokerStatsManager.java         |  34 +-
 .../org/apache/rocketmq/store/util/LibC.java    |  15 +-
 .../rocketmq/store/DefaultMessageStoreTest.java |  34 +-
 .../rocketmq/store/MappedFileQueueTest.java     |  56 +-
 .../apache/rocketmq/store/MappedFileTest.java   |  33 +-
 .../rocketmq/store/StoreCheckpointTest.java     |  20 +-
 .../rocketmq/store/index/IndexFileTest.java     |  25 +-
 .../store/schedule/ScheduleMessageTest.java     |  35 +-
 store/src/test/resources/logback-test.xml       |  24 +-
 style/copyright/Apache.xml                      |   9 +-
 style/copyright/profiles_settings.xml           |  84 +--
 style/rmq_codeStyle.xml                         | 204 +++---
 tools/pom.xml                                   |   2 +-
 .../rocketmq/tools/admin/DefaultMQAdminExt.java | 141 ++--
 .../tools/admin/DefaultMQAdminExtImpl.java      | 212 +++---
 .../apache/rocketmq/tools/admin/MQAdminExt.java | 141 ++--
 .../rocketmq/tools/admin/api/MessageTrack.java  |  21 +-
 .../rocketmq/tools/admin/api/TrackType.java     |  12 +-
 .../rocketmq/tools/command/CommandUtil.java     |  46 +-
 .../rocketmq/tools/command/MQAdminStartup.java  |  72 +-
 .../rocketmq/tools/command/SubCommand.java      |  18 +-
 .../broker/BrokerConsumeStatsSubCommad.java     |  46 +-
 .../command/broker/BrokerStatusSubCommand.java  |  24 +-
 .../broker/CleanExpiredCQSubCommand.java        |  22 +-
 .../command/broker/CleanUnusedTopicCommand.java |  22 +-
 .../command/broker/GetBrokerConfigCommand.java  |  39 +-
 .../command/broker/SendMsgStatusCommand.java    |  36 +-
 .../broker/UpdateBrokerConfigSubCommand.java    |  29 +-
 .../cluster/CLusterSendMsgRTCommand.java        |  55 +-
 .../command/cluster/ClusterListSubCommand.java  |  89 ++-
 .../ConsumerConnectionSubCommand.java           |  29 +-
 .../ProducerConnectionSubCommand.java           |  17 +-
 .../consumer/ConsumerProgressSubCommand.java    |  84 +--
 .../consumer/ConsumerStatusSubCommand.java      |  30 +-
 .../command/consumer/ConsumerSubCommand.java    |  32 +-
 .../DeleteSubscriptionGroupCommand.java         |  35 +-
 .../consumer/StartMonitoringSubCommand.java     |  23 +-
 .../consumer/UpdateSubGroupSubCommand.java      |  29 +-
 .../command/message/CheckMsgSendRTCommand.java  |  34 +-
 .../command/message/DecodeMessageIdCommond.java |  18 +-
 .../message/PrintMessageByQueueCommand.java     | 157 +++--
 .../command/message/PrintMessageSubCommand.java |  76 +--
 .../command/message/QueryMsgByIdSubCommand.java | 287 ++++----
 .../message/QueryMsgByKeySubCommand.java        |  27 +-
 .../message/QueryMsgByOffsetSubCommand.java     |   7 +-
 .../message/QueryMsgByUniqueKeySubCommand.java  | 177 +++--
 .../rocketmq/tools/command/message/Store.java   |  56 +-
 .../command/namesrv/DeleteKvConfigCommand.java  |  22 +-
 .../namesrv/GetNamesrvConfigCommand.java        |  25 +-
 .../command/namesrv/UpdateKvConfigCommand.java  |  22 +-
 .../namesrv/UpdateNamesrvConfigCommand.java     |  27 +-
 .../namesrv/WipeWritePermSubCommand.java        |  34 +-
 .../command/offset/CloneGroupOffsetCommand.java |  23 +-
 .../offset/GetConsumerStatusCommand.java        |  29 +-
 .../offset/ResetOffsetByTimeCommand.java        |  32 +-
 .../offset/ResetOffsetByTimeOldCommand.java     |  80 ++-
 .../tools/command/stats/StatsAllSubCommand.java | 176 +++--
 .../command/topic/AllocateMQSubCommand.java     |  20 +-
 .../command/topic/DeleteTopicSubCommand.java    |  70 +-
 .../tools/command/topic/RebalanceResult.java    |  15 +-
 .../command/topic/TopicClusterSubCommand.java   |  12 +-
 .../command/topic/TopicListSubCommand.java      |  34 +-
 .../command/topic/TopicRouteSubCommand.java     |  10 +-
 .../command/topic/TopicStatusSubCommand.java    |  49 +-
 .../command/topic/UpdateOrderConfCommand.java   |  26 +-
 .../topic/UpdateTopicPermSubCommand.java        |  17 +-
 .../command/topic/UpdateTopicSubCommand.java    |  25 +-
 .../tools/monitor/DefaultMonitorListener.java   |  40 +-
 .../rocketmq/tools/monitor/DeleteMsgsEvent.java |  20 +-
 .../rocketmq/tools/monitor/FailedMsgs.java      |  21 +-
 .../rocketmq/tools/monitor/MonitorConfig.java   |  19 +-
 .../rocketmq/tools/monitor/MonitorListener.java |   3 +-
 .../rocketmq/tools/monitor/MonitorService.java  |  65 +-
 .../rocketmq/tools/monitor/UndoneMsgs.java      |  27 +-
 555 files changed, 8226 insertions(+), 11139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/pom.xml
----------------------------------------------------------------------
diff --git a/broker/pom.xml b/broker/pom.xml
index 0917503..30525e4 100644
--- a/broker/pom.xml
+++ b/broker/pom.xml
@@ -15,7 +15,7 @@
    limitations under the License.
   -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <groupId>org.apache.rocketmq</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 8e973ac..501c1c5 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -16,7 +16,25 @@
  */
 package org.apache.rocketmq.broker;
 
-import org.apache.rocketmq.broker.client.*;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.broker.client.ClientHousekeepingService;
+import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
+import org.apache.rocketmq.broker.client.ConsumerManager;
+import org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener;
+import org.apache.rocketmq.broker.client.ProducerManager;
 import org.apache.rocketmq.broker.client.net.Broker2Client;
 import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager;
 import org.apache.rocketmq.broker.filtersrv.FilterServerManager;
@@ -30,11 +48,21 @@ import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
 import org.apache.rocketmq.broker.out.BrokerOuterAPI;
 import org.apache.rocketmq.broker.plugin.MessageStoreFactory;
 import org.apache.rocketmq.broker.plugin.MessageStorePluginContext;
-import org.apache.rocketmq.broker.processor.*;
+import org.apache.rocketmq.broker.processor.AdminBrokerProcessor;
+import org.apache.rocketmq.broker.processor.ClientManageProcessor;
+import org.apache.rocketmq.broker.processor.ConsumerManageProcessor;
+import org.apache.rocketmq.broker.processor.EndTransactionProcessor;
+import org.apache.rocketmq.broker.processor.PullMessageProcessor;
+import org.apache.rocketmq.broker.processor.QueryMessageProcessor;
+import org.apache.rocketmq.broker.processor.SendMessageProcessor;
 import org.apache.rocketmq.broker.slave.SlaveSynchronize;
 import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
 import org.apache.rocketmq.broker.topic.TopicConfigManager;
-import org.apache.rocketmq.common.*;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.Configuration;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
@@ -43,7 +71,11 @@ import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.common.stats.MomentStatsItem;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.RemotingServer;
-import org.apache.rocketmq.remoting.netty.*;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.netty.RequestTask;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.MessageArrivingListener;
 import org.apache.rocketmq.store.MessageStore;
@@ -54,15 +86,6 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.*;
-
-
 public class BrokerController {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private static final Logger LOG_PROTECTION = LoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
@@ -84,7 +107,7 @@ public class BrokerController {
     private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager();
     private final BrokerOuterAPI brokerOuterAPI;
     private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
-            "BrokerControllerScheduledThread"));
+        "BrokerControllerScheduledThread"));
     private final SlaveSynchronize slaveSynchronize;
     private final BlockingQueue<Runnable> sendThreadPoolQueue;
     private final BlockingQueue<Runnable> pullThreadPoolQueue;
@@ -110,10 +133,10 @@ public class BrokerController {
     private Configuration configuration;
 
     public BrokerController(//
-                            final BrokerConfig brokerConfig, //
-                            final NettyServerConfig nettyServerConfig, //
-                            final NettyClientConfig nettyClientConfig, //
-                            final MessageStoreConfig messageStoreConfig //
+        final BrokerConfig brokerConfig, //
+        final NettyServerConfig nettyServerConfig, //
+        final NettyClientConfig nettyClientConfig, //
+        final MessageStoreConfig messageStoreConfig //
     ) {
         this.brokerConfig = brokerConfig;
         this.nettyServerConfig = nettyServerConfig;
@@ -151,9 +174,9 @@ public class BrokerController {
 
         this.brokerFastFailure = new BrokerFastFailure(this);
         this.configuration = new Configuration(
-                log,
-                BrokerPathConfigHelper.getBrokerConfigPath(),
-                this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
+            log,
+            BrokerPathConfigHelper.getBrokerConfigPath(),
+            this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
         );
     }
 
@@ -180,9 +203,9 @@ public class BrokerController {
         if (result) {
             try {
                 this.messageStore =
-                        new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
-                                this.brokerConfig);
-                this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
+                    new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
+                        this.brokerConfig);
+                this.brokerStats = new BrokerStats((DefaultMessageStore)this.messageStore);
                 //load plugin
                 MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
                 this.messageStore = MessageStoreFactory.build(context, this.messageStore);
@@ -196,44 +219,43 @@ public class BrokerController {
 
         if (result) {
             this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
-            NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
+            NettyServerConfig fastConfig = (NettyServerConfig)this.nettyServerConfig.clone();
             fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
             this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
             this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
-                    this.brokerConfig.getSendMessageThreadPoolNums(),
-                    this.brokerConfig.getSendMessageThreadPoolNums(),
-                    1000 * 60,
-                    TimeUnit.MILLISECONDS,
-                    this.sendThreadPoolQueue,
-                    new ThreadFactoryImpl("SendMessageThread_"));
+                this.brokerConfig.getSendMessageThreadPoolNums(),
+                this.brokerConfig.getSendMessageThreadPoolNums(),
+                1000 * 60,
+                TimeUnit.MILLISECONDS,
+                this.sendThreadPoolQueue,
+                new ThreadFactoryImpl("SendMessageThread_"));
 
             this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
-                    this.brokerConfig.getPullMessageThreadPoolNums(),
-                    this.brokerConfig.getPullMessageThreadPoolNums(),
-                    1000 * 60,
-                    TimeUnit.MILLISECONDS,
-                    this.pullThreadPoolQueue,
-                    new ThreadFactoryImpl("PullMessageThread_"));
+                this.brokerConfig.getPullMessageThreadPoolNums(),
+                this.brokerConfig.getPullMessageThreadPoolNums(),
+                1000 * 60,
+                TimeUnit.MILLISECONDS,
+                this.pullThreadPoolQueue,
+                new ThreadFactoryImpl("PullMessageThread_"));
 
             this.adminBrokerExecutor =
-                    Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
-                            "AdminBrokerThread_"));
+                Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
+                    "AdminBrokerThread_"));
 
             this.clientManageExecutor = new ThreadPoolExecutor(
-                    this.brokerConfig.getClientManageThreadPoolNums(),
-                    this.brokerConfig.getClientManageThreadPoolNums(),
-                    1000 * 60,
-                    TimeUnit.MILLISECONDS,
-                    this.clientManagerThreadPoolQueue,
-                    new ThreadFactoryImpl("ClientManageThread_"));
+                this.brokerConfig.getClientManageThreadPoolNums(),
+                this.brokerConfig.getClientManageThreadPoolNums(),
+                1000 * 60,
+                TimeUnit.MILLISECONDS,
+                this.clientManagerThreadPoolQueue,
+                new ThreadFactoryImpl("ClientManageThread_"));
 
             this.consumerManageExecutor =
-                    Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
-                            "ConsumerManageThread_"));
+                Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
+                    "ConsumerManageThread_"));
 
             this.registerProcessor();
 
-
             // TODO remove in future
             final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis();
             final long period = 1000 * 60 * 60 * 24;
@@ -259,7 +281,6 @@ public class BrokerController {
                 }
             }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
 
-
             this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                 @Override
                 public void run() {
@@ -399,7 +420,6 @@ public class BrokerController {
         this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
         this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
 
-
         /**
          * EndTransactionProcessor
          */
@@ -446,7 +466,8 @@ public class BrokerController {
             slowTimeMills = this.messageStore.now() - rt.getCreateTimestamp();
         }
 
-        if (slowTimeMills < 0) slowTimeMills = 0;
+        if (slowTimeMills < 0)
+            slowTimeMills = 0;
 
         return slowTimeMills;
     }
@@ -577,10 +598,10 @@ public class BrokerController {
 
     private void unregisterBrokerAll() {
         this.brokerOuterAPI.unregisterBrokerAll(
-                this.brokerConfig.getBrokerClusterName(),
-                this.getBrokerAddr(),
-                this.brokerConfig.getBrokerName(),
-                this.brokerConfig.getBrokerId());
+            this.brokerConfig.getBrokerClusterName(),
+            this.getBrokerAddr(),
+            this.brokerConfig.getBrokerName(),
+            this.brokerConfig.getBrokerId());
     }
 
     public String getBrokerAddr() {
@@ -643,27 +664,27 @@ public class BrokerController {
         TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
 
         if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
-                || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
+            || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
             ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
             for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
                 TopicConfig tmp =
-                        new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
-                                this.brokerConfig.getBrokerPermission());
+                    new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
+                        this.brokerConfig.getBrokerPermission());
                 topicConfigTable.put(topicConfig.getTopicName(), tmp);
             }
             topicConfigWrapper.setTopicConfigTable(topicConfigTable);
         }
 
         RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll(
-                this.brokerConfig.getBrokerClusterName(),
-                this.getBrokerAddr(),
-                this.brokerConfig.getBrokerName(),
-                this.brokerConfig.getBrokerId(),
-                this.getHAServerAddr(),
-                topicConfigWrapper,
-                this.filterServerManager.buildNewFilterServerList(),
-                oneway,
-                this.brokerConfig.getRegisterBrokerTimeoutMills());
+            this.brokerConfig.getBrokerClusterName(),
+            this.getBrokerAddr(),
+            this.brokerConfig.getBrokerName(),
+            this.brokerConfig.getBrokerId(),
+            this.getHAServerAddr(),
+            topicConfigWrapper,
+            this.filterServerManager.buildNewFilterServerList(),
+            oneway,
+            this.brokerConfig.getRegisterBrokerTimeoutMills());
 
         if (registerBrokerResult != null) {
             if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
index dbcd304..7a46df3 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
@@ -6,45 +6,39 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.rocketmq.broker;
 
 import java.io.File;
 
-
 public class BrokerPathConfigHelper {
     private static String brokerConfigPath = System.getProperty("user.home") + File.separator + "store"
-            + File.separator + "config" + File.separator + "broker.properties";
-
+        + File.separator + "config" + File.separator + "broker.properties";
 
     public static String getBrokerConfigPath() {
         return brokerConfigPath;
     }
 
-
     public static void setBrokerConfigPath(String path) {
         brokerConfigPath = path;
     }
 
-
     public static String getTopicConfigPath(final String rootDir) {
         return rootDir + File.separator + "config" + File.separator + "topics.json";
     }
 
-
     public static String getConsumerOffsetPath(final String rootDir) {
         return rootDir + File.separator + "config" + File.separator + "consumerOffset.json";
     }
 
-
     public static String getSubscriptionGroupPath(final String rootDir) {
         return rootDir + File.separator + "config" + File.separator + "subscriptionGroup.json";
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
index 86091c4..dfa97c1 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -18,6 +18,15 @@ package org.apache.rocketmq.broker;
 
 import ch.qos.logback.classic.LoggerContext;
 import ch.qos.logback.classic.joran.JoranConfigurator;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
@@ -30,20 +39,9 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.srvutil.ServerUtil;
 import org.apache.rocketmq.store.config.BrokerRole;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-
 public class BrokerStartup {
     public static Properties properties = null;
     public static CommandLine commandLine = null;
@@ -58,7 +56,7 @@ public class BrokerStartup {
         try {
             controller.start();
             String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
-                    + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
+                + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
 
             if (null != controller.getBrokerConfig().getNamesrvAddr()) {
                 tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
@@ -89,7 +87,7 @@ public class BrokerStartup {
             //PackageConflictDetect.detectFastjson();
             Options options = ServerUtil.buildCommandlineOptions(new Options());
             commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
-                    new PosixParser());
+                new PosixParser());
             if (null == commandLine) {
                 System.exit(-1);
             }
@@ -142,7 +140,7 @@ public class BrokerStartup {
 
             if (null == brokerConfig.getRocketmqHome()) {
                 System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV
-                        + " variable in your environment to match the location of the RocketMQ installation");
+                    + " variable in your environment to match the location of the RocketMQ installation");
                 System.exit(-2);
             }
 
@@ -157,13 +155,12 @@ public class BrokerStartup {
                     }
                 } catch (Exception e) {
                     System.out.printf(
-                            "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
-                            namesrvAddr);
+                        "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
+                        namesrvAddr);
                     System.exit(-3);
                 }
             }
 
-
             switch (messageStoreConfig.getBrokerRole()) {
                 case ASYNC_MASTER:
                 case SYNC_MASTER:
@@ -181,7 +178,7 @@ public class BrokerStartup {
             }
 
             messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
-            LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+            LoggerContext lc = (LoggerContext)LoggerFactory.getILoggerFactory();
             JoranConfigurator configurator = new JoranConfigurator();
             configurator.setContext(lc);
             lc.reset();
@@ -194,10 +191,10 @@ public class BrokerStartup {
             MixAll.printObjectProperties(log, messageStoreConfig);
 
             final BrokerController controller = new BrokerController(//
-                    brokerConfig, //
-                    nettyServerConfig, //
-                    nettyClientConfig, //
-                    messageStoreConfig);
+                brokerConfig, //
+                nettyServerConfig, //
+                nettyClientConfig, //
+                messageStoreConfig);
             // remember all configs to prevent discard
             controller.getConfiguration().registerConfig(properties);
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java
index a994503..24cddb9 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java
@@ -6,19 +6,18 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.broker.client;
 
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
 import io.netty.channel.Channel;
-
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
 
 public class ClientChannelInfo {
     private final Channel channel;
@@ -27,12 +26,10 @@ public class ClientChannelInfo {
     private final int version;
     private volatile long lastUpdateTimestamp = System.currentTimeMillis();
 
-
     public ClientChannelInfo(Channel channel) {
         this(channel, null, null, 0);
     }
 
-
     public ClientChannelInfo(Channel channel, String clientId, LanguageCode language, int version) {
         this.channel = channel;
         this.clientId = clientId;
@@ -40,37 +37,30 @@ public class ClientChannelInfo {
         this.version = version;
     }
 
-
     public Channel getChannel() {
         return channel;
     }
 
-
     public String getClientId() {
         return clientId;
     }
 
-
     public LanguageCode getLanguage() {
         return language;
     }
 
-
     public int getVersion() {
         return version;
     }
 
-
     public long getLastUpdateTimestamp() {
         return lastUpdateTimestamp;
     }
 
-
     public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
         this.lastUpdateTimestamp = lastUpdateTimestamp;
     }
 
-
     @Override
     public int hashCode() {
         final int prime = 31;
@@ -78,12 +68,11 @@ public class ClientChannelInfo {
         result = prime * result + ((channel == null) ? 0 : channel.hashCode());
         result = prime * result + ((clientId == null) ? 0 : clientId.hashCode());
         result = prime * result + ((language == null) ? 0 : language.hashCode());
-        result = prime * result + (int) (lastUpdateTimestamp ^ (lastUpdateTimestamp >>> 32));
+        result = prime * result + (int)(lastUpdateTimestamp ^ (lastUpdateTimestamp >>> 32));
         result = prime * result + version;
         return result;
     }
 
-
     @Override
     public boolean equals(Object obj) {
         if (this == obj)
@@ -92,7 +81,7 @@ public class ClientChannelInfo {
             return false;
         if (getClass() != obj.getClass())
             return false;
-        ClientChannelInfo other = (ClientChannelInfo) obj;
+        ClientChannelInfo other = (ClientChannelInfo)obj;
         if (channel == null) {
             if (other.channel != null)
                 return false;
@@ -103,10 +92,9 @@ public class ClientChannelInfo {
         return true;
     }
 
-
     @Override
     public String toString() {
         return "ClientChannelInfo [channel=" + channel + ", clientId=" + clientId + ", language=" + language
-                + ", version=" + version + ", lastUpdateTimestamp=" + lastUpdateTimestamp + "]";
+            + ", version=" + version + ", lastUpdateTimestamp=" + lastUpdateTimestamp + "]";
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
index 856ce72..5d7c0ea 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
@@ -6,42 +6,38 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.broker.client;
 
+import io.netty.channel.Channel;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.remoting.ChannelEventListener;
-import io.netty.channel.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-
 public class ClientHousekeepingService implements ChannelEventListener {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private final BrokerController brokerController;
 
     private ScheduledExecutorService scheduledExecutorService = Executors
-            .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ClientHousekeepingScheduledThread"));
-
+        .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ClientHousekeepingScheduledThread"));
 
     public ClientHousekeepingService(final BrokerController brokerController) {
         this.brokerController = brokerController;
     }
 
-
     public void start() {
 
         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@@ -71,7 +67,6 @@ public class ClientHousekeepingService implements ChannelEventListener {
 
     }
 
-
     @Override
     public void onChannelClose(String remoteAddr, Channel channel) {
         this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
@@ -79,7 +74,6 @@ public class ClientHousekeepingService implements ChannelEventListener {
         this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
     }
 
-
     @Override
     public void onChannelException(String remoteAddr, Channel channel) {
         this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
@@ -87,7 +81,6 @@ public class ClientHousekeepingService implements ChannelEventListener {
         this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
     }
 
-
     @Override
     public void onChannelIdle(String remoteAddr, Channel channel) {
         this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
index d5b056e..2656467 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
@@ -16,45 +16,41 @@
  */
 package org.apache.rocketmq.broker.client;
 
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
-import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import io.netty.channel.Channel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ConsumerGroupInfo {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private final String groupName;
     private final ConcurrentHashMap<String/* Topic */, SubscriptionData> subscriptionTable =
-            new ConcurrentHashMap<String, SubscriptionData>();
+        new ConcurrentHashMap<String, SubscriptionData>();
     private final ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable =
-            new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
+        new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
     private volatile ConsumeType consumeType;
     private volatile MessageModel messageModel;
     private volatile ConsumeFromWhere consumeFromWhere;
     private volatile long lastUpdateTimestamp = System.currentTimeMillis();
 
-
     public ConsumerGroupInfo(String groupName, ConsumeType consumeType, MessageModel messageModel,
-                             ConsumeFromWhere consumeFromWhere) {
+        ConsumeFromWhere consumeFromWhere) {
         this.groupName = groupName;
         this.consumeType = consumeType;
         this.messageModel = messageModel;
         this.consumeFromWhere = consumeFromWhere;
     }
 
-
     public ClientChannelInfo findChannel(final String clientId) {
         Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator();
         while (it.hasNext()) {
@@ -67,17 +63,14 @@ public class ConsumerGroupInfo {
         return null;
     }
 
-
     public ConcurrentHashMap<String, SubscriptionData> getSubscriptionTable() {
         return subscriptionTable;
     }
 
-
     public ConcurrentHashMap<Channel, ClientChannelInfo> getChannelInfoTable() {
         return channelInfoTable;
     }
 
-
     public List<Channel> getAllChannel() {
         List<Channel> result = new ArrayList<Channel>();
 
@@ -86,7 +79,6 @@ public class ConsumerGroupInfo {
         return result;
     }
 
-
     public List<String> getAllClientId() {
         List<String> result = new ArrayList<String>();
 
@@ -101,7 +93,6 @@ public class ConsumerGroupInfo {
         return result;
     }
 
-
     public void unregisterChannel(final ClientChannelInfo clientChannelInfo) {
         ClientChannelInfo old = this.channelInfoTable.remove(clientChannelInfo.getChannel());
         if (old != null) {
@@ -109,13 +100,12 @@ public class ConsumerGroupInfo {
         }
     }
 
-
     public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) {
         final ClientChannelInfo info = this.channelInfoTable.remove(channel);
         if (info != null) {
             log.warn(
-                    "NETTY EVENT: remove not active channel[{}] from ConsumerGroupInfo groupChannelTable, consumer group: {}",
-                    info.toString(), groupName);
+                "NETTY EVENT: remove not active channel[{}] from ConsumerGroupInfo groupChannelTable, consumer group: {}",
+                info.toString(), groupName);
             return true;
         }
 
@@ -123,7 +113,7 @@ public class ConsumerGroupInfo {
     }
 
     public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,
-                                 MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {
+        MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {
         boolean updated = false;
         this.consumeType = consumeType;
         this.messageModel = messageModel;
@@ -134,7 +124,7 @@ public class ConsumerGroupInfo {
             ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);
             if (null == prev) {
                 log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType,
-                        messageModel, infoNew.toString());
+                    messageModel, infoNew.toString());
                 updated = true;
             }
 
@@ -142,9 +132,9 @@ public class ConsumerGroupInfo {
         } else {
             if (!infoOld.getClientId().equals(infoNew.getClientId())) {
                 log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ",
-                        this.groupName,
-                        infoOld.toString(),
-                        infoNew.toString());
+                    this.groupName,
+                    infoOld.toString(),
+                    infoNew.toString());
                 this.channelInfoTable.put(infoNew.getChannel(), infoNew);
             }
         }
@@ -155,7 +145,6 @@ public class ConsumerGroupInfo {
         return updated;
     }
 
-
     public boolean updateSubscription(final Set<SubscriptionData> subList) {
         boolean updated = false;
 
@@ -166,15 +155,15 @@ public class ConsumerGroupInfo {
                 if (null == prev) {
                     updated = true;
                     log.info("subscription changed, add new topic, group: {} {}",
-                            this.groupName,
-                            sub.toString());
+                        this.groupName,
+                        sub.toString());
                 }
             } else if (sub.getSubVersion() > old.getSubVersion()) {
                 if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
                     log.info("subscription changed, group: {} OLD: {} NEW: {}",
-                            this.groupName,
-                            old.toString(),
-                            sub.toString()
+                        this.groupName,
+                        old.toString(),
+                        sub.toString()
                     );
                 }
 
@@ -182,7 +171,6 @@ public class ConsumerGroupInfo {
             }
         }
 
-
         Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
         while (it.hasNext()) {
             Entry<String, SubscriptionData> next = it.next();
@@ -198,9 +186,9 @@ public class ConsumerGroupInfo {
 
             if (!exist) {
                 log.warn("subscription changed, group: {} remove topic {} {}",
-                        this.groupName,
-                        oldTopic,
-                        next.getValue().toString()
+                    this.groupName,
+                    oldTopic,
+                    next.getValue().toString()
                 );
 
                 it.remove();
@@ -213,57 +201,46 @@ public class ConsumerGroupInfo {
         return updated;
     }
 
-
     public Set<String> getSubscribeTopics() {
         return subscriptionTable.keySet();
     }
 
-
     public SubscriptionData findSubscriptionData(final String topic) {
         return this.subscriptionTable.get(topic);
     }
 
-
     public ConsumeType getConsumeType() {
         return consumeType;
     }
 
-
     public void setConsumeType(ConsumeType consumeType) {
         this.consumeType = consumeType;
     }
 
-
     public MessageModel getMessageModel() {
         return messageModel;
     }
 
-
     public void setMessageModel(MessageModel messageModel) {
         this.messageModel = messageModel;
     }
 
-
     public String getGroupName() {
         return groupName;
     }
 
-
     public long getLastUpdateTimestamp() {
         return lastUpdateTimestamp;
     }
 
-
     public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
         this.lastUpdateTimestamp = lastUpdateTimestamp;
     }
 
-
     public ConsumeFromWhere getConsumeFromWhere() {
         return consumeFromWhere;
     }
 
-
     public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
         this.consumeFromWhere = consumeFromWhere;
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java
index 368582a..fbec010 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java
@@ -6,21 +6,19 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.broker.client;
 
 import io.netty.channel.Channel;
-
 import java.util.List;
 
-
 public interface ConsumerIdsChangeListener {
     void consumerIdsChanged(final String group, final List<Channel> channels);
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
index 95ed478..fd4fb88 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
@@ -16,6 +16,12 @@
  */
 package org.apache.rocketmq.broker.client;
 
+import io.netty.channel.Channel;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
@@ -23,22 +29,14 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
-import io.netty.channel.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-
 public class ConsumerManager {
     private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
     private final ConcurrentHashMap<String/* Group */, ConsumerGroupInfo> consumerTable =
-            new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
+        new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
     private final ConsumerIdsChangeListener consumerIdsChangeListener;
 
     public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener) {
@@ -86,7 +84,7 @@ public class ConsumerManager {
                     ConsumerGroupInfo remove = this.consumerTable.remove(next.getKey());
                     if (remove != null) {
                         log.info("unregister consumer ok, no any connection, and remove consumer group, {}",
-                                next.getKey());
+                            next.getKey());
                     }
                 }
 
@@ -96,8 +94,8 @@ public class ConsumerManager {
     }
 
     public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
-                                    ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
-                                    final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
+        ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
+        final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
 
         ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
         if (null == consumerGroupInfo) {
@@ -107,8 +105,8 @@ public class ConsumerManager {
         }
 
         boolean r1 =
-                consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
-                        consumeFromWhere);
+            consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
+                consumeFromWhere);
         boolean r2 = consumerGroupInfo.updateSubscription(subList);
 
         if (r1 || r2) {
@@ -143,7 +141,7 @@ public class ConsumerManager {
             String group = next.getKey();
             ConsumerGroupInfo consumerGroupInfo = next.getValue();
             ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable =
-                    consumerGroupInfo.getChannelInfoTable();
+                consumerGroupInfo.getChannelInfoTable();
 
             Iterator<Entry<Channel, ClientChannelInfo>> itChannel = channelInfoTable.entrySet().iterator();
             while (itChannel.hasNext()) {
@@ -152,8 +150,8 @@ public class ConsumerManager {
                 long diff = System.currentTimeMillis() - clientChannelInfo.getLastUpdateTimestamp();
                 if (diff > CHANNEL_EXPIRED_TIMEOUT) {
                     log.warn(
-                            "SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}",
-                            RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()), group);
+                        "SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}",
+                        RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()), group);
                     RemotingUtil.closeChannel(clientChannelInfo.getChannel());
                     itChannel.remove();
                 }
@@ -161,8 +159,8 @@ public class ConsumerManager {
 
             if (channelInfoTable.isEmpty()) {
                 log.warn(
-                        "SCAN: remove expired channel from ConsumerManager consumerTable, all clear, consumerGroup={}",
-                        group);
+                    "SCAN: remove expired channel from ConsumerManager consumerTable, all clear, consumerGroup={}",
+                    group);
                 it.remove();
             }
         }
@@ -174,7 +172,7 @@ public class ConsumerManager {
         while (it.hasNext()) {
             Entry<String, ConsumerGroupInfo> entry = it.next();
             ConcurrentHashMap<String, SubscriptionData> subscriptionTable =
-                    entry.getValue().getSubscriptionTable();
+                entry.getValue().getSubscriptionTable();
             if (subscriptionTable.containsKey(topic)) {
                 groups.add(entry.getKey());
             }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
index b60fcb3..93f73b8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
@@ -6,31 +6,27 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.rocketmq.broker.client;
 
-import org.apache.rocketmq.broker.BrokerController;
 import io.netty.channel.Channel;
-
 import java.util.List;
-
+import org.apache.rocketmq.broker.BrokerController;
 
 public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
     private final BrokerController brokerController;
 
-
     public DefaultConsumerIdsChangeListener(BrokerController brokerController) {
         this.brokerController = brokerController;
     }
 
-
     @Override
     public void consumerIdsChanged(String group, List<Channel> channels) {
         if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {