You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:14:41 UTC
[50/99] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18
Rename package name from com.alibaba to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/BrokerController.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/BrokerController.java b/broker/src/main/java/com/alibaba/rocketmq/broker/BrokerController.java
deleted file mode 100644
index b2b6aed..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/BrokerController.java
+++ /dev/null
@@ -1,773 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker;
-
-import com.alibaba.rocketmq.broker.client.*;
-import com.alibaba.rocketmq.broker.client.net.Broker2Client;
-import com.alibaba.rocketmq.broker.client.rebalance.RebalanceLockManager;
-import com.alibaba.rocketmq.broker.filtersrv.FilterServerManager;
-import com.alibaba.rocketmq.broker.latency.BrokerFastFailure;
-import com.alibaba.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
-import com.alibaba.rocketmq.broker.longpolling.NotifyMessageArrivingListener;
-import com.alibaba.rocketmq.broker.longpolling.PullRequestHoldService;
-import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageHook;
-import com.alibaba.rocketmq.broker.mqtrace.SendMessageHook;
-import com.alibaba.rocketmq.broker.offset.ConsumerOffsetManager;
-import com.alibaba.rocketmq.broker.out.BrokerOuterAPI;
-import com.alibaba.rocketmq.broker.plugin.MessageStoreFactory;
-import com.alibaba.rocketmq.broker.plugin.MessageStorePluginContext;
-import com.alibaba.rocketmq.broker.processor.*;
-import com.alibaba.rocketmq.broker.slave.SlaveSynchronize;
-import com.alibaba.rocketmq.broker.subscription.SubscriptionGroupManager;
-import com.alibaba.rocketmq.broker.topic.TopicConfigManager;
-import com.alibaba.rocketmq.common.*;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.common.constant.PermName;
-import com.alibaba.rocketmq.common.namesrv.RegisterBrokerResult;
-import com.alibaba.rocketmq.common.protocol.RequestCode;
-import com.alibaba.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
-import com.alibaba.rocketmq.common.stats.MomentStatsItem;
-import com.alibaba.rocketmq.remoting.RPCHook;
-import com.alibaba.rocketmq.remoting.RemotingServer;
-import com.alibaba.rocketmq.remoting.netty.*;
-import com.alibaba.rocketmq.store.DefaultMessageStore;
-import com.alibaba.rocketmq.store.MessageArrivingListener;
-import com.alibaba.rocketmq.store.MessageStore;
-import com.alibaba.rocketmq.store.config.BrokerRole;
-import com.alibaba.rocketmq.store.config.MessageStoreConfig;
-import com.alibaba.rocketmq.store.stats.BrokerStats;
-import com.alibaba.rocketmq.store.stats.BrokerStatsManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.*;
-
-
-/**
- * @author shijia.wxr
- */
-public class BrokerController {
- private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
- private static final Logger LOG_PROTECTION = LoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
- private static final Logger LOG_WATER_MARK = LoggerFactory.getLogger(LoggerName.WATER_MARK_LOGGER_NAME);
- private final BrokerConfig brokerConfig;
- private final NettyServerConfig nettyServerConfig;
- private final NettyClientConfig nettyClientConfig;
- private final MessageStoreConfig messageStoreConfig;
- private final ConsumerOffsetManager consumerOffsetManager;
- private final ConsumerManager consumerManager;
- private final ProducerManager producerManager;
- private final ClientHousekeepingService clientHousekeepingService;
- private final PullMessageProcessor pullMessageProcessor;
- private final PullRequestHoldService pullRequestHoldService;
- private final MessageArrivingListener messageArrivingListener;
- private final Broker2Client broker2Client;
- private final SubscriptionGroupManager subscriptionGroupManager;
- private final ConsumerIdsChangeListener consumerIdsChangeListener;
- private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager();
- private final BrokerOuterAPI brokerOuterAPI;
- private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
- "BrokerControllerScheduledThread"));
- private final SlaveSynchronize slaveSynchronize;
- private final BlockingQueue<Runnable> sendThreadPoolQueue;
- private final BlockingQueue<Runnable> pullThreadPoolQueue;
- private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;
- private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;
- private final FilterServerManager filterServerManager;
- private final BrokerStatsManager brokerStatsManager;
- private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
- private final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
- private MessageStore messageStore;
- private RemotingServer remotingServer;
- private RemotingServer fastRemotingServer;
- private TopicConfigManager topicConfigManager;
- private ExecutorService sendMessageExecutor;
- private ExecutorService pullMessageExecutor;
- private ExecutorService adminBrokerExecutor;
- private ExecutorService clientManageExecutor;
- private ExecutorService consumerManageExecutor;
- private boolean updateMasterHAServerAddrPeriodically = false;
- private BrokerStats brokerStats;
- private InetSocketAddress storeHost;
- private BrokerFastFailure brokerFastFailure;
- private Configuration configuration;
-
- public BrokerController(//
- final BrokerConfig brokerConfig, //
- final NettyServerConfig nettyServerConfig, //
- final NettyClientConfig nettyClientConfig, //
- final MessageStoreConfig messageStoreConfig //
- ) {
- this.brokerConfig = brokerConfig;
- this.nettyServerConfig = nettyServerConfig;
- this.nettyClientConfig = nettyClientConfig;
- this.messageStoreConfig = messageStoreConfig;
- this.consumerOffsetManager = new ConsumerOffsetManager(this);
- this.topicConfigManager = new TopicConfigManager(this);
- this.pullMessageProcessor = new PullMessageProcessor(this);
- this.pullRequestHoldService = new PullRequestHoldService(this);
- this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
- this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
- this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
- this.producerManager = new ProducerManager();
- this.clientHousekeepingService = new ClientHousekeepingService(this);
- this.broker2Client = new Broker2Client(this);
- this.subscriptionGroupManager = new SubscriptionGroupManager(this);
- this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
- this.filterServerManager = new FilterServerManager(this);
-
- if (this.brokerConfig.getNamesrvAddr() != null) {
- this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
- log.info("user specfied name server address: {}", this.brokerConfig.getNamesrvAddr());
- }
-
- this.slaveSynchronize = new SlaveSynchronize(this);
-
- this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
-
- this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
- this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
- this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
-
- this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
- this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
-
- this.brokerFastFailure = new BrokerFastFailure(this);
- this.configuration = new Configuration(
- log,
- BrokerPathConfigHelper.getBrokerConfigPath(),
- this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
- );
- }
-
- public BrokerConfig getBrokerConfig() {
- return brokerConfig;
- }
-
- public NettyServerConfig getNettyServerConfig() {
- return nettyServerConfig;
- }
-
- public BlockingQueue<Runnable> getPullThreadPoolQueue() {
- return pullThreadPoolQueue;
- }
-
- public boolean initialize() throws CloneNotSupportedException {
- boolean result = true;
-
- result = result && this.topicConfigManager.load();
-
- result = result && this.consumerOffsetManager.load();
- result = result && this.subscriptionGroupManager.load();
-
- if (result) {
- try {
- this.messageStore =
- new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
- this.brokerConfig);
- this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
- //load plugin
- MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
- this.messageStore = MessageStoreFactory.build(context, this.messageStore);
- } catch (IOException e) {
- result = false;
- e.printStackTrace();
- }
- }
-
- result = result && this.messageStore.load();
-
- if (result) {
- this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
- NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
- fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
- this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
- this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
- this.brokerConfig.getSendMessageThreadPoolNums(),
- this.brokerConfig.getSendMessageThreadPoolNums(),
- 1000 * 60,
- TimeUnit.MILLISECONDS,
- this.sendThreadPoolQueue,
- new ThreadFactoryImpl("SendMessageThread_"));
-
- this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
- this.brokerConfig.getPullMessageThreadPoolNums(),
- this.brokerConfig.getPullMessageThreadPoolNums(),
- 1000 * 60,
- TimeUnit.MILLISECONDS,
- this.pullThreadPoolQueue,
- new ThreadFactoryImpl("PullMessageThread_"));
-
- this.adminBrokerExecutor =
- Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
- "AdminBrokerThread_"));
-
- this.clientManageExecutor = new ThreadPoolExecutor(
- this.brokerConfig.getClientManageThreadPoolNums(),
- this.brokerConfig.getClientManageThreadPoolNums(),
- 1000 * 60,
- TimeUnit.MILLISECONDS,
- this.clientManagerThreadPoolQueue,
- new ThreadFactoryImpl("ClientManageThread_"));
-
- this.consumerManageExecutor =
- Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
- "ConsumerManageThread_"));
-
- this.registerProcessor();
-
-
- // TODO remove in future
- final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis();
- final long period = 1000 * 60 * 60 * 24;
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- BrokerController.this.getBrokerStats().record();
- } catch (Throwable e) {
- log.error("schedule record error.", e);
- }
- }
- }, initialDelay, period, TimeUnit.MILLISECONDS);
-
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- BrokerController.this.consumerOffsetManager.persist();
- } catch (Throwable e) {
- log.error("schedule persist consumerOffset error.", e);
- }
- }
- }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
-
-
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- BrokerController.this.protectBroker();
- } catch (Exception e) {
- log.error("protectBroker error.", e);
- }
- }
- }, 3, 3, TimeUnit.MINUTES);
-
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- BrokerController.this.printWaterMark();
- } catch (Exception e) {
- log.error("printWaterMark error.", e);
- }
- }
- }, 10, 1, TimeUnit.SECONDS);
-
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-
- @Override
- public void run() {
- try {
- log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
- } catch (Throwable e) {
- log.error("schedule dispatchBehindBytes error.", e);
- }
- }
- }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
-
- if (this.brokerConfig.getNamesrvAddr() != null) {
- this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
- } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-
- @Override
- public void run() {
- try {
- BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
- } catch (Throwable e) {
- log.error("ScheduledTask fetchNameServerAddr exception", e);
- }
- }
- }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
- }
-
- if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
- if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
- this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
- this.updateMasterHAServerAddrPeriodically = false;
- } else {
- this.updateMasterHAServerAddrPeriodically = true;
- }
-
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-
- @Override
- public void run() {
- try {
- BrokerController.this.slaveSynchronize.syncAll();
- } catch (Throwable e) {
- log.error("ScheduledTask syncAll slave exception", e);
- }
- }
- }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
- } else {
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-
- @Override
- public void run() {
- try {
- BrokerController.this.printMasterAndSlaveDiff();
- } catch (Throwable e) {
- log.error("schedule printMasterAndSlaveDiff error.", e);
- }
- }
- }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
- }
- }
-
- return result;
- }
-
- public void registerProcessor() {
- /**
- * SendMessageProcessor
- */
- SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
- sendProcessor.registerSendMessageHook(sendMessageHookList);
- sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
-
- this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
- this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
- this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
- this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
- this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
- this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
- /**
- * PullMessageProcessor
- */
- this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
- this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
-
- /**
- * QueryMessageProcessor
- */
- NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
- this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.pullMessageExecutor);
- this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.pullMessageExecutor);
-
- this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.pullMessageExecutor);
- this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.pullMessageExecutor);
-
- /**
- * ClientManageProcessor
- */
- ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
- this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor);
- this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
-
- this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor);
- this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
-
- /**
- * ConsumerManageProcessor
- */
- ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);
- this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
- this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
- this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
-
- this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
- this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
- this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
-
-
- /**
- * EndTransactionProcessor
- */
- this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);
- this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);
-
- /**
- * Default
- */
- AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
- this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
- this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
- }
-
- public BrokerStats getBrokerStats() {
- return brokerStats;
- }
-
- public void setBrokerStats(BrokerStats brokerStats) {
- this.brokerStats = brokerStats;
- }
-
- public void protectBroker() {
- if (this.brokerConfig.isDisableConsumeIfConsumerReadSlowly()) {
- final Iterator<Map.Entry<String, MomentStatsItem>> it = this.brokerStatsManager.getMomentStatsItemSetFallSize().getStatsItemTable().entrySet().iterator();
- while (it.hasNext()) {
- final Map.Entry<String, MomentStatsItem> next = it.next();
- final long fallBehindBytes = next.getValue().getValue().get();
- if (fallBehindBytes > this.brokerConfig.getConsumerFallbehindThreshold()) {
- final String[] split = next.getValue().getStatsKey().split("@");
- final String group = split[2];
- LOG_PROTECTION.info("[PROTECT_BROKER] the consumer[{}] consume slowly, {} bytes, disable it", group, fallBehindBytes);
- this.subscriptionGroupManager.disableConsume(group);
- }
- }
- }
- }
-
- public long headSlowTimeMills(BlockingQueue<Runnable> q) {
- long slowTimeMills = 0;
- final Runnable peek = q.peek();
- if (peek != null) {
- RequestTask rt = BrokerFastFailure.castRunnable(peek);
- slowTimeMills = this.messageStore.now() - rt.getCreateTimestamp();
- }
-
- if (slowTimeMills < 0) slowTimeMills = 0;
-
- return slowTimeMills;
- }
-
- public long headSlowTimeMills4SendThreadPoolQueue() {
- return this.headSlowTimeMills(this.sendThreadPoolQueue);
- }
-
- public long headSlowTimeMills4PullThreadPoolQueue() {
- return this.headSlowTimeMills(this.pullThreadPoolQueue);
- }
-
- public void printWaterMark() {
- LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue());
- LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue());
- }
-
- public MessageStore getMessageStore() {
- return messageStore;
- }
-
- public void setMessageStore(MessageStore messageStore) {
- this.messageStore = messageStore;
- }
-
- private void printMasterAndSlaveDiff() {
- long diff = this.messageStore.slaveFallBehindMuch();
-
- // XXX: warn and notify me
- log.info("slave fall behind master, how much, {} bytes", diff);
- }
-
- public Broker2Client getBroker2Client() {
- return broker2Client;
- }
-
- public ConsumerManager getConsumerManager() {
- return consumerManager;
- }
-
- public ConsumerOffsetManager getConsumerOffsetManager() {
- return consumerOffsetManager;
- }
-
- public MessageStoreConfig getMessageStoreConfig() {
- return messageStoreConfig;
- }
-
- public ProducerManager getProducerManager() {
- return producerManager;
- }
-
- public void setFastRemotingServer(RemotingServer fastRemotingServer) {
- this.fastRemotingServer = fastRemotingServer;
- }
-
- public PullMessageProcessor getPullMessageProcessor() {
- return pullMessageProcessor;
- }
-
- public PullRequestHoldService getPullRequestHoldService() {
- return pullRequestHoldService;
- }
-
- public SubscriptionGroupManager getSubscriptionGroupManager() {
- return subscriptionGroupManager;
- }
-
- public void shutdown() {
- if (this.brokerStatsManager != null) {
- this.brokerStatsManager.shutdown();
- }
-
- if (this.clientHousekeepingService != null) {
- this.clientHousekeepingService.shutdown();
- }
-
- if (this.pullRequestHoldService != null) {
- this.pullRequestHoldService.shutdown();
- }
-
- if (this.remotingServer != null) {
- this.remotingServer.shutdown();
- }
-
- if (this.fastRemotingServer != null) {
- this.fastRemotingServer.shutdown();
- }
-
- if (this.messageStore != null) {
- this.messageStore.shutdown();
- }
-
- this.scheduledExecutorService.shutdown();
- try {
- this.scheduledExecutorService.awaitTermination(5000, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- }
-
- this.unregisterBrokerAll();
-
- if (this.sendMessageExecutor != null) {
- this.sendMessageExecutor.shutdown();
- }
-
- if (this.pullMessageExecutor != null) {
- this.pullMessageExecutor.shutdown();
- }
-
- if (this.adminBrokerExecutor != null) {
- this.adminBrokerExecutor.shutdown();
- }
-
- if (this.brokerOuterAPI != null) {
- this.brokerOuterAPI.shutdown();
- }
-
- this.consumerOffsetManager.persist();
-
- if (this.filterServerManager != null) {
- this.filterServerManager.shutdown();
- }
-
- if (this.brokerFastFailure != null) {
- this.brokerFastFailure.shutdown();
- }
- }
-
- private void unregisterBrokerAll() {
- this.brokerOuterAPI.unregisterBrokerAll(
- this.brokerConfig.getBrokerClusterName(),
- this.getBrokerAddr(),
- this.brokerConfig.getBrokerName(),
- this.brokerConfig.getBrokerId());
- }
-
- public String getBrokerAddr() {
- return this.brokerConfig.getBrokerIP1() + ":" + this.nettyServerConfig.getListenPort();
- }
-
- public void start() throws Exception {
- if (this.messageStore != null) {
- this.messageStore.start();
- }
-
- if (this.remotingServer != null) {
- this.remotingServer.start();
- }
-
- if (this.fastRemotingServer != null) {
- this.fastRemotingServer.start();
- }
-
- if (this.brokerOuterAPI != null) {
- this.brokerOuterAPI.start();
- }
-
- if (this.pullRequestHoldService != null) {
- this.pullRequestHoldService.start();
- }
-
- if (this.clientHousekeepingService != null) {
- this.clientHousekeepingService.start();
- }
-
- if (this.filterServerManager != null) {
- this.filterServerManager.start();
- }
-
- this.registerBrokerAll(true, false);
-
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-
- @Override
- public void run() {
- try {
- BrokerController.this.registerBrokerAll(true, false);
- } catch (Throwable e) {
- log.error("registerBrokerAll Exception", e);
- }
- }
- }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);
-
- if (this.brokerStatsManager != null) {
- this.brokerStatsManager.start();
- }
-
- if (this.brokerFastFailure != null) {
- this.brokerFastFailure.start();
- }
- }
-
- public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway) {
- TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
-
- if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
- || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
- ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
- for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
- TopicConfig tmp =
- new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
- this.brokerConfig.getBrokerPermission());
- topicConfigTable.put(topicConfig.getTopicName(), tmp);
- }
- topicConfigWrapper.setTopicConfigTable(topicConfigTable);
- }
-
- RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll(
- this.brokerConfig.getBrokerClusterName(),
- this.getBrokerAddr(),
- this.brokerConfig.getBrokerName(),
- this.brokerConfig.getBrokerId(),
- this.getHAServerAddr(),
- topicConfigWrapper,
- this.filterServerManager.buildNewFilterServerList(),
- oneway,
- this.brokerConfig.getRegisterBrokerTimeoutMills());
-
- if (registerBrokerResult != null) {
- if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
- this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
- }
-
- this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
-
- if (checkOrderConfig) {
- this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
- }
- }
- }
-
- public TopicConfigManager getTopicConfigManager() {
- return topicConfigManager;
- }
-
- public void setTopicConfigManager(TopicConfigManager topicConfigManager) {
- this.topicConfigManager = topicConfigManager;
- }
-
- public String getHAServerAddr() {
- return this.brokerConfig.getBrokerIP2() + ":" + this.messageStoreConfig.getHaListenPort();
- }
-
- public RebalanceLockManager getRebalanceLockManager() {
- return rebalanceLockManager;
- }
-
- public SlaveSynchronize getSlaveSynchronize() {
- return slaveSynchronize;
- }
-
- public ExecutorService getPullMessageExecutor() {
- return pullMessageExecutor;
- }
-
- public void setPullMessageExecutor(ExecutorService pullMessageExecutor) {
- this.pullMessageExecutor = pullMessageExecutor;
- }
-
- public BlockingQueue<Runnable> getSendThreadPoolQueue() {
- return sendThreadPoolQueue;
- }
-
- public FilterServerManager getFilterServerManager() {
- return filterServerManager;
- }
-
- public BrokerStatsManager getBrokerStatsManager() {
- return brokerStatsManager;
- }
-
- public List<SendMessageHook> getSendMessageHookList() {
- return sendMessageHookList;
- }
-
- public void registerSendMessageHook(final SendMessageHook hook) {
- this.sendMessageHookList.add(hook);
- log.info("register SendMessageHook Hook, {}", hook.hookName());
- }
-
- public List<ConsumeMessageHook> getConsumeMessageHookList() {
- return consumeMessageHookList;
- }
-
- public void registerConsumeMessageHook(final ConsumeMessageHook hook) {
- this.consumeMessageHookList.add(hook);
- log.info("register ConsumeMessageHook Hook, {}", hook.hookName());
- }
-
- public void registerServerRPCHook(RPCHook rpcHook) {
- getRemotingServer().registerRPCHook(rpcHook);
- }
-
- public RemotingServer getRemotingServer() {
- return remotingServer;
- }
-
- public void setRemotingServer(RemotingServer remotingServer) {
- this.remotingServer = remotingServer;
- }
-
- public void registerClientRPCHook(RPCHook rpcHook) {
- this.getBrokerOuterAPI().registerRPCHook(rpcHook);
- }
-
- public BrokerOuterAPI getBrokerOuterAPI() {
- return brokerOuterAPI;
- }
-
- public InetSocketAddress getStoreHost() {
- return storeHost;
- }
-
- public void setStoreHost(InetSocketAddress storeHost) {
- this.storeHost = storeHost;
- }
-
- public Configuration getConfiguration() {
- return this.configuration;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/BrokerPathConfigHelper.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/BrokerPathConfigHelper.java b/broker/src/main/java/com/alibaba/rocketmq/broker/BrokerPathConfigHelper.java
deleted file mode 100644
index 055e8dc..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/BrokerPathConfigHelper.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.broker;
-
-import java.io.File;
-
-
-public class BrokerPathConfigHelper {
- private static String brokerConfigPath = System.getProperty("user.home") + File.separator + "store"
- + File.separator + "config" + File.separator + "broker.properties";
-
-
- public static String getBrokerConfigPath() {
- return brokerConfigPath;
- }
-
-
- public static void setBrokerConfigPath(String path) {
- brokerConfigPath = path;
- }
-
-
- public static String getTopicConfigPath(final String rootDir) {
- return rootDir + File.separator + "config" + File.separator + "topics.json";
- }
-
-
- public static String getConsumerOffsetPath(final String rootDir) {
- return rootDir + File.separator + "config" + File.separator + "consumerOffset.json";
- }
-
-
- public static String getSubscriptionGroupPath(final String rootDir) {
- return rootDir + File.separator + "config" + File.separator + "subscriptionGroup.json";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/BrokerStartup.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/com/alibaba/rocketmq/broker/BrokerStartup.java
deleted file mode 100644
index 7e81117..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/BrokerStartup.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker;
-
-import ch.qos.logback.classic.LoggerContext;
-import ch.qos.logback.classic.joran.JoranConfigurator;
-import com.alibaba.rocketmq.common.BrokerConfig;
-import com.alibaba.rocketmq.common.MQVersion;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.remoting.common.RemotingUtil;
-import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
-import com.alibaba.rocketmq.remoting.netty.NettyServerConfig;
-import com.alibaba.rocketmq.remoting.netty.NettySystemConfig;
-import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
-import com.alibaba.rocketmq.srvutil.ServerUtil;
-import com.alibaba.rocketmq.store.config.BrokerRole;
-import com.alibaba.rocketmq.store.config.MessageStoreConfig;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.PosixParser;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicInteger;
-
-
-/**
- * @author shijia.wxr
- */
-public class BrokerStartup {
- public static Properties properties = null;
- public static CommandLine commandLine = null;
- public static String configFile = null;
- public static Logger log;
-
- public static void main(String[] args) {
- start(createBrokerController(args));
- }
-
- public static BrokerController start(BrokerController controller) {
- try {
- controller.start();
- String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
- + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
-
- if (null != controller.getBrokerConfig().getNamesrvAddr()) {
- tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
- }
-
- log.info(tip);
- return controller;
- } catch (Throwable e) {
- e.printStackTrace();
- System.exit(-1);
- }
-
- return null;
- }
-
- public static BrokerController createBrokerController(String[] args) {
- System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
-
- if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
- NettySystemConfig.socketSndbufSize = 131072;
- }
-
- if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
- NettySystemConfig.socketRcvbufSize = 131072;
- }
-
- try {
- //PackageConflictDetect.detectFastjson();
- Options options = ServerUtil.buildCommandlineOptions(new Options());
- commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
- new PosixParser());
- if (null == commandLine) {
- System.exit(-1);
- }
-
- final BrokerConfig brokerConfig = new BrokerConfig();
- final NettyServerConfig nettyServerConfig = new NettyServerConfig();
- final NettyClientConfig nettyClientConfig = new NettyClientConfig();
- nettyServerConfig.setListenPort(10911);
- final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
-
- if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
- int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
- messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
- }
-
- if (commandLine.hasOption('p')) {
- MixAll.printObjectProperties(null, brokerConfig);
- MixAll.printObjectProperties(null, nettyServerConfig);
- MixAll.printObjectProperties(null, nettyClientConfig);
- MixAll.printObjectProperties(null, messageStoreConfig);
- System.exit(0);
- } else if (commandLine.hasOption('m')) {
- MixAll.printObjectProperties(null, brokerConfig, true);
- MixAll.printObjectProperties(null, nettyServerConfig, true);
- MixAll.printObjectProperties(null, nettyClientConfig, true);
- MixAll.printObjectProperties(null, messageStoreConfig, true);
- System.exit(0);
- }
-
- if (commandLine.hasOption('c')) {
- String file = commandLine.getOptionValue('c');
- if (file != null) {
- configFile = file;
- InputStream in = new BufferedInputStream(new FileInputStream(file));
- properties = new Properties();
- properties.load(in);
-
- parsePropertie2SystemEnv(properties);
- MixAll.properties2Object(properties, brokerConfig);
- MixAll.properties2Object(properties, nettyServerConfig);
- MixAll.properties2Object(properties, nettyClientConfig);
- MixAll.properties2Object(properties, messageStoreConfig);
-
- BrokerPathConfigHelper.setBrokerConfigPath(file);
- in.close();
- }
- }
-
- MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
-
- if (null == brokerConfig.getRocketmqHome()) {
- System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV
- + " variable in your environment to match the location of the RocketMQ installation");
- System.exit(-2);
- }
-
- String namesrvAddr = brokerConfig.getNamesrvAddr();
- if (null != namesrvAddr) {
- try {
- String[] addrArray = namesrvAddr.split(";");
- if (addrArray != null) {
- for (String addr : addrArray) {
- RemotingUtil.string2SocketAddress(addr);
- }
- }
- } catch (Exception e) {
- System.out.printf(
- "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
- namesrvAddr);
- System.exit(-3);
- }
- }
-
-
- switch (messageStoreConfig.getBrokerRole()) {
- case ASYNC_MASTER:
- case SYNC_MASTER:
- brokerConfig.setBrokerId(MixAll.MASTER_ID);
- break;
- case SLAVE:
- if (brokerConfig.getBrokerId() <= 0) {
- System.out.printf("Slave's brokerId must be > 0");
- System.exit(-3);
- }
-
- break;
- default:
- break;
- }
-
- messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
- LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
- JoranConfigurator configurator = new JoranConfigurator();
- configurator.setContext(lc);
- lc.reset();
- configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");
- log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-
- MixAll.printObjectProperties(log, brokerConfig);
- MixAll.printObjectProperties(log, nettyServerConfig);
- MixAll.printObjectProperties(log, nettyClientConfig);
- MixAll.printObjectProperties(log, messageStoreConfig);
-
- final BrokerController controller = new BrokerController(//
- brokerConfig, //
- nettyServerConfig, //
- nettyClientConfig, //
- messageStoreConfig);
- // remember all configs to prevent discard
- controller.getConfiguration().registerConfig(properties);
-
- boolean initResult = controller.initialize();
- if (!initResult) {
- controller.shutdown();
- System.exit(-3);
- }
-
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- private volatile boolean hasShutdown = false;
- private AtomicInteger shutdownTimes = new AtomicInteger(0);
-
- @Override
- public void run() {
- synchronized (this) {
- log.info("shutdown hook was invoked, " + this.shutdownTimes.incrementAndGet());
- if (!this.hasShutdown) {
- this.hasShutdown = true;
- long begineTime = System.currentTimeMillis();
- controller.shutdown();
- long consumingTimeTotal = System.currentTimeMillis() - begineTime;
- log.info("shutdown hook over, consuming time total(ms): " + consumingTimeTotal);
- }
- }
- }
- }, "ShutdownHook"));
-
- return controller;
- } catch (Throwable e) {
- e.printStackTrace();
- System.exit(-1);
- }
-
- return null;
- }
-
- private static void parsePropertie2SystemEnv(Properties properties) {
- if (properties == null) {
- return;
- }
- String rmqAddressServerDomain = properties.getProperty("rmqAddressServerDomain", "jmenv.tbsite.net");
- String rmqAddressServerSubGroup = properties.getProperty("rmqAddressServerSubGroup", "nsaddr");
- System.setProperty("rocketmq.namesrv.domain", rmqAddressServerDomain);
- System.setProperty("rocketmq.namesrv.domain.subgroup", rmqAddressServerSubGroup);
- }
-
- public static Options buildCommandlineOptions(final Options options) {
- Option opt = new Option("c", "configFile", true, "Broker config properties file");
- opt.setRequired(false);
- options.addOption(opt);
-
- opt = new Option("p", "printConfigItem", false, "Print all config item");
- opt.setRequired(false);
- options.addOption(opt);
-
- opt = new Option("m", "printImportantConfig", false, "Print important config item");
- opt.setRequired(false);
- options.addOption(opt);
-
- return options;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientChannelInfo.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientChannelInfo.java b/broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientChannelInfo.java
deleted file mode 100644
index babf4b7..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientChannelInfo.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.client;
-
-import com.alibaba.rocketmq.remoting.protocol.LanguageCode;
-import io.netty.channel.Channel;
-
-
-/**
- * @author shijia.wxr
- */
-public class ClientChannelInfo {
- private final Channel channel;
- private final String clientId;
- private final LanguageCode language;
- private final int version;
- private volatile long lastUpdateTimestamp = System.currentTimeMillis();
-
-
- public ClientChannelInfo(Channel channel) {
- this(channel, null, null, 0);
- }
-
-
- public ClientChannelInfo(Channel channel, String clientId, LanguageCode language, int version) {
- this.channel = channel;
- this.clientId = clientId;
- this.language = language;
- this.version = version;
- }
-
-
- public Channel getChannel() {
- return channel;
- }
-
-
- public String getClientId() {
- return clientId;
- }
-
-
- public LanguageCode getLanguage() {
- return language;
- }
-
-
- public int getVersion() {
- return version;
- }
-
-
- public long getLastUpdateTimestamp() {
- return lastUpdateTimestamp;
- }
-
-
- public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
- this.lastUpdateTimestamp = lastUpdateTimestamp;
- }
-
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((channel == null) ? 0 : channel.hashCode());
- result = prime * result + ((clientId == null) ? 0 : clientId.hashCode());
- result = prime * result + ((language == null) ? 0 : language.hashCode());
- result = prime * result + (int) (lastUpdateTimestamp ^ (lastUpdateTimestamp >>> 32));
- result = prime * result + version;
- return result;
- }
-
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- ClientChannelInfo other = (ClientChannelInfo) obj;
- if (channel == null) {
- if (other.channel != null)
- return false;
- } else if (this.channel != other.channel) {
- return false;
- }
-
- return true;
- }
-
-
- @Override
- public String toString() {
- return "ClientChannelInfo [channel=" + channel + ", clientId=" + clientId + ", language=" + language
- + ", version=" + version + ", lastUpdateTimestamp=" + lastUpdateTimestamp + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientHousekeepingService.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientHousekeepingService.java b/broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientHousekeepingService.java
deleted file mode 100644
index 4ac7532..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientHousekeepingService.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.client;
-
-import com.alibaba.rocketmq.broker.BrokerController;
-import com.alibaba.rocketmq.common.ThreadFactoryImpl;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.remoting.ChannelEventListener;
-import io.netty.channel.Channel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-
-/**
- * @author shijia.wxr
- */
-public class ClientHousekeepingService implements ChannelEventListener {
- private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
- private final BrokerController brokerController;
-
- private ScheduledExecutorService scheduledExecutorService = Executors
- .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ClientHousekeepingScheduledThread"));
-
-
- public ClientHousekeepingService(final BrokerController brokerController) {
- this.brokerController = brokerController;
- }
-
-
- public void start() {
-
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- ClientHousekeepingService.this.scanExceptionChannel();
- } catch (Exception e) {
- log.error("", e);
- }
- }
- }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
- }
-
- private void scanExceptionChannel() {
- this.brokerController.getProducerManager().scanNotActiveChannel();
- this.brokerController.getConsumerManager().scanNotActiveChannel();
- this.brokerController.getFilterServerManager().scanNotActiveChannel();
- }
-
- public void shutdown() {
- this.scheduledExecutorService.shutdown();
- }
-
- @Override
- public void onChannelConnect(String remoteAddr, Channel channel) {
-
- }
-
-
- @Override
- public void onChannelClose(String remoteAddr, Channel channel) {
- this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
- this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
- this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
- }
-
-
- @Override
- public void onChannelException(String remoteAddr, Channel channel) {
- this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
- this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
- this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
- }
-
-
- @Override
- public void onChannelIdle(String remoteAddr, Channel channel) {
- this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
- this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
- this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerGroupInfo.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerGroupInfo.java b/broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerGroupInfo.java
deleted file mode 100644
index 410b703..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerGroupInfo.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.client;
-
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
-import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
-import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
-import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import io.netty.channel.Channel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-/**
- * @author shijia.wxr
- */
-public class ConsumerGroupInfo {
- private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
- private final String groupName;
- private final ConcurrentHashMap<String/* Topic */, SubscriptionData> subscriptionTable =
- new ConcurrentHashMap<String, SubscriptionData>();
- private final ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable =
- new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
- private volatile ConsumeType consumeType;
- private volatile MessageModel messageModel;
- private volatile ConsumeFromWhere consumeFromWhere;
- private volatile long lastUpdateTimestamp = System.currentTimeMillis();
-
-
- public ConsumerGroupInfo(String groupName, ConsumeType consumeType, MessageModel messageModel,
- ConsumeFromWhere consumeFromWhere) {
- this.groupName = groupName;
- this.consumeType = consumeType;
- this.messageModel = messageModel;
- this.consumeFromWhere = consumeFromWhere;
- }
-
-
- public ClientChannelInfo findChannel(final String clientId) {
- Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<Channel, ClientChannelInfo> next = it.next();
- if (next.getValue().getClientId().equals(clientId)) {
- return next.getValue();
- }
- }
-
- return null;
- }
-
-
- public ConcurrentHashMap<String, SubscriptionData> getSubscriptionTable() {
- return subscriptionTable;
- }
-
-
- public ConcurrentHashMap<Channel, ClientChannelInfo> getChannelInfoTable() {
- return channelInfoTable;
- }
-
-
- public List<Channel> getAllChannel() {
- List<Channel> result = new ArrayList<Channel>();
-
- result.addAll(this.channelInfoTable.keySet());
-
- return result;
- }
-
-
- public List<String> getAllClientId() {
- List<String> result = new ArrayList<String>();
-
- Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator();
-
- while (it.hasNext()) {
- Entry<Channel, ClientChannelInfo> entry = it.next();
- ClientChannelInfo clientChannelInfo = entry.getValue();
- result.add(clientChannelInfo.getClientId());
- }
-
- return result;
- }
-
-
- public void unregisterChannel(final ClientChannelInfo clientChannelInfo) {
- ClientChannelInfo old = this.channelInfoTable.remove(clientChannelInfo.getChannel());
- if (old != null) {
- log.info("unregister a consumer[{}] from consumerGroupInfo {}", this.groupName, old.toString());
- }
- }
-
-
- public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) {
- final ClientChannelInfo info = this.channelInfoTable.remove(channel);
- if (info != null) {
- log.warn(
- "NETTY EVENT: remove not active channel[{}] from ConsumerGroupInfo groupChannelTable, consumer group: {}",
- info.toString(), groupName);
- return true;
- }
-
- return false;
- }
-
- public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,
- MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {
- boolean updated = false;
- this.consumeType = consumeType;
- this.messageModel = messageModel;
- this.consumeFromWhere = consumeFromWhere;
-
- ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel());
- if (null == infoOld) {
- ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);
- if (null == prev) {
- log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType,
- messageModel, infoNew.toString());
- updated = true;
- }
-
- infoOld = infoNew;
- } else {
- if (!infoOld.getClientId().equals(infoNew.getClientId())) {
- log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ",
- this.groupName,
- infoOld.toString(),
- infoNew.toString());
- this.channelInfoTable.put(infoNew.getChannel(), infoNew);
- }
- }
-
- this.lastUpdateTimestamp = System.currentTimeMillis();
- infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp);
-
- return updated;
- }
-
-
- public boolean updateSubscription(final Set<SubscriptionData> subList) {
- boolean updated = false;
-
- for (SubscriptionData sub : subList) {
- SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
- if (old == null) {
- SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
- if (null == prev) {
- updated = true;
- log.info("subscription changed, add new topic, group: {} {}",
- this.groupName,
- sub.toString());
- }
- } else if (sub.getSubVersion() > old.getSubVersion()) {
- if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
- log.info("subscription changed, group: {} OLD: {} NEW: {}",
- this.groupName,
- old.toString(),
- sub.toString()
- );
- }
-
- this.subscriptionTable.put(sub.getTopic(), sub);
- }
- }
-
-
- Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, SubscriptionData> next = it.next();
- String oldTopic = next.getKey();
-
- boolean exist = false;
- for (SubscriptionData sub : subList) {
- if (sub.getTopic().equals(oldTopic)) {
- exist = true;
- break;
- }
- }
-
- if (!exist) {
- log.warn("subscription changed, group: {} remove topic {} {}",
- this.groupName,
- oldTopic,
- next.getValue().toString()
- );
-
- it.remove();
- updated = true;
- }
- }
-
- this.lastUpdateTimestamp = System.currentTimeMillis();
-
- return updated;
- }
-
-
- public Set<String> getSubscribeTopics() {
- return subscriptionTable.keySet();
- }
-
-
- public SubscriptionData findSubscriptionData(final String topic) {
- return this.subscriptionTable.get(topic);
- }
-
-
- public ConsumeType getConsumeType() {
- return consumeType;
- }
-
-
- public void setConsumeType(ConsumeType consumeType) {
- this.consumeType = consumeType;
- }
-
-
- public MessageModel getMessageModel() {
- return messageModel;
- }
-
-
- public void setMessageModel(MessageModel messageModel) {
- this.messageModel = messageModel;
- }
-
-
- public String getGroupName() {
- return groupName;
- }
-
-
- public long getLastUpdateTimestamp() {
- return lastUpdateTimestamp;
- }
-
-
- public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
- this.lastUpdateTimestamp = lastUpdateTimestamp;
- }
-
-
- public ConsumeFromWhere getConsumeFromWhere() {
- return consumeFromWhere;
- }
-
-
- public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
- this.consumeFromWhere = consumeFromWhere;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerIdsChangeListener.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerIdsChangeListener.java b/broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerIdsChangeListener.java
deleted file mode 100644
index 4da2eb3..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerIdsChangeListener.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.client;
-
-import io.netty.channel.Channel;
-
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- */
-public interface ConsumerIdsChangeListener {
- public void consumerIdsChanged(final String group, final List<Channel> channels);
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerManager.java
deleted file mode 100644
index 48e9673..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerManager.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.client;
-
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
-import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
-import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
-import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-import com.alibaba.rocketmq.remoting.common.RemotingUtil;
-import io.netty.channel.Channel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-/**
- * @author shijia.wxr
- */
-public class ConsumerManager {
- private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
- private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
- private final ConcurrentHashMap<String/* Group */, ConsumerGroupInfo> consumerTable =
- new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
- private final ConsumerIdsChangeListener consumerIdsChangeListener;
-
- public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener) {
- this.consumerIdsChangeListener = consumerIdsChangeListener;
- }
-
- public ClientChannelInfo findChannel(final String group, final String clientId) {
- ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
- if (consumerGroupInfo != null) {
- return consumerGroupInfo.findChannel(clientId);
- }
- return null;
- }
-
- public SubscriptionData findSubscriptionData(final String group, final String topic) {
- ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);
- if (consumerGroupInfo != null) {
- return consumerGroupInfo.findSubscriptionData(topic);
- }
-
- return null;
- }
-
- public ConsumerGroupInfo getConsumerGroupInfo(final String group) {
- return this.consumerTable.get(group);
- }
-
- public int findSubscriptionDataCount(final String group) {
- ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);
- if (consumerGroupInfo != null) {
- return consumerGroupInfo.getSubscriptionTable().size();
- }
-
- return 0;
- }
-
- public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
- Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, ConsumerGroupInfo> next = it.next();
- ConsumerGroupInfo info = next.getValue();
- boolean removed = info.doChannelCloseEvent(remoteAddr, channel);
- if (removed) {
- if (info.getChannelInfoTable().isEmpty()) {
- ConsumerGroupInfo remove = this.consumerTable.remove(next.getKey());
- if (remove != null) {
- log.info("unregister consumer ok, no any connection, and remove consumer group, {}",
- next.getKey());
- }
- }
-
- this.consumerIdsChangeListener.consumerIdsChanged(next.getKey(), info.getAllChannel());
- }
- }
- }
-
- public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
- ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
- final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
-
- ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
- if (null == consumerGroupInfo) {
- ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
- ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
- consumerGroupInfo = prev != null ? prev : tmp;
- }
-
- boolean r1 =
- consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
- consumeFromWhere);
- boolean r2 = consumerGroupInfo.updateSubscription(subList);
-
- if (r1 || r2) {
- if (isNotifyConsumerIdsChangedEnable) {
- this.consumerIdsChangeListener.consumerIdsChanged(group, consumerGroupInfo.getAllChannel());
- }
- }
-
- return r1 || r2;
- }
-
- public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo, boolean isNotifyConsumerIdsChangedEnable) {
- ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
- if (null != consumerGroupInfo) {
- consumerGroupInfo.unregisterChannel(clientChannelInfo);
- if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {
- ConsumerGroupInfo remove = this.consumerTable.remove(group);
- if (remove != null) {
- log.info("unregister consumer ok, no any connection, and remove consumer group, {}", group);
- }
- }
- if (isNotifyConsumerIdsChangedEnable) {
- this.consumerIdsChangeListener.consumerIdsChanged(group, consumerGroupInfo.getAllChannel());
- }
- }
- }
-
- public void scanNotActiveChannel() {
- Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, ConsumerGroupInfo> next = it.next();
- String group = next.getKey();
- ConsumerGroupInfo consumerGroupInfo = next.getValue();
- ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable =
- consumerGroupInfo.getChannelInfoTable();
-
- Iterator<Entry<Channel, ClientChannelInfo>> itChannel = channelInfoTable.entrySet().iterator();
- while (itChannel.hasNext()) {
- Entry<Channel, ClientChannelInfo> nextChannel = itChannel.next();
- ClientChannelInfo clientChannelInfo = nextChannel.getValue();
- long diff = System.currentTimeMillis() - clientChannelInfo.getLastUpdateTimestamp();
- if (diff > CHANNEL_EXPIRED_TIMEOUT) {
- log.warn(
- "SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}",
- RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()), group);
- RemotingUtil.closeChannel(clientChannelInfo.getChannel());
- itChannel.remove();
- }
- }
-
- if (channelInfoTable.isEmpty()) {
- log.warn(
- "SCAN: remove expired channel from ConsumerManager consumerTable, all clear, consumerGroup={}",
- group);
- it.remove();
- }
- }
- }
-
- public HashSet<String> queryTopicConsumeByWho(final String topic) {
- HashSet<String> groups = new HashSet<String>();
- Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, ConsumerGroupInfo> entry = it.next();
- ConcurrentHashMap<String, SubscriptionData> subscriptionTable =
- entry.getValue().getSubscriptionTable();
- if (subscriptionTable.containsKey(topic)) {
- groups.add(entry.getKey());
- }
- }
- return groups;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java b/broker/src/main/java/com/alibaba/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
deleted file mode 100644
index 0095913..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.client;
-
-import com.alibaba.rocketmq.broker.BrokerController;
-import io.netty.channel.Channel;
-
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- */
-public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
- private final BrokerController brokerController;
-
-
- public DefaultConsumerIdsChangeListener(BrokerController brokerController) {
- this.brokerController = brokerController;
- }
-
-
- @Override
- public void consumerIdsChanged(String group, List<Channel> channels) {
- if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {
- for (Channel chl : channels) {
- this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);
- }
- }
- }
-}