You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vi...@apache.org on 2016/12/28 10:02:50 UTC
[43/50] [abbrv] incubator-rocketmq git commit: ROCKETMQ-18 Reformat
all codes.
ROCKETMQ-18 Reformat all codes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/388ba7a5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/388ba7a5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/388ba7a5
Branch: refs/heads/spec
Commit: 388ba7a58465245389a3592904b6fc7ef777dc7a
Parents: 95cfb8d
Author: yukon <yu...@apache.org>
Authored: Wed Dec 28 15:42:48 2016 +0800
Committer: yukon <yu...@apache.org>
Committed: Wed Dec 28 15:42:48 2016 +0800
----------------------------------------------------------------------
broker/pom.xml | 2 +-
.../rocketmq/broker/BrokerController.java | 155 +++--
.../rocketmq/broker/BrokerPathConfigHelper.java | 20 +-
.../apache/rocketmq/broker/BrokerStartup.java | 41 +-
.../broker/client/ClientChannelInfo.java | 32 +-
.../client/ClientHousekeepingService.java | 29 +-
.../broker/client/ConsumerGroupInfo.java | 73 +-
.../client/ConsumerIdsChangeListener.java | 14 +-
.../rocketmq/broker/client/ConsumerManager.java | 38 +-
.../DefaultConsumerIdsChangeListener.java | 18 +-
.../rocketmq/broker/client/ProducerManager.java | 54 +-
.../broker/client/net/Broker2Client.java | 96 ++-
.../client/rebalance/RebalanceLockManager.java | 107 ++-
.../broker/filtersrv/FilterServerManager.java | 36 +-
.../broker/filtersrv/FilterServerUtil.java | 13 +-
.../broker/latency/BrokerFastFailure.java | 44 +-
.../latency/BrokerFixedThreadPoolExecutor.java | 19 +-
.../broker/longpolling/ManyPullRequest.java | 18 +-
.../NotifyMessageArrivingListener.java | 15 +-
.../broker/longpolling/PullRequest.java | 23 +-
.../longpolling/PullRequestHoldService.java | 16 +-
.../broker/mqtrace/ConsumeMessageContext.java | 36 +-
.../broker/mqtrace/ConsumeMessageHook.java | 14 +-
.../broker/mqtrace/SendMessageContext.java | 43 +-
.../broker/mqtrace/SendMessageHook.java | 14 +-
.../broker/offset/ConsumerOffsetManager.java | 39 +-
.../rocketmq/broker/out/BrokerOuterAPI.java | 90 +--
.../broker/pagecache/ManyMessageTransfer.java | 18 +-
.../broker/pagecache/OneMessageTransfer.java | 18 +-
.../broker/pagecache/QueryMessageTransfer.java | 18 +-
.../plugin/AbstractPluginMessageStore.java | 28 +-
.../broker/plugin/MessageStoreFactory.java | 21 +-
.../plugin/MessageStorePluginContext.java | 17 +-
.../processor/AbstractSendMessageProcessor.java | 98 ++-
.../broker/processor/AdminBrokerProcessor.java | 237 ++++---
.../broker/processor/ClientManageProcessor.java | 61 +-
.../processor/ConsumerManageProcessor.java | 68 +-
.../processor/EndTransactionProcessor.java | 50 +-
.../processor/ForwardRequestProcessor.java | 17 +-
.../broker/processor/PullMessageProcessor.java | 100 ++-
.../broker/processor/QueryMessageProcessor.java | 61 +-
.../broker/processor/SendMessageProcessor.java | 80 +--
.../rocketmq/broker/slave/SlaveSynchronize.java | 54 +-
.../subscription/SubscriptionGroupManager.java | 29 +-
.../broker/topic/TopicConfigManager.java | 64 +-
.../broker/transaction/TransactionRecord.java | 16 +-
.../broker/transaction/TransactionStore.java | 20 +-
.../transaction/jdbc/JDBCTransactionStore.java | 34 +-
.../jdbc/JDBCTransactionStoreConfig.java | 20 +-
.../rocketmq/broker/BrokerControllerTest.java | 15 +-
.../rocketmq/broker/BrokerTestHarness.java | 22 +-
.../rocketmq/broker/api/SendMessageTest.java | 23 +-
.../offset/ConsumerOffsetManagerTest.java | 15 +-
.../broker/topic/TopicConfigManagerTest.java | 19 +-
checkstyle/checkstyle.xml | 8 +-
client/pom.xml | 2 +-
.../apache/rocketmq/client/ClientConfig.java | 35 +-
.../org/apache/rocketmq/client/MQAdmin.java | 33 +-
.../org/apache/rocketmq/client/MQHelper.java | 42 +-
.../org/apache/rocketmq/client/QueryResult.java | 22 +-
.../org/apache/rocketmq/client/Validators.java | 32 +-
.../rocketmq/client/admin/MQAdminExtInner.java | 12 +-
.../client/common/ThreadLocalIndex.java | 20 +-
.../consumer/AllocateMessageQueueStrategy.java | 27 +-
.../client/consumer/DefaultMQPullConsumer.java | 75 +--
.../client/consumer/DefaultMQPushConsumer.java | 81 +--
.../rocketmq/client/consumer/MQConsumer.java | 22 +-
.../client/consumer/MQPullConsumer.java | 52 +-
.../consumer/MQPullConsumerScheduleService.java | 36 +-
.../client/consumer/MQPushConsumer.java | 23 +-
.../client/consumer/MessageQueueListener.java | 18 +-
.../rocketmq/client/consumer/PullCallback.java | 12 +-
.../rocketmq/client/consumer/PullResult.java | 30 +-
.../rocketmq/client/consumer/PullStatus.java | 12 +-
.../client/consumer/PullTaskCallback.java | 13 +-
.../client/consumer/PullTaskContext.java | 16 +-
.../listener/ConsumeConcurrentlyContext.java | 7 -
.../listener/ConsumeConcurrentlyStatus.java | 12 +-
.../listener/ConsumeOrderlyContext.java | 19 +-
.../consumer/listener/ConsumeOrderlyStatus.java | 12 +-
.../consumer/listener/ConsumeReturnType.java | 12 +-
.../consumer/listener/MessageListener.java | 12 +-
.../listener/MessageListenerConcurrently.java | 18 +-
.../listener/MessageListenerOrderly.java | 18 +-
.../AllocateMessageQueueAveragely.java | 19 +-
.../AllocateMessageQueueAveragelyByCircle.java | 15 +-
.../rebalance/AllocateMessageQueueByConfig.java | 19 +-
.../AllocateMessageQueueByMachineRoom.java | 21 +-
.../consumer/store/LocalFileOffsetStore.java | 51 +-
.../consumer/store/OffsetSerializeWrapper.java | 20 +-
.../client/consumer/store/OffsetStore.java | 23 +-
.../client/consumer/store/ReadOffsetType.java | 12 +-
.../consumer/store/RemoteBrokerOffsetStore.java | 53 +-
.../client/exception/MQBrokerException.java | 18 +-
.../client/exception/MQClientException.java | 17 +-
.../client/hook/CheckForbiddenContext.java | 40 +-
.../client/hook/CheckForbiddenHook.java | 14 +-
.../client/hook/ConsumeMessageContext.java | 32 +-
.../client/hook/ConsumeMessageHook.java | 12 +-
.../client/hook/FilterMessageContext.java | 29 +-
.../rocketmq/client/hook/FilterMessageHook.java | 13 +-
.../client/hook/SendMessageContext.java | 35 +-
.../rocketmq/client/hook/SendMessageHook.java | 12 +-
.../client/impl/ClientRemotingProcessor.java | 56 +-
.../rocketmq/client/impl/CommunicationMode.java | 12 +-
.../rocketmq/client/impl/FindBrokerResult.java | 15 +-
.../rocketmq/client/impl/MQAdminImpl.java | 114 ++--
.../rocketmq/client/impl/MQClientAPIImpl.java | 667 ++++++++++---------
.../rocketmq/client/impl/MQClientManager.java | 26 +-
.../ConsumeMessageConcurrentlyService.java | 88 +--
.../consumer/ConsumeMessageOrderlyService.java | 135 ++--
.../impl/consumer/ConsumeMessageService.java | 31 +-
.../consumer/DefaultMQPullConsumerImpl.java | 193 +++---
.../consumer/DefaultMQPushConsumerImpl.java | 261 ++++----
.../client/impl/consumer/MQConsumerInner.java | 26 +-
.../client/impl/consumer/MessageQueueLock.java | 19 +-
.../client/impl/consumer/ProcessQueue.java | 57 +-
.../client/impl/consumer/PullAPIWrapper.java | 84 +--
.../impl/consumer/PullMessageService.java | 37 +-
.../client/impl/consumer/PullRequest.java | 23 +-
.../client/impl/consumer/PullResultExt.java | 22 +-
.../client/impl/consumer/RebalanceImpl.java | 75 +--
.../client/impl/consumer/RebalancePullImpl.java | 22 +-
.../client/impl/consumer/RebalancePushImpl.java | 32 +-
.../client/impl/consumer/RebalanceService.java | 18 +-
.../client/impl/factory/MQClientInstance.java | 234 ++++---
.../impl/producer/DefaultMQProducerImpl.java | 216 +++---
.../client/impl/producer/MQProducerInner.java | 27 +-
.../client/impl/producer/TopicPublishInfo.java | 17 +-
.../latency/LatencyFaultToleranceImpl.java | 52 +-
.../client/latency/MQFaultStrategy.java | 7 +-
.../rocketmq/client/log/ClientLogger.java | 25 +-
.../client/producer/DefaultMQProducer.java | 96 +--
.../producer/LocalTransactionExecuter.java | 13 +-
.../client/producer/LocalTransactionState.java | 12 +-
.../rocketmq/client/producer/MQProducer.java | 71 +-
.../client/producer/MessageQueueSelector.java | 16 +-
.../rocketmq/client/producer/SendCallback.java | 13 +-
.../rocketmq/client/producer/SendResult.java | 31 +-
.../rocketmq/client/producer/SendStatus.java | 12 +-
.../producer/TransactionCheckListener.java | 13 +-
.../client/producer/TransactionMQProducer.java | 27 +-
.../client/producer/TransactionSendResult.java | 15 +-
.../selector/SelectMessageQueueByHash.java | 16 +-
.../SelectMessageQueueByMachineRoom.java | 21 +-
.../selector/SelectMessageQueueByRandoom.java | 19 +-
.../client/stat/ConsumerStatsManager.java | 40 +-
.../main/resources/logback_rocketmq_client.xml | 2 +-
.../apache/rocketmq/client/ValidatorsTest.java | 13 +-
common/pom.xml | 2 +-
.../apache/rocketmq/common/BrokerConfig.java | 83 +--
.../rocketmq/common/BrokerConfigSingleton.java | 12 +-
.../apache/rocketmq/common/ConfigManager.java | 17 +-
.../apache/rocketmq/common/Configuration.java | 27 +-
.../apache/rocketmq/common/CountDownLatch2.java | 111 ++-
.../org/apache/rocketmq/common/DataVersion.java | 36 +-
.../org/apache/rocketmq/common/MQVersion.java | 2 -
.../java/org/apache/rocketmq/common/MixAll.java | 54 +-
.../java/org/apache/rocketmq/common/Pair.java | 17 +-
.../apache/rocketmq/common/ServiceState.java | 12 +-
.../apache/rocketmq/common/ServiceThread.java | 28 +-
.../org/apache/rocketmq/common/SystemClock.java | 12 +-
.../rocketmq/common/ThreadFactoryImpl.java | 15 +-
.../org/apache/rocketmq/common/TopicConfig.java | 68 +-
.../apache/rocketmq/common/TopicFilterType.java | 12 +-
.../org/apache/rocketmq/common/UtilAll.java | 114 ++--
.../rocketmq/common/admin/ConsumeStats.java | 21 +-
.../rocketmq/common/admin/OffsetWrapper.java | 18 +-
.../rocketmq/common/admin/RollbackStats.java | 24 +-
.../rocketmq/common/admin/TopicOffset.java | 18 +-
.../rocketmq/common/admin/TopicStatsTable.java | 18 +-
.../common/constant/DBMsgConstants.java | 12 +-
.../rocketmq/common/constant/LoggerName.java | 12 +-
.../rocketmq/common/constant/PermName.java | 12 +-
.../common/consumer/ConsumeFromWhere.java | 12 +-
.../rocketmq/common/filter/FilterAPI.java | 18 +-
.../rocketmq/common/filter/FilterContext.java | 14 +-
.../rocketmq/common/filter/MessageFilter.java | 1 -
.../apache/rocketmq/common/filter/impl/Op.java | 15 +-
.../rocketmq/common/filter/impl/Operand.java | 12 +-
.../rocketmq/common/filter/impl/Operator.java | 14 +-
.../rocketmq/common/filter/impl/PolishExpr.java | 41 +-
.../rocketmq/common/filter/impl/Type.java | 12 +-
.../org/apache/rocketmq/common/help/FAQUrl.java | 49 +-
.../rocketmq/common/hook/FilterCheckHook.java | 14 +-
.../apache/rocketmq/common/message/Message.java | 48 +-
.../common/message/MessageAccessor.java | 24 +-
.../common/message/MessageClientExt.java | 34 +-
.../common/message/MessageClientIDSetter.java | 16 +-
.../rocketmq/common/message/MessageConst.java | 2 -
.../rocketmq/common/message/MessageDecoder.java | 71 +-
.../rocketmq/common/message/MessageExt.java | 58 +-
.../rocketmq/common/message/MessageId.java | 18 +-
.../rocketmq/common/message/MessageQueue.java | 27 +-
.../common/message/MessageQueueForC.java | 30 +-
.../rocketmq/common/message/MessageType.java | 12 +-
.../rocketmq/common/namesrv/NamesrvConfig.java | 25 +-
.../rocketmq/common/namesrv/NamesrvUtil.java | 12 +-
.../common/namesrv/RegisterBrokerResult.java | 19 +-
.../rocketmq/common/namesrv/TopAddressing.java | 53 +-
.../common/protocol/MQProtosHelper.java | 17 +-
.../rocketmq/common/protocol/RequestCode.java | 16 +-
.../rocketmq/common/protocol/ResponseCode.java | 17 +-
.../common/protocol/body/BrokerStatsData.java | 19 +-
.../common/protocol/body/BrokerStatsItem.java | 18 +-
.../rocketmq/common/protocol/body/CMResult.java | 12 +-
.../common/protocol/body/ClusterInfo.java | 28 +-
.../common/protocol/body/Connection.java | 21 +-
.../common/protocol/body/ConsumeByWho.java | 26 +-
.../body/ConsumeMessageDirectlyResult.java | 28 +-
.../common/protocol/body/ConsumeStatsList.java | 18 +-
.../common/protocol/body/ConsumeStatus.java | 24 +-
.../protocol/body/ConsumerConnection.java | 31 +-
.../body/ConsumerOffsetSerializeWrapper.java | 20 +-
.../protocol/body/ConsumerRunningInfo.java | 134 ++--
.../protocol/body/GetConsumerStatusBody.java | 24 +-
.../common/protocol/body/GroupList.java | 18 +-
.../rocketmq/common/protocol/body/KVTable.java | 18 +-
.../protocol/body/LockBatchRequestBody.java | 24 +-
.../protocol/body/LockBatchResponseBody.java | 20 +-
.../common/protocol/body/ProcessQueueInfo.java | 54 +-
.../protocol/body/ProducerConnection.java | 18 +-
.../protocol/body/QueryConsumeTimeSpanBody.java | 18 +-
.../body/QueryCorrectionOffsetBody.java | 18 +-
.../common/protocol/body/QueueTimeSpan.java | 29 +-
.../protocol/body/RegisterBrokerBody.java | 20 +-
.../common/protocol/body/ResetOffsetBody.java | 18 +-
.../protocol/body/ResetOffsetBodyForC.java | 17 +-
.../protocol/body/SubscriptionGroupWrapper.java | 24 +-
.../body/TopicConfigSerializeWrapper.java | 22 +-
.../common/protocol/body/TopicList.java | 20 +-
.../protocol/body/UnlockBatchRequestBody.java | 24 +-
.../CheckTransactionStateRequestHeader.java | 20 +-
.../CheckTransactionStateResponseHeader.java | 24 +-
.../header/CloneGroupOffsetRequestHeader.java | 24 +-
...nsumeMessageDirectlyResultRequestHeader.java | 22 +-
.../ConsumerSendMsgBackRequestHeader.java | 31 +-
.../header/CreateTopicRequestHeader.java | 33 +-
.../DeleteSubscriptionGroupRequestHeader.java | 16 +-
.../header/DeleteTopicRequestHeader.java | 18 +-
.../header/EndTransactionRequestHeader.java | 33 +-
.../header/EndTransactionResponseHeader.java | 15 +-
.../header/GetAllTopicConfigResponseHeader.java | 15 +-
.../header/GetBrokerConfigResponseHeader.java | 18 +-
.../header/GetConsumeStatsInBrokerHeader.java | 13 +-
.../header/GetConsumeStatsRequestHeader.java | 18 +-
.../GetConsumerConnectionListRequestHeader.java | 16 +-
.../GetConsumerListByGroupRequestHeader.java | 16 +-
.../GetConsumerListByGroupResponseBody.java | 18 +-
.../GetConsumerListByGroupResponseHeader.java | 13 +-
.../GetConsumerRunningInfoRequestHeader.java | 20 +-
.../header/GetConsumerStatusRequestHeader.java | 20 +-
.../GetEarliestMsgStoretimeRequestHeader.java | 20 +-
.../GetEarliestMsgStoretimeResponseHeader.java | 18 +-
.../header/GetMaxOffsetRequestHeader.java | 20 +-
.../header/GetMaxOffsetResponseHeader.java | 18 +-
.../header/GetMinOffsetRequestHeader.java | 20 +-
.../header/GetMinOffsetResponseHeader.java | 18 +-
.../GetProducerConnectionListRequestHeader.java | 16 +-
.../header/GetTopicStatsInfoRequestHeader.java | 16 +-
.../header/GetTopicsByClusterRequestHeader.java | 16 +-
.../NotifyConsumerIdsChangedRequestHeader.java | 16 +-
.../header/PullMessageRequestHeader.java | 36 +-
.../header/PullMessageResponseHeader.java | 24 +-
.../QueryConsumeTimeSpanRequestHeader.java | 18 +-
.../QueryConsumerOffsetRequestHeader.java | 22 +-
.../QueryConsumerOffsetResponseHeader.java | 18 +-
.../header/QueryCorrectionOffsetHeader.java | 22 +-
.../header/QueryMessageRequestHeader.java | 26 +-
.../header/QueryMessageResponseHeader.java | 20 +-
.../QueryTopicConsumeByWhoRequestHeader.java | 18 +-
.../header/ResetOffsetRequestHeader.java | 22 +-
.../header/SearchOffsetRequestHeader.java | 22 +-
.../header/SearchOffsetResponseHeader.java | 18 +-
.../header/SendMessageRequestHeader.java | 40 +-
.../header/SendMessageRequestHeaderV2.java | 24 -
.../header/SendMessageResponseHeader.java | 22 +-
.../header/UnregisterClientRequestHeader.java | 20 +-
.../header/UnregisterClientResponseHeader.java | 13 +-
.../UpdateConsumerOffsetRequestHeader.java | 24 +-
.../UpdateConsumerOffsetResponseHeader.java | 15 +-
.../ViewBrokerStatsDataRequestHeader.java | 18 +-
.../header/ViewMessageRequestHeader.java | 18 +-
.../header/ViewMessageResponseHeader.java | 15 +-
.../RegisterFilterServerRequestHeader.java | 16 +-
.../RegisterFilterServerResponseHeader.java | 18 +-
...RegisterMessageFilterClassRequestHeader.java | 22 +-
.../namesrv/DeleteKVConfigRequestHeader.java | 18 +-
.../DeleteTopicInNamesrvRequestHeader.java | 16 +-
.../namesrv/GetKVConfigRequestHeader.java | 18 +-
.../namesrv/GetKVConfigResponseHeader.java | 16 +-
.../GetKVListByNamespaceRequestHeader.java | 16 +-
.../namesrv/GetRouteInfoRequestHeader.java | 18 +-
.../namesrv/GetRouteInfoResponseHeader.java | 15 +-
.../namesrv/PutKVConfigRequestHeader.java | 20 +-
.../namesrv/RegisterBrokerRequestHeader.java | 26 +-
.../namesrv/RegisterBrokerResponseHeader.java | 18 +-
.../RegisterOrderTopicRequestHeader.java | 20 +-
.../namesrv/UnRegisterBrokerRequestHeader.java | 24 +-
.../WipeWritePermOfBrokerRequestHeader.java | 16 +-
.../WipeWritePermOfBrokerResponseHeader.java | 16 +-
.../common/protocol/heartbeat/ConsumeType.java | 15 +-
.../common/protocol/heartbeat/ConsumerData.java | 35 +-
.../protocol/heartbeat/HeartbeatData.java | 27 +-
.../common/protocol/heartbeat/MessageModel.java | 15 +-
.../common/protocol/heartbeat/ProducerData.java | 17 +-
.../protocol/heartbeat/SubscriptionData.java | 39 +-
.../common/protocol/route/BrokerData.java | 20 +-
.../common/protocol/route/QueueData.java | 20 +-
.../common/protocol/route/TopicRouteData.java | 27 +-
.../common/protocol/topic/OffsetMovedEvent.java | 24 +-
.../common/queue/ConcurrentTreeMap.java | 27 +-
.../rocketmq/common/queue/RoundQueue.java | 15 +-
.../rocketmq/common/running/RunningStats.java | 12 +-
.../rocketmq/common/stats/MomentStatsItem.java | 19 +-
.../common/stats/MomentStatsItemSet.java | 11 +-
.../apache/rocketmq/common/stats/StatsItem.java | 68 +-
.../rocketmq/common/stats/StatsItemSet.java | 14 +-
.../rocketmq/common/stats/StatsSnapshot.java | 18 +-
.../subscription/SubscriptionGroupConfig.java | 35 +-
.../rocketmq/common/sysflag/MessageSysFlag.java | 3 -
.../rocketmq/common/sysflag/PullSysFlag.java | 20 +-
.../common/sysflag/SubscriptionSysFlag.java | 5 -
.../rocketmq/common/sysflag/TopicSysFlag.java | 8 -
.../rocketmq/common/utils/ChannelUtil.java | 15 +-
.../rocketmq/common/utils/HttpTinyClient.java | 36 +-
.../rocketmq/common/utils/IOTinyUtils.java | 42 +-
.../org/apache/rocketmq/common/MixAllTest.java | 18 +-
.../rocketmq/common/RemotingUtilTest.java | 13 +-
.../org/apache/rocketmq/common/UtilAllTest.java | 31 +-
.../rocketmq/common/filter/FilterAPITest.java | 17 +-
.../common/protocol/ConsumeStatusTest.java | 13 +-
conf/2m-2s-async/broker-a-s.properties | 1 -
conf/2m-2s-async/broker-a.properties | 1 -
conf/2m-2s-async/broker-b-s.properties | 1 -
conf/2m-2s-async/broker-b.properties | 1 -
conf/2m-2s-sync/broker-a-s.properties | 1 -
conf/2m-2s-sync/broker-a.properties | 1 -
conf/2m-2s-sync/broker-b-s.properties | 1 -
conf/2m-2s-sync/broker-b.properties | 1 -
conf/2m-noslave/broker-a.properties | 1 -
conf/2m-noslave/broker-b.properties | 1 -
conf/broker.conf | 14 +-
conf/logback_broker.xml | 22 +-
conf/logback_filtersrv.xml | 4 +-
conf/logback_namesrv.xml | 4 +-
conf/logback_tools.xml | 4 +-
example/pom.xml | 2 +-
.../rocketmq/example/benchmark/Consumer.java | 53 +-
.../rocketmq/example/benchmark/Producer.java | 52 +-
.../example/benchmark/TransactionProducer.java | 67 +-
.../example/broadcast/PushConsumer.java | 5 +-
.../rocketmq/example/filter/Consumer.java | 8 +-
.../rocketmq/example/filter/Producer.java | 6 +-
.../rocketmq/example/operation/Consumer.java | 21 +-
.../rocketmq/example/operation/Producer.java | 27 +-
.../rocketmq/example/ordermessage/Consumer.java | 6 +-
.../rocketmq/example/ordermessage/Producer.java | 13 +-
.../rocketmq/example/quickstart/Consumer.java | 5 +-
.../rocketmq/example/quickstart/Producer.java | 4 +-
.../rocketmq/example/simple/AsyncProducer.java | 10 +-
.../rocketmq/example/simple/CachedQueue.java | 17 +-
.../rocketmq/example/simple/Producer.java | 7 +-
.../rocketmq/example/simple/PullConsumer.java | 10 +-
.../example/simple/PullScheduleService.java | 2 -
.../rocketmq/example/simple/PushConsumer.java | 4 +-
.../example/simple/RandomAsyncCommit.java | 23 +-
.../rocketmq/example/simple/TestProducer.java | 8 +-
.../TransactionCheckListenerImpl.java | 5 +-
.../transaction/TransactionExecuterImpl.java | 16 +-
.../transaction/TransactionProducer.java | 9 +-
.../src/main/resources/MessageFilterImpl.java | 15 +-
filtersrv/pom.xml | 2 +-
.../filtersrv/FilterServerOuterAPI.java | 17 +-
.../rocketmq/filtersrv/FiltersrvConfig.java | 41 +-
.../rocketmq/filtersrv/FiltersrvController.java | 61 +-
.../rocketmq/filtersrv/FiltersrvStartup.java | 34 +-
.../rocketmq/filtersrv/filter/DynaCode.java | 177 +++--
.../filter/FilterClassFetchMethod.java | 12 +-
.../filtersrv/filter/FilterClassInfo.java | 19 +-
.../filtersrv/filter/FilterClassLoader.java | 12 +-
.../filtersrv/filter/FilterClassManager.java | 67 +-
.../filter/HttpFilterClassFetchMethod.java | 17 +-
.../processor/DefaultRequestProcessor.java | 91 ++-
.../stats/FilterServerStatsManager.java | 29 +-
namesrv/pom.xml | 2 +-
.../rocketmq/namesrv/NamesrvController.java | 45 +-
.../apache/rocketmq/namesrv/NamesrvStartup.java | 36 +-
.../namesrv/kvconfig/KVConfigManager.java | 45 +-
.../kvconfig/KVConfigSerializeWrapper.java | 18 +-
.../processor/ClusterTestRequestProcessor.java | 25 +-
.../processor/DefaultRequestProcessor.java | 154 ++---
.../routeinfo/BrokerHousekeepingService.java | 20 +-
.../namesrv/routeinfo/RouteInfoManager.java | 142 ++--
remoting/pom.xml | 2 +-
.../rocketmq/remoting/ChannelEventListener.java | 16 +-
.../rocketmq/remoting/CommandCustomHeader.java | 13 +-
.../rocketmq/remoting/InvokeCallback.java | 13 +-
.../org/apache/rocketmq/remoting/RPCHook.java | 4 +-
.../rocketmq/remoting/RemotingClient.java | 38 +-
.../rocketmq/remoting/RemotingServer.java | 26 +-
.../rocketmq/remoting/RemotingService.java | 2 -
.../remoting/annotation/CFNullable.java | 12 +-
.../apache/rocketmq/remoting/common/Pair.java | 17 +-
.../remoting/common/RemotingHelper.java | 41 +-
.../rocketmq/remoting/common/RemotingUtil.java | 29 +-
.../common/SemaphoreReleaseOnlyOnce.java | 16 +-
.../rocketmq/remoting/common/ServiceThread.java | 19 +-
.../exception/RemotingCommandException.java | 14 +-
.../exception/RemotingConnectException.java | 14 +-
.../remoting/exception/RemotingException.java | 14 +-
.../exception/RemotingSendRequestException.java | 14 +-
.../exception/RemotingTimeoutException.java | 15 +-
.../RemotingTooMuchRequestException.java | 13 +-
.../remoting/netty/NettyClientConfig.java | 31 +-
.../rocketmq/remoting/netty/NettyDecoder.java | 28 +-
.../rocketmq/remoting/netty/NettyEncoder.java | 24 +-
.../rocketmq/remoting/netty/NettyEvent.java | 18 +-
.../rocketmq/remoting/netty/NettyEventType.java | 12 +-
.../remoting/netty/NettyRemotingAbstract.java | 85 ++-
.../remoting/netty/NettyRemotingClient.java | 111 ++-
.../remoting/netty/NettyRemotingServer.java | 120 ++--
.../remoting/netty/NettyRequestProcessor.java | 18 +-
.../remoting/netty/NettyServerConfig.java | 36 +-
.../remoting/netty/NettySystemConfig.java | 26 +-
.../rocketmq/remoting/netty/RequestTask.java | 25 +-
.../rocketmq/remoting/netty/ResponseFuture.java | 45 +-
.../remoting/protocol/LanguageCode.java | 30 +-
.../remoting/protocol/RemotingCommand.java | 73 +-
.../remoting/protocol/RemotingCommandType.java | 12 +-
.../remoting/protocol/RemotingSerializable.java | 14 +-
.../protocol/RemotingSysResponseCode.java | 12 +-
.../remoting/protocol/RocketMQSerializable.java | 54 +-
.../remoting/protocol/SerializeType.java | 16 +-
.../org/apache/rocketmq/remoting/MixTest.java | 15 +-
.../apache/rocketmq/remoting/NettyRPCTest.java | 92 ++-
.../rocketmq/subclass/TestSubClassAuto.java | 13 +-
srvutil/pom.xml | 2 +-
.../org/apache/rocketmq/srvutil/ServerUtil.java | 29 +-
store/pom.xml | 2 +-
.../store/AllocateMappedFileService.java | 59 +-
.../rocketmq/store/AppendMessageCallback.java | 15 +-
.../rocketmq/store/AppendMessageResult.java | 42 +-
.../rocketmq/store/AppendMessageStatus.java | 12 +-
.../org/apache/rocketmq/store/CommitLog.java | 300 ++++-----
.../org/apache/rocketmq/store/ConsumeQueue.java | 78 +--
.../rocketmq/store/DefaultMessageFilter.java | 13 +-
.../rocketmq/store/DefaultMessageStore.java | 195 +++---
.../apache/rocketmq/store/DispatchRequest.java | 35 +-
.../apache/rocketmq/store/GetMessageResult.java | 45 +-
.../apache/rocketmq/store/GetMessageStatus.java | 12 +-
.../org/apache/rocketmq/store/MappedFile.java | 176 ++---
.../apache/rocketmq/store/MappedFileQueue.java | 82 +--
.../rocketmq/store/MessageArrivingListener.java | 12 +-
.../rocketmq/store/MessageExtBrokerInner.java | 17 +-
.../apache/rocketmq/store/MessageFilter.java | 13 +-
.../org/apache/rocketmq/store/MessageStore.java | 49 +-
.../apache/rocketmq/store/PutMessageResult.java | 21 +-
.../apache/rocketmq/store/PutMessageStatus.java | 12 +-
.../rocketmq/store/QueryMessageResult.java | 23 +-
.../rocketmq/store/ReferenceResource.java | 22 +-
.../org/apache/rocketmq/store/RunningFlags.java | 26 +-
.../store/SelectMappedBufferResult.java | 21 +-
.../apache/rocketmq/store/StoreCheckpoint.java | 40 +-
.../rocketmq/store/StoreStatsService.java | 88 +--
.../org/apache/rocketmq/store/StoreUtil.java | 16 +-
.../rocketmq/store/TransientStorePool.java | 15 +-
.../rocketmq/store/config/BrokerRole.java | 12 +-
.../rocketmq/store/config/FlushDiskType.java | 12 +-
.../store/config/MessageStoreConfig.java | 132 +---
.../store/config/StorePathConfigHelper.java | 19 +-
.../apache/rocketmq/store/ha/HAConnection.java | 63 +-
.../org/apache/rocketmq/store/ha/HAService.java | 95 +--
.../rocketmq/store/ha/WaitNotifyObject.java | 18 +-
.../apache/rocketmq/store/index/IndexFile.java | 72 +-
.../rocketmq/store/index/IndexHeader.java | 15 -
.../rocketmq/store/index/IndexService.java | 51 +-
.../rocketmq/store/index/QueryOffsetResult.java | 19 +-
.../schedule/DelayOffsetSerializeWrapper.java | 20 +-
.../store/schedule/ScheduleMessageService.java | 112 ++--
.../rocketmq/store/stats/BrokerStats.java | 29 +-
.../store/stats/BrokerStatsManager.java | 34 +-
.../org/apache/rocketmq/store/util/LibC.java | 15 +-
.../rocketmq/store/DefaultMessageStoreTest.java | 34 +-
.../rocketmq/store/MappedFileQueueTest.java | 56 +-
.../apache/rocketmq/store/MappedFileTest.java | 33 +-
.../rocketmq/store/StoreCheckpointTest.java | 20 +-
.../rocketmq/store/index/IndexFileTest.java | 25 +-
.../store/schedule/ScheduleMessageTest.java | 35 +-
store/src/test/resources/logback-test.xml | 24 +-
style/copyright/Apache.xml | 9 +-
style/copyright/profiles_settings.xml | 84 +--
style/rmq_codeStyle.xml | 204 +++---
tools/pom.xml | 2 +-
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 141 ++--
.../tools/admin/DefaultMQAdminExtImpl.java | 212 +++---
.../apache/rocketmq/tools/admin/MQAdminExt.java | 141 ++--
.../rocketmq/tools/admin/api/MessageTrack.java | 21 +-
.../rocketmq/tools/admin/api/TrackType.java | 12 +-
.../rocketmq/tools/command/CommandUtil.java | 46 +-
.../rocketmq/tools/command/MQAdminStartup.java | 72 +-
.../rocketmq/tools/command/SubCommand.java | 18 +-
.../broker/BrokerConsumeStatsSubCommad.java | 46 +-
.../command/broker/BrokerStatusSubCommand.java | 24 +-
.../broker/CleanExpiredCQSubCommand.java | 22 +-
.../command/broker/CleanUnusedTopicCommand.java | 22 +-
.../command/broker/GetBrokerConfigCommand.java | 39 +-
.../command/broker/SendMsgStatusCommand.java | 36 +-
.../broker/UpdateBrokerConfigSubCommand.java | 29 +-
.../cluster/CLusterSendMsgRTCommand.java | 55 +-
.../command/cluster/ClusterListSubCommand.java | 89 ++-
.../ConsumerConnectionSubCommand.java | 29 +-
.../ProducerConnectionSubCommand.java | 17 +-
.../consumer/ConsumerProgressSubCommand.java | 84 +--
.../consumer/ConsumerStatusSubCommand.java | 30 +-
.../command/consumer/ConsumerSubCommand.java | 32 +-
.../DeleteSubscriptionGroupCommand.java | 35 +-
.../consumer/StartMonitoringSubCommand.java | 23 +-
.../consumer/UpdateSubGroupSubCommand.java | 29 +-
.../command/message/CheckMsgSendRTCommand.java | 34 +-
.../command/message/DecodeMessageIdCommond.java | 18 +-
.../message/PrintMessageByQueueCommand.java | 157 +++--
.../command/message/PrintMessageSubCommand.java | 76 +--
.../command/message/QueryMsgByIdSubCommand.java | 287 ++++----
.../message/QueryMsgByKeySubCommand.java | 27 +-
.../message/QueryMsgByOffsetSubCommand.java | 7 +-
.../message/QueryMsgByUniqueKeySubCommand.java | 177 +++--
.../rocketmq/tools/command/message/Store.java | 56 +-
.../command/namesrv/DeleteKvConfigCommand.java | 22 +-
.../namesrv/GetNamesrvConfigCommand.java | 25 +-
.../command/namesrv/UpdateKvConfigCommand.java | 22 +-
.../namesrv/UpdateNamesrvConfigCommand.java | 27 +-
.../namesrv/WipeWritePermSubCommand.java | 34 +-
.../command/offset/CloneGroupOffsetCommand.java | 23 +-
.../offset/GetConsumerStatusCommand.java | 29 +-
.../offset/ResetOffsetByTimeCommand.java | 32 +-
.../offset/ResetOffsetByTimeOldCommand.java | 80 ++-
.../tools/command/stats/StatsAllSubCommand.java | 176 +++--
.../command/topic/AllocateMQSubCommand.java | 20 +-
.../command/topic/DeleteTopicSubCommand.java | 70 +-
.../tools/command/topic/RebalanceResult.java | 15 +-
.../command/topic/TopicClusterSubCommand.java | 12 +-
.../command/topic/TopicListSubCommand.java | 34 +-
.../command/topic/TopicRouteSubCommand.java | 10 +-
.../command/topic/TopicStatusSubCommand.java | 49 +-
.../command/topic/UpdateOrderConfCommand.java | 26 +-
.../topic/UpdateTopicPermSubCommand.java | 17 +-
.../command/topic/UpdateTopicSubCommand.java | 25 +-
.../tools/monitor/DefaultMonitorListener.java | 40 +-
.../rocketmq/tools/monitor/DeleteMsgsEvent.java | 20 +-
.../rocketmq/tools/monitor/FailedMsgs.java | 21 +-
.../rocketmq/tools/monitor/MonitorConfig.java | 19 +-
.../rocketmq/tools/monitor/MonitorListener.java | 3 +-
.../rocketmq/tools/monitor/MonitorService.java | 65 +-
.../rocketmq/tools/monitor/UndoneMsgs.java | 27 +-
555 files changed, 8226 insertions(+), 11139 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/pom.xml
----------------------------------------------------------------------
diff --git a/broker/pom.xml b/broker/pom.xml
index 0917503..30525e4 100644
--- a/broker/pom.xml
+++ b/broker/pom.xml
@@ -15,7 +15,7 @@
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.rocketmq</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 8e973ac..501c1c5 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -16,7 +16,25 @@
*/
package org.apache.rocketmq.broker;
-import org.apache.rocketmq.broker.client.*;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.broker.client.ClientHousekeepingService;
+import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
+import org.apache.rocketmq.broker.client.ConsumerManager;
+import org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener;
+import org.apache.rocketmq.broker.client.ProducerManager;
import org.apache.rocketmq.broker.client.net.Broker2Client;
import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager;
import org.apache.rocketmq.broker.filtersrv.FilterServerManager;
@@ -30,11 +48,21 @@ import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.broker.plugin.MessageStoreFactory;
import org.apache.rocketmq.broker.plugin.MessageStorePluginContext;
-import org.apache.rocketmq.broker.processor.*;
+import org.apache.rocketmq.broker.processor.AdminBrokerProcessor;
+import org.apache.rocketmq.broker.processor.ClientManageProcessor;
+import org.apache.rocketmq.broker.processor.ConsumerManageProcessor;
+import org.apache.rocketmq.broker.processor.EndTransactionProcessor;
+import org.apache.rocketmq.broker.processor.PullMessageProcessor;
+import org.apache.rocketmq.broker.processor.QueryMessageProcessor;
+import org.apache.rocketmq.broker.processor.SendMessageProcessor;
import org.apache.rocketmq.broker.slave.SlaveSynchronize;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
-import org.apache.rocketmq.common.*;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.Configuration;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
@@ -43,7 +71,11 @@ import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.stats.MomentStatsItem;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingServer;
-import org.apache.rocketmq.remoting.netty.*;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageArrivingListener;
import org.apache.rocketmq.store.MessageStore;
@@ -54,15 +86,6 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.*;
-
-
public class BrokerController {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final Logger LOG_PROTECTION = LoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
@@ -84,7 +107,7 @@ public class BrokerController {
private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager();
private final BrokerOuterAPI brokerOuterAPI;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
- "BrokerControllerScheduledThread"));
+ "BrokerControllerScheduledThread"));
private final SlaveSynchronize slaveSynchronize;
private final BlockingQueue<Runnable> sendThreadPoolQueue;
private final BlockingQueue<Runnable> pullThreadPoolQueue;
@@ -110,10 +133,10 @@ public class BrokerController {
private Configuration configuration;
public BrokerController(//
- final BrokerConfig brokerConfig, //
- final NettyServerConfig nettyServerConfig, //
- final NettyClientConfig nettyClientConfig, //
- final MessageStoreConfig messageStoreConfig //
+ final BrokerConfig brokerConfig, //
+ final NettyServerConfig nettyServerConfig, //
+ final NettyClientConfig nettyClientConfig, //
+ final MessageStoreConfig messageStoreConfig //
) {
this.brokerConfig = brokerConfig;
this.nettyServerConfig = nettyServerConfig;
@@ -151,9 +174,9 @@ public class BrokerController {
this.brokerFastFailure = new BrokerFastFailure(this);
this.configuration = new Configuration(
- log,
- BrokerPathConfigHelper.getBrokerConfigPath(),
- this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
+ log,
+ BrokerPathConfigHelper.getBrokerConfigPath(),
+ this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
);
}
@@ -180,9 +203,9 @@ public class BrokerController {
if (result) {
try {
this.messageStore =
- new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
- this.brokerConfig);
- this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
+ new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
+ this.brokerConfig);
+ this.brokerStats = new BrokerStats((DefaultMessageStore)this.messageStore);
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
@@ -196,44 +219,43 @@ public class BrokerController {
if (result) {
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
- NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
+ NettyServerConfig fastConfig = (NettyServerConfig)this.nettyServerConfig.clone();
fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
- this.brokerConfig.getSendMessageThreadPoolNums(),
- this.brokerConfig.getSendMessageThreadPoolNums(),
- 1000 * 60,
- TimeUnit.MILLISECONDS,
- this.sendThreadPoolQueue,
- new ThreadFactoryImpl("SendMessageThread_"));
+ this.brokerConfig.getSendMessageThreadPoolNums(),
+ this.brokerConfig.getSendMessageThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.sendThreadPoolQueue,
+ new ThreadFactoryImpl("SendMessageThread_"));
this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
- this.brokerConfig.getPullMessageThreadPoolNums(),
- this.brokerConfig.getPullMessageThreadPoolNums(),
- 1000 * 60,
- TimeUnit.MILLISECONDS,
- this.pullThreadPoolQueue,
- new ThreadFactoryImpl("PullMessageThread_"));
+ this.brokerConfig.getPullMessageThreadPoolNums(),
+ this.brokerConfig.getPullMessageThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.pullThreadPoolQueue,
+ new ThreadFactoryImpl("PullMessageThread_"));
this.adminBrokerExecutor =
- Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
- "AdminBrokerThread_"));
+ Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
+ "AdminBrokerThread_"));
this.clientManageExecutor = new ThreadPoolExecutor(
- this.brokerConfig.getClientManageThreadPoolNums(),
- this.brokerConfig.getClientManageThreadPoolNums(),
- 1000 * 60,
- TimeUnit.MILLISECONDS,
- this.clientManagerThreadPoolQueue,
- new ThreadFactoryImpl("ClientManageThread_"));
+ this.brokerConfig.getClientManageThreadPoolNums(),
+ this.brokerConfig.getClientManageThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.clientManagerThreadPoolQueue,
+ new ThreadFactoryImpl("ClientManageThread_"));
this.consumerManageExecutor =
- Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
- "ConsumerManageThread_"));
+ Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
+ "ConsumerManageThread_"));
this.registerProcessor();
-
// TODO remove in future
final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis();
final long period = 1000 * 60 * 60 * 24;
@@ -259,7 +281,6 @@ public class BrokerController {
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
-
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
@@ -399,7 +420,6 @@ public class BrokerController {
this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
-
/**
* EndTransactionProcessor
*/
@@ -446,7 +466,8 @@ public class BrokerController {
slowTimeMills = this.messageStore.now() - rt.getCreateTimestamp();
}
- if (slowTimeMills < 0) slowTimeMills = 0;
+ if (slowTimeMills < 0)
+ slowTimeMills = 0;
return slowTimeMills;
}
@@ -577,10 +598,10 @@ public class BrokerController {
private void unregisterBrokerAll() {
this.brokerOuterAPI.unregisterBrokerAll(
- this.brokerConfig.getBrokerClusterName(),
- this.getBrokerAddr(),
- this.brokerConfig.getBrokerName(),
- this.brokerConfig.getBrokerId());
+ this.brokerConfig.getBrokerClusterName(),
+ this.getBrokerAddr(),
+ this.brokerConfig.getBrokerName(),
+ this.brokerConfig.getBrokerId());
}
public String getBrokerAddr() {
@@ -643,27 +664,27 @@ public class BrokerController {
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
- || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
+ || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
TopicConfig tmp =
- new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
- this.brokerConfig.getBrokerPermission());
+ new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
+ this.brokerConfig.getBrokerPermission());
topicConfigTable.put(topicConfig.getTopicName(), tmp);
}
topicConfigWrapper.setTopicConfigTable(topicConfigTable);
}
RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll(
- this.brokerConfig.getBrokerClusterName(),
- this.getBrokerAddr(),
- this.brokerConfig.getBrokerName(),
- this.brokerConfig.getBrokerId(),
- this.getHAServerAddr(),
- topicConfigWrapper,
- this.filterServerManager.buildNewFilterServerList(),
- oneway,
- this.brokerConfig.getRegisterBrokerTimeoutMills());
+ this.brokerConfig.getBrokerClusterName(),
+ this.getBrokerAddr(),
+ this.brokerConfig.getBrokerName(),
+ this.brokerConfig.getBrokerId(),
+ this.getHAServerAddr(),
+ topicConfigWrapper,
+ this.filterServerManager.buildNewFilterServerList(),
+ oneway,
+ this.brokerConfig.getRegisterBrokerTimeoutMills());
if (registerBrokerResult != null) {
if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
index dbcd304..7a46df3 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
@@ -6,45 +6,39 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.broker;
import java.io.File;
-
public class BrokerPathConfigHelper {
private static String brokerConfigPath = System.getProperty("user.home") + File.separator + "store"
- + File.separator + "config" + File.separator + "broker.properties";
-
+ + File.separator + "config" + File.separator + "broker.properties";
public static String getBrokerConfigPath() {
return brokerConfigPath;
}
-
public static void setBrokerConfigPath(String path) {
brokerConfigPath = path;
}
-
public static String getTopicConfigPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "topics.json";
}
-
public static String getConsumerOffsetPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "consumerOffset.json";
}
-
public static String getSubscriptionGroupPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "subscriptionGroup.json";
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
index 86091c4..dfa97c1 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -18,6 +18,15 @@ package org.apache.rocketmq.broker;
import ch.qos.logback.classic.LoggerContext;
import ch.qos.logback.classic.joran.JoranConfigurator;
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
@@ -30,20 +39,9 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-
public class BrokerStartup {
public static Properties properties = null;
public static CommandLine commandLine = null;
@@ -58,7 +56,7 @@ public class BrokerStartup {
try {
controller.start();
String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
- + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
+ + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
if (null != controller.getBrokerConfig().getNamesrvAddr()) {
tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
@@ -89,7 +87,7 @@ public class BrokerStartup {
//PackageConflictDetect.detectFastjson();
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
- new PosixParser());
+ new PosixParser());
if (null == commandLine) {
System.exit(-1);
}
@@ -142,7 +140,7 @@ public class BrokerStartup {
if (null == brokerConfig.getRocketmqHome()) {
System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV
- + " variable in your environment to match the location of the RocketMQ installation");
+ + " variable in your environment to match the location of the RocketMQ installation");
System.exit(-2);
}
@@ -157,13 +155,12 @@ public class BrokerStartup {
}
} catch (Exception e) {
System.out.printf(
- "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
- namesrvAddr);
+ "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
+ namesrvAddr);
System.exit(-3);
}
}
-
switch (messageStoreConfig.getBrokerRole()) {
case ASYNC_MASTER:
case SYNC_MASTER:
@@ -181,7 +178,7 @@ public class BrokerStartup {
}
messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
- LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+ LoggerContext lc = (LoggerContext)LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
@@ -194,10 +191,10 @@ public class BrokerStartup {
MixAll.printObjectProperties(log, messageStoreConfig);
final BrokerController controller = new BrokerController(//
- brokerConfig, //
- nettyServerConfig, //
- nettyClientConfig, //
- messageStoreConfig);
+ brokerConfig, //
+ nettyServerConfig, //
+ nettyClientConfig, //
+ messageStoreConfig);
// remember all configs to prevent discard
controller.getConfiguration().registerConfig(properties);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java
index a994503..24cddb9 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientChannelInfo.java
@@ -6,19 +6,18 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.broker.client;
-import org.apache.rocketmq.remoting.protocol.LanguageCode;
import io.netty.channel.Channel;
-
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
public class ClientChannelInfo {
private final Channel channel;
@@ -27,12 +26,10 @@ public class ClientChannelInfo {
private final int version;
private volatile long lastUpdateTimestamp = System.currentTimeMillis();
-
public ClientChannelInfo(Channel channel) {
this(channel, null, null, 0);
}
-
public ClientChannelInfo(Channel channel, String clientId, LanguageCode language, int version) {
this.channel = channel;
this.clientId = clientId;
@@ -40,37 +37,30 @@ public class ClientChannelInfo {
this.version = version;
}
-
public Channel getChannel() {
return channel;
}
-
public String getClientId() {
return clientId;
}
-
public LanguageCode getLanguage() {
return language;
}
-
public int getVersion() {
return version;
}
-
public long getLastUpdateTimestamp() {
return lastUpdateTimestamp;
}
-
public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
this.lastUpdateTimestamp = lastUpdateTimestamp;
}
-
@Override
public int hashCode() {
final int prime = 31;
@@ -78,12 +68,11 @@ public class ClientChannelInfo {
result = prime * result + ((channel == null) ? 0 : channel.hashCode());
result = prime * result + ((clientId == null) ? 0 : clientId.hashCode());
result = prime * result + ((language == null) ? 0 : language.hashCode());
- result = prime * result + (int) (lastUpdateTimestamp ^ (lastUpdateTimestamp >>> 32));
+ result = prime * result + (int)(lastUpdateTimestamp ^ (lastUpdateTimestamp >>> 32));
result = prime * result + version;
return result;
}
-
@Override
public boolean equals(Object obj) {
if (this == obj)
@@ -92,7 +81,7 @@ public class ClientChannelInfo {
return false;
if (getClass() != obj.getClass())
return false;
- ClientChannelInfo other = (ClientChannelInfo) obj;
+ ClientChannelInfo other = (ClientChannelInfo)obj;
if (channel == null) {
if (other.channel != null)
return false;
@@ -103,10 +92,9 @@ public class ClientChannelInfo {
return true;
}
-
@Override
public String toString() {
return "ClientChannelInfo [channel=" + channel + ", clientId=" + clientId + ", language=" + language
- + ", version=" + version + ", lastUpdateTimestamp=" + lastUpdateTimestamp + "]";
+ + ", version=" + version + ", lastUpdateTimestamp=" + lastUpdateTimestamp + "]";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
index 856ce72..5d7c0ea 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ClientHousekeepingService.java
@@ -6,42 +6,38 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.broker.client;
+import io.netty.channel.Channel;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.remoting.ChannelEventListener;
-import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-
public class ClientHousekeepingService implements ChannelEventListener {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
private ScheduledExecutorService scheduledExecutorService = Executors
- .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ClientHousekeepingScheduledThread"));
-
+ .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ClientHousekeepingScheduledThread"));
public ClientHousekeepingService(final BrokerController brokerController) {
this.brokerController = brokerController;
}
-
public void start() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@@ -71,7 +67,6 @@ public class ClientHousekeepingService implements ChannelEventListener {
}
-
@Override
public void onChannelClose(String remoteAddr, Channel channel) {
this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
@@ -79,7 +74,6 @@ public class ClientHousekeepingService implements ChannelEventListener {
this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
}
-
@Override
public void onChannelException(String remoteAddr, Channel channel) {
this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
@@ -87,7 +81,6 @@ public class ClientHousekeepingService implements ChannelEventListener {
this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
}
-
@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
index d5b056e..2656467 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
@@ -16,45 +16,41 @@
*/
package org.apache.rocketmq.broker.client;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
-import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
-import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
-import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import io.netty.channel.Channel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ConsumerGroupInfo {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final String groupName;
private final ConcurrentHashMap<String/* Topic */, SubscriptionData> subscriptionTable =
- new ConcurrentHashMap<String, SubscriptionData>();
+ new ConcurrentHashMap<String, SubscriptionData>();
private final ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable =
- new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
+ new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
private volatile ConsumeType consumeType;
private volatile MessageModel messageModel;
private volatile ConsumeFromWhere consumeFromWhere;
private volatile long lastUpdateTimestamp = System.currentTimeMillis();
-
public ConsumerGroupInfo(String groupName, ConsumeType consumeType, MessageModel messageModel,
- ConsumeFromWhere consumeFromWhere) {
+ ConsumeFromWhere consumeFromWhere) {
this.groupName = groupName;
this.consumeType = consumeType;
this.messageModel = messageModel;
this.consumeFromWhere = consumeFromWhere;
}
-
public ClientChannelInfo findChannel(final String clientId) {
Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator();
while (it.hasNext()) {
@@ -67,17 +63,14 @@ public class ConsumerGroupInfo {
return null;
}
-
public ConcurrentHashMap<String, SubscriptionData> getSubscriptionTable() {
return subscriptionTable;
}
-
public ConcurrentHashMap<Channel, ClientChannelInfo> getChannelInfoTable() {
return channelInfoTable;
}
-
public List<Channel> getAllChannel() {
List<Channel> result = new ArrayList<Channel>();
@@ -86,7 +79,6 @@ public class ConsumerGroupInfo {
return result;
}
-
public List<String> getAllClientId() {
List<String> result = new ArrayList<String>();
@@ -101,7 +93,6 @@ public class ConsumerGroupInfo {
return result;
}
-
public void unregisterChannel(final ClientChannelInfo clientChannelInfo) {
ClientChannelInfo old = this.channelInfoTable.remove(clientChannelInfo.getChannel());
if (old != null) {
@@ -109,13 +100,12 @@ public class ConsumerGroupInfo {
}
}
-
public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) {
final ClientChannelInfo info = this.channelInfoTable.remove(channel);
if (info != null) {
log.warn(
- "NETTY EVENT: remove not active channel[{}] from ConsumerGroupInfo groupChannelTable, consumer group: {}",
- info.toString(), groupName);
+ "NETTY EVENT: remove not active channel[{}] from ConsumerGroupInfo groupChannelTable, consumer group: {}",
+ info.toString(), groupName);
return true;
}
@@ -123,7 +113,7 @@ public class ConsumerGroupInfo {
}
public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,
- MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {
+ MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {
boolean updated = false;
this.consumeType = consumeType;
this.messageModel = messageModel;
@@ -134,7 +124,7 @@ public class ConsumerGroupInfo {
ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);
if (null == prev) {
log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType,
- messageModel, infoNew.toString());
+ messageModel, infoNew.toString());
updated = true;
}
@@ -142,9 +132,9 @@ public class ConsumerGroupInfo {
} else {
if (!infoOld.getClientId().equals(infoNew.getClientId())) {
log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ",
- this.groupName,
- infoOld.toString(),
- infoNew.toString());
+ this.groupName,
+ infoOld.toString(),
+ infoNew.toString());
this.channelInfoTable.put(infoNew.getChannel(), infoNew);
}
}
@@ -155,7 +145,6 @@ public class ConsumerGroupInfo {
return updated;
}
-
public boolean updateSubscription(final Set<SubscriptionData> subList) {
boolean updated = false;
@@ -166,15 +155,15 @@ public class ConsumerGroupInfo {
if (null == prev) {
updated = true;
log.info("subscription changed, add new topic, group: {} {}",
- this.groupName,
- sub.toString());
+ this.groupName,
+ sub.toString());
}
} else if (sub.getSubVersion() > old.getSubVersion()) {
if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
log.info("subscription changed, group: {} OLD: {} NEW: {}",
- this.groupName,
- old.toString(),
- sub.toString()
+ this.groupName,
+ old.toString(),
+ sub.toString()
);
}
@@ -182,7 +171,6 @@ public class ConsumerGroupInfo {
}
}
-
Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, SubscriptionData> next = it.next();
@@ -198,9 +186,9 @@ public class ConsumerGroupInfo {
if (!exist) {
log.warn("subscription changed, group: {} remove topic {} {}",
- this.groupName,
- oldTopic,
- next.getValue().toString()
+ this.groupName,
+ oldTopic,
+ next.getValue().toString()
);
it.remove();
@@ -213,57 +201,46 @@ public class ConsumerGroupInfo {
return updated;
}
-
public Set<String> getSubscribeTopics() {
return subscriptionTable.keySet();
}
-
public SubscriptionData findSubscriptionData(final String topic) {
return this.subscriptionTable.get(topic);
}
-
public ConsumeType getConsumeType() {
return consumeType;
}
-
public void setConsumeType(ConsumeType consumeType) {
this.consumeType = consumeType;
}
-
public MessageModel getMessageModel() {
return messageModel;
}
-
public void setMessageModel(MessageModel messageModel) {
this.messageModel = messageModel;
}
-
public String getGroupName() {
return groupName;
}
-
public long getLastUpdateTimestamp() {
return lastUpdateTimestamp;
}
-
public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
this.lastUpdateTimestamp = lastUpdateTimestamp;
}
-
public ConsumeFromWhere getConsumeFromWhere() {
return consumeFromWhere;
}
-
public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
this.consumeFromWhere = consumeFromWhere;
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java
index 368582a..fbec010 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerIdsChangeListener.java
@@ -6,21 +6,19 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.broker.client;
import io.netty.channel.Channel;
-
import java.util.List;
-
public interface ConsumerIdsChangeListener {
void consumerIdsChanged(final String group, final List<Channel> channels);
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
index 95ed478..fd4fb88 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
@@ -16,6 +16,12 @@
*/
package org.apache.rocketmq.broker.client;
+import io.netty.channel.Channel;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
@@ -23,22 +29,14 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
-import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-
public class ConsumerManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
private final ConcurrentHashMap<String/* Group */, ConsumerGroupInfo> consumerTable =
- new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
+ new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
private final ConsumerIdsChangeListener consumerIdsChangeListener;
public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener) {
@@ -86,7 +84,7 @@ public class ConsumerManager {
ConsumerGroupInfo remove = this.consumerTable.remove(next.getKey());
if (remove != null) {
log.info("unregister consumer ok, no any connection, and remove consumer group, {}",
- next.getKey());
+ next.getKey());
}
}
@@ -96,8 +94,8 @@ public class ConsumerManager {
}
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
- ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
- final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
+ ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
+ final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) {
@@ -107,8 +105,8 @@ public class ConsumerManager {
}
boolean r1 =
- consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
- consumeFromWhere);
+ consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
+ consumeFromWhere);
boolean r2 = consumerGroupInfo.updateSubscription(subList);
if (r1 || r2) {
@@ -143,7 +141,7 @@ public class ConsumerManager {
String group = next.getKey();
ConsumerGroupInfo consumerGroupInfo = next.getValue();
ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable =
- consumerGroupInfo.getChannelInfoTable();
+ consumerGroupInfo.getChannelInfoTable();
Iterator<Entry<Channel, ClientChannelInfo>> itChannel = channelInfoTable.entrySet().iterator();
while (itChannel.hasNext()) {
@@ -152,8 +150,8 @@ public class ConsumerManager {
long diff = System.currentTimeMillis() - clientChannelInfo.getLastUpdateTimestamp();
if (diff > CHANNEL_EXPIRED_TIMEOUT) {
log.warn(
- "SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}",
- RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()), group);
+ "SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}",
+ RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()), group);
RemotingUtil.closeChannel(clientChannelInfo.getChannel());
itChannel.remove();
}
@@ -161,8 +159,8 @@ public class ConsumerManager {
if (channelInfoTable.isEmpty()) {
log.warn(
- "SCAN: remove expired channel from ConsumerManager consumerTable, all clear, consumerGroup={}",
- group);
+ "SCAN: remove expired channel from ConsumerManager consumerTable, all clear, consumerGroup={}",
+ group);
it.remove();
}
}
@@ -174,7 +172,7 @@ public class ConsumerManager {
while (it.hasNext()) {
Entry<String, ConsumerGroupInfo> entry = it.next();
ConcurrentHashMap<String, SubscriptionData> subscriptionTable =
- entry.getValue().getSubscriptionTable();
+ entry.getValue().getSubscriptionTable();
if (subscriptionTable.containsKey(topic)) {
groups.add(entry.getKey());
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
index b60fcb3..93f73b8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
@@ -6,31 +6,27 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.rocketmq.broker.client;
-import org.apache.rocketmq.broker.BrokerController;
import io.netty.channel.Channel;
-
import java.util.List;
-
+import org.apache.rocketmq.broker.BrokerController;
public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
private final BrokerController brokerController;
-
public DefaultConsumerIdsChangeListener(BrokerController brokerController) {
this.brokerController = brokerController;
}
-
@Override
public void consumerIdsChanged(String group, List<Channel> channels) {
if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {