You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:58 UTC

[41/43] incubator-rocketmq git commit: Finish code dump. Reviewed by: @yukon @vongosling @stevenschew @vintagewang @lollipop @zander

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerController.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerController.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerController.java
new file mode 100644
index 0000000..b2b6aed
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerController.java
@@ -0,0 +1,773 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.broker;
+
+import com.alibaba.rocketmq.broker.client.*;
+import com.alibaba.rocketmq.broker.client.net.Broker2Client;
+import com.alibaba.rocketmq.broker.client.rebalance.RebalanceLockManager;
+import com.alibaba.rocketmq.broker.filtersrv.FilterServerManager;
+import com.alibaba.rocketmq.broker.latency.BrokerFastFailure;
+import com.alibaba.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
+import com.alibaba.rocketmq.broker.longpolling.NotifyMessageArrivingListener;
+import com.alibaba.rocketmq.broker.longpolling.PullRequestHoldService;
+import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageHook;
+import com.alibaba.rocketmq.broker.mqtrace.SendMessageHook;
+import com.alibaba.rocketmq.broker.offset.ConsumerOffsetManager;
+import com.alibaba.rocketmq.broker.out.BrokerOuterAPI;
+import com.alibaba.rocketmq.broker.plugin.MessageStoreFactory;
+import com.alibaba.rocketmq.broker.plugin.MessageStorePluginContext;
+import com.alibaba.rocketmq.broker.processor.*;
+import com.alibaba.rocketmq.broker.slave.SlaveSynchronize;
+import com.alibaba.rocketmq.broker.subscription.SubscriptionGroupManager;
+import com.alibaba.rocketmq.broker.topic.TopicConfigManager;
+import com.alibaba.rocketmq.common.*;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.constant.PermName;
+import com.alibaba.rocketmq.common.namesrv.RegisterBrokerResult;
+import com.alibaba.rocketmq.common.protocol.RequestCode;
+import com.alibaba.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import com.alibaba.rocketmq.common.stats.MomentStatsItem;
+import com.alibaba.rocketmq.remoting.RPCHook;
+import com.alibaba.rocketmq.remoting.RemotingServer;
+import com.alibaba.rocketmq.remoting.netty.*;
+import com.alibaba.rocketmq.store.DefaultMessageStore;
+import com.alibaba.rocketmq.store.MessageArrivingListener;
+import com.alibaba.rocketmq.store.MessageStore;
+import com.alibaba.rocketmq.store.config.BrokerRole;
+import com.alibaba.rocketmq.store.config.MessageStoreConfig;
+import com.alibaba.rocketmq.store.stats.BrokerStats;
+import com.alibaba.rocketmq.store.stats.BrokerStatsManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class BrokerController {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private static final Logger LOG_PROTECTION = LoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
+    private static final Logger LOG_WATER_MARK = LoggerFactory.getLogger(LoggerName.WATER_MARK_LOGGER_NAME);
+    private final BrokerConfig brokerConfig;
+    private final NettyServerConfig nettyServerConfig;
+    private final NettyClientConfig nettyClientConfig;
+    private final MessageStoreConfig messageStoreConfig;
+    private final ConsumerOffsetManager consumerOffsetManager;
+    private final ConsumerManager consumerManager;
+    private final ProducerManager producerManager;
+    private final ClientHousekeepingService clientHousekeepingService;
+    private final PullMessageProcessor pullMessageProcessor;
+    private final PullRequestHoldService pullRequestHoldService;
+    private final MessageArrivingListener messageArrivingListener;
+    private final Broker2Client broker2Client;
+    private final SubscriptionGroupManager subscriptionGroupManager;
+    private final ConsumerIdsChangeListener consumerIdsChangeListener;
+    private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager();
+    private final BrokerOuterAPI brokerOuterAPI;
+    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
+            "BrokerControllerScheduledThread"));
+    private final SlaveSynchronize slaveSynchronize;
+    private final BlockingQueue<Runnable> sendThreadPoolQueue;
+    private final BlockingQueue<Runnable> pullThreadPoolQueue;
+    private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;
+    private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;
+    private final FilterServerManager filterServerManager;
+    private final BrokerStatsManager brokerStatsManager;
+    private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
+    private final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
+    private MessageStore messageStore;
+    private RemotingServer remotingServer;
+    private RemotingServer fastRemotingServer;
+    private TopicConfigManager topicConfigManager;
+    private ExecutorService sendMessageExecutor;
+    private ExecutorService pullMessageExecutor;
+    private ExecutorService adminBrokerExecutor;
+    private ExecutorService clientManageExecutor;
+    private ExecutorService consumerManageExecutor;
+    private boolean updateMasterHAServerAddrPeriodically = false;
+    private BrokerStats brokerStats;
+    private InetSocketAddress storeHost;
+    private BrokerFastFailure brokerFastFailure;
+    private Configuration configuration;
+
+    public BrokerController(//
+                            final BrokerConfig brokerConfig, //
+                            final NettyServerConfig nettyServerConfig, //
+                            final NettyClientConfig nettyClientConfig, //
+                            final MessageStoreConfig messageStoreConfig //
+    ) {
+        this.brokerConfig = brokerConfig;
+        this.nettyServerConfig = nettyServerConfig;
+        this.nettyClientConfig = nettyClientConfig;
+        this.messageStoreConfig = messageStoreConfig;
+        this.consumerOffsetManager = new ConsumerOffsetManager(this);
+        this.topicConfigManager = new TopicConfigManager(this);
+        this.pullMessageProcessor = new PullMessageProcessor(this);
+        this.pullRequestHoldService = new PullRequestHoldService(this);
+        this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
+        this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
+        this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
+        this.producerManager = new ProducerManager();
+        this.clientHousekeepingService = new ClientHousekeepingService(this);
+        this.broker2Client = new Broker2Client(this);
+        this.subscriptionGroupManager = new SubscriptionGroupManager(this);
+        this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
+        this.filterServerManager = new FilterServerManager(this);
+
+        if (this.brokerConfig.getNamesrvAddr() != null) {
+            this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
+            log.info("user specfied name server address: {}", this.brokerConfig.getNamesrvAddr());
+        }
+
+        this.slaveSynchronize = new SlaveSynchronize(this);
+
+        this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
+
+        this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
+        this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
+        this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
+
+        this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
+        this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
+
+        this.brokerFastFailure = new BrokerFastFailure(this);
+        this.configuration = new Configuration(
+                log,
+                BrokerPathConfigHelper.getBrokerConfigPath(),
+                this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
+        );
+    }
+
+    public BrokerConfig getBrokerConfig() {
+        return brokerConfig;
+    }
+
+    public NettyServerConfig getNettyServerConfig() {
+        return nettyServerConfig;
+    }
+
+    public BlockingQueue<Runnable> getPullThreadPoolQueue() {
+        return pullThreadPoolQueue;
+    }
+
+    public boolean initialize() throws CloneNotSupportedException {
+        boolean result = true;
+
+        result = result && this.topicConfigManager.load();
+
+        result = result && this.consumerOffsetManager.load();
+        result = result && this.subscriptionGroupManager.load();
+
+        if (result) {
+            try {
+                this.messageStore =
+                        new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
+                                this.brokerConfig);
+                this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
+                //load plugin
+                MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
+                this.messageStore = MessageStoreFactory.build(context, this.messageStore);
+            } catch (IOException e) {
+                result = false;
+                e.printStackTrace();
+            }
+        }
+
+        result = result && this.messageStore.load();
+
+        if (result) {
+            this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
+            NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
+            fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
+            this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
+            this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
+                    this.brokerConfig.getSendMessageThreadPoolNums(),
+                    this.brokerConfig.getSendMessageThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MILLISECONDS,
+                    this.sendThreadPoolQueue,
+                    new ThreadFactoryImpl("SendMessageThread_"));
+
+            this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
+                    this.brokerConfig.getPullMessageThreadPoolNums(),
+                    this.brokerConfig.getPullMessageThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MILLISECONDS,
+                    this.pullThreadPoolQueue,
+                    new ThreadFactoryImpl("PullMessageThread_"));
+
+            this.adminBrokerExecutor =
+                    Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
+                            "AdminBrokerThread_"));
+
+            this.clientManageExecutor = new ThreadPoolExecutor(
+                    this.brokerConfig.getClientManageThreadPoolNums(),
+                    this.brokerConfig.getClientManageThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MILLISECONDS,
+                    this.clientManagerThreadPoolQueue,
+                    new ThreadFactoryImpl("ClientManageThread_"));
+
+            this.consumerManageExecutor =
+                    Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
+                            "ConsumerManageThread_"));
+
+            this.registerProcessor();
+
+
+            // TODO remove in future
+            final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis();
+            final long period = 1000 * 60 * 60 * 24;
+            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        BrokerController.this.getBrokerStats().record();
+                    } catch (Throwable e) {
+                        log.error("schedule record error.", e);
+                    }
+                }
+            }, initialDelay, period, TimeUnit.MILLISECONDS);
+
+            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        BrokerController.this.consumerOffsetManager.persist();
+                    } catch (Throwable e) {
+                        log.error("schedule persist consumerOffset error.", e);
+                    }
+                }
+            }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
+
+
+            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        BrokerController.this.protectBroker();
+                    } catch (Exception e) {
+                        log.error("protectBroker error.", e);
+                    }
+                }
+            }, 3, 3, TimeUnit.MINUTES);
+
+            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        BrokerController.this.printWaterMark();
+                    } catch (Exception e) {
+                        log.error("printWaterMark error.", e);
+                    }
+                }
+            }, 10, 1, TimeUnit.SECONDS);
+
+            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
+                    } catch (Throwable e) {
+                        log.error("schedule dispatchBehindBytes error.", e);
+                    }
+                }
+            }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
+
+            if (this.brokerConfig.getNamesrvAddr() != null) {
+                this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
+            } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
+                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+
+                    @Override
+                    public void run() {
+                        try {
+                            BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
+                        } catch (Throwable e) {
+                            log.error("ScheduledTask fetchNameServerAddr exception", e);
+                        }
+                    }
+                }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
+            }
+
+            if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
+                if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
+                    this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
+                    this.updateMasterHAServerAddrPeriodically = false;
+                } else {
+                    this.updateMasterHAServerAddrPeriodically = true;
+                }
+
+                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+
+                    @Override
+                    public void run() {
+                        try {
+                            BrokerController.this.slaveSynchronize.syncAll();
+                        } catch (Throwable e) {
+                            log.error("ScheduledTask syncAll slave exception", e);
+                        }
+                    }
+                }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
+            } else {
+                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+
+                    @Override
+                    public void run() {
+                        try {
+                            BrokerController.this.printMasterAndSlaveDiff();
+                        } catch (Throwable e) {
+                            log.error("schedule printMasterAndSlaveDiff error.", e);
+                        }
+                    }
+                }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
+            }
+        }
+
+        return result;
+    }
+
+    public void registerProcessor() {
+        /**
+         * SendMessageProcessor
+         */
+        SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
+        sendProcessor.registerSendMessageHook(sendMessageHookList);
+        sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
+
+        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
+        /**
+         * PullMessageProcessor
+         */
+        this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
+        this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
+
+        /**
+         * QueryMessageProcessor
+         */
+        NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
+        this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.pullMessageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.pullMessageExecutor);
+
+        this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.pullMessageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.pullMessageExecutor);
+
+        /**
+         * ClientManageProcessor
+         */
+        ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
+        this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
+
+        this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
+
+        /**
+         * ConsumerManageProcessor
+         */
+        ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);
+        this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+
+        this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+
+
+        /**
+         * EndTransactionProcessor
+         */
+        this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);
+
+        /**
+         * Default
+         */
+        AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
+        this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
+        this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
+    }
+
+    public BrokerStats getBrokerStats() {
+        return brokerStats;
+    }
+
+    public void setBrokerStats(BrokerStats brokerStats) {
+        this.brokerStats = brokerStats;
+    }
+
+    public void protectBroker() {
+        if (this.brokerConfig.isDisableConsumeIfConsumerReadSlowly()) {
+            final Iterator<Map.Entry<String, MomentStatsItem>> it = this.brokerStatsManager.getMomentStatsItemSetFallSize().getStatsItemTable().entrySet().iterator();
+            while (it.hasNext()) {
+                final Map.Entry<String, MomentStatsItem> next = it.next();
+                final long fallBehindBytes = next.getValue().getValue().get();
+                if (fallBehindBytes > this.brokerConfig.getConsumerFallbehindThreshold()) {
+                    final String[] split = next.getValue().getStatsKey().split("@");
+                    final String group = split[2];
+                    LOG_PROTECTION.info("[PROTECT_BROKER] the consumer[{}] consume slowly, {} bytes, disable it", group, fallBehindBytes);
+                    this.subscriptionGroupManager.disableConsume(group);
+                }
+            }
+        }
+    }
+
+    public long headSlowTimeMills(BlockingQueue<Runnable> q) {
+        long slowTimeMills = 0;
+        final Runnable peek = q.peek();
+        if (peek != null) {
+            RequestTask rt = BrokerFastFailure.castRunnable(peek);
+            slowTimeMills = this.messageStore.now() - rt.getCreateTimestamp();
+        }
+
+        if (slowTimeMills < 0) slowTimeMills = 0;
+
+        return slowTimeMills;
+    }
+
+    public long headSlowTimeMills4SendThreadPoolQueue() {
+        return this.headSlowTimeMills(this.sendThreadPoolQueue);
+    }
+
+    public long headSlowTimeMills4PullThreadPoolQueue() {
+        return this.headSlowTimeMills(this.pullThreadPoolQueue);
+    }
+
+    public void printWaterMark() {
+        LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue());
+        LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue());
+    }
+
+    public MessageStore getMessageStore() {
+        return messageStore;
+    }
+
+    public void setMessageStore(MessageStore messageStore) {
+        this.messageStore = messageStore;
+    }
+
+    private void printMasterAndSlaveDiff() {
+        long diff = this.messageStore.slaveFallBehindMuch();
+
+        // XXX: warn and notify me
+        log.info("slave fall behind master, how much, {} bytes", diff);
+    }
+
+    public Broker2Client getBroker2Client() {
+        return broker2Client;
+    }
+
+    public ConsumerManager getConsumerManager() {
+        return consumerManager;
+    }
+
+    public ConsumerOffsetManager getConsumerOffsetManager() {
+        return consumerOffsetManager;
+    }
+
+    public MessageStoreConfig getMessageStoreConfig() {
+        return messageStoreConfig;
+    }
+
+    public ProducerManager getProducerManager() {
+        return producerManager;
+    }
+
+    public void setFastRemotingServer(RemotingServer fastRemotingServer) {
+        this.fastRemotingServer = fastRemotingServer;
+    }
+
+    public PullMessageProcessor getPullMessageProcessor() {
+        return pullMessageProcessor;
+    }
+
+    public PullRequestHoldService getPullRequestHoldService() {
+        return pullRequestHoldService;
+    }
+
+    public SubscriptionGroupManager getSubscriptionGroupManager() {
+        return subscriptionGroupManager;
+    }
+
+    public void shutdown() {
+        if (this.brokerStatsManager != null) {
+            this.brokerStatsManager.shutdown();
+        }
+
+        if (this.clientHousekeepingService != null) {
+            this.clientHousekeepingService.shutdown();
+        }
+
+        if (this.pullRequestHoldService != null) {
+            this.pullRequestHoldService.shutdown();
+        }
+
+        if (this.remotingServer != null) {
+            this.remotingServer.shutdown();
+        }
+
+        if (this.fastRemotingServer != null) {
+            this.fastRemotingServer.shutdown();
+        }
+
+        if (this.messageStore != null) {
+            this.messageStore.shutdown();
+        }
+
+        this.scheduledExecutorService.shutdown();
+        try {
+            this.scheduledExecutorService.awaitTermination(5000, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+        }
+
+        this.unregisterBrokerAll();
+
+        if (this.sendMessageExecutor != null) {
+            this.sendMessageExecutor.shutdown();
+        }
+
+        if (this.pullMessageExecutor != null) {
+            this.pullMessageExecutor.shutdown();
+        }
+
+        if (this.adminBrokerExecutor != null) {
+            this.adminBrokerExecutor.shutdown();
+        }
+
+        if (this.brokerOuterAPI != null) {
+            this.brokerOuterAPI.shutdown();
+        }
+
+        this.consumerOffsetManager.persist();
+
+        if (this.filterServerManager != null) {
+            this.filterServerManager.shutdown();
+        }
+
+        if (this.brokerFastFailure != null) {
+            this.brokerFastFailure.shutdown();
+        }
+    }
+
+    private void unregisterBrokerAll() {
+        this.brokerOuterAPI.unregisterBrokerAll(
+                this.brokerConfig.getBrokerClusterName(),
+                this.getBrokerAddr(),
+                this.brokerConfig.getBrokerName(),
+                this.brokerConfig.getBrokerId());
+    }
+
+    public String getBrokerAddr() {
+        return this.brokerConfig.getBrokerIP1() + ":" + this.nettyServerConfig.getListenPort();
+    }
+
+    public void start() throws Exception {
+        if (this.messageStore != null) {
+            this.messageStore.start();
+        }
+
+        if (this.remotingServer != null) {
+            this.remotingServer.start();
+        }
+
+        if (this.fastRemotingServer != null) {
+            this.fastRemotingServer.start();
+        }
+
+        if (this.brokerOuterAPI != null) {
+            this.brokerOuterAPI.start();
+        }
+
+        if (this.pullRequestHoldService != null) {
+            this.pullRequestHoldService.start();
+        }
+
+        if (this.clientHousekeepingService != null) {
+            this.clientHousekeepingService.start();
+        }
+
+        if (this.filterServerManager != null) {
+            this.filterServerManager.start();
+        }
+
+        this.registerBrokerAll(true, false);
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    BrokerController.this.registerBrokerAll(true, false);
+                } catch (Throwable e) {
+                    log.error("registerBrokerAll Exception", e);
+                }
+            }
+        }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);
+
+        if (this.brokerStatsManager != null) {
+            this.brokerStatsManager.start();
+        }
+
+        if (this.brokerFastFailure != null) {
+            this.brokerFastFailure.start();
+        }
+    }
+
+    public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway) {
+        TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
+
+        if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
+                || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
+            ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
+            for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
+                TopicConfig tmp =
+                        new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
+                                this.brokerConfig.getBrokerPermission());
+                topicConfigTable.put(topicConfig.getTopicName(), tmp);
+            }
+            topicConfigWrapper.setTopicConfigTable(topicConfigTable);
+        }
+
+        RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll(
+                this.brokerConfig.getBrokerClusterName(),
+                this.getBrokerAddr(),
+                this.brokerConfig.getBrokerName(),
+                this.brokerConfig.getBrokerId(),
+                this.getHAServerAddr(),
+                topicConfigWrapper,
+                this.filterServerManager.buildNewFilterServerList(),
+                oneway,
+                this.brokerConfig.getRegisterBrokerTimeoutMills());
+
+        if (registerBrokerResult != null) {
+            if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
+                this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
+            }
+
+            this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
+
+            if (checkOrderConfig) {
+                this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
+            }
+        }
+    }
+
+    public TopicConfigManager getTopicConfigManager() {
+        return topicConfigManager;
+    }
+
+    public void setTopicConfigManager(TopicConfigManager topicConfigManager) {
+        this.topicConfigManager = topicConfigManager;
+    }
+
+    public String getHAServerAddr() {
+        return this.brokerConfig.getBrokerIP2() + ":" + this.messageStoreConfig.getHaListenPort();
+    }
+
+    public RebalanceLockManager getRebalanceLockManager() {
+        return rebalanceLockManager;
+    }
+
+    public SlaveSynchronize getSlaveSynchronize() {
+        return slaveSynchronize;
+    }
+
+    public ExecutorService getPullMessageExecutor() {
+        return pullMessageExecutor;
+    }
+
+    public void setPullMessageExecutor(ExecutorService pullMessageExecutor) {
+        this.pullMessageExecutor = pullMessageExecutor;
+    }
+
+    public BlockingQueue<Runnable> getSendThreadPoolQueue() {
+        return sendThreadPoolQueue;
+    }
+
+    public FilterServerManager getFilterServerManager() {
+        return filterServerManager;
+    }
+
+    public BrokerStatsManager getBrokerStatsManager() {
+        return brokerStatsManager;
+    }
+
+    public List<SendMessageHook> getSendMessageHookList() {
+        return sendMessageHookList;
+    }
+
+    public void registerSendMessageHook(final SendMessageHook hook) {
+        this.sendMessageHookList.add(hook);
+        log.info("register SendMessageHook Hook, {}", hook.hookName());
+    }
+
+    public List<ConsumeMessageHook> getConsumeMessageHookList() {
+        return consumeMessageHookList;
+    }
+
+    public void registerConsumeMessageHook(final ConsumeMessageHook hook) {
+        this.consumeMessageHookList.add(hook);
+        log.info("register ConsumeMessageHook Hook, {}", hook.hookName());
+    }
+
+    public void registerServerRPCHook(RPCHook rpcHook) {
+        getRemotingServer().registerRPCHook(rpcHook);
+    }
+
+    public RemotingServer getRemotingServer() {
+        return remotingServer;
+    }
+
+    public void setRemotingServer(RemotingServer remotingServer) {
+        this.remotingServer = remotingServer;
+    }
+
+    public void registerClientRPCHook(RPCHook rpcHook) {
+        this.getBrokerOuterAPI().registerRPCHook(rpcHook);
+    }
+
+    public BrokerOuterAPI getBrokerOuterAPI() {
+        return brokerOuterAPI;
+    }
+
+    public InetSocketAddress getStoreHost() {
+        return storeHost;
+    }
+
+    public void setStoreHost(InetSocketAddress storeHost) {
+        this.storeHost = storeHost;
+    }
+
+    public Configuration getConfiguration() {
+        return this.configuration;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerPathConfigHelper.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerPathConfigHelper.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerPathConfigHelper.java
new file mode 100644
index 0000000..055e8dc
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerPathConfigHelper.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.broker;
+
+import java.io.File;
+
+
+public class BrokerPathConfigHelper {
+    private static String brokerConfigPath = System.getProperty("user.home") + File.separator + "store"
+            + File.separator + "config" + File.separator + "broker.properties";
+
+
+    public static String getBrokerConfigPath() {
+        return brokerConfigPath;
+    }
+
+
+    public static void setBrokerConfigPath(String path) {
+        brokerConfigPath = path;
+    }
+
+
+    public static String getTopicConfigPath(final String rootDir) {
+        return rootDir + File.separator + "config" + File.separator + "topics.json";
+    }
+
+
+    public static String getConsumerOffsetPath(final String rootDir) {
+        return rootDir + File.separator + "config" + File.separator + "consumerOffset.json";
+    }
+
+
+    public static String getSubscriptionGroupPath(final String rootDir) {
+        return rootDir + File.separator + "config" + File.separator + "subscriptionGroup.json";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerStartup.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerStartup.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerStartup.java
new file mode 100644
index 0000000..7e81117
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/BrokerStartup.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.broker;
+
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.joran.JoranConfigurator;
+import com.alibaba.rocketmq.common.BrokerConfig;
+import com.alibaba.rocketmq.common.MQVersion;
+import com.alibaba.rocketmq.common.MixAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.remoting.common.RemotingUtil;
+import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
+import com.alibaba.rocketmq.remoting.netty.NettyServerConfig;
+import com.alibaba.rocketmq.remoting.netty.NettySystemConfig;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import com.alibaba.rocketmq.srvutil.ServerUtil;
+import com.alibaba.rocketmq.store.config.BrokerRole;
+import com.alibaba.rocketmq.store.config.MessageStoreConfig;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class BrokerStartup {
+    public static Properties properties = null;
+    public static CommandLine commandLine = null;
+    public static String configFile = null;
+    public static Logger log;
+
+    public static void main(String[] args) {
+        start(createBrokerController(args));
+    }
+
+    public static BrokerController start(BrokerController controller) {
+        try {
+            controller.start();
+            String tip = "The broker[" + controller.getBrokerConfig().getBrokerName() + ", "
+                    + controller.getBrokerAddr() + "] boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
+
+            if (null != controller.getBrokerConfig().getNamesrvAddr()) {
+                tip += " and name server is " + controller.getBrokerConfig().getNamesrvAddr();
+            }
+
+            log.info(tip);
+            return controller;
+        } catch (Throwable e) {
+            e.printStackTrace();
+            System.exit(-1);
+        }
+
+        return null;
+    }
+
+    public static BrokerController createBrokerController(String[] args) {
+        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
+
+        if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
+            NettySystemConfig.socketSndbufSize = 131072;
+        }
+
+        if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
+            NettySystemConfig.socketRcvbufSize = 131072;
+        }
+
+        try {
+            //PackageConflictDetect.detectFastjson();
+            Options options = ServerUtil.buildCommandlineOptions(new Options());
+            commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
+                    new PosixParser());
+            if (null == commandLine) {
+                System.exit(-1);
+            }
+
+            final BrokerConfig brokerConfig = new BrokerConfig();
+            final NettyServerConfig nettyServerConfig = new NettyServerConfig();
+            final NettyClientConfig nettyClientConfig = new NettyClientConfig();
+            nettyServerConfig.setListenPort(10911);
+            final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+
+            if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
+                int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
+                messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
+            }
+
+            if (commandLine.hasOption('p')) {
+                MixAll.printObjectProperties(null, brokerConfig);
+                MixAll.printObjectProperties(null, nettyServerConfig);
+                MixAll.printObjectProperties(null, nettyClientConfig);
+                MixAll.printObjectProperties(null, messageStoreConfig);
+                System.exit(0);
+            } else if (commandLine.hasOption('m')) {
+                MixAll.printObjectProperties(null, brokerConfig, true);
+                MixAll.printObjectProperties(null, nettyServerConfig, true);
+                MixAll.printObjectProperties(null, nettyClientConfig, true);
+                MixAll.printObjectProperties(null, messageStoreConfig, true);
+                System.exit(0);
+            }
+
+            if (commandLine.hasOption('c')) {
+                String file = commandLine.getOptionValue('c');
+                if (file != null) {
+                    configFile = file;
+                    InputStream in = new BufferedInputStream(new FileInputStream(file));
+                    properties = new Properties();
+                    properties.load(in);
+
+                    parsePropertie2SystemEnv(properties);
+                    MixAll.properties2Object(properties, brokerConfig);
+                    MixAll.properties2Object(properties, nettyServerConfig);
+                    MixAll.properties2Object(properties, nettyClientConfig);
+                    MixAll.properties2Object(properties, messageStoreConfig);
+
+                    BrokerPathConfigHelper.setBrokerConfigPath(file);
+                    in.close();
+                }
+            }
+
+            MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
+
+            if (null == brokerConfig.getRocketmqHome()) {
+                System.out.printf("Please set the " + MixAll.ROCKETMQ_HOME_ENV
+                        + " variable in your environment to match the location of the RocketMQ installation");
+                System.exit(-2);
+            }
+
+            String namesrvAddr = brokerConfig.getNamesrvAddr();
+            if (null != namesrvAddr) {
+                try {
+                    String[] addrArray = namesrvAddr.split(";");
+                    if (addrArray != null) {
+                        for (String addr : addrArray) {
+                            RemotingUtil.string2SocketAddress(addr);
+                        }
+                    }
+                } catch (Exception e) {
+                    System.out.printf(
+                            "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
+                            namesrvAddr);
+                    System.exit(-3);
+                }
+            }
+
+
+            switch (messageStoreConfig.getBrokerRole()) {
+                case ASYNC_MASTER:
+                case SYNC_MASTER:
+                    brokerConfig.setBrokerId(MixAll.MASTER_ID);
+                    break;
+                case SLAVE:
+                    if (brokerConfig.getBrokerId() <= 0) {
+                        System.out.printf("Slave's brokerId must be > 0");
+                        System.exit(-3);
+                    }
+
+                    break;
+                default:
+                    break;
+            }
+
+            messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
+            LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+            JoranConfigurator configurator = new JoranConfigurator();
+            configurator.setContext(lc);
+            lc.reset();
+            configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");
+            log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+            MixAll.printObjectProperties(log, brokerConfig);
+            MixAll.printObjectProperties(log, nettyServerConfig);
+            MixAll.printObjectProperties(log, nettyClientConfig);
+            MixAll.printObjectProperties(log, messageStoreConfig);
+
+            final BrokerController controller = new BrokerController(//
+                    brokerConfig, //
+                    nettyServerConfig, //
+                    nettyClientConfig, //
+                    messageStoreConfig);
+            // remember all configs to prevent discard
+            controller.getConfiguration().registerConfig(properties);
+
+            boolean initResult = controller.initialize();
+            if (!initResult) {
+                controller.shutdown();
+                System.exit(-3);
+            }
+
+            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+                private volatile boolean hasShutdown = false;
+                private AtomicInteger shutdownTimes = new AtomicInteger(0);
+
+                @Override
+                public void run() {
+                    synchronized (this) {
+                        log.info("shutdown hook was invoked, " + this.shutdownTimes.incrementAndGet());
+                        if (!this.hasShutdown) {
+                            this.hasShutdown = true;
+                            long begineTime = System.currentTimeMillis();
+                            controller.shutdown();
+                            long consumingTimeTotal = System.currentTimeMillis() - begineTime;
+                            log.info("shutdown hook over, consuming time total(ms): " + consumingTimeTotal);
+                        }
+                    }
+                }
+            }, "ShutdownHook"));
+
+            return controller;
+        } catch (Throwable e) {
+            e.printStackTrace();
+            System.exit(-1);
+        }
+
+        return null;
+    }
+
+    private static void parsePropertie2SystemEnv(Properties properties) {
+        if (properties == null) {
+            return;
+        }
+        String rmqAddressServerDomain = properties.getProperty("rmqAddressServerDomain", "jmenv.tbsite.net");
+        String rmqAddressServerSubGroup = properties.getProperty("rmqAddressServerSubGroup", "nsaddr");
+        System.setProperty("rocketmq.namesrv.domain", rmqAddressServerDomain);
+        System.setProperty("rocketmq.namesrv.domain.subgroup", rmqAddressServerSubGroup);
+    }
+
+    public static Options buildCommandlineOptions(final Options options) {
+        Option opt = new Option("c", "configFile", true, "Broker config properties file");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("p", "printConfigItem", false, "Print all config item");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("m", "printImportantConfig", false, "Print important config item");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientChannelInfo.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientChannelInfo.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientChannelInfo.java
new file mode 100644
index 0000000..babf4b7
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientChannelInfo.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.client;
+
+import com.alibaba.rocketmq.remoting.protocol.LanguageCode;
+import io.netty.channel.Channel;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ClientChannelInfo {
+    private final Channel channel;
+    private final String clientId;
+    private final LanguageCode language;
+    private final int version;
+    private volatile long lastUpdateTimestamp = System.currentTimeMillis();
+
+
+    public ClientChannelInfo(Channel channel) {
+        this(channel, null, null, 0);
+    }
+
+
+    public ClientChannelInfo(Channel channel, String clientId, LanguageCode language, int version) {
+        this.channel = channel;
+        this.clientId = clientId;
+        this.language = language;
+        this.version = version;
+    }
+
+
+    public Channel getChannel() {
+        return channel;
+    }
+
+
+    public String getClientId() {
+        return clientId;
+    }
+
+
+    public LanguageCode getLanguage() {
+        return language;
+    }
+
+
+    public int getVersion() {
+        return version;
+    }
+
+
+    public long getLastUpdateTimestamp() {
+        return lastUpdateTimestamp;
+    }
+
+
+    public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+        this.lastUpdateTimestamp = lastUpdateTimestamp;
+    }
+
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((channel == null) ? 0 : channel.hashCode());
+        result = prime * result + ((clientId == null) ? 0 : clientId.hashCode());
+        result = prime * result + ((language == null) ? 0 : language.hashCode());
+        result = prime * result + (int) (lastUpdateTimestamp ^ (lastUpdateTimestamp >>> 32));
+        result = prime * result + version;
+        return result;
+    }
+
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        ClientChannelInfo other = (ClientChannelInfo) obj;
+        if (channel == null) {
+            if (other.channel != null)
+                return false;
+        } else if (this.channel != other.channel) {
+            return false;
+        }
+
+        return true;
+    }
+
+
+    @Override
+    public String toString() {
+        return "ClientChannelInfo [channel=" + channel + ", clientId=" + clientId + ", language=" + language
+                + ", version=" + version + ", lastUpdateTimestamp=" + lastUpdateTimestamp + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientHousekeepingService.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientHousekeepingService.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientHousekeepingService.java
new file mode 100644
index 0000000..4ac7532
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ClientHousekeepingService.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.client;
+
+import com.alibaba.rocketmq.broker.BrokerController;
+import com.alibaba.rocketmq.common.ThreadFactoryImpl;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.remoting.ChannelEventListener;
+import io.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ClientHousekeepingService implements ChannelEventListener {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private final BrokerController brokerController;
+
+    private ScheduledExecutorService scheduledExecutorService = Executors
+            .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ClientHousekeepingScheduledThread"));
+
+
+    public ClientHousekeepingService(final BrokerController brokerController) {
+        this.brokerController = brokerController;
+    }
+
+
+    public void start() {
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    ClientHousekeepingService.this.scanExceptionChannel();
+                } catch (Exception e) {
+                    log.error("", e);
+                }
+            }
+        }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
+    }
+
+    private void scanExceptionChannel() {
+        this.brokerController.getProducerManager().scanNotActiveChannel();
+        this.brokerController.getConsumerManager().scanNotActiveChannel();
+        this.brokerController.getFilterServerManager().scanNotActiveChannel();
+    }
+
+    public void shutdown() {
+        this.scheduledExecutorService.shutdown();
+    }
+
+    @Override
+    public void onChannelConnect(String remoteAddr, Channel channel) {
+
+    }
+
+
+    @Override
+    public void onChannelClose(String remoteAddr, Channel channel) {
+        this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
+        this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
+        this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
+    }
+
+
+    @Override
+    public void onChannelException(String remoteAddr, Channel channel) {
+        this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
+        this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
+        this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
+    }
+
+
+    @Override
+    public void onChannelIdle(String remoteAddr, Channel channel) {
+        this.brokerController.getProducerManager().doChannelCloseEvent(remoteAddr, channel);
+        this.brokerController.getConsumerManager().doChannelCloseEvent(remoteAddr, channel);
+        this.brokerController.getFilterServerManager().doChannelCloseEvent(remoteAddr, channel);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerGroupInfo.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerGroupInfo.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerGroupInfo.java
new file mode 100644
index 0000000..410b703
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerGroupInfo.java
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.client;
+
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
+import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
+import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
+import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import io.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ConsumerGroupInfo {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private final String groupName;
+    private final ConcurrentHashMap<String/* Topic */, SubscriptionData> subscriptionTable =
+            new ConcurrentHashMap<String, SubscriptionData>();
+    private final ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable =
+            new ConcurrentHashMap<Channel, ClientChannelInfo>(16);
+    private volatile ConsumeType consumeType;
+    private volatile MessageModel messageModel;
+    private volatile ConsumeFromWhere consumeFromWhere;
+    private volatile long lastUpdateTimestamp = System.currentTimeMillis();
+
+
+    public ConsumerGroupInfo(String groupName, ConsumeType consumeType, MessageModel messageModel,
+                             ConsumeFromWhere consumeFromWhere) {
+        this.groupName = groupName;
+        this.consumeType = consumeType;
+        this.messageModel = messageModel;
+        this.consumeFromWhere = consumeFromWhere;
+    }
+
+
+    public ClientChannelInfo findChannel(final String clientId) {
+        Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<Channel, ClientChannelInfo> next = it.next();
+            if (next.getValue().getClientId().equals(clientId)) {
+                return next.getValue();
+            }
+        }
+
+        return null;
+    }
+
+
+    public ConcurrentHashMap<String, SubscriptionData> getSubscriptionTable() {
+        return subscriptionTable;
+    }
+
+
+    public ConcurrentHashMap<Channel, ClientChannelInfo> getChannelInfoTable() {
+        return channelInfoTable;
+    }
+
+
+    public List<Channel> getAllChannel() {
+        List<Channel> result = new ArrayList<Channel>();
+
+        result.addAll(this.channelInfoTable.keySet());
+
+        return result;
+    }
+
+
+    public List<String> getAllClientId() {
+        List<String> result = new ArrayList<String>();
+
+        Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator();
+
+        while (it.hasNext()) {
+            Entry<Channel, ClientChannelInfo> entry = it.next();
+            ClientChannelInfo clientChannelInfo = entry.getValue();
+            result.add(clientChannelInfo.getClientId());
+        }
+
+        return result;
+    }
+
+
+    public void unregisterChannel(final ClientChannelInfo clientChannelInfo) {
+        ClientChannelInfo old = this.channelInfoTable.remove(clientChannelInfo.getChannel());
+        if (old != null) {
+            log.info("unregister a consumer[{}] from consumerGroupInfo {}", this.groupName, old.toString());
+        }
+    }
+
+
+    public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) {
+        final ClientChannelInfo info = this.channelInfoTable.remove(channel);
+        if (info != null) {
+            log.warn(
+                    "NETTY EVENT: remove not active channel[{}] from ConsumerGroupInfo groupChannelTable, consumer group: {}",
+                    info.toString(), groupName);
+            return true;
+        }
+
+        return false;
+    }
+
+    public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,
+                                 MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {
+        boolean updated = false;
+        this.consumeType = consumeType;
+        this.messageModel = messageModel;
+        this.consumeFromWhere = consumeFromWhere;
+
+        ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel());
+        if (null == infoOld) {
+            ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew);
+            if (null == prev) {
+                log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType,
+                        messageModel, infoNew.toString());
+                updated = true;
+            }
+
+            infoOld = infoNew;
+        } else {
+            if (!infoOld.getClientId().equals(infoNew.getClientId())) {
+                log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ",
+                        this.groupName,
+                        infoOld.toString(),
+                        infoNew.toString());
+                this.channelInfoTable.put(infoNew.getChannel(), infoNew);
+            }
+        }
+
+        this.lastUpdateTimestamp = System.currentTimeMillis();
+        infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp);
+
+        return updated;
+    }
+
+
+    public boolean updateSubscription(final Set<SubscriptionData> subList) {
+        boolean updated = false;
+
+        for (SubscriptionData sub : subList) {
+            SubscriptionData old = this.subscriptionTable.get(sub.getTopic());
+            if (old == null) {
+                SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub);
+                if (null == prev) {
+                    updated = true;
+                    log.info("subscription changed, add new topic, group: {} {}",
+                            this.groupName,
+                            sub.toString());
+                }
+            } else if (sub.getSubVersion() > old.getSubVersion()) {
+                if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {
+                    log.info("subscription changed, group: {} OLD: {} NEW: {}",
+                            this.groupName,
+                            old.toString(),
+                            sub.toString()
+                    );
+                }
+
+                this.subscriptionTable.put(sub.getTopic(), sub);
+            }
+        }
+
+
+        Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, SubscriptionData> next = it.next();
+            String oldTopic = next.getKey();
+
+            boolean exist = false;
+            for (SubscriptionData sub : subList) {
+                if (sub.getTopic().equals(oldTopic)) {
+                    exist = true;
+                    break;
+                }
+            }
+
+            if (!exist) {
+                log.warn("subscription changed, group: {} remove topic {} {}",
+                        this.groupName,
+                        oldTopic,
+                        next.getValue().toString()
+                );
+
+                it.remove();
+                updated = true;
+            }
+        }
+
+        this.lastUpdateTimestamp = System.currentTimeMillis();
+
+        return updated;
+    }
+
+
+    public Set<String> getSubscribeTopics() {
+        return subscriptionTable.keySet();
+    }
+
+
+    public SubscriptionData findSubscriptionData(final String topic) {
+        return this.subscriptionTable.get(topic);
+    }
+
+
+    public ConsumeType getConsumeType() {
+        return consumeType;
+    }
+
+
+    public void setConsumeType(ConsumeType consumeType) {
+        this.consumeType = consumeType;
+    }
+
+
+    public MessageModel getMessageModel() {
+        return messageModel;
+    }
+
+
+    public void setMessageModel(MessageModel messageModel) {
+        this.messageModel = messageModel;
+    }
+
+
+    public String getGroupName() {
+        return groupName;
+    }
+
+
+    public long getLastUpdateTimestamp() {
+        return lastUpdateTimestamp;
+    }
+
+
+    public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+        this.lastUpdateTimestamp = lastUpdateTimestamp;
+    }
+
+
+    public ConsumeFromWhere getConsumeFromWhere() {
+        return consumeFromWhere;
+    }
+
+
+    public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
+        this.consumeFromWhere = consumeFromWhere;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerIdsChangeListener.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerIdsChangeListener.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerIdsChangeListener.java
new file mode 100644
index 0000000..4da2eb3
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerIdsChangeListener.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.client;
+
+import io.netty.channel.Channel;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public interface ConsumerIdsChangeListener {
+    public void consumerIdsChanged(final String group, final List<Channel> channels);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerManager.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerManager.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerManager.java
new file mode 100644
index 0000000..48e9673
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ConsumerManager.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.client;
+
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
+import com.alibaba.rocketmq.common.protocol.heartbeat.ConsumeType;
+import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
+import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+import com.alibaba.rocketmq.remoting.common.RemotingUtil;
+import io.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ConsumerManager {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
+    private final ConcurrentHashMap<String/* Group */, ConsumerGroupInfo> consumerTable =
+            new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
+    private final ConsumerIdsChangeListener consumerIdsChangeListener;
+
+    public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener) {
+        this.consumerIdsChangeListener = consumerIdsChangeListener;
+    }
+
+    public ClientChannelInfo findChannel(final String group, final String clientId) {
+        ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
+        if (consumerGroupInfo != null) {
+            return consumerGroupInfo.findChannel(clientId);
+        }
+        return null;
+    }
+
+    public SubscriptionData findSubscriptionData(final String group, final String topic) {
+        ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);
+        if (consumerGroupInfo != null) {
+            return consumerGroupInfo.findSubscriptionData(topic);
+        }
+
+        return null;
+    }
+
+    public ConsumerGroupInfo getConsumerGroupInfo(final String group) {
+        return this.consumerTable.get(group);
+    }
+
+    public int findSubscriptionDataCount(final String group) {
+        ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);
+        if (consumerGroupInfo != null) {
+            return consumerGroupInfo.getSubscriptionTable().size();
+        }
+
+        return 0;
+    }
+
+    public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
+        Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, ConsumerGroupInfo> next = it.next();
+            ConsumerGroupInfo info = next.getValue();
+            boolean removed = info.doChannelCloseEvent(remoteAddr, channel);
+            if (removed) {
+                if (info.getChannelInfoTable().isEmpty()) {
+                    ConsumerGroupInfo remove = this.consumerTable.remove(next.getKey());
+                    if (remove != null) {
+                        log.info("unregister consumer ok, no any connection, and remove consumer group, {}",
+                                next.getKey());
+                    }
+                }
+
+                this.consumerIdsChangeListener.consumerIdsChanged(next.getKey(), info.getAllChannel());
+            }
+        }
+    }
+
+    public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
+                                    ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
+                                    final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
+
+        ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
+        if (null == consumerGroupInfo) {
+            ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
+            ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
+            consumerGroupInfo = prev != null ? prev : tmp;
+        }
+
+        boolean r1 =
+                consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
+                        consumeFromWhere);
+        boolean r2 = consumerGroupInfo.updateSubscription(subList);
+
+        if (r1 || r2) {
+            if (isNotifyConsumerIdsChangedEnable) {
+                this.consumerIdsChangeListener.consumerIdsChanged(group, consumerGroupInfo.getAllChannel());
+            }
+        }
+
+        return r1 || r2;
+    }
+
+    public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo, boolean isNotifyConsumerIdsChangedEnable) {
+        ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
+        if (null != consumerGroupInfo) {
+            consumerGroupInfo.unregisterChannel(clientChannelInfo);
+            if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {
+                ConsumerGroupInfo remove = this.consumerTable.remove(group);
+                if (remove != null) {
+                    log.info("unregister consumer ok, no any connection, and remove consumer group, {}", group);
+                }
+            }
+            if (isNotifyConsumerIdsChangedEnable) {
+                this.consumerIdsChangeListener.consumerIdsChanged(group, consumerGroupInfo.getAllChannel());
+            }
+        }
+    }
+
+    public void scanNotActiveChannel() {
+        Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, ConsumerGroupInfo> next = it.next();
+            String group = next.getKey();
+            ConsumerGroupInfo consumerGroupInfo = next.getValue();
+            ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable =
+                    consumerGroupInfo.getChannelInfoTable();
+
+            Iterator<Entry<Channel, ClientChannelInfo>> itChannel = channelInfoTable.entrySet().iterator();
+            while (itChannel.hasNext()) {
+                Entry<Channel, ClientChannelInfo> nextChannel = itChannel.next();
+                ClientChannelInfo clientChannelInfo = nextChannel.getValue();
+                long diff = System.currentTimeMillis() - clientChannelInfo.getLastUpdateTimestamp();
+                if (diff > CHANNEL_EXPIRED_TIMEOUT) {
+                    log.warn(
+                            "SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}",
+                            RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()), group);
+                    RemotingUtil.closeChannel(clientChannelInfo.getChannel());
+                    itChannel.remove();
+                }
+            }
+
+            if (channelInfoTable.isEmpty()) {
+                log.warn(
+                        "SCAN: remove expired channel from ConsumerManager consumerTable, all clear, consumerGroup={}",
+                        group);
+                it.remove();
+            }
+        }
+    }
+
+    public HashSet<String> queryTopicConsumeByWho(final String topic) {
+        HashSet<String> groups = new HashSet<String>();
+        Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, ConsumerGroupInfo> entry = it.next();
+            ConcurrentHashMap<String, SubscriptionData> subscriptionTable =
+                    entry.getValue().getSubscriptionTable();
+            if (subscriptionTable.containsKey(topic)) {
+                groups.add(entry.getKey());
+            }
+        }
+        return groups;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
new file mode 100644
index 0000000..0095913
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.client;
+
+import com.alibaba.rocketmq.broker.BrokerController;
+import io.netty.channel.Channel;
+
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class DefaultConsumerIdsChangeListener implements ConsumerIdsChangeListener {
+    private final BrokerController brokerController;
+
+
+    public DefaultConsumerIdsChangeListener(BrokerController brokerController) {
+        this.brokerController = brokerController;
+    }
+
+
+    @Override
+    public void consumerIdsChanged(String group, List<Channel> channels) {
+        if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {
+            for (Channel chl : channels) {
+                this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);
+            }
+        }
+    }
+}