You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:14:36 UTC

[45/99] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18 Rename package name from com.alibaba to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/subscription/SubscriptionGroupManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/com/alibaba/rocketmq/broker/subscription/SubscriptionGroupManager.java
deleted file mode 100644
index d954a46..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.subscription;
-
-import com.alibaba.rocketmq.broker.BrokerController;
-import com.alibaba.rocketmq.broker.BrokerPathConfigHelper;
-import com.alibaba.rocketmq.common.ConfigManager;
-import com.alibaba.rocketmq.common.DataVersion;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig;
-import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-/**
- * @author shijia.wxr
- */
-public class SubscriptionGroupManager extends ConfigManager {
-    private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-
-    private final ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
-            new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);
-    private final DataVersion dataVersion = new DataVersion();
-    private transient BrokerController brokerController;
-
-
-    public SubscriptionGroupManager() {
-        this.init();
-    }
-
-    private void init() {
-        {
-            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
-            subscriptionGroupConfig.setGroupName(MixAll.TOOLS_CONSUMER_GROUP);
-            this.subscriptionGroupTable.put(MixAll.TOOLS_CONSUMER_GROUP, subscriptionGroupConfig);
-        }
-
-        {
-            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
-            subscriptionGroupConfig.setGroupName(MixAll.FILTERSRV_CONSUMER_GROUP);
-            this.subscriptionGroupTable.put(MixAll.FILTERSRV_CONSUMER_GROUP, subscriptionGroupConfig);
-        }
-
-        {
-            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
-            subscriptionGroupConfig.setGroupName(MixAll.SELF_TEST_CONSUMER_GROUP);
-            this.subscriptionGroupTable.put(MixAll.SELF_TEST_CONSUMER_GROUP, subscriptionGroupConfig);
-        }
-
-        {
-            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
-            subscriptionGroupConfig.setGroupName(MixAll.ONS_HTTP_PROXY_GROUP);
-            subscriptionGroupConfig.setConsumeBroadcastEnable(true);
-            this.subscriptionGroupTable.put(MixAll.ONS_HTTP_PROXY_GROUP, subscriptionGroupConfig);
-        }
-
-        {
-            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
-            subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PULL_GROUP);
-            subscriptionGroupConfig.setConsumeBroadcastEnable(true);
-            this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PULL_GROUP, subscriptionGroupConfig);
-        }
-
-        {
-            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
-            subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PERMISSION_GROUP);
-            subscriptionGroupConfig.setConsumeBroadcastEnable(true);
-            this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PERMISSION_GROUP, subscriptionGroupConfig);
-        }
-
-        {
-            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
-            subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_OWNER_GROUP);
-            subscriptionGroupConfig.setConsumeBroadcastEnable(true);
-            this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_OWNER_GROUP, subscriptionGroupConfig);
-        }
-    }
-
-
-    public SubscriptionGroupManager(BrokerController brokerController) {
-        this.brokerController = brokerController;
-        this.init();
-    }
-
-
-    public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {
-        SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config);
-        if (old != null) {
-            log.info("update subscription group config, old: " + old + " new: " + config);
-        } else {
-            log.info("create new subscription group, " + config);
-        }
-
-        this.dataVersion.nextVersion();
-
-        this.persist();
-    }
-
-    public void disableConsume(final String groupName) {
-        SubscriptionGroupConfig old = this.subscriptionGroupTable.get(groupName);
-        if (old != null) {
-            old.setConsumeEnable(false);
-            this.dataVersion.nextVersion();
-        }
-    }
-
-
-    public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {
-        SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);
-        if (null == subscriptionGroupConfig) {
-            if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {
-                subscriptionGroupConfig = new SubscriptionGroupConfig();
-                subscriptionGroupConfig.setGroupName(group);
-                SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig);
-                if (null == preConfig) {
-                    log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());
-                }
-                this.dataVersion.nextVersion();
-                this.persist();
-            }
-        }
-
-        return subscriptionGroupConfig;
-    }
-
-
-    @Override
-    public String encode() {
-        return this.encode(false);
-    }
-
-    @Override
-    public String configFilePath() {
-        //return BrokerPathConfigHelper.getSubscriptionGroupPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
-        return BrokerPathConfigHelper.getSubscriptionGroupPath(System.getProperty("user.home") + File.separator + "store");
-    }
-
-    @Override
-    public void decode(String jsonString) {
-        if (jsonString != null) {
-            SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class);
-            if (obj != null) {
-                this.subscriptionGroupTable.putAll(obj.subscriptionGroupTable);
-                this.dataVersion.assignNewOne(obj.dataVersion);
-                this.printLoadDataWhenFirstBoot(obj);
-            }
-        }
-    }
-
-    public String encode(final boolean prettyFormat) {
-        return RemotingSerializable.toJson(this, prettyFormat);
-    }
-
-    private void printLoadDataWhenFirstBoot(final SubscriptionGroupManager sgm) {
-        Iterator<Entry<String, SubscriptionGroupConfig>> it = sgm.getSubscriptionGroupTable().entrySet().iterator();
-        while (it.hasNext()) {
-            Entry<String, SubscriptionGroupConfig> next = it.next();
-            log.info("load exist subscription group, {}", next.getValue().toString());
-        }
-    }
-
-    public ConcurrentHashMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() {
-        return subscriptionGroupTable;
-    }
-
-
-    public DataVersion getDataVersion() {
-        return dataVersion;
-    }
-
-
-    public void deleteSubscriptionGroupConfig(final String groupName) {
-        SubscriptionGroupConfig old = this.subscriptionGroupTable.remove(groupName);
-        if (old != null) {
-            log.info("delete subscription group OK, subscription group: " + old);
-            this.dataVersion.nextVersion();
-            this.persist();
-        } else {
-            log.warn("delete subscription group failed, subscription group: " + old + " not exist");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/topic/TopicConfigManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/com/alibaba/rocketmq/broker/topic/TopicConfigManager.java
deleted file mode 100644
index 94d7e9f..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/topic/TopicConfigManager.java
+++ /dev/null
@@ -1,440 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.topic;
-
-import com.alibaba.rocketmq.broker.BrokerController;
-import com.alibaba.rocketmq.broker.BrokerPathConfigHelper;
-import com.alibaba.rocketmq.common.ConfigManager;
-import com.alibaba.rocketmq.common.DataVersion;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.TopicConfig;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.common.constant.PermName;
-import com.alibaba.rocketmq.common.protocol.body.KVTable;
-import com.alibaba.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
-import com.alibaba.rocketmq.common.sysflag.TopicSysFlag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-
-/**
- * @author shijia.wxr
- */
-public class TopicConfigManager extends ConfigManager {
-    private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-    private static final long LOCK_TIMEOUT_MILLIS = 3000;
-    private transient final Lock lockTopicConfigTable = new ReentrantLock();
-
-    private final ConcurrentHashMap<String, TopicConfig> topicConfigTable =
-            new ConcurrentHashMap<String, TopicConfig>(1024);
-    private final DataVersion dataVersion = new DataVersion();
-    private final Set<String> systemTopicList = new HashSet<String>();
-    private transient BrokerController brokerController;
-
-
-    public TopicConfigManager() {
-    }
-
-
-    public TopicConfigManager(BrokerController brokerController) {
-        this.brokerController = brokerController;
-        {
-            // MixAll.SELF_TEST_TOPIC
-            String topic = MixAll.SELF_TEST_TOPIC;
-            TopicConfig topicConfig = new TopicConfig(topic);
-            this.systemTopicList.add(topic);
-            topicConfig.setReadQueueNums(1);
-            topicConfig.setWriteQueueNums(1);
-            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
-        }
-        {
-            // MixAll.DEFAULT_TOPIC
-            if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
-                String topic = MixAll.DEFAULT_TOPIC;
-                TopicConfig topicConfig = new TopicConfig(topic);
-                this.systemTopicList.add(topic);
-                topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
-                        .getDefaultTopicQueueNums());
-                topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
-                        .getDefaultTopicQueueNums());
-                int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
-                topicConfig.setPerm(perm);
-                this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
-            }
-        }
-        {
-            // MixAll.BENCHMARK_TOPIC
-            String topic = MixAll.BENCHMARK_TOPIC;
-            TopicConfig topicConfig = new TopicConfig(topic);
-            this.systemTopicList.add(topic);
-            topicConfig.setReadQueueNums(1024);
-            topicConfig.setWriteQueueNums(1024);
-            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
-        }
-        {
-
-            String topic = this.brokerController.getBrokerConfig().getBrokerClusterName();
-            TopicConfig topicConfig = new TopicConfig(topic);
-            this.systemTopicList.add(topic);
-            int perm = PermName.PERM_INHERIT;
-            if (this.brokerController.getBrokerConfig().isClusterTopicEnable()) {
-                perm |= PermName.PERM_READ | PermName.PERM_WRITE;
-            }
-            topicConfig.setPerm(perm);
-            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
-        }
-        {
-
-            String topic = this.brokerController.getBrokerConfig().getBrokerName();
-            TopicConfig topicConfig = new TopicConfig(topic);
-            this.systemTopicList.add(topic);
-            int perm = PermName.PERM_INHERIT;
-            if (this.brokerController.getBrokerConfig().isBrokerTopicEnable()) {
-                perm |= PermName.PERM_READ | PermName.PERM_WRITE;
-            }
-            topicConfig.setReadQueueNums(1);
-            topicConfig.setWriteQueueNums(1);
-            topicConfig.setPerm(perm);
-            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
-        }
-        {
-            // MixAll.OFFSET_MOVED_EVENT
-            String topic = MixAll.OFFSET_MOVED_EVENT;
-            TopicConfig topicConfig = new TopicConfig(topic);
-            this.systemTopicList.add(topic);
-            topicConfig.setReadQueueNums(1);
-            topicConfig.setWriteQueueNums(1);
-            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
-        }
-    }
-
-
-    public boolean isSystemTopic(final String topic) {
-        return this.systemTopicList.contains(topic);
-    }
-
-
-    public Set<String> getSystemTopic() {
-        return this.systemTopicList;
-    }
-
-
-    public boolean isTopicCanSendMessage(final String topic) {
-        return !topic.equals(MixAll.DEFAULT_TOPIC);
-    }
-
-
-    public TopicConfig selectTopicConfig(final String topic) {
-        return this.topicConfigTable.get(topic);
-    }
-
-
-    public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,
-                                                      final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {
-        TopicConfig topicConfig = null;
-        boolean createNew = false;
-
-        try {
-            if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
-                try {
-                    topicConfig = this.topicConfigTable.get(topic);
-                    if (topicConfig != null)
-                        return topicConfig;
-
-                    TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);
-                    if (defaultTopicConfig != null) {
-                        if (defaultTopic.equals(MixAll.DEFAULT_TOPIC)) {
-                            if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
-                                defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
-                            }
-                        }
-
-                        if (PermName.isInherited(defaultTopicConfig.getPerm())) {
-                            topicConfig = new TopicConfig(topic);
-
-                            int queueNums =
-                                    clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig
-                                            .getWriteQueueNums() : clientDefaultTopicQueueNums;
-
-                            if (queueNums < 0) {
-                                queueNums = 0;
-                            }
-
-                            topicConfig.setReadQueueNums(queueNums);
-                            topicConfig.setWriteQueueNums(queueNums);
-                            int perm = defaultTopicConfig.getPerm();
-                            perm &= ~PermName.PERM_INHERIT;
-                            topicConfig.setPerm(perm);
-                            topicConfig.setTopicSysFlag(topicSysFlag);
-                            topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
-                        } else {
-                            LOG.warn("create new topic failed, because the default topic[" + defaultTopic
-                                    + "] no perm, " + defaultTopicConfig.getPerm() + " producer: "
-                                    + remoteAddress);
-                        }
-                    } else {
-                        LOG.warn("create new topic failed, because the default topic[" + defaultTopic
-                                + "] not exist." + " producer: " + remoteAddress);
-                    }
-
-                    if (topicConfig != null) {
-                        LOG.info("create new topic by default topic[" + defaultTopic + "], " + topicConfig
-                                + " producer: " + remoteAddress);
-
-                        this.topicConfigTable.put(topic, topicConfig);
-
-                        this.dataVersion.nextVersion();
-
-                        createNew = true;
-
-                        this.persist();
-                    }
-                } finally {
-                    this.lockTopicConfigTable.unlock();
-                }
-            }
-        } catch (InterruptedException e) {
-            LOG.error("createTopicInSendMessageMethod exception", e);
-        }
-
-        if (createNew) {
-            this.brokerController.registerBrokerAll(false, true);
-        }
-
-        return topicConfig;
-    }
-
-    public TopicConfig createTopicInSendMessageBackMethod(
-            final String topic,
-            final int clientDefaultTopicQueueNums,
-            final int perm,
-            final int topicSysFlag) {
-        TopicConfig topicConfig = this.topicConfigTable.get(topic);
-        if (topicConfig != null)
-            return topicConfig;
-
-        boolean createNew = false;
-
-        try {
-            if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
-                try {
-                    topicConfig = this.topicConfigTable.get(topic);
-                    if (topicConfig != null)
-                        return topicConfig;
-
-                    topicConfig = new TopicConfig(topic);
-                    topicConfig.setReadQueueNums(clientDefaultTopicQueueNums);
-                    topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums);
-                    topicConfig.setPerm(perm);
-                    topicConfig.setTopicSysFlag(topicSysFlag);
-
-                    LOG.info("create new topic {}", topicConfig);
-                    this.topicConfigTable.put(topic, topicConfig);
-                    createNew = true;
-                    this.dataVersion.nextVersion();
-                    this.persist();
-                } finally {
-                    this.lockTopicConfigTable.unlock();
-                }
-            }
-        } catch (InterruptedException e) {
-            LOG.error("createTopicInSendMessageBackMethod exception", e);
-        }
-
-        if (createNew) {
-            this.brokerController.registerBrokerAll(false, true);
-        }
-
-        return topicConfig;
-    }
-
-    public void updateTopicUnitFlag(final String topic, final boolean unit) {
-
-        TopicConfig topicConfig = this.topicConfigTable.get(topic);
-        if (topicConfig != null) {
-            int oldTopicSysFlag = topicConfig.getTopicSysFlag();
-            if (unit) {
-                topicConfig.setTopicSysFlag(TopicSysFlag.setUnitFlag(oldTopicSysFlag));
-            } else {
-                topicConfig.setTopicSysFlag(TopicSysFlag.clearUnitFlag(oldTopicSysFlag));
-            }
-
-            LOG.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag,
-                    topicConfig.getTopicSysFlag());
-
-            this.topicConfigTable.put(topic, topicConfig);
-
-            this.dataVersion.nextVersion();
-
-            this.persist();
-            this.brokerController.registerBrokerAll(false, true);
-        }
-    }
-
-    public void updateTopicUnitSubFlag(final String topic, final boolean hasUnitSub) {
-        TopicConfig topicConfig = this.topicConfigTable.get(topic);
-        if (topicConfig != null) {
-            int oldTopicSysFlag = topicConfig.getTopicSysFlag();
-            if (hasUnitSub) {
-                topicConfig.setTopicSysFlag(TopicSysFlag.setUnitSubFlag(oldTopicSysFlag));
-            }
-
-            LOG.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag,
-                    topicConfig.getTopicSysFlag());
-
-            this.topicConfigTable.put(topic, topicConfig);
-
-            this.dataVersion.nextVersion();
-
-            this.persist();
-            this.brokerController.registerBrokerAll(false, true);
-        }
-    }
-
-    public void updateTopicConfig(final TopicConfig topicConfig) {
-        TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
-        if (old != null) {
-            LOG.info("update topic config, old: " + old + " new: " + topicConfig);
-        } else {
-            LOG.info("create new topic, " + topicConfig);
-        }
-
-        this.dataVersion.nextVersion();
-
-        this.persist();
-    }
-
-
-    public void updateOrderTopicConfig(final KVTable orderKVTableFromNs) {
-
-        if (orderKVTableFromNs != null && orderKVTableFromNs.getTable() != null) {
-            boolean isChange = false;
-            Set<String> orderTopics = orderKVTableFromNs.getTable().keySet();
-            for (String topic : orderTopics) {
-                TopicConfig topicConfig = this.topicConfigTable.get(topic);
-                if (topicConfig != null && !topicConfig.isOrder()) {
-                    topicConfig.setOrder(true);
-                    isChange = true;
-                    LOG.info("update order topic config, topic={}, order={}", topic, true);
-                }
-            }
-
-            for (Map.Entry<String, TopicConfig> entry : this.topicConfigTable.entrySet()) {
-                String topic = entry.getKey();
-                if (!orderTopics.contains(topic)) {
-                    TopicConfig topicConfig = entry.getValue();
-                    if (topicConfig.isOrder()) {
-                        topicConfig.setOrder(false);
-                        isChange = true;
-                        LOG.info("update order topic config, topic={}, order={}", topic, false);
-                    }
-                }
-            }
-
-            if (isChange) {
-                this.dataVersion.nextVersion();
-                this.persist();
-            }
-        }
-    }
-
-    public boolean isOrderTopic(final String topic) {
-        TopicConfig topicConfig = this.topicConfigTable.get(topic);
-        if (topicConfig == null) {
-            return false;
-        } else {
-            return topicConfig.isOrder();
-        }
-    }
-
-    public void deleteTopicConfig(final String topic) {
-        TopicConfig old = this.topicConfigTable.remove(topic);
-        if (old != null) {
-            LOG.info("delete topic config OK, topic: " + old);
-            this.dataVersion.nextVersion();
-            this.persist();
-        } else {
-            LOG.warn("delete topic config failed, topic: " + topic + " not exist");
-        }
-    }
-
-    public TopicConfigSerializeWrapper buildTopicConfigSerializeWrapper() {
-        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
-        topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);
-        topicConfigSerializeWrapper.setDataVersion(this.dataVersion);
-        return topicConfigSerializeWrapper;
-    }
-
-    @Override
-    public String encode() {
-        return encode(false);
-    }
-
-    @Override
-    public String configFilePath() {
-//        return BrokerPathConfigHelper.getTopicConfigPath(this.brokerController.getMessageStoreConfig()
-//                .getStorePathRootDir());
-        return BrokerPathConfigHelper.getTopicConfigPath(System.getProperty("user.home") + File.separator + "store");
-    }
-
-    @Override
-    public void decode(String jsonString) {
-        if (jsonString != null) {
-            TopicConfigSerializeWrapper topicConfigSerializeWrapper =
-                    TopicConfigSerializeWrapper.fromJson(jsonString, TopicConfigSerializeWrapper.class);
-            if (topicConfigSerializeWrapper != null) {
-                this.topicConfigTable.putAll(topicConfigSerializeWrapper.getTopicConfigTable());
-                this.dataVersion.assignNewOne(topicConfigSerializeWrapper.getDataVersion());
-                this.printLoadDataWhenFirstBoot(topicConfigSerializeWrapper);
-            }
-        }
-    }
-
-    public String encode(final boolean prettyFormat) {
-        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
-        topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);
-        topicConfigSerializeWrapper.setDataVersion(this.dataVersion);
-        return topicConfigSerializeWrapper.toJson(prettyFormat);
-    }
-
-    private void printLoadDataWhenFirstBoot(final TopicConfigSerializeWrapper tcs) {
-        Iterator<Entry<String, TopicConfig>> it = tcs.getTopicConfigTable().entrySet().iterator();
-        while (it.hasNext()) {
-            Entry<String, TopicConfig> next = it.next();
-            LOG.info("load exist local topic, {}", next.getValue().toString());
-        }
-    }
-
-    public DataVersion getDataVersion() {
-        return dataVersion;
-    }
-
-    public ConcurrentHashMap<String, TopicConfig> getTopicConfigTable() {
-        return topicConfigTable;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionRecord.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionRecord.java b/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionRecord.java
deleted file mode 100644
index 4328cf8..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionRecord.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package com.alibaba.rocketmq.broker.transaction;
-
-public class TransactionRecord {
-    // Commit Log Offset
-    private long offset;
-    private String producerGroup;
-
-
-    public long getOffset() {
-        return offset;
-    }
-
-
-    public void setOffset(long offset) {
-        this.offset = offset;
-    }
-
-
-    public String getProducerGroup() {
-        return producerGroup;
-    }
-
-
-    public void setProducerGroup(String producerGroup) {
-        this.producerGroup = producerGroup;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionStore.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionStore.java b/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionStore.java
deleted file mode 100644
index 9d977ab..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/TransactionStore.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package com.alibaba.rocketmq.broker.transaction;
-
-import java.util.List;
-
-
-public interface TransactionStore {
-    public boolean open();
-
-
-    public void close();
-
-
-    public boolean put(final List<TransactionRecord> trs);
-
-
-    public void remove(final List<Long> pks);
-
-
-    public List<TransactionRecord> traverse(final long pk, final int nums);
-
-
-    public long totalRecords();
-
-
-    public long minPK();
-
-
-    public long maxPK();
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java b/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java
deleted file mode 100644
index 47de33b..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package com.alibaba.rocketmq.broker.transaction.jdbc;
-
-import com.alibaba.rocketmq.broker.transaction.TransactionRecord;
-import com.alibaba.rocketmq.broker.transaction.TransactionStore;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URL;
-import java.sql.*;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicLong;
-
-
-public class JDBCTransactionStore implements TransactionStore {
-    private static final Logger log = LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
-    private final JDBCTransactionStoreConfig jdbcTransactionStoreConfig;
-    private Connection connection;
-    private AtomicLong totalRecordsValue = new AtomicLong(0);
-
-    public JDBCTransactionStore(JDBCTransactionStoreConfig jdbcTransactionStoreConfig) {
-        this.jdbcTransactionStoreConfig = jdbcTransactionStoreConfig;
-    }
-
-    @Override
-    public boolean open() {
-        if (this.loadDriver()) {
-            Properties props = new Properties();
-            props.put("user", jdbcTransactionStoreConfig.getJdbcUser());
-            props.put("password", jdbcTransactionStoreConfig.getJdbcPassword());
-
-            try {
-                this.connection =
-                        DriverManager.getConnection(this.jdbcTransactionStoreConfig.getJdbcURL(), props);
-
-                this.connection.setAutoCommit(false);
-
-
-                if (!this.computeTotalRecords()) {
-                    return this.createDB();
-                }
-
-                return true;
-            } catch (SQLException e) {
-                log.info("Create JDBC Connection Exeption", e);
-            }
-        }
-
-        return false;
-    }
-
-    private boolean loadDriver() {
-        try {
-            Class.forName(this.jdbcTransactionStoreConfig.getJdbcDriverClass()).newInstance();
-            log.info("Loaded the appropriate driver, {}",
-                    this.jdbcTransactionStoreConfig.getJdbcDriverClass());
-            return true;
-        } catch (Exception e) {
-            log.info("Loaded the appropriate driver Exception", e);
-        }
-
-        return false;
-    }
-
-    private boolean computeTotalRecords() {
-        Statement statement = null;
-        ResultSet resultSet = null;
-        try {
-            statement = this.connection.createStatement();
-
-            resultSet = statement.executeQuery("select count(offset) as total from t_transaction");
-            if (!resultSet.next()) {
-                log.warn("computeTotalRecords ResultSet is empty");
-                return false;
-            }
-
-            this.totalRecordsValue.set(resultSet.getLong(1));
-        } catch (Exception e) {
-            log.warn("computeTotalRecords Exception", e);
-            return false;
-        } finally {
-            if (null != statement) {
-                try {
-                    statement.close();
-                } catch (SQLException e) {
-                }
-            }
-
-            if (null != resultSet) {
-                try {
-                    resultSet.close();
-                } catch (SQLException e) {
-                }
-            }
-        }
-
-        return true;
-    }
-
-    private boolean createDB() {
-        Statement statement = null;
-        try {
-            statement = this.connection.createStatement();
-
-            String sql = this.createTableSql();
-            log.info("createDB SQL:\n {}", sql);
-            statement.execute(sql);
-            this.connection.commit();
-            return true;
-        } catch (Exception e) {
-            log.warn("createDB Exception", e);
-            return false;
-        } finally {
-            if (null != statement) {
-                try {
-                    statement.close();
-                } catch (SQLException e) {
-                    log.warn("Close statement exception", e);
-                }
-            }
-        }
-    }
-
-    private String createTableSql() {
-        URL resource = JDBCTransactionStore.class.getClassLoader().getResource("transaction.sql");
-        String fileContent = MixAll.file2String(resource);
-        return fileContent;
-    }
-
-    @Override
-    public void close() {
-        try {
-            if (this.connection != null) {
-                this.connection.close();
-            }
-        } catch (SQLException e) {
-        }
-    }
-
-    @Override
-    public boolean put(List<TransactionRecord> trs) {
-        PreparedStatement statement = null;
-        try {
-            this.connection.setAutoCommit(false);
-            statement = this.connection.prepareStatement("insert into t_transaction values (?, ?)");
-            for (TransactionRecord tr : trs) {
-                statement.setLong(1, tr.getOffset());
-                statement.setString(2, tr.getProducerGroup());
-                statement.addBatch();
-            }
-            int[] executeBatch = statement.executeBatch();
-            this.connection.commit();
-            this.totalRecordsValue.addAndGet(updatedRows(executeBatch));
-            return true;
-        } catch (Exception e) {
-            log.warn("createDB Exception", e);
-            return false;
-        } finally {
-            if (null != statement) {
-                try {
-                    statement.close();
-                } catch (SQLException e) {
-                    log.warn("Close statement exception", e);
-                }
-            }
-        }
-    }
-
-    private long updatedRows(int[] rows) {
-        long res = 0;
-        for (int i : rows) {
-            res += i;
-        }
-
-        return res;
-    }
-
-    @Override
-    public void remove(List<Long> pks) {
-        PreparedStatement statement = null;
-        try {
-            this.connection.setAutoCommit(false);
-            statement = this.connection.prepareStatement("DELETE FROM t_transaction WHERE offset = ?");
-            for (long pk : pks) {
-                statement.setLong(1, pk);
-                statement.addBatch();
-            }
-            int[] executeBatch = statement.executeBatch();
-            this.connection.commit();
-        } catch (Exception e) {
-            log.warn("createDB Exception", e);
-        } finally {
-            if (null != statement) {
-                try {
-                    statement.close();
-                } catch (SQLException e) {
-                }
-            }
-        }
-    }
-
-    @Override
-    public List<TransactionRecord> traverse(long pk, int nums) {
-        return null;
-    }
-
-    @Override
-    public long totalRecords() {
-        return this.totalRecordsValue.get();
-    }
-
-    @Override
-    public long minPK() {
-        return 0;
-    }
-
-    @Override
-    public long maxPK() {
-        return 0;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java b/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java
deleted file mode 100644
index 1244cfc..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package com.alibaba.rocketmq.broker.transaction.jdbc;
-
-public class JDBCTransactionStoreConfig {
-    private String jdbcDriverClass = "com.mysql.jdbc.Driver";
-    private String jdbcURL = "jdbc:mysql://xxx.xxx.xxx.xxx:1000/xxx?useUnicode=true&characterEncoding=UTF-8";
-    private String jdbcUser = "xxx";
-    private String jdbcPassword = "xxx";
-
-
-    public String getJdbcDriverClass() {
-        return jdbcDriverClass;
-    }
-
-
-    public void setJdbcDriverClass(String jdbcDriverClass) {
-        this.jdbcDriverClass = jdbcDriverClass;
-    }
-
-
-    public String getJdbcURL() {
-        return jdbcURL;
-    }
-
-
-    public void setJdbcURL(String jdbcURL) {
-        this.jdbcURL = jdbcURL;
-    }
-
-
-    public String getJdbcUser() {
-        return jdbcUser;
-    }
-
-
-    public void setJdbcUser(String jdbcUser) {
-        this.jdbcUser = jdbcUser;
-    }
-
-
-    public String getJdbcPassword() {
-        return jdbcPassword;
-    }
-
-
-    public void setJdbcPassword(String jdbcPassword) {
-        this.jdbcPassword = jdbcPassword;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
new file mode 100644
index 0000000..c5c05f4
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -0,0 +1,773 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker;
+
+import org.apache.rocketmq.broker.client.*;
+import org.apache.rocketmq.broker.client.net.Broker2Client;
+import org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager;
+import org.apache.rocketmq.broker.filtersrv.FilterServerManager;
+import org.apache.rocketmq.broker.latency.BrokerFastFailure;
+import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
+import org.apache.rocketmq.broker.longpolling.NotifyMessageArrivingListener;
+import org.apache.rocketmq.broker.longpolling.PullRequestHoldService;
+import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
+import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
+import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.broker.plugin.MessageStoreFactory;
+import org.apache.rocketmq.broker.plugin.MessageStorePluginContext;
+import org.apache.rocketmq.broker.processor.*;
+import org.apache.rocketmq.broker.slave.SlaveSynchronize;
+import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
+import org.apache.rocketmq.broker.topic.TopicConfigManager;
+import org.apache.rocketmq.common.*;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.common.stats.MomentStatsItem;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.RemotingServer;
+import org.apache.rocketmq.remoting.netty.*;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.MessageArrivingListener;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.stats.BrokerStats;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class BrokerController {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private static final Logger LOG_PROTECTION = LoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
+    private static final Logger LOG_WATER_MARK = LoggerFactory.getLogger(LoggerName.WATER_MARK_LOGGER_NAME);
+    private final BrokerConfig brokerConfig;
+    private final NettyServerConfig nettyServerConfig;
+    private final NettyClientConfig nettyClientConfig;
+    private final MessageStoreConfig messageStoreConfig;
+    private final ConsumerOffsetManager consumerOffsetManager;
+    private final ConsumerManager consumerManager;
+    private final ProducerManager producerManager;
+    private final ClientHousekeepingService clientHousekeepingService;
+    private final PullMessageProcessor pullMessageProcessor;
+    private final PullRequestHoldService pullRequestHoldService;
+    private final MessageArrivingListener messageArrivingListener;
+    private final Broker2Client broker2Client;
+    private final SubscriptionGroupManager subscriptionGroupManager;
+    private final ConsumerIdsChangeListener consumerIdsChangeListener;
+    private final RebalanceLockManager rebalanceLockManager = new RebalanceLockManager();
+    private final BrokerOuterAPI brokerOuterAPI;
+    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
+            "BrokerControllerScheduledThread"));
+    private final SlaveSynchronize slaveSynchronize;
+    private final BlockingQueue<Runnable> sendThreadPoolQueue;
+    private final BlockingQueue<Runnable> pullThreadPoolQueue;
+    private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;
+    private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;
+    private final FilterServerManager filterServerManager;
+    private final BrokerStatsManager brokerStatsManager;
+    private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
+    private final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
+    private MessageStore messageStore;
+    private RemotingServer remotingServer;
+    private RemotingServer fastRemotingServer;
+    private TopicConfigManager topicConfigManager;
+    private ExecutorService sendMessageExecutor;
+    private ExecutorService pullMessageExecutor;
+    private ExecutorService adminBrokerExecutor;
+    private ExecutorService clientManageExecutor;
+    private ExecutorService consumerManageExecutor;
+    private boolean updateMasterHAServerAddrPeriodically = false;
+    private BrokerStats brokerStats;
+    private InetSocketAddress storeHost;
+    private BrokerFastFailure brokerFastFailure;
+    private Configuration configuration;
+
+    public BrokerController(//
+                            final BrokerConfig brokerConfig, //
+                            final NettyServerConfig nettyServerConfig, //
+                            final NettyClientConfig nettyClientConfig, //
+                            final MessageStoreConfig messageStoreConfig //
+    ) {
+        this.brokerConfig = brokerConfig;
+        this.nettyServerConfig = nettyServerConfig;
+        this.nettyClientConfig = nettyClientConfig;
+        this.messageStoreConfig = messageStoreConfig;
+        this.consumerOffsetManager = new ConsumerOffsetManager(this);
+        this.topicConfigManager = new TopicConfigManager(this);
+        this.pullMessageProcessor = new PullMessageProcessor(this);
+        this.pullRequestHoldService = new PullRequestHoldService(this);
+        this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
+        this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
+        this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
+        this.producerManager = new ProducerManager();
+        this.clientHousekeepingService = new ClientHousekeepingService(this);
+        this.broker2Client = new Broker2Client(this);
+        this.subscriptionGroupManager = new SubscriptionGroupManager(this);
+        this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
+        this.filterServerManager = new FilterServerManager(this);
+
+        if (this.brokerConfig.getNamesrvAddr() != null) {
+            this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
+            log.info("user specfied name server address: {}", this.brokerConfig.getNamesrvAddr());
+        }
+
+        this.slaveSynchronize = new SlaveSynchronize(this);
+
+        this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
+
+        this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
+        this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
+        this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
+
+        this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
+        this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
+
+        this.brokerFastFailure = new BrokerFastFailure(this);
+        this.configuration = new Configuration(
+                log,
+                BrokerPathConfigHelper.getBrokerConfigPath(),
+                this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
+        );
+    }
+
+    public BrokerConfig getBrokerConfig() {
+        return brokerConfig;
+    }
+
+    public NettyServerConfig getNettyServerConfig() {
+        return nettyServerConfig;
+    }
+
+    public BlockingQueue<Runnable> getPullThreadPoolQueue() {
+        return pullThreadPoolQueue;
+    }
+
+    public boolean initialize() throws CloneNotSupportedException {
+        boolean result = true;
+
+        result = result && this.topicConfigManager.load();
+
+        result = result && this.consumerOffsetManager.load();
+        result = result && this.subscriptionGroupManager.load();
+
+        if (result) {
+            try {
+                this.messageStore =
+                        new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
+                                this.brokerConfig);
+                this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
+                //load plugin
+                MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
+                this.messageStore = MessageStoreFactory.build(context, this.messageStore);
+            } catch (IOException e) {
+                result = false;
+                e.printStackTrace();
+            }
+        }
+
+        result = result && this.messageStore.load();
+
+        if (result) {
+            this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
+            NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
+            fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
+            this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
+            this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
+                    this.brokerConfig.getSendMessageThreadPoolNums(),
+                    this.brokerConfig.getSendMessageThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MILLISECONDS,
+                    this.sendThreadPoolQueue,
+                    new ThreadFactoryImpl("SendMessageThread_"));
+
+            this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
+                    this.brokerConfig.getPullMessageThreadPoolNums(),
+                    this.brokerConfig.getPullMessageThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MILLISECONDS,
+                    this.pullThreadPoolQueue,
+                    new ThreadFactoryImpl("PullMessageThread_"));
+
+            this.adminBrokerExecutor =
+                    Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
+                            "AdminBrokerThread_"));
+
+            this.clientManageExecutor = new ThreadPoolExecutor(
+                    this.brokerConfig.getClientManageThreadPoolNums(),
+                    this.brokerConfig.getClientManageThreadPoolNums(),
+                    1000 * 60,
+                    TimeUnit.MILLISECONDS,
+                    this.clientManagerThreadPoolQueue,
+                    new ThreadFactoryImpl("ClientManageThread_"));
+
+            this.consumerManageExecutor =
+                    Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
+                            "ConsumerManageThread_"));
+
+            this.registerProcessor();
+
+
+            // TODO remove in future
+            final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis();
+            final long period = 1000 * 60 * 60 * 24;
+            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        BrokerController.this.getBrokerStats().record();
+                    } catch (Throwable e) {
+                        log.error("schedule record error.", e);
+                    }
+                }
+            }, initialDelay, period, TimeUnit.MILLISECONDS);
+
+            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        BrokerController.this.consumerOffsetManager.persist();
+                    } catch (Throwable e) {
+                        log.error("schedule persist consumerOffset error.", e);
+                    }
+                }
+            }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
+
+
+            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        BrokerController.this.protectBroker();
+                    } catch (Exception e) {
+                        log.error("protectBroker error.", e);
+                    }
+                }
+            }, 3, 3, TimeUnit.MINUTES);
+
+            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        BrokerController.this.printWaterMark();
+                    } catch (Exception e) {
+                        log.error("printWaterMark error.", e);
+                    }
+                }
+            }, 10, 1, TimeUnit.SECONDS);
+
+            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
+                    } catch (Throwable e) {
+                        log.error("schedule dispatchBehindBytes error.", e);
+                    }
+                }
+            }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
+
+            if (this.brokerConfig.getNamesrvAddr() != null) {
+                this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
+            } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
+                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+
+                    @Override
+                    public void run() {
+                        try {
+                            BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
+                        } catch (Throwable e) {
+                            log.error("ScheduledTask fetchNameServerAddr exception", e);
+                        }
+                    }
+                }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
+            }
+
+            if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
+                if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
+                    this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
+                    this.updateMasterHAServerAddrPeriodically = false;
+                } else {
+                    this.updateMasterHAServerAddrPeriodically = true;
+                }
+
+                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+
+                    @Override
+                    public void run() {
+                        try {
+                            BrokerController.this.slaveSynchronize.syncAll();
+                        } catch (Throwable e) {
+                            log.error("ScheduledTask syncAll slave exception", e);
+                        }
+                    }
+                }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
+            } else {
+                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+
+                    @Override
+                    public void run() {
+                        try {
+                            BrokerController.this.printMasterAndSlaveDiff();
+                        } catch (Throwable e) {
+                            log.error("schedule printMasterAndSlaveDiff error.", e);
+                        }
+                    }
+                }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
+            }
+        }
+
+        return result;
+    }
+
+    public void registerProcessor() {
+        /**
+         * SendMessageProcessor
+         */
+        SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
+        sendProcessor.registerSendMessageHook(sendMessageHookList);
+        sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
+
+        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
+        /**
+         * PullMessageProcessor
+         */
+        this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
+        this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
+
+        /**
+         * QueryMessageProcessor
+         */
+        NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
+        this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.pullMessageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.pullMessageExecutor);
+
+        this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.pullMessageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.pullMessageExecutor);
+
+        /**
+         * ClientManageProcessor
+         */
+        ClientManageProcessor clientProcessor = new ClientManageProcessor(this);
+        this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
+
+        this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.clientManageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);
+
+        /**
+         * ConsumerManageProcessor
+         */
+        ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);
+        this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+
+        this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+
+
+        /**
+         * EndTransactionProcessor
+         */
+        this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);
+
+        /**
+         * Default
+         */
+        AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);
+        this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
+        this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);
+    }
+
+    public BrokerStats getBrokerStats() {
+        return brokerStats;
+    }
+
+    public void setBrokerStats(BrokerStats brokerStats) {
+        this.brokerStats = brokerStats;
+    }
+
+    public void protectBroker() {
+        if (this.brokerConfig.isDisableConsumeIfConsumerReadSlowly()) {
+            final Iterator<Map.Entry<String, MomentStatsItem>> it = this.brokerStatsManager.getMomentStatsItemSetFallSize().getStatsItemTable().entrySet().iterator();
+            while (it.hasNext()) {
+                final Map.Entry<String, MomentStatsItem> next = it.next();
+                final long fallBehindBytes = next.getValue().getValue().get();
+                if (fallBehindBytes > this.brokerConfig.getConsumerFallbehindThreshold()) {
+                    final String[] split = next.getValue().getStatsKey().split("@");
+                    final String group = split[2];
+                    LOG_PROTECTION.info("[PROTECT_BROKER] the consumer[{}] consume slowly, {} bytes, disable it", group, fallBehindBytes);
+                    this.subscriptionGroupManager.disableConsume(group);
+                }
+            }
+        }
+    }
+
+    public long headSlowTimeMills(BlockingQueue<Runnable> q) {
+        long slowTimeMills = 0;
+        final Runnable peek = q.peek();
+        if (peek != null) {
+            RequestTask rt = BrokerFastFailure.castRunnable(peek);
+            slowTimeMills = this.messageStore.now() - rt.getCreateTimestamp();
+        }
+
+        if (slowTimeMills < 0) slowTimeMills = 0;
+
+        return slowTimeMills;
+    }
+
+    public long headSlowTimeMills4SendThreadPoolQueue() {
+        return this.headSlowTimeMills(this.sendThreadPoolQueue);
+    }
+
+    public long headSlowTimeMills4PullThreadPoolQueue() {
+        return this.headSlowTimeMills(this.pullThreadPoolQueue);
+    }
+
+    public void printWaterMark() {
+        LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue());
+        LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue());
+    }
+
+    public MessageStore getMessageStore() {
+        return messageStore;
+    }
+
+    public void setMessageStore(MessageStore messageStore) {
+        this.messageStore = messageStore;
+    }
+
+    private void printMasterAndSlaveDiff() {
+        long diff = this.messageStore.slaveFallBehindMuch();
+
+        // XXX: warn and notify me
+        log.info("slave fall behind master, how much, {} bytes", diff);
+    }
+
+    public Broker2Client getBroker2Client() {
+        return broker2Client;
+    }
+
+    public ConsumerManager getConsumerManager() {
+        return consumerManager;
+    }
+
+    public ConsumerOffsetManager getConsumerOffsetManager() {
+        return consumerOffsetManager;
+    }
+
+    public MessageStoreConfig getMessageStoreConfig() {
+        return messageStoreConfig;
+    }
+
+    public ProducerManager getProducerManager() {
+        return producerManager;
+    }
+
+    public void setFastRemotingServer(RemotingServer fastRemotingServer) {
+        this.fastRemotingServer = fastRemotingServer;
+    }
+
+    public PullMessageProcessor getPullMessageProcessor() {
+        return pullMessageProcessor;
+    }
+
+    public PullRequestHoldService getPullRequestHoldService() {
+        return pullRequestHoldService;
+    }
+
+    public SubscriptionGroupManager getSubscriptionGroupManager() {
+        return subscriptionGroupManager;
+    }
+
+    public void shutdown() {
+        if (this.brokerStatsManager != null) {
+            this.brokerStatsManager.shutdown();
+        }
+
+        if (this.clientHousekeepingService != null) {
+            this.clientHousekeepingService.shutdown();
+        }
+
+        if (this.pullRequestHoldService != null) {
+            this.pullRequestHoldService.shutdown();
+        }
+
+        if (this.remotingServer != null) {
+            this.remotingServer.shutdown();
+        }
+
+        if (this.fastRemotingServer != null) {
+            this.fastRemotingServer.shutdown();
+        }
+
+        if (this.messageStore != null) {
+            this.messageStore.shutdown();
+        }
+
+        this.scheduledExecutorService.shutdown();
+        try {
+            this.scheduledExecutorService.awaitTermination(5000, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+        }
+
+        this.unregisterBrokerAll();
+
+        if (this.sendMessageExecutor != null) {
+            this.sendMessageExecutor.shutdown();
+        }
+
+        if (this.pullMessageExecutor != null) {
+            this.pullMessageExecutor.shutdown();
+        }
+
+        if (this.adminBrokerExecutor != null) {
+            this.adminBrokerExecutor.shutdown();
+        }
+
+        if (this.brokerOuterAPI != null) {
+            this.brokerOuterAPI.shutdown();
+        }
+
+        this.consumerOffsetManager.persist();
+
+        if (this.filterServerManager != null) {
+            this.filterServerManager.shutdown();
+        }
+
+        if (this.brokerFastFailure != null) {
+            this.brokerFastFailure.shutdown();
+        }
+    }
+
+    private void unregisterBrokerAll() {
+        this.brokerOuterAPI.unregisterBrokerAll(
+                this.brokerConfig.getBrokerClusterName(),
+                this.getBrokerAddr(),
+                this.brokerConfig.getBrokerName(),
+                this.brokerConfig.getBrokerId());
+    }
+
+    public String getBrokerAddr() {
+        return this.brokerConfig.getBrokerIP1() + ":" + this.nettyServerConfig.getListenPort();
+    }
+
+    public void start() throws Exception {
+        if (this.messageStore != null) {
+            this.messageStore.start();
+        }
+
+        if (this.remotingServer != null) {
+            this.remotingServer.start();
+        }
+
+        if (this.fastRemotingServer != null) {
+            this.fastRemotingServer.start();
+        }
+
+        if (this.brokerOuterAPI != null) {
+            this.brokerOuterAPI.start();
+        }
+
+        if (this.pullRequestHoldService != null) {
+            this.pullRequestHoldService.start();
+        }
+
+        if (this.clientHousekeepingService != null) {
+            this.clientHousekeepingService.start();
+        }
+
+        if (this.filterServerManager != null) {
+            this.filterServerManager.start();
+        }
+
+        this.registerBrokerAll(true, false);
+
+        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+
+            @Override
+            public void run() {
+                try {
+                    BrokerController.this.registerBrokerAll(true, false);
+                } catch (Throwable e) {
+                    log.error("registerBrokerAll Exception", e);
+                }
+            }
+        }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);
+
+        if (this.brokerStatsManager != null) {
+            this.brokerStatsManager.start();
+        }
+
+        if (this.brokerFastFailure != null) {
+            this.brokerFastFailure.start();
+        }
+    }
+
+    public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway) {
+        TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
+
+        if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
+                || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
+            ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
+            for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
+                TopicConfig tmp =
+                        new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
+                                this.brokerConfig.getBrokerPermission());
+                topicConfigTable.put(topicConfig.getTopicName(), tmp);
+            }
+            topicConfigWrapper.setTopicConfigTable(topicConfigTable);
+        }
+
+        RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll(
+                this.brokerConfig.getBrokerClusterName(),
+                this.getBrokerAddr(),
+                this.brokerConfig.getBrokerName(),
+                this.brokerConfig.getBrokerId(),
+                this.getHAServerAddr(),
+                topicConfigWrapper,
+                this.filterServerManager.buildNewFilterServerList(),
+                oneway,
+                this.brokerConfig.getRegisterBrokerTimeoutMills());
+
+        if (registerBrokerResult != null) {
+            if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
+                this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
+            }
+
+            this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
+
+            if (checkOrderConfig) {
+                this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
+            }
+        }
+    }
+
+    public TopicConfigManager getTopicConfigManager() {
+        return topicConfigManager;
+    }
+
+    public void setTopicConfigManager(TopicConfigManager topicConfigManager) {
+        this.topicConfigManager = topicConfigManager;
+    }
+
+    public String getHAServerAddr() {
+        return this.brokerConfig.getBrokerIP2() + ":" + this.messageStoreConfig.getHaListenPort();
+    }
+
+    public RebalanceLockManager getRebalanceLockManager() {
+        return rebalanceLockManager;
+    }
+
+    public SlaveSynchronize getSlaveSynchronize() {
+        return slaveSynchronize;
+    }
+
+    public ExecutorService getPullMessageExecutor() {
+        return pullMessageExecutor;
+    }
+
+    public void setPullMessageExecutor(ExecutorService pullMessageExecutor) {
+        this.pullMessageExecutor = pullMessageExecutor;
+    }
+
+    public BlockingQueue<Runnable> getSendThreadPoolQueue() {
+        return sendThreadPoolQueue;
+    }
+
+    public FilterServerManager getFilterServerManager() {
+        return filterServerManager;
+    }
+
+    public BrokerStatsManager getBrokerStatsManager() {
+        return brokerStatsManager;
+    }
+
+    public List<SendMessageHook> getSendMessageHookList() {
+        return sendMessageHookList;
+    }
+
+    public void registerSendMessageHook(final SendMessageHook hook) {
+        this.sendMessageHookList.add(hook);
+        log.info("register SendMessageHook Hook, {}", hook.hookName());
+    }
+
+    public List<ConsumeMessageHook> getConsumeMessageHookList() {
+        return consumeMessageHookList;
+    }
+
+    public void registerConsumeMessageHook(final ConsumeMessageHook hook) {
+        this.consumeMessageHookList.add(hook);
+        log.info("register ConsumeMessageHook Hook, {}", hook.hookName());
+    }
+
+    public void registerServerRPCHook(RPCHook rpcHook) {
+        getRemotingServer().registerRPCHook(rpcHook);
+    }
+
+    public RemotingServer getRemotingServer() {
+        return remotingServer;
+    }
+
+    public void setRemotingServer(RemotingServer remotingServer) {
+        this.remotingServer = remotingServer;
+    }
+
+    public void registerClientRPCHook(RPCHook rpcHook) {
+        this.getBrokerOuterAPI().registerRPCHook(rpcHook);
+    }
+
+    public BrokerOuterAPI getBrokerOuterAPI() {
+        return brokerOuterAPI;
+    }
+
+    public InetSocketAddress getStoreHost() {
+        return storeHost;
+    }
+
+    public void setStoreHost(InetSocketAddress storeHost) {
+        this.storeHost = storeHost;
+    }
+
+    public Configuration getConfiguration() {
+        return this.configuration;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
new file mode 100644
index 0000000..dbcd304
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.broker;
+
+import java.io.File;
+
+
+public class BrokerPathConfigHelper {
+    private static String brokerConfigPath = System.getProperty("user.home") + File.separator + "store"
+            + File.separator + "config" + File.separator + "broker.properties";
+
+
+    public static String getBrokerConfigPath() {
+        return brokerConfigPath;
+    }
+
+
+    public static void setBrokerConfigPath(String path) {
+        brokerConfigPath = path;
+    }
+
+
+    public static String getTopicConfigPath(final String rootDir) {
+        return rootDir + File.separator + "config" + File.separator + "topics.json";
+    }
+
+
+    public static String getConsumerOffsetPath(final String rootDir) {
+        return rootDir + File.separator + "config" + File.separator + "consumerOffset.json";
+    }
+
+
+    public static String getSubscriptionGroupPath(final String rootDir) {
+        return rootDir + File.separator + "config" + File.separator + "subscriptionGroup.json";
+    }
+
+}