You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 02:44:42 UTC

[46/58] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18 Rename package name from com.alibaba to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/processor/EndTransactionProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/EndTransactionProcessor.java b/broker/src/main/java/com/alibaba/rocketmq/broker/processor/EndTransactionProcessor.java
deleted file mode 100644
index ed10f1b..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/EndTransactionProcessor.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.processor;
-
-import com.alibaba.rocketmq.broker.BrokerController;
-import com.alibaba.rocketmq.common.TopicFilterType;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.common.message.MessageAccessor;
-import com.alibaba.rocketmq.common.message.MessageConst;
-import com.alibaba.rocketmq.common.message.MessageDecoder;
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.common.protocol.ResponseCode;
-import com.alibaba.rocketmq.common.protocol.header.EndTransactionRequestHeader;
-import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
-import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
-import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
-import com.alibaba.rocketmq.store.MessageExtBrokerInner;
-import com.alibaba.rocketmq.store.MessageStore;
-import com.alibaba.rocketmq.store.PutMessageResult;
-import io.netty.channel.ChannelHandlerContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * @author shijia.wxr
- */
-public class EndTransactionProcessor implements NettyRequestProcessor {
-    private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
-    private final BrokerController brokerController;
-
-    public EndTransactionProcessor(final BrokerController brokerController) {
-        this.brokerController = brokerController;
-    }
-
-    @Override
-    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
-        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
-        final EndTransactionRequestHeader requestHeader =
-                (EndTransactionRequestHeader) request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
-
-
-        if (requestHeader.getFromTransactionCheck()) {
-            switch (requestHeader.getCommitOrRollback()) {
-                case MessageSysFlag.TRANSACTION_NOT_TYPE: {
-                    LOGGER.warn("check producer[{}] transaction state, but it's pending status."
-                                    + "RequestHeader: {} Remark: {}",
-                            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
-                            requestHeader.toString(),
-                            request.getRemark());
-                    return null;
-                }
-
-                case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
-                    LOGGER.warn("check producer[{}] transaction state, the producer commit the message."
-                                    + "RequestHeader: {} Remark: {}",
-                            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
-                            requestHeader.toString(),
-                            request.getRemark());
-
-                    break;
-                }
-
-                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
-                    LOGGER.warn("check producer[{}] transaction state, the producer rollback the message."
-                                    + "RequestHeader: {} Remark: {}",
-                            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
-                            requestHeader.toString(),
-                            request.getRemark());
-                    break;
-                }
-                default:
-                    return null;
-            }
-        } else {
-            switch (requestHeader.getCommitOrRollback()) {
-                case MessageSysFlag.TRANSACTION_NOT_TYPE: {
-                    LOGGER.warn("the producer[{}] end transaction in sending message,  and it's pending status."
-                                    + "RequestHeader: {} Remark: {}",
-                            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
-                            requestHeader.toString(),
-                            request.getRemark());
-                    return null;
-                }
-
-                case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
-                    break;
-                }
-
-                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
-                    LOGGER.warn("the producer[{}] end transaction in sending message, rollback the message."
-                                    + "RequestHeader: {} Remark: {}",
-                            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
-                            requestHeader.toString(),
-                            request.getRemark());
-                    break;
-                }
-                default:
-                    return null;
-            }
-        }
-
-        final MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getCommitLogOffset());
-        if (msgExt != null) {
-            final String pgroupRead = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
-            if (!pgroupRead.equals(requestHeader.getProducerGroup())) {
-                response.setCode(ResponseCode.SYSTEM_ERROR);
-                response.setRemark("the producer group wrong");
-                return response;
-            }
-
-            if (msgExt.getQueueOffset() != requestHeader.getTranStateTableOffset()) {
-                response.setCode(ResponseCode.SYSTEM_ERROR);
-                response.setRemark("the transaction state table offset wrong");
-                return response;
-            }
-
-            if (msgExt.getCommitLogOffset() != requestHeader.getCommitLogOffset()) {
-                response.setCode(ResponseCode.SYSTEM_ERROR);
-                response.setRemark("the commit log offset wrong");
-                return response;
-            }
-
-            MessageExtBrokerInner msgInner = this.endMessageTransaction(msgExt);
-            msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
-
-            msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
-            msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
-            msgInner.setStoreTimestamp(msgExt.getStoreTimestamp());
-            if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
-                msgInner.setBody(null);
-            }
-
-            final MessageStore messageStore = this.brokerController.getMessageStore();
-            final PutMessageResult putMessageResult = messageStore.putMessage(msgInner);
-            if (putMessageResult != null) {
-                switch (putMessageResult.getPutMessageStatus()) {
-                    // Success
-                    case PUT_OK:
-                    case FLUSH_DISK_TIMEOUT:
-                    case FLUSH_SLAVE_TIMEOUT:
-                    case SLAVE_NOT_AVAILABLE:
-                        response.setCode(ResponseCode.SUCCESS);
-                        response.setRemark(null);
-                        break;
-                    // Failed
-                    case CREATE_MAPEDFILE_FAILED:
-                        response.setCode(ResponseCode.SYSTEM_ERROR);
-                        response.setRemark("create maped file failed.");
-                        break;
-                    case MESSAGE_ILLEGAL:
-                    case PROPERTIES_SIZE_EXCEEDED:
-                        response.setCode(ResponseCode.MESSAGE_ILLEGAL);
-                        response.setRemark("the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
-                        break;
-                    case SERVICE_NOT_AVAILABLE:
-                        response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
-                        response.setRemark("service not available now.");
-                        break;
-                    case OS_PAGECACHE_BUSY:
-                        response.setCode(ResponseCode.SYSTEM_ERROR);
-                        response.setRemark("OS page cache busy, please try another machine");
-                        break;
-                    case UNKNOWN_ERROR:
-                        response.setCode(ResponseCode.SYSTEM_ERROR);
-                        response.setRemark("UNKNOWN_ERROR");
-                        break;
-                    default:
-                        response.setCode(ResponseCode.SYSTEM_ERROR);
-                        response.setRemark("UNKNOWN_ERROR DEFAULT");
-                        break;
-                }
-
-                return response;
-            } else {
-                response.setCode(ResponseCode.SYSTEM_ERROR);
-                response.setRemark("store putMessage return null");
-            }
-        } else {
-            response.setCode(ResponseCode.SYSTEM_ERROR);
-            response.setRemark("find prepared transaction message failed");
-            return response;
-        }
-
-        return response;
-    }
-
-    @Override
-    public boolean rejectRequest() {
-        return false;
-    }
-
-    private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) {
-        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
-        msgInner.setBody(msgExt.getBody());
-        msgInner.setFlag(msgExt.getFlag());
-        MessageAccessor.setProperties(msgInner, msgExt.getProperties());
-
-        TopicFilterType topicFilterType =
-                (msgInner.getSysFlag() & MessageSysFlag.MULTI_TAGS_FLAG) == MessageSysFlag.MULTI_TAGS_FLAG ? TopicFilterType.MULTI_TAG
-                        : TopicFilterType.SINGLE_TAG;
-        long tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
-        msgInner.setTagsCode(tagsCodeValue);
-        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
-
-        msgInner.setSysFlag(msgExt.getSysFlag());
-        msgInner.setBornTimestamp(msgExt.getBornTimestamp());
-        msgInner.setBornHost(msgExt.getBornHost());
-        msgInner.setStoreHost(msgExt.getStoreHost());
-        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
-
-        msgInner.setWaitStoreMsgOK(false);
-        MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
-
-        msgInner.setTopic(msgExt.getTopic());
-        msgInner.setQueueId(msgExt.getQueueId());
-
-        return msgInner;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/processor/ForwardRequestProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/ForwardRequestProcessor.java b/broker/src/main/java/com/alibaba/rocketmq/broker/processor/ForwardRequestProcessor.java
deleted file mode 100644
index a92ead0..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/ForwardRequestProcessor.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.broker.processor;
-
-import com.alibaba.rocketmq.broker.BrokerController;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
-import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
-import io.netty.channel.ChannelHandlerContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * @author shijia.wxr
- */
-public class ForwardRequestProcessor implements NettyRequestProcessor {
-    private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-
-    private final BrokerController brokerController;
-
-
-    public ForwardRequestProcessor(final BrokerController brokerController) {
-        this.brokerController = brokerController;
-    }
-
-
-    @Override
-    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
-        return null;
-    }
-
-    @Override
-    public boolean rejectRequest() {
-        return false;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java
deleted file mode 100644
index 1257f18..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java
+++ /dev/null
@@ -1,542 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.processor;
-
-import com.alibaba.rocketmq.broker.BrokerController;
-import com.alibaba.rocketmq.broker.client.ConsumerGroupInfo;
-import com.alibaba.rocketmq.broker.longpolling.PullRequest;
-import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageContext;
-import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageHook;
-import com.alibaba.rocketmq.broker.pagecache.ManyMessageTransfer;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.TopicConfig;
-import com.alibaba.rocketmq.common.TopicFilterType;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.common.constant.PermName;
-import com.alibaba.rocketmq.common.filter.FilterAPI;
-import com.alibaba.rocketmq.common.help.FAQUrl;
-import com.alibaba.rocketmq.common.message.MessageDecoder;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.common.protocol.ResponseCode;
-import com.alibaba.rocketmq.common.protocol.header.PullMessageRequestHeader;
-import com.alibaba.rocketmq.common.protocol.header.PullMessageResponseHeader;
-import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
-import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import com.alibaba.rocketmq.common.protocol.topic.OffsetMovedEvent;
-import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig;
-import com.alibaba.rocketmq.common.sysflag.PullSysFlag;
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-import com.alibaba.rocketmq.remoting.common.RemotingUtil;
-import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
-import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
-import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
-import com.alibaba.rocketmq.store.GetMessageResult;
-import com.alibaba.rocketmq.store.MessageExtBrokerInner;
-import com.alibaba.rocketmq.store.PutMessageResult;
-import com.alibaba.rocketmq.store.config.BrokerRole;
-import com.alibaba.rocketmq.store.stats.BrokerStatsManager;
-import io.netty.channel.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- */
-public class PullMessageProcessor implements NettyRequestProcessor {
-    private static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-    private final BrokerController brokerController;
-    private List<ConsumeMessageHook> consumeMessageHookList;
-
-    public PullMessageProcessor(final BrokerController brokerController) {
-        this.brokerController = brokerController;
-    }
-
-    @Override
-    public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
-        return this.processRequest(ctx.channel(), request, true);
-    }
-
-    @Override
-    public boolean rejectRequest() {
-        return false;
-    }
-
-    private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
-            throws RemotingCommandException {
-        RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
-        final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
-        final PullMessageRequestHeader requestHeader =
-                (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);
-
-
-        response.setOpaque(request.getOpaque());
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("receive PullMessage request command, " + request);
-        }
-
-
-        if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
-            response.setCode(ResponseCode.NO_PERMISSION);
-            response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] pulling message is forbidden");
-            return response;
-        }
-
-
-        SubscriptionGroupConfig subscriptionGroupConfig =
-                this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
-        if (null == subscriptionGroupConfig) {
-            response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
-            response.setRemark("subscription group not exist, " + requestHeader.getConsumerGroup() + " "
-                    + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
-            return response;
-        }
-
-
-        if (!subscriptionGroupConfig.isConsumeEnable()) {
-            response.setCode(ResponseCode.NO_PERMISSION);
-            response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());
-            return response;
-        }
-
-        final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());
-        final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());
-        final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());
-
-        final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;
-
-
-        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
-        if (null == topicConfig) {
-            LOG.error("the topic " + requestHeader.getTopic() + " not exist, consumer: " + RemotingHelper.parseChannelRemoteAddr(channel));
-            response.setCode(ResponseCode.TOPIC_NOT_EXIST);
-            response.setRemark(
-                    "topic[" + requestHeader.getTopic() + "] not exist, apply first please!" + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
-            return response;
-        }
-
-
-        if (!PermName.isReadable(topicConfig.getPerm())) {
-            response.setCode(ResponseCode.NO_PERMISSION);
-            response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");
-            return response;
-        }
-
-
-        if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {
-            String errorInfo = "queueId[" + requestHeader.getQueueId() + "] is illagal,Topic :" + requestHeader.getTopic()
-                    + " topicConfig.readQueueNums: " + topicConfig.getReadQueueNums() + " consumer: " + channel.remoteAddress();
-            LOG.warn(errorInfo);
-            response.setCode(ResponseCode.SYSTEM_ERROR);
-            response.setRemark(errorInfo);
-            return response;
-        }
-
-
-        SubscriptionData subscriptionData = null;
-        if (hasSubscriptionFlag) {
-            try {
-                subscriptionData = FilterAPI.buildSubscriptionData(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
-                        requestHeader.getSubscription());
-            } catch (Exception e) {
-                LOG.warn("parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), //
-                        requestHeader.getConsumerGroup());
-                response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
-                response.setRemark("parse the consumer's subscription failed");
-                return response;
-            }
-        } else {
-            ConsumerGroupInfo consumerGroupInfo =
-                    this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
-            if (null == consumerGroupInfo) {
-                LOG.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());
-                response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
-                response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
-                return response;
-            }
-
-            if (!subscriptionGroupConfig.isConsumeBroadcastEnable() //
-                    && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
-                response.setCode(ResponseCode.NO_PERMISSION);
-                response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");
-                return response;
-            }
-
-            subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
-            if (null == subscriptionData) {
-                LOG.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
-                response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
-                response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
-                return response;
-            }
-
-
-            if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
-                LOG.warn("the broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),
-                        subscriptionData.getSubString());
-                response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);
-                response.setRemark("the consumer's subscription not latest");
-                return response;
-            }
-        }
-
-        final GetMessageResult getMessageResult =
-                this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
-                        requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), subscriptionData);
-        if (getMessageResult != null) {
-            response.setRemark(getMessageResult.getStatus().name());
-            responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
-            responseHeader.setMinOffset(getMessageResult.getMinOffset());
-            responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
-
-
-            if (getMessageResult.isSuggestPullingFromSlave()) {
-                responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
-            } else {
-                responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
-            }
-
-            switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
-                case ASYNC_MASTER:
-                case SYNC_MASTER:
-                    break;
-                case SLAVE:
-                    if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
-                        response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
-                        responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
-                    }
-                    break;
-            }
-
-            if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
-                // consume too slow ,redirect to another machine
-                if (getMessageResult.isSuggestPullingFromSlave()) {
-                    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
-                }
-                // consume ok
-                else {
-                    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
-                }
-            } else {
-                responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
-            }
-
-            switch (getMessageResult.getStatus()) {
-                case FOUND:
-                    response.setCode(ResponseCode.SUCCESS);
-                    break;
-                case MESSAGE_WAS_REMOVING:
-                    response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
-                    break;
-                case NO_MATCHED_LOGIC_QUEUE:
-                case NO_MESSAGE_IN_QUEUE:
-                    if (0 != requestHeader.getQueueOffset()) {
-                        response.setCode(ResponseCode.PULL_OFFSET_MOVED);
-
-                        // XXX: warn and notify me
-                        LOG.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}", //
-                                requestHeader.getQueueOffset(), //
-                                getMessageResult.getNextBeginOffset(), //
-                                requestHeader.getTopic(), //
-                                requestHeader.getQueueId(), //
-                                requestHeader.getConsumerGroup()//
-                        );
-                    } else {
-                        response.setCode(ResponseCode.PULL_NOT_FOUND);
-                    }
-                    break;
-                case NO_MATCHED_MESSAGE:
-                    response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
-                    break;
-                case OFFSET_FOUND_NULL:
-                    response.setCode(ResponseCode.PULL_NOT_FOUND);
-                    break;
-                case OFFSET_OVERFLOW_BADLY:
-                    response.setCode(ResponseCode.PULL_OFFSET_MOVED);
-                    // XXX: warn and notify me
-                    LOG.info("the request offset: " + requestHeader.getQueueOffset() + " over flow badly, broker max offset: "
-                            + getMessageResult.getMaxOffset() + ", consumer: " + channel.remoteAddress());
-                    break;
-                case OFFSET_OVERFLOW_ONE:
-                    response.setCode(ResponseCode.PULL_NOT_FOUND);
-                    break;
-                case OFFSET_TOO_SMALL:
-                    response.setCode(ResponseCode.PULL_OFFSET_MOVED);
-                    LOG.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
-                            requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),
-                            getMessageResult.getMinOffset(), channel.remoteAddress());
-                    break;
-                default:
-                    assert false;
-                    break;
-            }
-
-            if (this.hasConsumeMessageHook()) {
-                ConsumeMessageContext context = new ConsumeMessageContext();
-                context.setConsumerGroup(requestHeader.getConsumerGroup());
-                context.setTopic(requestHeader.getTopic());
-                context.setQueueId(requestHeader.getQueueId());
-
-                String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
-
-                switch (response.getCode()) {
-                    case ResponseCode.SUCCESS:
-                        int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
-                        int incValue = getMessageResult.getMsgCount4Commercial() * commercialBaseCount;
-
-                        context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS);
-                        context.setCommercialRcvTimes(incValue);
-                        context.setCommercialRcvSize(getMessageResult.getBufferTotalSize());
-                        context.setCommercialOwner(owner);
-
-                        break;
-                    case ResponseCode.PULL_NOT_FOUND:
-                        if (!brokerAllowSuspend) {
-
-
-                            context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
-                            context.setCommercialRcvTimes(1);
-                            context.setCommercialOwner(owner);
-
-                        }
-                        break;
-                    case ResponseCode.PULL_RETRY_IMMEDIATELY:
-                    case ResponseCode.PULL_OFFSET_MOVED:
-                        context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);
-                        context.setCommercialRcvTimes(1);
-                        context.setCommercialOwner(owner);
-                        break;
-                    default:
-                        assert false;
-                        break;
-                }
-
-                this.executeConsumeMessageHookBefore(context);
-            }
-
-            switch (response.getCode()) {
-                case ResponseCode.SUCCESS:
-
-                    this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
-                            getMessageResult.getMessageCount());
-
-                    this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
-                            getMessageResult.getBufferTotalSize());
-
-                    this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
-                    if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
-                        final long beginTimeMills = this.brokerController.getMessageStore().now();
-                        final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
-                        this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
-                                requestHeader.getTopic(), requestHeader.getQueueId(),
-                                (int) (this.brokerController.getMessageStore().now() - beginTimeMills));
-                        response.setBody(r);
-                    } else {
-                        try {
-                            FileRegion fileRegion =
-                                    new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);
-                            channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
-                                @Override
-                                public void operationComplete(ChannelFuture future) throws Exception {
-                                    getMessageResult.release();
-                                    if (!future.isSuccess()) {
-                                        LOG.error("transfer many message by pagecache failed, " + channel.remoteAddress(), future.cause());
-                                    }
-                                }
-                            });
-                        } catch (Throwable e) {
-                            LOG.error("transfer many message by pagecache exception", e);
-                            getMessageResult.release();
-                        }
-
-                        response = null;
-                    }
-                    break;
-                case ResponseCode.PULL_NOT_FOUND:
-
-                    if (brokerAllowSuspend && hasSuspendFlag) {
-                        long pollingTimeMills = suspendTimeoutMillisLong;
-                        if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
-                            pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
-                        }
-
-                        String topic = requestHeader.getTopic();
-                        long offset = requestHeader.getQueueOffset();
-                        int queueId = requestHeader.getQueueId();
-                        PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
-                                this.brokerController.getMessageStore().now(), offset, subscriptionData);
-                        this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
-                        response = null;
-                        break;
-                    }
-
-
-                case ResponseCode.PULL_RETRY_IMMEDIATELY:
-                    break;
-                case ResponseCode.PULL_OFFSET_MOVED:
-                    if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE
-                            || this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {
-                        MessageQueue mq = new MessageQueue();
-                        mq.setTopic(requestHeader.getTopic());
-                        mq.setQueueId(requestHeader.getQueueId());
-                        mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
-
-                        OffsetMovedEvent event = new OffsetMovedEvent();
-                        event.setConsumerGroup(requestHeader.getConsumerGroup());
-                        event.setMessageQueue(mq);
-                        event.setOffsetRequest(requestHeader.getQueueOffset());
-                        event.setOffsetNew(getMessageResult.getNextBeginOffset());
-                        this.generateOffsetMovedEvent(event);
-                        LOG.warn(
-                                "PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",
-                                requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),
-                                responseHeader.getSuggestWhichBrokerId());
-                    } else {
-                        responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
-                        response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
-                        LOG.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",
-                                requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),
-                                responseHeader.getSuggestWhichBrokerId());
-                    }
-
-                    break;
-                default:
-                    assert false;
-            }
-        } else {
-            response.setCode(ResponseCode.SYSTEM_ERROR);
-            response.setRemark("store getMessage return null");
-        }
-
-
-        boolean storeOffsetEnable = brokerAllowSuspend;
-        storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
-        storeOffsetEnable = storeOffsetEnable
-                && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
-        if (storeOffsetEnable) {
-            this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
-                    requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
-        }
-        return response;
-    }
-
-
-    public boolean hasConsumeMessageHook() {
-        return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty();
-    }
-
-    public void executeConsumeMessageHookBefore(final ConsumeMessageContext context) {
-        if (hasConsumeMessageHook()) {
-            for (ConsumeMessageHook hook : this.consumeMessageHookList) {
-                try {
-                    hook.consumeMessageBefore(context);
-                } catch (Throwable e) {
-                }
-            }
-        }
-    }
-
-    private byte[] readGetMessageResult(final GetMessageResult getMessageResult, final String group, final String topic, final int queueId) {
-        final ByteBuffer byteBuffer = ByteBuffer.allocate(getMessageResult.getBufferTotalSize());
-
-        long storeTimestamp = 0;
-        try {
-            List<ByteBuffer> messageBufferList = getMessageResult.getMessageBufferList();
-            for (ByteBuffer bb : messageBufferList) {
-
-                byteBuffer.put(bb);
-                storeTimestamp = bb.getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
-            }
-        } finally {
-            getMessageResult.release();
-        }
-
-        this.brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, topic, queueId, this.brokerController.getMessageStore().now() - storeTimestamp);
-        return byteBuffer.array();
-    }
-
-    private void generateOffsetMovedEvent(final OffsetMovedEvent event) {
-        try {
-            MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
-            msgInner.setTopic(MixAll.OFFSET_MOVED_EVENT);
-            msgInner.setTags(event.getConsumerGroup());
-            msgInner.setDelayTimeLevel(0);
-            msgInner.setKeys(event.getConsumerGroup());
-            msgInner.setBody(event.encode());
-            msgInner.setFlag(0);
-            msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
-            msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(TopicFilterType.SINGLE_TAG, msgInner.getTags()));
-
-            msgInner.setQueueId(0);
-            msgInner.setSysFlag(0);
-            msgInner.setBornTimestamp(System.currentTimeMillis());
-            msgInner.setBornHost(RemotingUtil.string2SocketAddress(this.brokerController.getBrokerAddr()));
-            msgInner.setStoreHost(msgInner.getBornHost());
-
-            msgInner.setReconsumeTimes(0);
-
-            PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
-        } catch (Exception e) {
-            LOG.warn(String.format("generateOffsetMovedEvent Exception, %s", event.toString()), e);
-        }
-    }
-
-    public void excuteRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException {
-        Runnable run = new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    final RemotingCommand response = PullMessageProcessor.this.processRequest(channel, request, false);
-
-                    if (response != null) {
-                        response.setOpaque(request.getOpaque());
-                        response.markResponseType();
-                        try {
-                            channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
-                                @Override
-                                public void operationComplete(ChannelFuture future) throws Exception {
-                                    if (!future.isSuccess()) {
-                                        LOG.error("processRequestWrapper response to " + future.channel().remoteAddress() + " failed",
-                                                future.cause());
-                                        LOG.error(request.toString());
-                                        LOG.error(response.toString());
-                                    }
-                                }
-                            });
-                        } catch (Throwable e) {
-                            LOG.error("processRequestWrapper process request over, but response failed", e);
-                            LOG.error(request.toString());
-                            LOG.error(response.toString());
-                        }
-                    }
-                } catch (RemotingCommandException e1) {
-                    LOG.error("excuteRequestWhenWakeup run", e1);
-                }
-            }
-        };
-
-        this.brokerController.getPullMessageExecutor().submit(run);
-    }
-
-    public void registerConsumeMessageHook(List<ConsumeMessageHook> sendMessageHookList) {
-        this.consumeMessageHookList = sendMessageHookList;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/processor/QueryMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/QueryMessageProcessor.java b/broker/src/main/java/com/alibaba/rocketmq/broker/processor/QueryMessageProcessor.java
deleted file mode 100644
index 738d11f..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/QueryMessageProcessor.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.broker.processor;
-
-import com.alibaba.rocketmq.broker.BrokerController;
-import com.alibaba.rocketmq.broker.pagecache.OneMessageTransfer;
-import com.alibaba.rocketmq.broker.pagecache.QueryMessageTransfer;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.common.protocol.RequestCode;
-import com.alibaba.rocketmq.common.protocol.ResponseCode;
-import com.alibaba.rocketmq.common.protocol.header.QueryMessageRequestHeader;
-import com.alibaba.rocketmq.common.protocol.header.QueryMessageResponseHeader;
-import com.alibaba.rocketmq.common.protocol.header.ViewMessageRequestHeader;
-import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
-import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
-import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
-import com.alibaba.rocketmq.store.QueryMessageResult;
-import com.alibaba.rocketmq.store.SelectMappedBufferResult;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.FileRegion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * @author shijia.wxr
- */
-public class QueryMessageProcessor implements NettyRequestProcessor {
-    private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-
-    private final BrokerController brokerController;
-
-
-    public QueryMessageProcessor(final BrokerController brokerController) {
-        this.brokerController = brokerController;
-    }
-
-
-    @Override
-    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
-            throws RemotingCommandException {
-        switch (request.getCode()) {
-            case RequestCode.QUERY_MESSAGE:
-                return this.queryMessage(ctx, request);
-            case RequestCode.VIEW_MESSAGE_BY_ID:
-                return this.viewMessageById(ctx, request);
-            default:
-                break;
-        }
-
-        return null;
-    }
-
-    @Override
-    public boolean rejectRequest() {
-        return false;
-    }
-
-
-    public RemotingCommand queryMessage(ChannelHandlerContext ctx, RemotingCommand request)
-            throws RemotingCommandException {
-        final RemotingCommand response =
-                RemotingCommand.createResponseCommand(QueryMessageResponseHeader.class);
-        final QueryMessageResponseHeader responseHeader =
-                (QueryMessageResponseHeader) response.readCustomHeader();
-        final QueryMessageRequestHeader requestHeader =
-                (QueryMessageRequestHeader) request
-                        .decodeCommandCustomHeader(QueryMessageRequestHeader.class);
-
-
-        response.setOpaque(request.getOpaque());
-
-
-        String isUniqueKey = request.getExtFields().get(MixAll.UNIQUE_MSG_QUERY_FLAG);
-        if (isUniqueKey != null && isUniqueKey.equals("true")) {
-            requestHeader.setMaxNum(this.brokerController.getMessageStoreConfig().getDefaultQueryMaxNum());
-        }
-
-        final QueryMessageResult queryMessageResult =
-                this.brokerController.getMessageStore().queryMessage(requestHeader.getTopic(),
-                        requestHeader.getKey(), requestHeader.getMaxNum(), requestHeader.getBeginTimestamp(),
-                        requestHeader.getEndTimestamp());
-        assert queryMessageResult != null;
-
-        responseHeader.setIndexLastUpdatePhyoffset(queryMessageResult.getIndexLastUpdatePhyoffset());
-        responseHeader.setIndexLastUpdateTimestamp(queryMessageResult.getIndexLastUpdateTimestamp());
-
-
-        if (queryMessageResult.getBufferTotalSize() > 0) {
-            response.setCode(ResponseCode.SUCCESS);
-            response.setRemark(null);
-
-            try {
-                FileRegion fileRegion =
-                        new QueryMessageTransfer(response.encodeHeader(queryMessageResult
-                                .getBufferTotalSize()), queryMessageResult);
-                ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
-                    @Override
-                    public void operationComplete(ChannelFuture future) throws Exception {
-                        queryMessageResult.release();
-                        if (!future.isSuccess()) {
-                            log.error("transfer query message by pagecache failed, ", future.cause());
-                        }
-                    }
-                });
-            } catch (Throwable e) {
-                log.error("", e);
-                queryMessageResult.release();
-            }
-
-            return null;
-        }
-
-        response.setCode(ResponseCode.QUERY_NOT_FOUND);
-        response.setRemark("can not find message, maybe time range not correct");
-        return response;
-    }
-
-
-    public RemotingCommand viewMessageById(ChannelHandlerContext ctx, RemotingCommand request)
-            throws RemotingCommandException {
-        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
-        final ViewMessageRequestHeader requestHeader =
-                (ViewMessageRequestHeader) request.decodeCommandCustomHeader(ViewMessageRequestHeader.class);
-
-
-        response.setOpaque(request.getOpaque());
-
-        final SelectMappedBufferResult selectMappedBufferResult =
-                this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset());
-        if (selectMappedBufferResult != null) {
-            response.setCode(ResponseCode.SUCCESS);
-            response.setRemark(null);
-
-            try {
-                FileRegion fileRegion =
-                        new OneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()),
-                                selectMappedBufferResult);
-                ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
-                    @Override
-                    public void operationComplete(ChannelFuture future) throws Exception {
-                        selectMappedBufferResult.release();
-                        if (!future.isSuccess()) {
-                            log.error("transfer one message by pagecache failed, ", future.cause());
-                        }
-                    }
-                });
-            } catch (Throwable e) {
-                log.error("", e);
-                selectMappedBufferResult.release();
-            }
-
-            return null;
-        } else {
-            response.setCode(ResponseCode.SYSTEM_ERROR);
-            response.setRemark("can not find message by the offset, " + requestHeader.getOffset());
-        }
-
-        return response;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java
deleted file mode 100644
index a375285..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java
+++ /dev/null
@@ -1,497 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.processor;
-
-import com.alibaba.rocketmq.broker.BrokerController;
-import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageContext;
-import com.alibaba.rocketmq.broker.mqtrace.ConsumeMessageHook;
-import com.alibaba.rocketmq.broker.mqtrace.SendMessageContext;
-import com.alibaba.rocketmq.common.*;
-import com.alibaba.rocketmq.common.constant.PermName;
-import com.alibaba.rocketmq.common.help.FAQUrl;
-import com.alibaba.rocketmq.common.message.MessageAccessor;
-import com.alibaba.rocketmq.common.message.MessageConst;
-import com.alibaba.rocketmq.common.message.MessageDecoder;
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.common.protocol.RequestCode;
-import com.alibaba.rocketmq.common.protocol.ResponseCode;
-import com.alibaba.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
-import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader;
-import com.alibaba.rocketmq.common.protocol.header.SendMessageResponseHeader;
-import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig;
-import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
-import com.alibaba.rocketmq.common.sysflag.TopicSysFlag;
-import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
-import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
-import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
-import com.alibaba.rocketmq.store.MessageExtBrokerInner;
-import com.alibaba.rocketmq.store.PutMessageResult;
-import com.alibaba.rocketmq.store.config.StorePathConfigHelper;
-import com.alibaba.rocketmq.store.stats.BrokerStatsManager;
-import io.netty.channel.ChannelHandlerContext;
-
-import java.net.SocketAddress;
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- */
-public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
-
-    private List<ConsumeMessageHook> consumeMessageHookList;
-
-    public SendMessageProcessor(final BrokerController brokerController) {
-        super(brokerController);
-    }
-
-    @Override
-    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
-        SendMessageContext mqtraceContext;
-        switch (request.getCode()) {
-            case RequestCode.CONSUMER_SEND_MSG_BACK:
-                return this.consumerSendMsgBack(ctx, request);
-            default:
-                SendMessageRequestHeader requestHeader = parseRequestHeader(request);
-                if (requestHeader == null) {
-                    return null;
-                }
-
-                mqtraceContext = buildMsgContext(ctx, requestHeader);
-                this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
-                final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
-
-                this.executeSendMessageHookAfter(response, mqtraceContext);
-                return response;
-        }
-    }
-
-    @Override
-    public boolean rejectRequest() {
-        return this.brokerController.getMessageStore().isOSPageCacheBusy() ||
-                this.brokerController.getMessageStore().isTransientStorePoolDeficient();
-    }
-
-    private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
-            throws RemotingCommandException {
-        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
-        final ConsumerSendMsgBackRequestHeader requestHeader =
-                (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
-
-        if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {
-
-            ConsumeMessageContext context = new ConsumeMessageContext();
-            context.setConsumerGroup(requestHeader.getGroup());
-            context.setTopic(requestHeader.getOriginTopic());
-            context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);
-            context.setCommercialRcvTimes(1);
-            context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER));
-
-            this.executeConsumeMessageHookAfter(context);
-        }
-
-
-        SubscriptionGroupConfig subscriptionGroupConfig =
-                this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
-        if (null == subscriptionGroupConfig) {
-            response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
-            response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
-                    + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
-            return response;
-        }
-
-
-        if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
-            response.setCode(ResponseCode.NO_PERMISSION);
-            response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
-            return response;
-        }
-
-
-        if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
-            response.setCode(ResponseCode.SUCCESS);
-            response.setRemark(null);
-            return response;
-        }
-
-        String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
-        int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
-
-
-        int topicSysFlag = 0;
-        if (requestHeader.isUnitMode()) {
-            topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
-        }
-
-
-        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(//
-                newTopic, //
-                subscriptionGroupConfig.getRetryQueueNums(), //
-                PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
-        if (null == topicConfig) {
-            response.setCode(ResponseCode.SYSTEM_ERROR);
-            response.setRemark("topic[" + newTopic + "] not exist");
-            return response;
-        }
-
-
-        if (!PermName.isWriteable(topicConfig.getPerm())) {
-            response.setCode(ResponseCode.NO_PERMISSION);
-            response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
-            return response;
-        }
-
-        MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
-        if (null == msgExt) {
-            response.setCode(ResponseCode.SYSTEM_ERROR);
-            response.setRemark("look message by offset failed, " + requestHeader.getOffset());
-            return response;
-        }
-
-
-        final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
-        if (null == retryTopic) {
-            MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
-        }
-        msgExt.setWaitStoreMsgOK(false);
-
-
-        int delayLevel = requestHeader.getDelayLevel();
-
-
-        int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
-        if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
-            maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
-        }
-
-
-        if (msgExt.getReconsumeTimes() >= maxReconsumeTimes//
-                || delayLevel < 0) {
-            newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
-            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
-
-            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
-                    DLQ_NUMS_PER_GROUP, //
-                    PermName.PERM_WRITE, 0
-            );
-            if (null == topicConfig) {
-                response.setCode(ResponseCode.SYSTEM_ERROR);
-                response.setRemark("topic[" + newTopic + "] not exist");
-                return response;
-            }
-        } else {
-            if (0 == delayLevel) {
-                delayLevel = 3 + msgExt.getReconsumeTimes();
-            }
-
-            msgExt.setDelayTimeLevel(delayLevel);
-        }
-
-        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
-        msgInner.setTopic(newTopic);
-        msgInner.setBody(msgExt.getBody());
-        msgInner.setFlag(msgExt.getFlag());
-        MessageAccessor.setProperties(msgInner, msgExt.getProperties());
-        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
-        msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
-
-        msgInner.setQueueId(queueIdInt);
-        msgInner.setSysFlag(msgExt.getSysFlag());
-        msgInner.setBornTimestamp(msgExt.getBornTimestamp());
-        msgInner.setBornHost(msgExt.getBornHost());
-        msgInner.setStoreHost(this.getStoreHost());
-        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
-
-        String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
-        MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
-
-        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
-        if (putMessageResult != null) {
-            switch (putMessageResult.getPutMessageStatus()) {
-                case PUT_OK:
-                    String backTopic = msgExt.getTopic();
-                    String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
-                    if (correctTopic != null) {
-                        backTopic = correctTopic;
-                    }
-
-                    this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
-
-                    response.setCode(ResponseCode.SUCCESS);
-                    response.setRemark(null);
-
-                    return response;
-                default:
-                    break;
-            }
-
-            response.setCode(ResponseCode.SYSTEM_ERROR);
-            response.setRemark(putMessageResult.getPutMessageStatus().name());
-            return response;
-        }
-
-        response.setCode(ResponseCode.SYSTEM_ERROR);
-        response.setRemark("putMessageResult is null");
-        return response;
-    }
-
-    private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //
-                                        final RemotingCommand request, //
-                                        final SendMessageContext sendMessageContext, //
-                                        final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
-
-        final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
-        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
-
-
-        response.setOpaque(request.getOpaque());
-
-        response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
-        response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
-
-        if (log.isDebugEnabled()) {
-            log.debug("receive SendMessage request command, " + request);
-        }
-
-        final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
-        if (this.brokerController.getMessageStore().now() < startTimstamp) {
-            response.setCode(ResponseCode.SYSTEM_ERROR);
-            response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
-            return response;
-        }
-
-        response.setCode(-1);
-        super.msgCheck(ctx, requestHeader, response);
-        if (response.getCode() != -1) {
-            return response;
-        }
-
-        final byte[] body = request.getBody();
-
-        int queueIdInt = requestHeader.getQueueId();
-        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
-
-        if (queueIdInt < 0) {
-            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
-        }
-
-        int sysFlag = requestHeader.getSysFlag();
-
-        if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
-            sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
-        }
-
-        String newTopic = requestHeader.getTopic();
-        if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
-            String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
-            SubscriptionGroupConfig subscriptionGroupConfig =
-                    this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
-            if (null == subscriptionGroupConfig) {
-                response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
-                response.setRemark(
-                        "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
-                return response;
-            }
-
-
-            int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
-            if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
-                maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
-            }
-            int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
-            if (reconsumeTimes >= maxReconsumeTimes) {
-                newTopic = MixAll.getDLQTopic(groupName);
-                queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
-                topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
-                        DLQ_NUMS_PER_GROUP, //
-                        PermName.PERM_WRITE, 0
-                );
-                if (null == topicConfig) {
-                    response.setCode(ResponseCode.SYSTEM_ERROR);
-                    response.setRemark("topic[" + newTopic + "] not exist");
-                    return response;
-                }
-            }
-        }
-        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
-        msgInner.setTopic(newTopic);
-        msgInner.setBody(body);
-        msgInner.setFlag(requestHeader.getFlag());
-        MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
-        msgInner.setPropertiesString(requestHeader.getProperties());
-        msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));
-
-        msgInner.setQueueId(queueIdInt);
-        msgInner.setSysFlag(sysFlag);
-        msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
-        msgInner.setBornHost(ctx.channel().remoteAddress());
-        msgInner.setStoreHost(this.getStoreHost());
-        msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
-
-        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
-            String traFlag = msgInner.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
-            if (traFlag != null) {
-                response.setCode(ResponseCode.NO_PERMISSION);
-                response.setRemark(
-                        "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");
-                return response;
-            }
-        }
-
-        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
-        if (putMessageResult != null) {
-            boolean sendOK = false;
-
-            switch (putMessageResult.getPutMessageStatus()) {
-                // Success
-                case PUT_OK:
-                    sendOK = true;
-                    response.setCode(ResponseCode.SUCCESS);
-                    break;
-                case FLUSH_DISK_TIMEOUT:
-                    response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);
-                    sendOK = true;
-                    break;
-                case FLUSH_SLAVE_TIMEOUT:
-                    response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);
-                    sendOK = true;
-                    break;
-                case SLAVE_NOT_AVAILABLE:
-                    response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
-                    sendOK = true;
-                    break;
-
-                // Failed
-                case CREATE_MAPEDFILE_FAILED:
-                    response.setCode(ResponseCode.SYSTEM_ERROR);
-                    response.setRemark("create mapped file failed, server is busy or broken.");
-                    break;
-                case MESSAGE_ILLEGAL:
-                case PROPERTIES_SIZE_EXCEEDED:
-                    response.setCode(ResponseCode.MESSAGE_ILLEGAL);
-                    response.setRemark(
-                            "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
-                    break;
-                case SERVICE_NOT_AVAILABLE:
-                    response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
-                    response.setRemark(
-                            "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");
-                    break;
-                case OS_PAGECACHE_BUSY:
-                    response.setCode(ResponseCode.SYSTEM_ERROR);
-                    response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
-                    break;
-                case UNKNOWN_ERROR:
-                    response.setCode(ResponseCode.SYSTEM_ERROR);
-                    response.setRemark("UNKNOWN_ERROR");
-                    break;
-                default:
-                    response.setCode(ResponseCode.SYSTEM_ERROR);
-                    response.setRemark("UNKNOWN_ERROR DEFAULT");
-                    break;
-            }
-
-            String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
-            if (sendOK) {
-
-                this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
-                this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
-                        putMessageResult.getAppendMessageResult().getWroteBytes());
-                this.brokerController.getBrokerStatsManager().incBrokerPutNums();
-
-                response.setRemark(null);
-
-                responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
-                responseHeader.setQueueId(queueIdInt);
-                responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
-
-
-                doResponse(ctx, request, response);
-
-
-                if (hasSendMessageHook()) {
-                    sendMessageContext.setMsgId(responseHeader.getMsgId());
-                    sendMessageContext.setQueueId(responseHeader.getQueueId());
-                    sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
-
-                    int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
-                    int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
-                    int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
-
-                    sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
-                    sendMessageContext.setCommercialSendTimes(incValue);
-                    sendMessageContext.setCommercialSendSize(wroteSize);
-                    sendMessageContext.setCommercialOwner(owner);
-                }
-                return null;
-            } else {
-                if (hasSendMessageHook()) {
-                    int wroteSize = request.getBody().length;
-                    int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
-
-                    sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
-                    sendMessageContext.setCommercialSendTimes(incValue);
-                    sendMessageContext.setCommercialSendSize(wroteSize);
-                    sendMessageContext.setCommercialOwner(owner);
-                }
-            }
-        } else {
-            response.setCode(ResponseCode.SYSTEM_ERROR);
-            response.setRemark("store putMessage return null");
-        }
-
-        return response;
-    }
-
-    public boolean hasConsumeMessageHook() {
-        return consumeMessageHookList != null && !this.consumeMessageHookList.isEmpty();
-    }
-
-    public void executeConsumeMessageHookAfter(final ConsumeMessageContext context) {
-        if (hasConsumeMessageHook()) {
-            for (ConsumeMessageHook hook : this.consumeMessageHookList) {
-                try {
-                    hook.consumeMessageAfter(context);
-                } catch (Throwable e) {
-                }
-            }
-        }
-    }
-
-    public SocketAddress getStoreHost() {
-        return storeHost;
-    }
-
-    private String diskUtil() {
-        String storePathPhysic = this.brokerController.getMessageStoreConfig().getStorePathCommitLog();
-        double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
-
-        String storePathLogis =
-                StorePathConfigHelper.getStorePathConsumeQueue(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
-        double logisRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogis);
-
-        String storePathIndex =
-                StorePathConfigHelper.getStorePathIndex(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
-        double indexRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathIndex);
-
-        return String.format("CL: %5.2f CQ: %5.2f INDEX: %5.2f", physicRatio, logisRatio, indexRatio);
-    }
-
-    public void registerConsumeMessageHook(List<ConsumeMessageHook> consumeMessageHookList) {
-        this.consumeMessageHookList = consumeMessageHookList;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/slave/SlaveSynchronize.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/com/alibaba/rocketmq/broker/slave/SlaveSynchronize.java
deleted file mode 100644
index 8b3aefe..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/slave/SlaveSynchronize.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.broker.slave;
-
-import com.alibaba.rocketmq.broker.BrokerController;
-import com.alibaba.rocketmq.broker.subscription.SubscriptionGroupManager;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper;
-import com.alibaba.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
-import com.alibaba.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
-import com.alibaba.rocketmq.store.config.StorePathConfigHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-
-/**
- * @author shijia.wxr
- * @author manhong.yqd
- */
-public class SlaveSynchronize {
-    private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-    private final BrokerController brokerController;
-    private volatile String masterAddr = null;
-
-
-    public SlaveSynchronize(BrokerController brokerController) {
-        this.brokerController = brokerController;
-    }
-
-
-    public String getMasterAddr() {
-        return masterAddr;
-    }
-
-
-    public void setMasterAddr(String masterAddr) {
-        this.masterAddr = masterAddr;
-    }
-
-
-    public void syncAll() {
-        this.syncTopicConfig();
-        this.syncConsumerOffset();
-        this.syncDelayOffset();
-        this.syncSubscriptionGroupConfig();
-    }
-
-
-    private void syncTopicConfig() {
-        String masterAddrBak = this.masterAddr;
-        if (masterAddrBak != null) {
-            try {
-                TopicConfigSerializeWrapper topicWrapper =
-                        this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
-                if (!this.brokerController.getTopicConfigManager().getDataVersion()
-                        .equals(topicWrapper.getDataVersion())) {
-
-                    this.brokerController.getTopicConfigManager().getDataVersion()
-                            .assignNewOne(topicWrapper.getDataVersion());
-                    this.brokerController.getTopicConfigManager().getTopicConfigTable().clear();
-                    this.brokerController.getTopicConfigManager().getTopicConfigTable()
-                            .putAll(topicWrapper.getTopicConfigTable());
-                    this.brokerController.getTopicConfigManager().persist();
-
-                    log.info("update slave topic config from master, {}", masterAddrBak);
-                }
-            } catch (Exception e) {
-                log.error("syncTopicConfig Exception, " + masterAddrBak, e);
-            }
-        }
-    }
-
-
-    private void syncConsumerOffset() {
-        String masterAddrBak = this.masterAddr;
-        if (masterAddrBak != null) {
-            try {
-                ConsumerOffsetSerializeWrapper offsetWrapper =
-                        this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
-                this.brokerController.getConsumerOffsetManager().getOffsetTable()
-                        .putAll(offsetWrapper.getOffsetTable());
-                this.brokerController.getConsumerOffsetManager().persist();
-                log.info("update slave consumer offset from master, {}", masterAddrBak);
-            } catch (Exception e) {
-                log.error("syncConsumerOffset Exception, " + masterAddrBak, e);
-            }
-        }
-    }
-
-
-    private void syncDelayOffset() {
-        String masterAddrBak = this.masterAddr;
-        if (masterAddrBak != null) {
-            try {
-                String delayOffset =
-                        this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);
-                if (delayOffset != null) {
-
-                    String fileName =
-                            StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController
-                                    .getMessageStoreConfig().getStorePathRootDir());
-                    try {
-                        MixAll.string2File(delayOffset, fileName);
-                    } catch (IOException e) {
-                        log.error("persist file Exception, " + fileName, e);
-                    }
-                }
-                log.info("update slave delay offset from master, {}", masterAddrBak);
-            } catch (Exception e) {
-                log.error("syncDelayOffset Exception, " + masterAddrBak, e);
-            }
-        }
-    }
-
-
-    private void syncSubscriptionGroupConfig() {
-        String masterAddrBak = this.masterAddr;
-        if (masterAddrBak != null) {
-            try {
-                SubscriptionGroupWrapper subscriptionWrapper =
-                        this.brokerController.getBrokerOuterAPI()
-                                .getAllSubscriptionGroupConfig(masterAddrBak);
-
-                if (!this.brokerController.getSubscriptionGroupManager().getDataVersion()
-                        .equals(subscriptionWrapper.getDataVersion())) {
-                    SubscriptionGroupManager subscriptionGroupManager =
-                            this.brokerController.getSubscriptionGroupManager();
-                    subscriptionGroupManager.getDataVersion().assignNewOne(
-                            subscriptionWrapper.getDataVersion());
-                    subscriptionGroupManager.getSubscriptionGroupTable().clear();
-                    subscriptionGroupManager.getSubscriptionGroupTable().putAll(
-                            subscriptionWrapper.getSubscriptionGroupTable());
-                    subscriptionGroupManager.persist();
-                    log.info("update slave Subscription Group from master, {}", masterAddrBak);
-                }
-            } catch (Exception e) {
-                log.error("syncSubscriptionGroup Exception, " + masterAddrBak, e);
-            }
-        }
-    }
-}