You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 02:44:44 UTC

[48/58] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18 Rename package name from com.alibaba to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManager.java b/broker/src/main/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManager.java
deleted file mode 100644
index 269918a..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/offset/ConsumerOffsetManager.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.broker.offset;
-
-import com.alibaba.rocketmq.broker.BrokerController;
-import com.alibaba.rocketmq.broker.BrokerPathConfigHelper;
-import com.alibaba.rocketmq.common.ConfigManager;
-import com.alibaba.rocketmq.common.UtilAll;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-/**
- * @author shijia.wxr
- */
-public class ConsumerOffsetManager extends ConfigManager {
-    private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-    private static final String TOPIC_GROUP_SEPARATOR = "@";
-
-    private ConcurrentHashMap<String/* topic@group */, ConcurrentHashMap<Integer, Long>> offsetTable =
-            new ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>>(512);
-
-    private transient BrokerController brokerController;
-
-
-    public ConsumerOffsetManager() {
-    }
-
-
-    public ConsumerOffsetManager(BrokerController brokerController) {
-        this.brokerController = brokerController;
-    }
-
-
-    public void scanUnsubscribedTopic() {
-        Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
-        while (it.hasNext()) {
-            Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next();
-            String topicAtGroup = next.getKey();
-            String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
-            if (arrays != null && arrays.length == 2) {
-                String topic = arrays[0];
-                String group = arrays[1];
-
-                if (null == brokerController.getConsumerManager().findSubscriptionData(group, topic)
-                        && this.offsetBehindMuchThanData(topic, next.getValue())) {
-                    it.remove();
-                    log.warn("remove topic offset, {}", topicAtGroup);
-                }
-            }
-        }
-    }
-
-
-    private boolean offsetBehindMuchThanData(final String topic, ConcurrentHashMap<Integer, Long> table) {
-        Iterator<Entry<Integer, Long>> it = table.entrySet().iterator();
-        boolean result = !table.isEmpty();
-
-        while (it.hasNext() && result) {
-            Entry<Integer, Long> next = it.next();
-            long minOffsetInStore = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, next.getKey());
-            long offsetInPersist = next.getValue();
-            if (offsetInPersist > minOffsetInStore) {
-                result = false;
-            } else {
-                result = true;
-            }
-        }
-
-        return result;
-    }
-
-
-    public Set<String> whichTopicByConsumer(final String group) {
-        Set<String> topics = new HashSet<String>();
-
-        Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
-        while (it.hasNext()) {
-            Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next();
-            String topicAtGroup = next.getKey();
-            String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
-            if (arrays != null && arrays.length == 2) {
-                if (group.equals(arrays[1])) {
-                    topics.add(arrays[0]);
-                }
-            }
-        }
-
-        return topics;
-    }
-
-
-    public Set<String> whichGroupByTopic(final String topic) {
-        Set<String> groups = new HashSet<String>();
-
-        Iterator<Entry<String, ConcurrentHashMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
-        while (it.hasNext()) {
-            Entry<String, ConcurrentHashMap<Integer, Long>> next = it.next();
-            String topicAtGroup = next.getKey();
-            String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
-            if (arrays != null && arrays.length == 2) {
-                if (topic.equals(arrays[0])) {
-                    groups.add(arrays[1]);
-                }
-            }
-        }
-
-        return groups;
-    }
-
-
-    public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, final long offset) {
-        // topic@group
-        String key = topic + TOPIC_GROUP_SEPARATOR + group;
-        this.commitOffset(clientHost, key, queueId, offset);
-    }
-
-    private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
-        ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key);
-        if (null == map) {
-            map = new ConcurrentHashMap<Integer, Long>(32);
-            map.put(queueId, offset);
-            this.offsetTable.put(key, map);
-        } else {
-            Long storeOffset = map.put(queueId, offset);
-            if (storeOffset != null && offset < storeOffset) {
-                log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
-            }
-        }
-    }
-
-    public long queryOffset(final String group, final String topic, final int queueId) {
-        // topic@group
-        String key = topic + TOPIC_GROUP_SEPARATOR + group;
-        ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key);
-        if (null != map) {
-            Long offset = map.get(queueId);
-            if (offset != null)
-                return offset;
-        }
-
-        return -1;
-    }
-
-    public String encode() {
-        return this.encode(false);
-    }
-
-    @Override
-    public String configFilePath() {
-        return BrokerPathConfigHelper.getConsumerOffsetPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
-    }
-
-    @Override
-    public void decode(String jsonString) {
-        if (jsonString != null) {
-            ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);
-            if (obj != null) {
-                this.offsetTable = obj.offsetTable;
-            }
-        }
-    }
-
-    public String encode(final boolean prettyFormat) {
-        return RemotingSerializable.toJson(this, prettyFormat);
-    }
-
-    public ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> getOffsetTable() {
-        return offsetTable;
-    }
-
-
-    public void setOffsetTable(ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> offsetTable) {
-        this.offsetTable = offsetTable;
-    }
-
-
-    public Map<Integer, Long> queryMinOffsetInAllGroup(final String topic, final String filterGroups) {
-
-        Map<Integer, Long> queueMinOffset = new HashMap<Integer, Long>();
-        Set<String> topicGroups = this.offsetTable.keySet();
-        if (!UtilAll.isBlank(filterGroups)) {
-            for (String group : filterGroups.split(",")) {
-                Iterator<String> it = topicGroups.iterator();
-                while (it.hasNext()) {
-                    if (group.equals(it.next().split(TOPIC_GROUP_SEPARATOR)[1])) {
-                        it.remove();
-                    }
-                }
-            }
-        }
-
-        for (Map.Entry<String, ConcurrentHashMap<Integer, Long>> offSetEntry : this.offsetTable.entrySet()) {
-            String topicGroup = offSetEntry.getKey();
-            String[] topicGroupArr = topicGroup.split(TOPIC_GROUP_SEPARATOR);
-            if (topic.equals(topicGroupArr[0])) {
-                for (Entry<Integer, Long> entry : offSetEntry.getValue().entrySet()) {
-                    long minOffset = this.brokerController.getMessageStore().getMinOffsetInQuque(topic, entry.getKey());
-                    if (entry.getValue() >= minOffset) {
-                        Long offset = queueMinOffset.get(entry.getKey());
-                        if (offset == null) {
-                            queueMinOffset.put(entry.getKey(), Math.min(Long.MAX_VALUE, entry.getValue()));
-                        } else {
-                            queueMinOffset.put(entry.getKey(), Math.min(entry.getValue(), offset));
-                        }
-                    }
-                }
-            }
-
-        }
-        return queueMinOffset;
-    }
-
-
-    public Map<Integer, Long> queryOffset(final String group, final String topic) {
-        // topic@group
-        String key = topic + TOPIC_GROUP_SEPARATOR + group;
-        return this.offsetTable.get(key);
-    }
-
-
-    public void cloneOffset(final String srcGroup, final String destGroup, final String topic) {
-        ConcurrentHashMap<Integer, Long> offsets = this.offsetTable.get(topic + TOPIC_GROUP_SEPARATOR + srcGroup);
-        if (offsets != null) {
-            this.offsetTable.put(topic + TOPIC_GROUP_SEPARATOR + destGroup, new ConcurrentHashMap<Integer, Long>(offsets));
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/out/BrokerOuterAPI.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/com/alibaba/rocketmq/broker/out/BrokerOuterAPI.java
deleted file mode 100644
index f051d29..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/out/BrokerOuterAPI.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.out;
-
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.common.namesrv.RegisterBrokerResult;
-import com.alibaba.rocketmq.common.namesrv.TopAddressing;
-import com.alibaba.rocketmq.common.protocol.RequestCode;
-import com.alibaba.rocketmq.common.protocol.ResponseCode;
-import com.alibaba.rocketmq.common.protocol.body.*;
-import com.alibaba.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader;
-import com.alibaba.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader;
-import com.alibaba.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader;
-import com.alibaba.rocketmq.remoting.RPCHook;
-import com.alibaba.rocketmq.remoting.RemotingClient;
-import com.alibaba.rocketmq.remoting.exception.*;
-import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
-import com.alibaba.rocketmq.remoting.netty.NettyRemotingClient;
-import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- * @author manhong.yqd
- */
-public class BrokerOuterAPI {
-    private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-    private final RemotingClient remotingClient;
-    private final TopAddressing topAddressing = new TopAddressing(MixAll.WS_ADDR);
-    private String nameSrvAddr = null;
-
-    public BrokerOuterAPI(final NettyClientConfig nettyClientConfig) {
-        this(nettyClientConfig, null);
-    }
-
-    public BrokerOuterAPI(final NettyClientConfig nettyClientConfig, RPCHook rpcHook) {
-        this.remotingClient = new NettyRemotingClient(nettyClientConfig);
-        this.remotingClient.registerRPCHook(rpcHook);
-    }
-
-    public void start() {
-        this.remotingClient.start();
-    }
-
-    public void shutdown() {
-        this.remotingClient.shutdown();
-    }
-
-    public String fetchNameServerAddr() {
-        try {
-            String addrs = this.topAddressing.fetchNSAddr();
-            if (addrs != null) {
-                if (!addrs.equals(this.nameSrvAddr)) {
-                    log.info("name server address changed, old: " + this.nameSrvAddr + " new: " + addrs);
-                    this.updateNameServerAddressList(addrs);
-                    this.nameSrvAddr = addrs;
-                    return nameSrvAddr;
-                }
-            }
-        } catch (Exception e) {
-            log.error("fetchNameServerAddr Exception", e);
-        }
-        return nameSrvAddr;
-    }
-
-    public void updateNameServerAddressList(final String addrs) {
-        List<String> lst = new ArrayList<String>();
-        String[] addrArray = addrs.split(";");
-        if (addrArray != null) {
-            for (String addr : addrArray) {
-                lst.add(addr);
-            }
-
-            this.remotingClient.updateNameServerAddressList(lst);
-        }
-    }
-
-    public RegisterBrokerResult registerBrokerAll(
-            final String clusterName,
-            final String brokerAddr,
-            final String brokerName,
-            final long brokerId,
-            final String haServerAddr,
-            final TopicConfigSerializeWrapper topicConfigWrapper,
-            final List<String> filterServerList,
-            final boolean oneway,
-            final int timeoutMills) {
-        RegisterBrokerResult registerBrokerResult = null;
-
-        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
-        if (nameServerAddressList != null) {
-            for (String namesrvAddr : nameServerAddressList) {
-                try {
-                    RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
-                            haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);
-                    if (result != null) {
-                        registerBrokerResult = result;
-                    }
-
-                    log.info("register broker to name server {} OK", namesrvAddr);
-                } catch (Exception e) {
-                    log.warn("registerBroker Exception, " + namesrvAddr, e);
-                }
-            }
-        }
-
-        return registerBrokerResult;
-    }
-
-    private RegisterBrokerResult registerBroker(
-            final String namesrvAddr,
-            final String clusterName,
-            final String brokerAddr,
-            final String brokerName,
-            final long brokerId,
-            final String haServerAddr,
-            final TopicConfigSerializeWrapper topicConfigWrapper,
-            final List<String> filterServerList,
-            final boolean oneway,
-            final int timeoutMills
-    ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
-            InterruptedException {
-        RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
-        requestHeader.setBrokerAddr(brokerAddr);
-        requestHeader.setBrokerId(brokerId);
-        requestHeader.setBrokerName(brokerName);
-        requestHeader.setClusterName(clusterName);
-        requestHeader.setHaServerAddr(haServerAddr);
-        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
-
-        RegisterBrokerBody requestBody = new RegisterBrokerBody();
-        requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
-        requestBody.setFilterServerList(filterServerList);
-        request.setBody(requestBody.encode());
-
-        if (oneway) {
-            try {
-                this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
-            } catch (RemotingTooMuchRequestException e) {
-            }
-            return null;
-        }
-
-        RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                RegisterBrokerResponseHeader responseHeader =
-                        (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
-                RegisterBrokerResult result = new RegisterBrokerResult();
-                result.setMasterAddr(responseHeader.getMasterAddr());
-                result.setHaServerAddr(responseHeader.getHaServerAddr());
-                result.setHaServerAddr(responseHeader.getHaServerAddr());
-                if (response.getBody() != null) {
-                    result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
-                }
-                return result;
-            }
-            default:
-                break;
-        }
-
-        throw new MQBrokerException(response.getCode(), response.getRemark());
-    }
-
-    public void unregisterBrokerAll(
-            final String clusterName,
-            final String brokerAddr,
-            final String brokerName,
-            final long brokerId
-    ) {
-        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
-        if (nameServerAddressList != null) {
-            for (String namesrvAddr : nameServerAddressList) {
-                try {
-                    this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId);
-                    log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr);
-                } catch (Exception e) {
-                    log.warn("unregisterBroker Exception, " + namesrvAddr, e);
-                }
-            }
-        }
-    }
-
-    public void unregisterBroker(
-            final String namesrvAddr,
-            final String clusterName,
-            final String brokerAddr,
-            final String brokerName,
-            final long brokerId
-    ) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
-        UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader();
-        requestHeader.setBrokerAddr(brokerAddr);
-        requestHeader.setBrokerId(brokerId);
-        requestHeader.setBrokerName(brokerName);
-        requestHeader.setClusterName(clusterName);
-        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_BROKER, requestHeader);
-
-        RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, 3000);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                return;
-            }
-            default:
-                break;
-        }
-
-        throw new MQBrokerException(response.getCode(), response.getRemark());
-    }
-
-    public TopicConfigSerializeWrapper getAllTopicConfig(final String addr) throws RemotingConnectException, RemotingSendRequestException,
-            RemotingTimeoutException, InterruptedException, MQBrokerException {
-        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
-
-        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 3000);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                return TopicConfigSerializeWrapper.decode(response.getBody(), TopicConfigSerializeWrapper.class);
-            }
-            default:
-                break;
-        }
-
-        throw new MQBrokerException(response.getCode(), response.getRemark());
-    }
-
-    public ConsumerOffsetSerializeWrapper getAllConsumerOffset(final String addr) throws InterruptedException, RemotingTimeoutException,
-            RemotingSendRequestException, RemotingConnectException, MQBrokerException {
-        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
-        RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                return ConsumerOffsetSerializeWrapper.decode(response.getBody(), ConsumerOffsetSerializeWrapper.class);
-            }
-            default:
-                break;
-        }
-
-        throw new MQBrokerException(response.getCode(), response.getRemark());
-    }
-
-    public String getAllDelayOffset(final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException,
-            RemotingConnectException, MQBrokerException, UnsupportedEncodingException {
-        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, null);
-        RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                return new String(response.getBody(), MixAll.DEFAULT_CHARSET);
-            }
-            default:
-                break;
-        }
-
-        throw new MQBrokerException(response.getCode(), response.getRemark());
-    }
-
-    public SubscriptionGroupWrapper getAllSubscriptionGroupConfig(final String addr) throws InterruptedException, RemotingTimeoutException,
-            RemotingSendRequestException, RemotingConnectException, MQBrokerException {
-        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
-        RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                return SubscriptionGroupWrapper.decode(response.getBody(), SubscriptionGroupWrapper.class);
-            }
-            default:
-                break;
-        }
-
-        throw new MQBrokerException(response.getCode(), response.getRemark());
-    }
-
-    public void registerRPCHook(RPCHook rpcHook) {
-        remotingClient.registerRPCHook(rpcHook);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/ManyMessageTransfer.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/ManyMessageTransfer.java b/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/ManyMessageTransfer.java
deleted file mode 100644
index 8050bc1..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/ManyMessageTransfer.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.broker.pagecache;
-
-import com.alibaba.rocketmq.store.GetMessageResult;
-import io.netty.channel.FileRegion;
-import io.netty.util.AbstractReferenceCounted;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.WritableByteChannel;
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- */
-public class ManyMessageTransfer extends AbstractReferenceCounted implements FileRegion {
-    private final ByteBuffer byteBufferHeader;
-    private final GetMessageResult getMessageResult;
-    private long transfered; // the bytes which was transfered already
-
-
-    public ManyMessageTransfer(ByteBuffer byteBufferHeader, GetMessageResult getMessageResult) {
-        this.byteBufferHeader = byteBufferHeader;
-        this.getMessageResult = getMessageResult;
-    }
-
-
-    @Override
-    public long position() {
-        int pos = byteBufferHeader.position();
-        List<ByteBuffer> messageBufferList = this.getMessageResult.getMessageBufferList();
-        for (ByteBuffer bb : messageBufferList) {
-            pos += bb.position();
-        }
-        return pos;
-    }
-
-    @Override
-    public long transfered() {
-        return transfered;
-    }
-
-    @Override
-    public long count() {
-        return byteBufferHeader.limit() + this.getMessageResult.getBufferTotalSize();
-    }
-
-    @Override
-    public long transferTo(WritableByteChannel target, long position) throws IOException {
-        if (this.byteBufferHeader.hasRemaining()) {
-            transfered += target.write(this.byteBufferHeader);
-            return transfered;
-        } else {
-            List<ByteBuffer> messageBufferList = this.getMessageResult.getMessageBufferList();
-            for (ByteBuffer bb : messageBufferList) {
-                if (bb.hasRemaining()) {
-                    transfered += target.write(bb);
-                    return transfered;
-                }
-            }
-        }
-
-        return 0;
-    }
-
-    public void close() {
-        this.deallocate();
-    }
-
-    @Override
-    protected void deallocate() {
-        this.getMessageResult.release();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/OneMessageTransfer.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/OneMessageTransfer.java b/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/OneMessageTransfer.java
deleted file mode 100644
index df742c5..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/OneMessageTransfer.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.broker.pagecache;
-
-import com.alibaba.rocketmq.store.SelectMappedBufferResult;
-import io.netty.channel.FileRegion;
-import io.netty.util.AbstractReferenceCounted;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.WritableByteChannel;
-
-
-/**
- * @author shijia.wxr
- */
-public class OneMessageTransfer extends AbstractReferenceCounted implements FileRegion {
-    private final ByteBuffer byteBufferHeader;
-    private final SelectMappedBufferResult selectMappedBufferResult;
-    private long transfered; // the bytes which was transfered already
-
-
-    public OneMessageTransfer(ByteBuffer byteBufferHeader, SelectMappedBufferResult selectMappedBufferResult) {
-        this.byteBufferHeader = byteBufferHeader;
-        this.selectMappedBufferResult = selectMappedBufferResult;
-    }
-
-
-    @Override
-    public long position() {
-        return this.byteBufferHeader.position() + this.selectMappedBufferResult.getByteBuffer().position();
-    }
-
-    @Override
-    public long transfered() {
-        return transfered;
-    }
-
-    @Override
-    public long count() {
-        return this.byteBufferHeader.limit() + this.selectMappedBufferResult.getSize();
-    }
-
-    @Override
-    public long transferTo(WritableByteChannel target, long position) throws IOException {
-        if (this.byteBufferHeader.hasRemaining()) {
-            transfered += target.write(this.byteBufferHeader);
-            return transfered;
-        } else if (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
-            transfered += target.write(this.selectMappedBufferResult.getByteBuffer());
-            return transfered;
-        }
-
-        return 0;
-    }
-
-    public void close() {
-        this.deallocate();
-    }
-
-    @Override
-    protected void deallocate() {
-        this.selectMappedBufferResult.release();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/QueryMessageTransfer.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/QueryMessageTransfer.java b/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/QueryMessageTransfer.java
deleted file mode 100644
index cbcbc74..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/pagecache/QueryMessageTransfer.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.broker.pagecache;
-
-import com.alibaba.rocketmq.store.QueryMessageResult;
-import io.netty.channel.FileRegion;
-import io.netty.util.AbstractReferenceCounted;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.WritableByteChannel;
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- */
-public class QueryMessageTransfer extends AbstractReferenceCounted implements FileRegion {
-    private final ByteBuffer byteBufferHeader;
-    private final QueryMessageResult queryMessageResult;
-    private long transfered; // the bytes which was transfered already
-
-
-    public QueryMessageTransfer(ByteBuffer byteBufferHeader, QueryMessageResult queryMessageResult) {
-        this.byteBufferHeader = byteBufferHeader;
-        this.queryMessageResult = queryMessageResult;
-    }
-
-
-    @Override
-    public long position() {
-        int pos = byteBufferHeader.position();
-        List<ByteBuffer> messageBufferList = this.queryMessageResult.getMessageBufferList();
-        for (ByteBuffer bb : messageBufferList) {
-            pos += bb.position();
-        }
-        return pos;
-    }
-
-    @Override
-    public long transfered() {
-        return transfered;
-    }
-
-    @Override
-    public long count() {
-        return byteBufferHeader.limit() + this.queryMessageResult.getBufferTotalSize();
-    }
-
-    @Override
-    public long transferTo(WritableByteChannel target, long position) throws IOException {
-        if (this.byteBufferHeader.hasRemaining()) {
-            transfered += target.write(this.byteBufferHeader);
-            return transfered;
-        } else {
-            List<ByteBuffer> messageBufferList = this.queryMessageResult.getMessageBufferList();
-            for (ByteBuffer bb : messageBufferList) {
-                if (bb.hasRemaining()) {
-                    transfered += target.write(bb);
-                    return transfered;
-                }
-            }
-        }
-
-        return 0;
-    }
-
-    public void close() {
-        this.deallocate();
-    }
-
-    @Override
-    protected void deallocate() {
-        this.queryMessageResult.release();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/AbstractPluginMessageStore.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/AbstractPluginMessageStore.java
deleted file mode 100644
index 141ba69..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/AbstractPluginMessageStore.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-/**
- *
- */
-package com.alibaba.rocketmq.broker.plugin;
-
-import com.alibaba.rocketmq.common.message.MessageExt;
-import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import com.alibaba.rocketmq.store.*;
-
-import java.util.HashMap;
-import java.util.Set;
-
-public abstract class AbstractPluginMessageStore implements MessageStore {
-    protected MessageStore next = null;
-    protected MessageStorePluginContext context;
-
-    public AbstractPluginMessageStore(MessageStorePluginContext context, MessageStore next) {
-        this.next = next;
-        this.context = context;
-    }
-
-    @Override
-    public long getEarliestMessageTime() {
-        return next.getEarliestMessageTime();
-    }
-
-    @Override
-    public long lockTimeMills() {
-        return next.lockTimeMills();
-    }
-
-    @Override
-    public boolean isOSPageCacheBusy() {
-        return next.isOSPageCacheBusy();
-    }
-
-    @Override
-    public boolean isTransientStorePoolDeficient() {
-        return next.isTransientStorePoolDeficient();
-    }
-
-    @Override
-    public boolean load() {
-        return next.load();
-    }
-
-    @Override
-    public void start() throws Exception {
-        next.start();
-    }
-
-    @Override
-    public void shutdown() {
-        next.shutdown();
-    }
-
-    @Override
-    public void destroy() {
-        next.destroy();
-    }
-
-    @Override
-    public PutMessageResult putMessage(MessageExtBrokerInner msg) {
-        return next.putMessage(msg);
-    }
-
-    @Override
-    public GetMessageResult getMessage(String group, String topic, int queueId, long offset,
-                                       int maxMsgNums, SubscriptionData subscriptionData) {
-        return next.getMessage(group, topic, queueId, offset, maxMsgNums, subscriptionData);
-    }
-
-    @Override
-    public long getMaxOffsetInQuque(String topic, int queueId) {
-        return next.getMaxOffsetInQuque(topic, queueId);
-    }
-
-    @Override
-    public long getMinOffsetInQuque(String topic, int queueId) {
-        return next.getMinOffsetInQuque(topic, queueId);
-    }
-
-    @Override
-    public long getCommitLogOffsetInQueue(String topic, int queueId, long cqOffset) {
-        return next.getCommitLogOffsetInQueue(topic, queueId, cqOffset);
-    }
-
-    @Override
-    public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) {
-        return next.getOffsetInQueueByTime(topic, queueId, timestamp);
-    }
-
-    @Override
-    public MessageExt lookMessageByOffset(long commitLogOffset) {
-        return next.lookMessageByOffset(commitLogOffset);
-    }
-
-    @Override
-    public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset) {
-        return next.selectOneMessageByOffset(commitLogOffset);
-    }
-
-    @Override
-    public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset, int msgSize) {
-        return next.selectOneMessageByOffset(commitLogOffset, msgSize);
-    }
-
-    @Override
-    public String getRunningDataInfo() {
-        return next.getRunningDataInfo();
-    }
-
-    @Override
-    public HashMap<String, String> getRuntimeInfo() {
-        return next.getRuntimeInfo();
-    }
-
-    @Override
-    public long getMaxPhyOffset() {
-        return next.getMaxPhyOffset();
-    }
-
-    @Override
-    public long getMinPhyOffset() {
-        return next.getMinPhyOffset();
-    }
-
-    @Override
-    public long getEarliestMessageTime(String topic, int queueId) {
-        return next.getEarliestMessageTime(topic, queueId);
-    }
-
-    @Override
-    public long getMessageStoreTimeStamp(String topic, int queueId, long offset) {
-        return next.getMessageStoreTimeStamp(topic, queueId, offset);
-    }
-
-    @Override
-    public long getMessageTotalInQueue(String topic, int queueId) {
-        return next.getMessageTotalInQueue(topic, queueId);
-    }
-
-    @Override
-    public SelectMappedBufferResult getCommitLogData(long offset) {
-        return next.getCommitLogData(offset);
-    }
-
-    @Override
-    public boolean appendToCommitLog(long startOffset, byte[] data) {
-        return next.appendToCommitLog(startOffset, data);
-    }
-
-    @Override
-    public void excuteDeleteFilesManualy() {
-        next.excuteDeleteFilesManualy();
-    }
-
-    @Override
-    public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin,
-                                           long end) {
-        return next.queryMessage(topic, key, maxNum, begin, end);
-    }
-
-    @Override
-    public void updateHaMasterAddress(String newAddr) {
-        next.updateHaMasterAddress(newAddr);
-    }
-
-    @Override
-    public long slaveFallBehindMuch() {
-        return next.slaveFallBehindMuch();
-    }
-
-    @Override
-    public long now() {
-        return next.now();
-    }
-
-    @Override
-    public int cleanUnusedTopic(Set<String> topics) {
-        return next.cleanUnusedTopic(topics);
-    }
-
-    @Override
-    public void cleanExpiredConsumerQueue() {
-        next.cleanExpiredConsumerQueue();
-    }
-
-    @Override
-    public boolean checkInDiskByConsumeOffset(String topic, int queueId, long consumeOffset) {
-        return next.checkInDiskByConsumeOffset(topic, queueId, consumeOffset);
-    }
-
-    @Override
-    public long dispatchBehindBytes() {
-        return next.dispatchBehindBytes();
-    }
-
-    @Override
-    public long flush() {
-        return next.flush();
-    }
-
-    @Override
-    public boolean resetWriteOffset(long phyOffset) {
-        return next.resetWriteOffset(phyOffset);
-    }
-
-    @Override
-    public long getConfirmOffset() {
-        return next.getConfirmOffset();
-    }
-
-    @Override
-    public void setConfirmOffset(long phyOffset) {
-        next.setConfirmOffset(phyOffset);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStoreFactory.java b/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStoreFactory.java
deleted file mode 100644
index 84f5be7..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStoreFactory.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-/**
- *
- */
-package com.alibaba.rocketmq.broker.plugin;
-
-import com.alibaba.rocketmq.store.MessageStore;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-
-public final class MessageStoreFactory {
-    public final static MessageStore build(MessageStorePluginContext context, MessageStore messageStore)
-            throws IOException {
-        String plugin = context.getBrokerConfig().getMessageStorePlugIn();
-        if (plugin != null && plugin.trim().length() != 0) {
-            String[] pluginClasses = plugin.split(",");
-            for (int i = pluginClasses.length - 1; i >= 0; --i) {
-                String pluginClass = pluginClasses[i];
-                try {
-                    @SuppressWarnings("unchecked")
-                    Class<AbstractPluginMessageStore> clazz = (Class<AbstractPluginMessageStore>) Class.forName(pluginClass);
-                    Constructor<AbstractPluginMessageStore> construct = clazz.getConstructor(MessageStorePluginContext.class, MessageStore.class);
-                    AbstractPluginMessageStore pluginMessageStore = (AbstractPluginMessageStore) construct.newInstance(context, messageStore);
-                    messageStore = pluginMessageStore;
-                } catch (Throwable e) {
-                    throw new RuntimeException(String.format(
-                            "Initialize plugin's class %s not found!", pluginClass), e);
-                }
-            }
-        }
-        return messageStore;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStorePluginContext.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStorePluginContext.java b/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStorePluginContext.java
deleted file mode 100644
index 15e8b07..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/plugin/MessageStorePluginContext.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-/**
- *
- */
-package com.alibaba.rocketmq.broker.plugin;
-
-import com.alibaba.rocketmq.common.BrokerConfig;
-import com.alibaba.rocketmq.store.MessageArrivingListener;
-import com.alibaba.rocketmq.store.config.MessageStoreConfig;
-import com.alibaba.rocketmq.store.stats.BrokerStatsManager;
-
-public class MessageStorePluginContext {
-    private MessageStoreConfig messageStoreConfig;
-    private BrokerStatsManager brokerStatsManager;
-    private MessageArrivingListener messageArrivingListener;
-    private BrokerConfig brokerConfig;
-
-    public MessageStorePluginContext(MessageStoreConfig messageStoreConfig,
-                                     BrokerStatsManager brokerStatsManager, MessageArrivingListener messageArrivingListener,
-                                     BrokerConfig brokerConfig) {
-        super();
-        this.messageStoreConfig = messageStoreConfig;
-        this.brokerStatsManager = brokerStatsManager;
-        this.messageArrivingListener = messageArrivingListener;
-        this.brokerConfig = brokerConfig;
-    }
-
-    public MessageStoreConfig getMessageStoreConfig() {
-        return messageStoreConfig;
-    }
-
-    public BrokerStatsManager getBrokerStatsManager() {
-        return brokerStatsManager;
-    }
-
-    public MessageArrivingListener getMessageArrivingListener() {
-        return messageArrivingListener;
-    }
-
-    public BrokerConfig getBrokerConfig() {
-        return brokerConfig;
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/processor/AbstractSendMessageProcessor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/com/alibaba/rocketmq/broker/processor/AbstractSendMessageProcessor.java
deleted file mode 100644
index 95db52d..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.broker.processor;
-
-import com.alibaba.rocketmq.broker.BrokerController;
-import com.alibaba.rocketmq.broker.mqtrace.SendMessageContext;
-import com.alibaba.rocketmq.broker.mqtrace.SendMessageHook;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.TopicConfig;
-import com.alibaba.rocketmq.common.TopicFilterType;
-import com.alibaba.rocketmq.common.constant.DBMsgConstants;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.common.constant.PermName;
-import com.alibaba.rocketmq.common.help.FAQUrl;
-import com.alibaba.rocketmq.common.message.MessageAccessor;
-import com.alibaba.rocketmq.common.message.MessageConst;
-import com.alibaba.rocketmq.common.message.MessageDecoder;
-import com.alibaba.rocketmq.common.protocol.RequestCode;
-import com.alibaba.rocketmq.common.protocol.ResponseCode;
-import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeader;
-import com.alibaba.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
-import com.alibaba.rocketmq.common.protocol.header.SendMessageResponseHeader;
-import com.alibaba.rocketmq.common.sysflag.MessageSysFlag;
-import com.alibaba.rocketmq.common.sysflag.TopicSysFlag;
-import com.alibaba.rocketmq.common.utils.ChannelUtil;
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-import com.alibaba.rocketmq.remoting.exception.RemotingCommandException;
-import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor;
-import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
-import com.alibaba.rocketmq.store.MessageExtBrokerInner;
-import io.netty.channel.ChannelHandlerContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-
-/**
- * @author shijia.wxr
- */
-public abstract class AbstractSendMessageProcessor implements NettyRequestProcessor {
-    protected static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-
-    protected final static int DLQ_NUMS_PER_GROUP = 1;
-    protected final BrokerController brokerController;
-    protected final Random random = new Random(System.currentTimeMillis());
-    protected final SocketAddress storeHost;
-    private List<SendMessageHook> sendMessageHookList;
-
-
-    public AbstractSendMessageProcessor(final BrokerController brokerController) {
-        this.brokerController = brokerController;
-        this.storeHost =
-                new InetSocketAddress(brokerController.getBrokerConfig().getBrokerIP1(), brokerController
-                        .getNettyServerConfig().getListenPort());
-    }
-
-    protected SendMessageContext buildMsgContext(ChannelHandlerContext ctx,
-                                                 SendMessageRequestHeader requestHeader) {
-        if (!this.hasSendMessageHook()) {
-            return null;
-        }
-        SendMessageContext mqtraceContext;
-        mqtraceContext = new SendMessageContext();
-        mqtraceContext.setProducerGroup(requestHeader.getProducerGroup());
-        mqtraceContext.setTopic(requestHeader.getTopic());
-        mqtraceContext.setMsgProps(requestHeader.getProperties());
-        mqtraceContext.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
-        mqtraceContext.setBrokerAddr(this.brokerController.getBrokerAddr());
-        mqtraceContext.setBrokerRegionId(this.brokerController.getBrokerConfig().getRegionId());
-        mqtraceContext.setBornTimeStamp(requestHeader.getBornTimestamp());
-
-        Map<String, String> properties = MessageDecoder.string2messageProperties(requestHeader.getProperties());
-        String uniqueKey = properties.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
-        properties.put(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
-        properties.put(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
-        requestHeader.setProperties(MessageDecoder.messageProperties2String(properties));
-
-
-        if (uniqueKey == null) {
-            uniqueKey = "";
-        }
-        mqtraceContext.setMsgUniqueKey(uniqueKey);
-        return mqtraceContext;
-    }
-
-    public boolean hasSendMessageHook() {
-        return sendMessageHookList != null && !this.sendMessageHookList.isEmpty();
-    }
-
-    protected MessageExtBrokerInner buildInnerMsg(final ChannelHandlerContext ctx,
-                                                  final SendMessageRequestHeader requestHeader, final byte[] body, TopicConfig topicConfig) {
-        int queueIdInt = requestHeader.getQueueId();
-        if (queueIdInt < 0) {
-            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
-        }
-        int sysFlag = requestHeader.getSysFlag();
-
-        if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
-            sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
-        }
-
-        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
-        msgInner.setTopic(requestHeader.getTopic());
-        msgInner.setBody(body);
-        msgInner.setFlag(requestHeader.getFlag());
-        MessageAccessor.setProperties(msgInner,
-                MessageDecoder.string2messageProperties(requestHeader.getProperties()));
-        msgInner.setPropertiesString(requestHeader.getProperties());
-        msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(),
-                msgInner.getTags()));
-
-        msgInner.setQueueId(queueIdInt);
-        msgInner.setSysFlag(sysFlag);
-        msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
-        msgInner.setBornHost(ctx.channel().remoteAddress());
-        msgInner.setStoreHost(this.getStoreHost());
-        msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader
-                .getReconsumeTimes());
-        return msgInner;
-    }
-
-    public SocketAddress getStoreHost() {
-        return storeHost;
-    }
-
-    protected RemotingCommand msgContentCheck(final ChannelHandlerContext ctx,
-                                              final SendMessageRequestHeader requestHeader, RemotingCommand request,
-                                              final RemotingCommand response) {
-        if (requestHeader.getTopic().length() > Byte.MAX_VALUE) {
-            log.warn("putMessage message topic length too long " + requestHeader.getTopic().length());
-            response.setCode(ResponseCode.MESSAGE_ILLEGAL);
-            return response;
-        }
-        if (requestHeader.getProperties() != null && requestHeader.getProperties().length() > Short.MAX_VALUE) {
-            log.warn("putMessage message properties length too long "
-                    + requestHeader.getProperties().length());
-            response.setCode(ResponseCode.MESSAGE_ILLEGAL);
-            return response;
-        }
-        if (request.getBody().length > DBMsgConstants.MAX_BODY_SIZE) {
-            log.warn(" topic {}  msg body size {}  from {}", requestHeader.getTopic(),
-                    request.getBody().length, ChannelUtil.getRemoteIp(ctx.channel()));
-            response.setRemark("msg body must be less 64KB");
-            response.setCode(ResponseCode.MESSAGE_ILLEGAL);
-            return response;
-        }
-        return response;
-    }
-
-    protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
-                                       final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
-        if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
-                && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
-            response.setCode(ResponseCode.NO_PERMISSION);
-            response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
-                    + "] sending message is forbidden");
-            return response;
-        }
-        if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {
-            String errorMsg =
-                    "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
-            log.warn(errorMsg);
-            response.setCode(ResponseCode.SYSTEM_ERROR);
-            response.setRemark(errorMsg);
-            return response;
-        }
-
-        TopicConfig topicConfig =
-                this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
-        if (null == topicConfig) {
-            int topicSysFlag = 0;
-            if (requestHeader.isUnitMode()) {
-                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
-                    topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
-                } else {
-                    topicSysFlag = TopicSysFlag.buildSysFlag(true, false);
-                }
-            }
-
-            log.warn("the topic " + requestHeader.getTopic() + " not exist, producer: "
-                    + ctx.channel().remoteAddress());
-            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(//
-                    requestHeader.getTopic(), //
-                    requestHeader.getDefaultTopic(), //
-                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()), //
-                    requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
-
-            if (null == topicConfig) {
-                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
-                    topicConfig =
-                            this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
-                                    requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,
-                                    topicSysFlag);
-                }
-            }
-
-            if (null == topicConfig) {
-                response.setCode(ResponseCode.TOPIC_NOT_EXIST);
-                response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
-                        + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
-                return response;
-            }
-        }
-
-        int queueIdInt = requestHeader.getQueueId();
-        int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
-        if (queueIdInt >= idValid) {
-            String errorInfo = String.format("request queueId[%d] is illagal, %s Producer: %s",
-                    queueIdInt,
-                    topicConfig.toString(),
-                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
-
-            log.warn(errorInfo);
-            response.setCode(ResponseCode.SYSTEM_ERROR);
-            response.setRemark(errorInfo);
-
-            return response;
-        }
-        return response;
-    }
-
-    public void registerSendMessageHook(List<SendMessageHook> sendMessageHookList) {
-        this.sendMessageHookList = sendMessageHookList;
-    }
-
-    protected void doResponse(ChannelHandlerContext ctx, RemotingCommand request,
-                              final RemotingCommand response) {
-        if (!request.isOnewayRPC()) {
-            try {
-                ctx.writeAndFlush(response);
-            } catch (Throwable e) {
-                log.error("SendMessageProcessor process request over, but response failed", e);
-                log.error(request.toString());
-                log.error(response.toString());
-            }
-        }
-    }
-
-    public void executeSendMessageHookBefore(final ChannelHandlerContext ctx, final RemotingCommand request,
-                                             SendMessageContext context) {
-        if (hasSendMessageHook()) {
-            for (SendMessageHook hook : this.sendMessageHookList) {
-                try {
-                    final SendMessageRequestHeader requestHeader = parseRequestHeader(request);
-
-                    if (null != requestHeader) {
-                        context.setProducerGroup(requestHeader.getProducerGroup());
-                        context.setTopic(requestHeader.getTopic());
-                        context.setBodyLength(request.getBody().length);
-                        context.setMsgProps(requestHeader.getProperties());
-                        context.setBornHost(RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
-                        context.setBrokerAddr(this.brokerController.getBrokerAddr());
-                        context.setQueueId(requestHeader.getQueueId());
-                    }
-
-                    hook.sendMessageBefore(context);
-                    requestHeader.setProperties(context.getMsgProps());
-                } catch (Throwable e) {
-                }
-            }
-        }
-    }
-
-    protected SendMessageRequestHeader parseRequestHeader(RemotingCommand request)
-            throws RemotingCommandException {
-
-        SendMessageRequestHeaderV2 requestHeaderV2 = null;
-        SendMessageRequestHeader requestHeader = null;
-        switch (request.getCode()) {
-            case RequestCode.SEND_MESSAGE_V2:
-                requestHeaderV2 =
-                        (SendMessageRequestHeaderV2) request
-                                .decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
-            case RequestCode.SEND_MESSAGE:
-                if (null == requestHeaderV2) {
-                    requestHeader =
-                            (SendMessageRequestHeader) request
-                                    .decodeCommandCustomHeader(SendMessageRequestHeader.class);
-                } else {
-                    requestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2);
-                }
-            default:
-                break;
-        }
-        return requestHeader;
-    }
-
-    public void executeSendMessageHookAfter(final RemotingCommand response, final SendMessageContext context) {
-        if (hasSendMessageHook()) {
-            for (SendMessageHook hook : this.sendMessageHookList) {
-                try {
-                    if (response != null) {
-                        final SendMessageResponseHeader responseHeader =
-                                (SendMessageResponseHeader) response.readCustomHeader();
-                        context.setMsgId(responseHeader.getMsgId());
-                        context.setQueueId(responseHeader.getQueueId());
-                        context.setQueueOffset(responseHeader.getQueueOffset());
-                        context.setCode(response.getCode());
-                        context.setErrorMsg(response.getRemark());
-                    }
-                    hook.sendMessageAfter(context);
-                } catch (Throwable e) {
-
-                }
-            }
-        }
-    }
-
-    @Override
-    public boolean rejectRequest() {
-        return false;
-    }
-}