You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:14:31 UTC

[40/99] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18 Rename package name from com.alibaba to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
new file mode 100644
index 0000000..7a0ddae
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -0,0 +1,497 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
+import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
+import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
+import org.apache.rocketmq.common.*;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.help.FAQUrl;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
+import org.apache.rocketmq.common.sysflag.TopicSysFlag;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.config.StorePathConfigHelper;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+
+import java.net.SocketAddress;
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
+
+    private List<ConsumeMessageHook> consumeMessageHookList;
+
+    public SendMessageProcessor(final BrokerController brokerController) {
+        super(brokerController);
+    }
+
+    @Override
+    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+        SendMessageContext mqtraceContext;
+        switch (request.getCode()) {
+            case RequestCode.CONSUMER_SEND_MSG_BACK:
+                return this.consumerSendMsgBack(ctx, request);
+            default:
+                SendMessageRequestHeader requestHeader = parseRequestHeader(request);
+                if (requestHeader == null) {
+                    return null;
+                }
+
+                mqtraceContext = buildMsgContext(ctx, requestHeader);
+                this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
+                final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
+
+                this.executeSendMessageHookAfter(response, mqtraceContext);
+                return response;
+        }
+    }
+
+    @Override
+    public boolean rejectRequest() {
+        return this.brokerController.getMessageStore().isOSPageCacheBusy() ||
+                this.brokerController.getMessageStore().isTransientStorePoolDeficient();
+    }
+
+    private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
+            throws RemotingCommandException {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        final ConsumerSendMsgBackRequestHeader requestHeader =
+                (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
+
+        if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {
+
+            ConsumeMessageContext context = new ConsumeMessageContext();
+            context.setConsumerGroup(requestHeader.getGroup());
+            context.setTopic(requestHeader.getOriginTopic());
+            context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);
+            context.setCommercialRcvTimes(1);
+            context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER));
+
+            this.executeConsumeMessageHookAfter(context);
+        }
+
+
+        SubscriptionGroupConfig subscriptionGroupConfig =
+                this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
+        if (null == subscriptionGroupConfig) {
+            response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
+            response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
+                    + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
+            return response;
+        }
+
+
+        if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
+            response.setCode(ResponseCode.NO_PERMISSION);
+            response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
+            return response;
+        }
+
+
+        if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
+            return response;
+        }
+
+        String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
+        int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
+
+
+        int topicSysFlag = 0;
+        if (requestHeader.isUnitMode()) {
+            topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
+        }
+
+
+        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(//
+                newTopic, //
+                subscriptionGroupConfig.getRetryQueueNums(), //
+                PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
+        if (null == topicConfig) {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("topic[" + newTopic + "] not exist");
+            return response;
+        }
+
+
+        if (!PermName.isWriteable(topicConfig.getPerm())) {
+            response.setCode(ResponseCode.NO_PERMISSION);
+            response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
+            return response;
+        }
+
+        MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
+        if (null == msgExt) {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("look message by offset failed, " + requestHeader.getOffset());
+            return response;
+        }
+
+
+        final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
+        if (null == retryTopic) {
+            MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
+        }
+        msgExt.setWaitStoreMsgOK(false);
+
+
+        int delayLevel = requestHeader.getDelayLevel();
+
+
+        int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
+        if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
+            maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
+        }
+
+
+        if (msgExt.getReconsumeTimes() >= maxReconsumeTimes//
+                || delayLevel < 0) {
+            newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
+            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
+
+            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
+                    DLQ_NUMS_PER_GROUP, //
+                    PermName.PERM_WRITE, 0
+            );
+            if (null == topicConfig) {
+                response.setCode(ResponseCode.SYSTEM_ERROR);
+                response.setRemark("topic[" + newTopic + "] not exist");
+                return response;
+            }
+        } else {
+            if (0 == delayLevel) {
+                delayLevel = 3 + msgExt.getReconsumeTimes();
+            }
+
+            msgExt.setDelayTimeLevel(delayLevel);
+        }
+
+        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
+        msgInner.setTopic(newTopic);
+        msgInner.setBody(msgExt.getBody());
+        msgInner.setFlag(msgExt.getFlag());
+        MessageAccessor.setProperties(msgInner, msgExt.getProperties());
+        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
+        msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
+
+        msgInner.setQueueId(queueIdInt);
+        msgInner.setSysFlag(msgExt.getSysFlag());
+        msgInner.setBornTimestamp(msgExt.getBornTimestamp());
+        msgInner.setBornHost(msgExt.getBornHost());
+        msgInner.setStoreHost(this.getStoreHost());
+        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
+
+        String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
+        MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
+
+        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
+        if (putMessageResult != null) {
+            switch (putMessageResult.getPutMessageStatus()) {
+                case PUT_OK:
+                    String backTopic = msgExt.getTopic();
+                    String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
+                    if (correctTopic != null) {
+                        backTopic = correctTopic;
+                    }
+
+                    this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
+
+                    response.setCode(ResponseCode.SUCCESS);
+                    response.setRemark(null);
+
+                    return response;
+                default:
+                    break;
+            }
+
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(putMessageResult.getPutMessageStatus().name());
+            return response;
+        }
+
+        response.setCode(ResponseCode.SYSTEM_ERROR);
+        response.setRemark("putMessageResult is null");
+        return response;
+    }
+
+    private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //
+                                        final RemotingCommand request, //
+                                        final SendMessageContext sendMessageContext, //
+                                        final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
+
+        final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
+        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
+
+
+        response.setOpaque(request.getOpaque());
+
+        response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
+        response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
+
+        if (log.isDebugEnabled()) {
+            log.debug("receive SendMessage request command, " + request);
+        }
+
+        final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
+        if (this.brokerController.getMessageStore().now() < startTimstamp) {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
+            return response;
+        }
+
+        response.setCode(-1);
+        super.msgCheck(ctx, requestHeader, response);
+        if (response.getCode() != -1) {
+            return response;
+        }
+
+        final byte[] body = request.getBody();
+
+        int queueIdInt = requestHeader.getQueueId();
+        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
+
+        if (queueIdInt < 0) {
+            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
+        }
+
+        int sysFlag = requestHeader.getSysFlag();
+
+        if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
+            sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
+        }
+
+        String newTopic = requestHeader.getTopic();
+        if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+            String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
+            SubscriptionGroupConfig subscriptionGroupConfig =
+                    this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
+            if (null == subscriptionGroupConfig) {
+                response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
+                response.setRemark(
+                        "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
+                return response;
+            }
+
+
+            int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
+            if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
+                maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
+            }
+            int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
+            if (reconsumeTimes >= maxReconsumeTimes) {
+                newTopic = MixAll.getDLQTopic(groupName);
+                queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
+                topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
+                        DLQ_NUMS_PER_GROUP, //
+                        PermName.PERM_WRITE, 0
+                );
+                if (null == topicConfig) {
+                    response.setCode(ResponseCode.SYSTEM_ERROR);
+                    response.setRemark("topic[" + newTopic + "] not exist");
+                    return response;
+                }
+            }
+        }
+        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
+        msgInner.setTopic(newTopic);
+        msgInner.setBody(body);
+        msgInner.setFlag(requestHeader.getFlag());
+        MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
+        msgInner.setPropertiesString(requestHeader.getProperties());
+        msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));
+
+        msgInner.setQueueId(queueIdInt);
+        msgInner.setSysFlag(sysFlag);
+        msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
+        msgInner.setBornHost(ctx.channel().remoteAddress());
+        msgInner.setStoreHost(this.getStoreHost());
+        msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
+
+        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
+            String traFlag = msgInner.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
+            if (traFlag != null) {
+                response.setCode(ResponseCode.NO_PERMISSION);
+                response.setRemark(
+                        "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");
+                return response;
+            }
+        }
+
+        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
+        if (putMessageResult != null) {
+            boolean sendOK = false;
+
+            switch (putMessageResult.getPutMessageStatus()) {
+                // Success
+                case PUT_OK:
+                    sendOK = true;
+                    response.setCode(ResponseCode.SUCCESS);
+                    break;
+                case FLUSH_DISK_TIMEOUT:
+                    response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);
+                    sendOK = true;
+                    break;
+                case FLUSH_SLAVE_TIMEOUT:
+                    response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);
+                    sendOK = true;
+                    break;
+                case SLAVE_NOT_AVAILABLE:
+                    response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
+                    sendOK = true;
+                    break;
+
+                // Failed
+                case CREATE_MAPEDFILE_FAILED:
+                    response.setCode(ResponseCode.SYSTEM_ERROR);
+                    response.setRemark("create mapped file failed, server is busy or broken.");
+                    break;
+                case MESSAGE_ILLEGAL:
+                case PROPERTIES_SIZE_EXCEEDED:
+                    response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+                    response.setRemark(
+                            "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
+                    break;
+                case SERVICE_NOT_AVAILABLE:
+                    response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
+                    response.setRemark(
+                            "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");
+                    break;
+                case OS_PAGECACHE_BUSY:
+                    response.setCode(ResponseCode.SYSTEM_ERROR);
+                    response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
+                    break;
+                case UNKNOWN_ERROR:
+                    response.setCode(ResponseCode.SYSTEM_ERROR);
+                    response.setRemark("UNKNOWN_ERROR");
+                    break;
+                default:
+                    response.setCode(ResponseCode.SYSTEM_ERROR);
+                    response.setRemark("UNKNOWN_ERROR DEFAULT");
+                    break;
+            }
+
+            String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
+            if (sendOK) {
+
+                this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
+                this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
+                        putMessageResult.getAppendMessageResult().getWroteBytes());
+                this.brokerController.getBrokerStatsManager().incBrokerPutNums();
+
+                response.setRemark(null);
+
+                responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
+                responseHeader.setQueueId(queueIdInt);
+                responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
+
+
+                doResponse(ctx, request, response);
+
+
+                if (hasSendMessageHook()) {
+                    sendMessageContext.setMsgId(responseHeader.getMsgId());
+                    sendMessageContext.setQueueId(responseHeader.getQueueId());
+                    sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
+
+                    int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
+                    int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
+                    int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
+
+                    sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
+                    sendMessageContext.setCommercialSendTimes(incValue);
+                    sendMessageContext.setCommercialSendSize(wroteSize);
+                    sendMessageContext.setCommercialOwner(owner);
+                }
+                return null;
+            } else {
+                if (hasSendMessageHook()) {
+                    int wroteSize = request.getBody().length;
+                    int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
+
+                    sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
+                    sendMessageContext.setCommercialSendTimes(incValue);
+                    sendMessageContext.setCommercialSendSize(wroteSize);
+                    sendMessageContext.setCommercialOwner(owner);
+                }
+            }
+        } else {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("store putMessage return null");
+        }
+
+        return response;
+    }
+
+    public boolean hasConsumeMessageHook() {
+        return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty();
+    }
+
+    public void executeConsumeMessageHookAfter(final ConsumeMessageContext context) {
+        if (hasConsumeMessageHook()) {
+            for (ConsumeMessageHook hook : this.consumeMessageHookList) {
+                try {
+                    hook.consumeMessageAfter(context);
+                } catch (Throwable e) {
+                }
+            }
+        }
+    }
+
+    public SocketAddress getStoreHost() {
+        return storeHost;
+    }
+
+    private String diskUtil() {
+        String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
+        double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
+
+        String storePathLogis =
+                StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
+        double logisRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogis);
+
+        String storePathIndex =
+                StorePathConfigHelper.getStorePathIndex(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
+        double indexRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathIndex);
+
+        return String.format("CL: %5.2f CQ: %5.2f INDEX: %5.2f", physicRatio, logisRatio, indexRatio);
+    }
+
+    public void registerConsumeMessageHook(List<ConsumeMessageHook> consumeMessageHookList) {
+        this.consumeMessageHookList = consumeMessageHookList;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
new file mode 100644
index 0000000..45914d7
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.rocketmq.broker.slave;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper;
+import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.store.config.StorePathConfigHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+
+/**
+ * @author shijia.wxr
+ * @author manhong.yqd
+ */
+public class SlaveSynchronize {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private final BrokerController brokerController;
+    private volatile String masterAddr = null;
+
+
+    public SlaveSynchronize(BrokerController brokerController) {
+        this.brokerController = brokerController;
+    }
+
+
+    public String getMasterAddr() {
+        return masterAddr;
+    }
+
+
+    public void setMasterAddr(String masterAddr) {
+        this.masterAddr = masterAddr;
+    }
+
+
+    public void syncAll() {
+        this.syncTopicConfig();
+        this.syncConsumerOffset();
+        this.syncDelayOffset();
+        this.syncSubscriptionGroupConfig();
+    }
+
+
+    private void syncTopicConfig() {
+        String masterAddrBak = this.masterAddr;
+        if (masterAddrBak != null) {
+            try {
+                TopicConfigSerializeWrapper topicWrapper =
+                        this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
+                if (!this.brokerController.getTopicConfigManager().getDataVersion()
+                        .equals(topicWrapper.getDataVersion())) {
+
+                    this.brokerController.getTopicConfigManager().getDataVersion()
+                            .assignNewOne(topicWrapper.getDataVersion());
+                    this.brokerController.getTopicConfigManager().getTopicConfigTable().clear();
+                    this.brokerController.getTopicConfigManager().getTopicConfigTable()
+                            .putAll(topicWrapper.getTopicConfigTable());
+                    this.brokerController.getTopicConfigManager().persist();
+
+                    log.info("update slave topic config from master, {}", masterAddrBak);
+                }
+            } catch (Exception e) {
+                log.error("syncTopicConfig Exception, " + masterAddrBak, e);
+            }
+        }
+    }
+
+
+    private void syncConsumerOffset() {
+        String masterAddrBak = this.masterAddr;
+        if (masterAddrBak != null) {
+            try {
+                ConsumerOffsetSerializeWrapper offsetWrapper =
+                        this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
+                this.brokerController.getConsumerOffsetManager().getOffsetTable()
+                        .putAll(offsetWrapper.getOffsetTable());
+                this.brokerController.getConsumerOffsetManager().persist();
+                log.info("update slave consumer offset from master, {}", masterAddrBak);
+            } catch (Exception e) {
+                log.error("syncConsumerOffset Exception, " + masterAddrBak, e);
+            }
+        }
+    }
+
+
+    private void syncDelayOffset() {
+        String masterAddrBak = this.masterAddr;
+        if (masterAddrBak != null) {
+            try {
+                String delayOffset =
+                        this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);
+                if (delayOffset != null) {
+
+                    String fileName =
+                            StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController
+                                    .getMessageStoreConfig().getStorePathRootDir());
+                    try {
+                        MixAll.string2File(delayOffset, fileName);
+                    } catch (IOException e) {
+                        log.error("persist file Exception, " + fileName, e);
+                    }
+                }
+                log.info("update slave delay offset from master, {}", masterAddrBak);
+            } catch (Exception e) {
+                log.error("syncDelayOffset Exception, " + masterAddrBak, e);
+            }
+        }
+    }
+
+
+    private void syncSubscriptionGroupConfig() {
+        String masterAddrBak = this.masterAddr;
+        if (masterAddrBak != null) {
+            try {
+                SubscriptionGroupWrapper subscriptionWrapper =
+                        this.brokerController.getBrokerOuterAPI()
+                                .getAllSubscriptionGroupConfig(masterAddrBak);
+
+                if (!this.brokerController.getSubscriptionGroupManager().getDataVersion()
+                        .equals(subscriptionWrapper.getDataVersion())) {
+                    SubscriptionGroupManager subscriptionGroupManager =
+                            this.brokerController.getSubscriptionGroupManager();
+                    subscriptionGroupManager.getDataVersion().assignNewOne(
+                            subscriptionWrapper.getDataVersion());
+                    subscriptionGroupManager.getSubscriptionGroupTable().clear();
+                    subscriptionGroupManager.getSubscriptionGroupTable().putAll(
+                            subscriptionWrapper.getSubscriptionGroupTable());
+                    subscriptionGroupManager.persist();
+                    log.info("update slave Subscription Group from master, {}", masterAddrBak);
+                }
+            } catch (Exception e) {
+                log.error("syncSubscriptionGroup Exception, " + masterAddrBak, e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
new file mode 100644
index 0000000..364d5c8
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.subscription;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.BrokerPathConfigHelper;
+import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.DataVersion;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class SubscriptionGroupManager extends ConfigManager {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+    private final ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptionGroupTable =
+            new ConcurrentHashMap<String, SubscriptionGroupConfig>(1024);
+    private final DataVersion dataVersion = new DataVersion();
+    private transient BrokerController brokerController;
+
+
+    public SubscriptionGroupManager() {
+        this.init();
+    }
+
+    private void init() {
+        {
+            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+            subscriptionGroupConfig.setGroupName(MixAll.TOOLS_CONSUMER_GROUP);
+            this.subscriptionGroupTable.put(MixAll.TOOLS_CONSUMER_GROUP, subscriptionGroupConfig);
+        }
+
+        {
+            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+            subscriptionGroupConfig.setGroupName(MixAll.FILTERSRV_CONSUMER_GROUP);
+            this.subscriptionGroupTable.put(MixAll.FILTERSRV_CONSUMER_GROUP, subscriptionGroupConfig);
+        }
+
+        {
+            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+            subscriptionGroupConfig.setGroupName(MixAll.SELF_TEST_CONSUMER_GROUP);
+            this.subscriptionGroupTable.put(MixAll.SELF_TEST_CONSUMER_GROUP, subscriptionGroupConfig);
+        }
+
+        {
+            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+            subscriptionGroupConfig.setGroupName(MixAll.ONS_HTTP_PROXY_GROUP);
+            subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+            this.subscriptionGroupTable.put(MixAll.ONS_HTTP_PROXY_GROUP, subscriptionGroupConfig);
+        }
+
+        {
+            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+            subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PULL_GROUP);
+            subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+            this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PULL_GROUP, subscriptionGroupConfig);
+        }
+
+        {
+            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+            subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_PERMISSION_GROUP);
+            subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+            this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_PERMISSION_GROUP, subscriptionGroupConfig);
+        }
+
+        {
+            SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+            subscriptionGroupConfig.setGroupName(MixAll.CID_ONSAPI_OWNER_GROUP);
+            subscriptionGroupConfig.setConsumeBroadcastEnable(true);
+            this.subscriptionGroupTable.put(MixAll.CID_ONSAPI_OWNER_GROUP, subscriptionGroupConfig);
+        }
+    }
+
+
+    public SubscriptionGroupManager(BrokerController brokerController) {
+        this.brokerController = brokerController;
+        this.init();
+    }
+
+
+    public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) {
+        SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config);
+        if (old != null) {
+            log.info("update subscription group config, old: " + old + " new: " + config);
+        } else {
+            log.info("create new subscription group, " + config);
+        }
+
+        this.dataVersion.nextVersion();
+
+        this.persist();
+    }
+
+    public void disableConsume(final String groupName) {
+        SubscriptionGroupConfig old = this.subscriptionGroupTable.get(groupName);
+        if (old != null) {
+            old.setConsumeEnable(false);
+            this.dataVersion.nextVersion();
+        }
+    }
+
+
+    public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {
+        SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);
+        if (null == subscriptionGroupConfig) {
+            if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {
+                subscriptionGroupConfig = new SubscriptionGroupConfig();
+                subscriptionGroupConfig.setGroupName(group);
+                SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig);
+                if (null == preConfig) {
+                    log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());
+                }
+                this.dataVersion.nextVersion();
+                this.persist();
+            }
+        }
+
+        return subscriptionGroupConfig;
+    }
+
+
+    @Override
+    public String encode() {
+        return this.encode(false);
+    }
+
+    @Override
+    public String configFilePath() {
+        //return BrokerPathConfigHelper.getSubscriptionGroupPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
+        return BrokerPathConfigHelper.getSubscriptionGroupPath(System.getProperty("user.home") + File.separator + "store");
+    }
+
+    @Override
+    public void decode(String jsonString) {
+        if (jsonString != null) {
+            SubscriptionGroupManager obj = RemotingSerializable.fromJson(jsonString, SubscriptionGroupManager.class);
+            if (obj != null) {
+                this.subscriptionGroupTable.putAll(obj.subscriptionGroupTable);
+                this.dataVersion.assignNewOne(obj.dataVersion);
+                this.printLoadDataWhenFirstBoot(obj);
+            }
+        }
+    }
+
+    public String encode(final boolean prettyFormat) {
+        return RemotingSerializable.toJson(this, prettyFormat);
+    }
+
+    private void printLoadDataWhenFirstBoot(final SubscriptionGroupManager sgm) {
+        Iterator<Entry<String, SubscriptionGroupConfig>> it = sgm.getSubscriptionGroupTable().entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, SubscriptionGroupConfig> next = it.next();
+            log.info("load exist subscription group, {}", next.getValue().toString());
+        }
+    }
+
+    public ConcurrentHashMap<String, SubscriptionGroupConfig> getSubscriptionGroupTable() {
+        return subscriptionGroupTable;
+    }
+
+
+    public DataVersion getDataVersion() {
+        return dataVersion;
+    }
+
+
+    public void deleteSubscriptionGroupConfig(final String groupName) {
+        SubscriptionGroupConfig old = this.subscriptionGroupTable.remove(groupName);
+        if (old != null) {
+            log.info("delete subscription group OK, subscription group: " + old);
+            this.dataVersion.nextVersion();
+            this.persist();
+        } else {
+            log.warn("delete subscription group failed, subscription group: " + old + " not exist");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
new file mode 100644
index 0000000..40fdd68
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -0,0 +1,440 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.topic;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.BrokerPathConfigHelper;
+import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.DataVersion;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.constant.PermName;
+import org.apache.rocketmq.common.protocol.body.KVTable;
+import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.common.sysflag.TopicSysFlag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class TopicConfigManager extends ConfigManager {
+    private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+    private static final long LOCK_TIMEOUT_MILLIS = 3000;
+    private transient final Lock lockTopicConfigTable = new ReentrantLock();
+
+    private final ConcurrentHashMap<String, TopicConfig> topicConfigTable =
+            new ConcurrentHashMap<String, TopicConfig>(1024);
+    private final DataVersion dataVersion = new DataVersion();
+    private final Set<String> systemTopicList = new HashSet<String>();
+    private transient BrokerController brokerController;
+
+
+    public TopicConfigManager() {
+    }
+
+
+    public TopicConfigManager(BrokerController brokerController) {
+        this.brokerController = brokerController;
+        {
+            // MixAll.SELF_TEST_TOPIC
+            String topic = MixAll.SELF_TEST_TOPIC;
+            TopicConfig topicConfig = new TopicConfig(topic);
+            this.systemTopicList.add(topic);
+            topicConfig.setReadQueueNums(1);
+            topicConfig.setWriteQueueNums(1);
+            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+        }
+        {
+            // MixAll.DEFAULT_TOPIC
+            if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
+                String topic = MixAll.DEFAULT_TOPIC;
+                TopicConfig topicConfig = new TopicConfig(topic);
+                this.systemTopicList.add(topic);
+                topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
+                        .getDefaultTopicQueueNums());
+                topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
+                        .getDefaultTopicQueueNums());
+                int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
+                topicConfig.setPerm(perm);
+                this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+            }
+        }
+        {
+            // MixAll.BENCHMARK_TOPIC
+            String topic = MixAll.BENCHMARK_TOPIC;
+            TopicConfig topicConfig = new TopicConfig(topic);
+            this.systemTopicList.add(topic);
+            topicConfig.setReadQueueNums(1024);
+            topicConfig.setWriteQueueNums(1024);
+            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+        }
+        {
+
+            String topic = this.brokerController.getBrokerConfig().getBrokerClusterName();
+            TopicConfig topicConfig = new TopicConfig(topic);
+            this.systemTopicList.add(topic);
+            int perm = PermName.PERM_INHERIT;
+            if (this.brokerController.getBrokerConfig().isClusterTopicEnable()) {
+                perm |= PermName.PERM_READ | PermName.PERM_WRITE;
+            }
+            topicConfig.setPerm(perm);
+            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+        }
+        {
+
+            String topic = this.brokerController.getBrokerConfig().getBrokerName();
+            TopicConfig topicConfig = new TopicConfig(topic);
+            this.systemTopicList.add(topic);
+            int perm = PermName.PERM_INHERIT;
+            if (this.brokerController.getBrokerConfig().isBrokerTopicEnable()) {
+                perm |= PermName.PERM_READ | PermName.PERM_WRITE;
+            }
+            topicConfig.setReadQueueNums(1);
+            topicConfig.setWriteQueueNums(1);
+            topicConfig.setPerm(perm);
+            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+        }
+        {
+            // MixAll.OFFSET_MOVED_EVENT
+            String topic = MixAll.OFFSET_MOVED_EVENT;
+            TopicConfig topicConfig = new TopicConfig(topic);
+            this.systemTopicList.add(topic);
+            topicConfig.setReadQueueNums(1);
+            topicConfig.setWriteQueueNums(1);
+            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+        }
+    }
+
+
+    public boolean isSystemTopic(final String topic) {
+        return this.systemTopicList.contains(topic);
+    }
+
+
+    public Set<String> getSystemTopic() {
+        return this.systemTopicList;
+    }
+
+
+    public boolean isTopicCanSendMessage(final String topic) {
+        return !topic.equals(MixAll.DEFAULT_TOPIC);
+    }
+
+
+    public TopicConfig selectTopicConfig(final String topic) {
+        return this.topicConfigTable.get(topic);
+    }
+
+
+    public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,
+                                                      final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {
+        TopicConfig topicConfig = null;
+        boolean createNew = false;
+
+        try {
+            if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+                try {
+                    topicConfig = this.topicConfigTable.get(topic);
+                    if (topicConfig != null)
+                        return topicConfig;
+
+                    TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);
+                    if (defaultTopicConfig != null) {
+                        if (defaultTopic.equals(MixAll.DEFAULT_TOPIC)) {
+                            if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
+                                defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
+                            }
+                        }
+
+                        if (PermName.isInherited(defaultTopicConfig.getPerm())) {
+                            topicConfig = new TopicConfig(topic);
+
+                            int queueNums =
+                                    clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig
+                                            .getWriteQueueNums() : clientDefaultTopicQueueNums;
+
+                            if (queueNums < 0) {
+                                queueNums = 0;
+                            }
+
+                            topicConfig.setReadQueueNums(queueNums);
+                            topicConfig.setWriteQueueNums(queueNums);
+                            int perm = defaultTopicConfig.getPerm();
+                            perm &= ~PermName.PERM_INHERIT;
+                            topicConfig.setPerm(perm);
+                            topicConfig.setTopicSysFlag(topicSysFlag);
+                            topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
+                        } else {
+                            LOG.warn("create new topic failed, because the default topic[" + defaultTopic
+                                    + "] no perm, " + defaultTopicConfig.getPerm() + " producer: "
+                                    + remoteAddress);
+                        }
+                    } else {
+                        LOG.warn("create new topic failed, because the default topic[" + defaultTopic
+                                + "] not exist." + " producer: " + remoteAddress);
+                    }
+
+                    if (topicConfig != null) {
+                        LOG.info("create new topic by default topic[" + defaultTopic + "], " + topicConfig
+                                + " producer: " + remoteAddress);
+
+                        this.topicConfigTable.put(topic, topicConfig);
+
+                        this.dataVersion.nextVersion();
+
+                        createNew = true;
+
+                        this.persist();
+                    }
+                } finally {
+                    this.lockTopicConfigTable.unlock();
+                }
+            }
+        } catch (InterruptedException e) {
+            LOG.error("createTopicInSendMessageMethod exception", e);
+        }
+
+        if (createNew) {
+            this.brokerController.registerBrokerAll(false, true);
+        }
+
+        return topicConfig;
+    }
+
+    public TopicConfig createTopicInSendMessageBackMethod(
+            final String topic,
+            final int clientDefaultTopicQueueNums,
+            final int perm,
+            final int topicSysFlag) {
+        TopicConfig topicConfig = this.topicConfigTable.get(topic);
+        if (topicConfig != null)
+            return topicConfig;
+
+        boolean createNew = false;
+
+        try {
+            if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+                try {
+                    topicConfig = this.topicConfigTable.get(topic);
+                    if (topicConfig != null)
+                        return topicConfig;
+
+                    topicConfig = new TopicConfig(topic);
+                    topicConfig.setReadQueueNums(clientDefaultTopicQueueNums);
+                    topicConfig.setWriteQueueNums(clientDefaultTopicQueueNums);
+                    topicConfig.setPerm(perm);
+                    topicConfig.setTopicSysFlag(topicSysFlag);
+
+                    LOG.info("create new topic {}", topicConfig);
+                    this.topicConfigTable.put(topic, topicConfig);
+                    createNew = true;
+                    this.dataVersion.nextVersion();
+                    this.persist();
+                } finally {
+                    this.lockTopicConfigTable.unlock();
+                }
+            }
+        } catch (InterruptedException e) {
+            LOG.error("createTopicInSendMessageBackMethod exception", e);
+        }
+
+        if (createNew) {
+            this.brokerController.registerBrokerAll(false, true);
+        }
+
+        return topicConfig;
+    }
+
+    public void updateTopicUnitFlag(final String topic, final boolean unit) {
+
+        TopicConfig topicConfig = this.topicConfigTable.get(topic);
+        if (topicConfig != null) {
+            int oldTopicSysFlag = topicConfig.getTopicSysFlag();
+            if (unit) {
+                topicConfig.setTopicSysFlag(TopicSysFlag.setUnitFlag(oldTopicSysFlag));
+            } else {
+                topicConfig.setTopicSysFlag(TopicSysFlag.clearUnitFlag(oldTopicSysFlag));
+            }
+
+            LOG.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag,
+                    topicConfig.getTopicSysFlag());
+
+            this.topicConfigTable.put(topic, topicConfig);
+
+            this.dataVersion.nextVersion();
+
+            this.persist();
+            this.brokerController.registerBrokerAll(false, true);
+        }
+    }
+
+    public void updateTopicUnitSubFlag(final String topic, final boolean hasUnitSub) {
+        TopicConfig topicConfig = this.topicConfigTable.get(topic);
+        if (topicConfig != null) {
+            int oldTopicSysFlag = topicConfig.getTopicSysFlag();
+            if (hasUnitSub) {
+                topicConfig.setTopicSysFlag(TopicSysFlag.setUnitSubFlag(oldTopicSysFlag));
+            }
+
+            LOG.info("update topic sys flag. oldTopicSysFlag={}, newTopicSysFlag", oldTopicSysFlag,
+                    topicConfig.getTopicSysFlag());
+
+            this.topicConfigTable.put(topic, topicConfig);
+
+            this.dataVersion.nextVersion();
+
+            this.persist();
+            this.brokerController.registerBrokerAll(false, true);
+        }
+    }
+
+    public void updateTopicConfig(final TopicConfig topicConfig) {
+        TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+        if (old != null) {
+            LOG.info("update topic config, old: " + old + " new: " + topicConfig);
+        } else {
+            LOG.info("create new topic, " + topicConfig);
+        }
+
+        this.dataVersion.nextVersion();
+
+        this.persist();
+    }
+
+
+    public void updateOrderTopicConfig(final KVTable orderKVTableFromNs) {
+
+        if (orderKVTableFromNs != null && orderKVTableFromNs.getTable() != null) {
+            boolean isChange = false;
+            Set<String> orderTopics = orderKVTableFromNs.getTable().keySet();
+            for (String topic : orderTopics) {
+                TopicConfig topicConfig = this.topicConfigTable.get(topic);
+                if (topicConfig != null && !topicConfig.isOrder()) {
+                    topicConfig.setOrder(true);
+                    isChange = true;
+                    LOG.info("update order topic config, topic={}, order={}", topic, true);
+                }
+            }
+
+            for (Map.Entry<String, TopicConfig> entry : this.topicConfigTable.entrySet()) {
+                String topic = entry.getKey();
+                if (!orderTopics.contains(topic)) {
+                    TopicConfig topicConfig = entry.getValue();
+                    if (topicConfig.isOrder()) {
+                        topicConfig.setOrder(false);
+                        isChange = true;
+                        LOG.info("update order topic config, topic={}, order={}", topic, false);
+                    }
+                }
+            }
+
+            if (isChange) {
+                this.dataVersion.nextVersion();
+                this.persist();
+            }
+        }
+    }
+
+    public boolean isOrderTopic(final String topic) {
+        TopicConfig topicConfig = this.topicConfigTable.get(topic);
+        if (topicConfig == null) {
+            return false;
+        } else {
+            return topicConfig.isOrder();
+        }
+    }
+
+    public void deleteTopicConfig(final String topic) {
+        TopicConfig old = this.topicConfigTable.remove(topic);
+        if (old != null) {
+            LOG.info("delete topic config OK, topic: " + old);
+            this.dataVersion.nextVersion();
+            this.persist();
+        } else {
+            LOG.warn("delete topic config failed, topic: " + topic + " not exist");
+        }
+    }
+
+    public TopicConfigSerializeWrapper buildTopicConfigSerializeWrapper() {
+        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+        topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);
+        topicConfigSerializeWrapper.setDataVersion(this.dataVersion);
+        return topicConfigSerializeWrapper;
+    }
+
+    @Override
+    public String encode() {
+        return encode(false);
+    }
+
+    @Override
+    public String configFilePath() {
+//        return BrokerPathConfigHelper.getTopicConfigPath(this.brokerController.getMessageStoreConfig()
+//                .getStorePathRootDir());
+        return BrokerPathConfigHelper.getTopicConfigPath(System.getProperty("user.home") + File.separator + "store");
+    }
+
+    @Override
+    public void decode(String jsonString) {
+        if (jsonString != null) {
+            TopicConfigSerializeWrapper topicConfigSerializeWrapper =
+                    TopicConfigSerializeWrapper.fromJson(jsonString, TopicConfigSerializeWrapper.class);
+            if (topicConfigSerializeWrapper != null) {
+                this.topicConfigTable.putAll(topicConfigSerializeWrapper.getTopicConfigTable());
+                this.dataVersion.assignNewOne(topicConfigSerializeWrapper.getDataVersion());
+                this.printLoadDataWhenFirstBoot(topicConfigSerializeWrapper);
+            }
+        }
+    }
+
+    public String encode(final boolean prettyFormat) {
+        TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
+        topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);
+        topicConfigSerializeWrapper.setDataVersion(this.dataVersion);
+        return topicConfigSerializeWrapper.toJson(prettyFormat);
+    }
+
+    private void printLoadDataWhenFirstBoot(final TopicConfigSerializeWrapper tcs) {
+        Iterator<Entry<String, TopicConfig>> it = tcs.getTopicConfigTable().entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, TopicConfig> next = it.next();
+            LOG.info("load exist local topic, {}", next.getValue().toString());
+        }
+    }
+
+    public DataVersion getDataVersion() {
+        return dataVersion;
+    }
+
+    public ConcurrentHashMap<String, TopicConfig> getTopicConfigTable() {
+        return topicConfigTable;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionRecord.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionRecord.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionRecord.java
new file mode 100644
index 0000000..68256d9
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionRecord.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.transaction;
+
+public class TransactionRecord {
+    // Commit Log Offset
+    private long offset;
+    private String producerGroup;
+
+
+    public long getOffset() {
+        return offset;
+    }
+
+
+    public void setOffset(long offset) {
+        this.offset = offset;
+    }
+
+
+    public String getProducerGroup() {
+        return producerGroup;
+    }
+
+
+    public void setProducerGroup(String producerGroup) {
+        this.producerGroup = producerGroup;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java
new file mode 100644
index 0000000..758eeed
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionStore.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.transaction;
+
+import java.util.List;
+
+
+public interface TransactionStore {
+    public boolean open();
+
+
+    public void close();
+
+
+    public boolean put(final List<TransactionRecord> trs);
+
+
+    public void remove(final List<Long> pks);
+
+
+    public List<TransactionRecord> traverse(final long pk, final int nums);
+
+
+    public long totalRecords();
+
+
+    public long minPK();
+
+
+    public long maxPK();
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java
new file mode 100644
index 0000000..4bf73d2
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStore.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.transaction.jdbc;
+
+import org.apache.rocketmq.broker.transaction.TransactionRecord;
+import org.apache.rocketmq.broker.transaction.TransactionStore;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.sql.*;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public class JDBCTransactionStore implements TransactionStore {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
+    private final JDBCTransactionStoreConfig jdbcTransactionStoreConfig;
+    private Connection connection;
+    private AtomicLong totalRecordsValue = new AtomicLong(0);
+
+    public JDBCTransactionStore(JDBCTransactionStoreConfig jdbcTransactionStoreConfig) {
+        this.jdbcTransactionStoreConfig = jdbcTransactionStoreConfig;
+    }
+
+    @Override
+    public boolean open() {
+        if (this.loadDriver()) {
+            Properties props = new Properties();
+            props.put("user", jdbcTransactionStoreConfig.getJdbcUser());
+            props.put("password", jdbcTransactionStoreConfig.getJdbcPassword());
+
+            try {
+                this.connection =
+                        DriverManager.getConnection(this.jdbcTransactionStoreConfig.getJdbcURL(), props);
+
+                this.connection.setAutoCommit(false);
+
+
+                if (!this.computeTotalRecords()) {
+                    return this.createDB();
+                }
+
+                return true;
+            } catch (SQLException e) {
+                log.info("Create JDBC Connection Exeption", e);
+            }
+        }
+
+        return false;
+    }
+
+    private boolean loadDriver() {
+        try {
+            Class.forName(this.jdbcTransactionStoreConfig.getJdbcDriverClass()).newInstance();
+            log.info("Loaded the appropriate driver, {}",
+                    this.jdbcTransactionStoreConfig.getJdbcDriverClass());
+            return true;
+        } catch (Exception e) {
+            log.info("Loaded the appropriate driver Exception", e);
+        }
+
+        return false;
+    }
+
+    private boolean computeTotalRecords() {
+        Statement statement = null;
+        ResultSet resultSet = null;
+        try {
+            statement = this.connection.createStatement();
+
+            resultSet = statement.executeQuery("select count(offset) as total from t_transaction");
+            if (!resultSet.next()) {
+                log.warn("computeTotalRecords ResultSet is empty");
+                return false;
+            }
+
+            this.totalRecordsValue.set(resultSet.getLong(1));
+        } catch (Exception e) {
+            log.warn("computeTotalRecords Exception", e);
+            return false;
+        } finally {
+            if (null != statement) {
+                try {
+                    statement.close();
+                } catch (SQLException e) {
+                }
+            }
+
+            if (null != resultSet) {
+                try {
+                    resultSet.close();
+                } catch (SQLException e) {
+                }
+            }
+        }
+
+        return true;
+    }
+
+    private boolean createDB() {
+        Statement statement = null;
+        try {
+            statement = this.connection.createStatement();
+
+            String sql = this.createTableSql();
+            log.info("createDB SQL:\n {}", sql);
+            statement.execute(sql);
+            this.connection.commit();
+            return true;
+        } catch (Exception e) {
+            log.warn("createDB Exception", e);
+            return false;
+        } finally {
+            if (null != statement) {
+                try {
+                    statement.close();
+                } catch (SQLException e) {
+                    log.warn("Close statement exception", e);
+                }
+            }
+        }
+    }
+
+    private String createTableSql() {
+        URL resource = JDBCTransactionStore.class.getClassLoader().getResource("transaction.sql");
+        String fileContent = MixAll.file2String(resource);
+        return fileContent;
+    }
+
+    @Override
+    public void close() {
+        try {
+            if (this.connection != null) {
+                this.connection.close();
+            }
+        } catch (SQLException e) {
+        }
+    }
+
+    @Override
+    public boolean put(List<TransactionRecord> trs) {
+        PreparedStatement statement = null;
+        try {
+            this.connection.setAutoCommit(false);
+            statement = this.connection.prepareStatement("insert into t_transaction values (?, ?)");
+            for (TransactionRecord tr : trs) {
+                statement.setLong(1, tr.getOffset());
+                statement.setString(2, tr.getProducerGroup());
+                statement.addBatch();
+            }
+            int[] executeBatch = statement.executeBatch();
+            this.connection.commit();
+            this.totalRecordsValue.addAndGet(updatedRows(executeBatch));
+            return true;
+        } catch (Exception e) {
+            log.warn("createDB Exception", e);
+            return false;
+        } finally {
+            if (null != statement) {
+                try {
+                    statement.close();
+                } catch (SQLException e) {
+                    log.warn("Close statement exception", e);
+                }
+            }
+        }
+    }
+
+    private long updatedRows(int[] rows) {
+        long res = 0;
+        for (int i : rows) {
+            res += i;
+        }
+
+        return res;
+    }
+
+    @Override
+    public void remove(List<Long> pks) {
+        PreparedStatement statement = null;
+        try {
+            this.connection.setAutoCommit(false);
+            statement = this.connection.prepareStatement("DELETE FROM t_transaction WHERE offset = ?");
+            for (long pk : pks) {
+                statement.setLong(1, pk);
+                statement.addBatch();
+            }
+            int[] executeBatch = statement.executeBatch();
+            this.connection.commit();
+        } catch (Exception e) {
+            log.warn("createDB Exception", e);
+        } finally {
+            if (null != statement) {
+                try {
+                    statement.close();
+                } catch (SQLException e) {
+                }
+            }
+        }
+    }
+
+    @Override
+    public List<TransactionRecord> traverse(long pk, int nums) {
+        return null;
+    }
+
+    @Override
+    public long totalRecords() {
+        return this.totalRecordsValue.get();
+    }
+
+    @Override
+    public long minPK() {
+        return 0;
+    }
+
+    @Override
+    public long maxPK() {
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java
new file mode 100644
index 0000000..5789329
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/jdbc/JDBCTransactionStoreConfig.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.transaction.jdbc;
+
+public class JDBCTransactionStoreConfig {
+    private String jdbcDriverClass = "com.mysql.jdbc.Driver";
+    private String jdbcURL = "jdbc:mysql://xxx.xxx.xxx.xxx:1000/xxx?useUnicode=true&characterEncoding=UTF-8";
+    private String jdbcUser = "xxx";
+    private String jdbcPassword = "xxx";
+
+
+    public String getJdbcDriverClass() {
+        return jdbcDriverClass;
+    }
+
+
+    public void setJdbcDriverClass(String jdbcDriverClass) {
+        this.jdbcDriverClass = jdbcDriverClass;
+    }
+
+
+    public String getJdbcURL() {
+        return jdbcURL;
+    }
+
+
+    public void setJdbcURL(String jdbcURL) {
+        this.jdbcURL = jdbcURL;
+    }
+
+
+    public String getJdbcUser() {
+        return jdbcUser;
+    }
+
+
+    public void setJdbcUser(String jdbcUser) {
+        this.jdbcUser = jdbcUser;
+    }
+
+
+    public String getJdbcPassword() {
+        return jdbcPassword;
+    }
+
+
+    public void setJdbcPassword(String jdbcPassword) {
+        this.jdbcPassword = jdbcPassword;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java b/broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java
deleted file mode 100644
index b661385..0000000
--- a/broker/src/test/java/com/alibaba/rocketmq/broker/BrokerControllerTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package com.alibaba.rocketmq.broker;
-
-import com.alibaba.rocketmq.common.BrokerConfig;
-import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
-import com.alibaba.rocketmq.remoting.netty.NettyServerConfig;
-import com.alibaba.rocketmq.store.config.MessageStoreConfig;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author shtykh_roman
- */
-public class BrokerControllerTest {
-    protected Logger logger = LoggerFactory.getLogger(BrokerControllerTest.class);
-
-    private static final int RESTART_NUM = 3;
-
-    /**
-     * Tests if the controller can be properly stopped and started.
-     *
-     * @throws Exception If fails.
-     */
-    @Test
-    public void testRestart() throws Exception {
-
-        for (int i = 0; i < RESTART_NUM; i++) {
-            BrokerController brokerController = new BrokerController(//
-                new BrokerConfig(), //
-                new NettyServerConfig(), //
-                new NettyClientConfig(), //
-                new MessageStoreConfig());
-            boolean initResult = brokerController.initialize();
-            Assert.assertTrue(initResult);
-            logger.info("Broker is initialized " + initResult);
-            brokerController.start();
-            logger.info("Broker is started");
-
-            brokerController.shutdown();
-            logger.info("Broker is stopped");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/test/java/com/alibaba/rocketmq/broker/BrokerTestHarness.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/com/alibaba/rocketmq/broker/BrokerTestHarness.java b/broker/src/test/java/com/alibaba/rocketmq/broker/BrokerTestHarness.java
deleted file mode 100644
index ca6f17b..0000000
--- a/broker/src/test/java/com/alibaba/rocketmq/broker/BrokerTestHarness.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-/**
- * $Id: SendMessageTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
- */
-package com.alibaba.rocketmq.broker;
-
-import com.alibaba.rocketmq.common.BrokerConfig;
-import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
-import com.alibaba.rocketmq.remoting.netty.NettyServerConfig;
-import com.alibaba.rocketmq.store.config.MessageStoreConfig;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.Random;
-
-/**
- * @author zander
- */
-public class BrokerTestHarness {
-
-    protected BrokerController brokerController = null;
-
-    protected Random random = new Random();
-    public final String BROKER_NAME = "TestBrokerName";
-    protected String brokerAddr = "";
-    protected Logger logger = LoggerFactory.getLogger(BrokerTestHarness.class);
-    protected BrokerConfig brokerConfig = new BrokerConfig();
-    protected NettyServerConfig nettyServerConfig = new NettyServerConfig();
-    protected NettyClientConfig nettyClientConfig = new NettyClientConfig();
-    protected MessageStoreConfig storeConfig = new MessageStoreConfig();
-
-    @Before
-    public void startup() throws Exception {
-        brokerConfig.setBrokerName(BROKER_NAME);
-        brokerConfig.setBrokerIP1("127.0.0.1");
-        storeConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "unitteststore");
-        storeConfig.setStorePathCommitLog(System.getProperty("user.home") + File.separator + "unitteststore" + File.separator + "commitlog");
-        nettyServerConfig.setListenPort(10000 + random.nextInt(1000));
-        brokerAddr = brokerConfig.getBrokerIP1() + ":" + nettyServerConfig.getListenPort();
-        brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig);
-        boolean initResult = brokerController.initialize();
-        Assert.assertTrue(initResult);
-        logger.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr());
-        brokerController.start();
-    }
-
-    @After
-    public void shutdown() throws Exception {
-        if (brokerController != null) {
-            brokerController.shutdown();
-        }
-        //maybe need to clean the file store. But we do not suggest deleting anything.
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java b/broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java
deleted file mode 100644
index cf97876..0000000
--- a/broker/src/test/java/com/alibaba/rocketmq/broker/api/SendMessageTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-/**
- * $Id: SendMessageTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
- */
-package com.alibaba.rocketmq.broker.api;
-
-import com.alibaba.rocketmq.broker.BrokerTestHarness;
-import com.alibaba.rocketmq.client.ClientConfig;
-import com.alibaba.rocketmq.client.hook.SendMessageContext;
-import com.alibaba.rocketmq.client.impl.CommunicationMode;
-import com.alibaba.rocketmq.client.impl.MQClientAPIImpl;
-import com.alibaba.rocketmq.client.producer.SendResult;
-import com.alibaba.rocketmq.client.producer.SendStatus;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.message.Message;
-import com.alibaba.rocketmq.common.message.MessageDecoder;
-import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader;
-import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-
-/**
- * @author zander
- */
-public class SendMessageTest extends BrokerTestHarness{
-
-    MQClientAPIImpl client = new MQClientAPIImpl(new NettyClientConfig(), null, null, new ClientConfig());
-    String topic = "UnitTestTopic";
-
-    @Before
-    @Override
-    public void startup() throws Exception {
-        super.startup();
-        client.start();
-
-    }
-
-    @After
-    @Override
-    public void shutdown() throws Exception {
-        client.shutdown();
-        super.shutdown();
-    }
-
-    @Test
-    public void testSendSingle() throws Exception{
-        Message msg = new Message(topic, "TAG1 TAG2", "100200300", "body".getBytes());
-        SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
-        requestHeader.setProducerGroup("abc");
-        requestHeader.setTopic(msg.getTopic());
-        requestHeader.setDefaultTopic(MixAll.DEFAULT_TOPIC);
-        requestHeader.setDefaultTopicQueueNums(4);
-        requestHeader.setQueueId(0);
-        requestHeader.setSysFlag(0);
-        requestHeader.setBornTimestamp(System.currentTimeMillis());
-        requestHeader.setFlag(msg.getFlag());
-        requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
-
-        SendResult result = client.sendMessage(brokerAddr, BROKER_NAME, msg, requestHeader, 1000 * 5,
-                CommunicationMode.SYNC, new SendMessageContext(), null);
-        assertEquals(result.getSendStatus(), SendStatus.SEND_OK);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/test/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
----------------------------------------------------------------------
diff --git a/broker/src/test/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManagerTest.java b/broker/src/test/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
deleted file mode 100644
index 94504a4..0000000
--- a/broker/src/test/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-/**
- * $Id: ConsumerOffsetManagerTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
- */
-package com.alibaba.rocketmq.broker.offset;
-
-import com.alibaba.rocketmq.broker.BrokerTestHarness;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-
-/**
- * @author zander
- */
-public class ConsumerOffsetManagerTest extends BrokerTestHarness{
-
-    @Test
-    public void testFlushConsumerOffset() throws Exception {
-        ConsumerOffsetManager consumerOffsetManager = new ConsumerOffsetManager(brokerController);
-        for (int i = 0; i < 10; i++) {
-            String group = "UNIT_TEST_GROUP_" + i;
-            for (int id = 0; id < 10; id++) {
-                consumerOffsetManager.commitOffset(null, group, "TOPIC_A", id, id + 100);
-                consumerOffsetManager.commitOffset(null, group, "TOPIC_B", id, id + 100);
-                consumerOffsetManager.commitOffset(null, group, "TOPIC_C", id, id + 100);
-            }
-        }
-        consumerOffsetManager.persist();
-        consumerOffsetManager.getOffsetTable().clear();
-        for (int i = 0; i < 10; i++) {
-            String group = "UNIT_TEST_GROUP_" + i;
-            for (int id = 0; id < 10; id++) {
-                assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_A", id), -1);
-                assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), -1);
-                assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), -1);
-            }
-        }
-        consumerOffsetManager.load();
-        for (int i = 0; i < 10; i++) {
-            String group = "UNIT_TEST_GROUP_" + i;
-            for (int id = 0; id < 10; id++) {
-                assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_A", id), id + 100);
-                assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), id + 100);
-                assertEquals(consumerOffsetManager.queryOffset(group, "TOPIC_B", id), id + 100);
-            }
-        }
-    }
-}