You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 09:14:40 UTC

[49/99] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18 Rename package name from com.alibaba to org.apache

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/client/ProducerManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/com/alibaba/rocketmq/broker/client/ProducerManager.java
deleted file mode 100644
index 74e7ea7..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/client/ProducerManager.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.broker.client;
-
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-import com.alibaba.rocketmq.remoting.common.RemotingUtil;
-import io.netty.channel.Channel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-
-/**
- * @author shijia.wxr
- */
-public class ProducerManager {
-    private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-    private static final long LOCK_TIMEOUT_MILLIS = 3000;
-    private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
-    private final Lock groupChannelLock = new ReentrantLock();
-    private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable =
-            new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
-
-
-    public ProducerManager() {
-    }
-
-
-    public HashMap<String, HashMap<Channel, ClientChannelInfo>> getGroupChannelTable() {
-        HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> newGroupChannelTable =
-                new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
-        try {
-            if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
-                try {
-                    newGroupChannelTable.putAll(groupChannelTable);
-                } finally {
-                    groupChannelLock.unlock();
-                }
-            }
-        } catch (InterruptedException e) {
-            log.error("", e);
-        }
-        return newGroupChannelTable;
-    }
-
-
-    public void scanNotActiveChannel() {
-        try {
-            if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
-                try {
-                    for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
-                            .entrySet()) {
-                        final String group = entry.getKey();
-                        final HashMap<Channel, ClientChannelInfo> chlMap = entry.getValue();
-
-                        Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator();
-                        while (it.hasNext()) {
-                            Entry<Channel, ClientChannelInfo> item = it.next();
-                            // final Integer id = item.getKey();
-                            final ClientChannelInfo info = item.getValue();
-
-                            long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp();
-                            if (diff > CHANNEL_EXPIRED_TIMEOUT) {
-                                it.remove();
-                                log.warn(
-                                        "SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
-                                        RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
-                                RemotingUtil.closeChannel(info.getChannel());
-                            }
-                        }
-                    }
-                } finally {
-                    this.groupChannelLock.unlock();
-                }
-            } else {
-                log.warn("ProducerManager scanNotActiveChannel lock timeout");
-            }
-        } catch (InterruptedException e) {
-            log.error("", e);
-        }
-    }
-
-
-    public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
-        if (channel != null) {
-            try {
-                if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
-                    try {
-                        for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
-                                .entrySet()) {
-                            final String group = entry.getKey();
-                            final HashMap<Channel, ClientChannelInfo> clientChannelInfoTable =
-                                    entry.getValue();
-                            final ClientChannelInfo clientChannelInfo =
-                                    clientChannelInfoTable.remove(channel);
-                            if (clientChannelInfo != null) {
-                                log.info(
-                                        "NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",
-                                        clientChannelInfo.toString(), remoteAddr, group);
-                            }
-
-                        }
-                    } finally {
-                        this.groupChannelLock.unlock();
-                    }
-                } else {
-                    log.warn("ProducerManager doChannelCloseEvent lock timeout");
-                }
-            } catch (InterruptedException e) {
-                log.error("", e);
-            }
-        }
-    }
-
-
-    public void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {
-        try {
-            ClientChannelInfo clientChannelInfoFound = null;
-
-            if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
-                try {
-                    HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
-                    if (null == channelTable) {
-                        channelTable = new HashMap<Channel, ClientChannelInfo>();
-                        this.groupChannelTable.put(group, channelTable);
-                    }
-
-                    clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
-                    if (null == clientChannelInfoFound) {
-                        channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
-                        log.info("new producer connected, group: {} channel: {}", group,
-                                clientChannelInfo.toString());
-                    }
-                } finally {
-                    this.groupChannelLock.unlock();
-                }
-
-                if (clientChannelInfoFound != null) {
-                    clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
-                }
-            } else {
-                log.warn("ProducerManager registerProducer lock timeout");
-            }
-        } catch (InterruptedException e) {
-            log.error("", e);
-        }
-    }
-
-
-    public void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {
-        try {
-            if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
-                try {
-                    HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
-                    if (null != channelTable && !channelTable.isEmpty()) {
-                        ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
-                        if (old != null) {
-                            log.info("unregister a producer[{}] from groupChannelTable {}", group,
-                                    clientChannelInfo.toString());
-                        }
-
-                        if (channelTable.isEmpty()) {
-                            this.groupChannelTable.remove(group);
-                            log.info("unregister a producer group[{}] from groupChannelTable", group);
-                        }
-                    }
-                } finally {
-                    this.groupChannelLock.unlock();
-                }
-            } else {
-                log.warn("ProducerManager unregisterProducer lock timeout");
-            }
-        } catch (InterruptedException e) {
-            log.error("", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/client/net/Broker2Client.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/client/net/Broker2Client.java b/broker/src/main/java/com/alibaba/rocketmq/broker/client/net/Broker2Client.java
deleted file mode 100644
index a38c9cb..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/client/net/Broker2Client.java
+++ /dev/null
@@ -1,317 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.client.net;
-
-import com.alibaba.rocketmq.broker.BrokerController;
-import com.alibaba.rocketmq.broker.client.ClientChannelInfo;
-import com.alibaba.rocketmq.broker.client.ConsumerGroupInfo;
-import com.alibaba.rocketmq.broker.pagecache.OneMessageTransfer;
-import com.alibaba.rocketmq.common.MQVersion;
-import com.alibaba.rocketmq.common.TopicConfig;
-import com.alibaba.rocketmq.common.UtilAll;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.common.message.MessageQueueForC;
-import com.alibaba.rocketmq.common.protocol.RequestCode;
-import com.alibaba.rocketmq.common.protocol.ResponseCode;
-import com.alibaba.rocketmq.common.protocol.body.GetConsumerStatusBody;
-import com.alibaba.rocketmq.common.protocol.body.ResetOffsetBody;
-import com.alibaba.rocketmq.common.protocol.body.ResetOffsetBodyForC;
-import com.alibaba.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
-import com.alibaba.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader;
-import com.alibaba.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
-import com.alibaba.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
-import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
-import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
-import com.alibaba.rocketmq.store.SelectMappedBufferResult;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.FileRegion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-/**
- * @author shijia.wxr
- */
-public class Broker2Client {
-    private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-    private final BrokerController brokerController;
-
-    public Broker2Client(BrokerController brokerController) {
-        this.brokerController = brokerController;
-    }
-
-    public void checkProducerTransactionState(
-            final Channel channel,
-            final CheckTransactionStateRequestHeader requestHeader,
-            final SelectMappedBufferResult selectMappedBufferResult) {
-        RemotingCommand request =
-                RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
-        request.markOnewayRPC();
-
-        try {
-            FileRegion fileRegion =
-                    new OneMessageTransfer(request.encodeHeader(selectMappedBufferResult.getSize()),
-                            selectMappedBufferResult);
-            channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
-                @Override
-                public void operationComplete(ChannelFuture future) throws Exception {
-                    selectMappedBufferResult.release();
-                    if (!future.isSuccess()) {
-                        log.error("invokeProducer failed,", future.cause());
-                    }
-                }
-            });
-        } catch (Throwable e) {
-            log.error("invokeProducer exception", e);
-            selectMappedBufferResult.release();
-        }
-    }
-
-    public RemotingCommand callClient(final Channel channel,
-                                      final RemotingCommand request
-    ) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
-        return this.brokerController.getRemotingServer().invokeSync(channel, request, 10000);
-    }
-
-    public void notifyConsumerIdsChanged(
-            final Channel channel,
-            final String consumerGroup) {
-        if (null == consumerGroup) {
-            log.error("notifyConsumerIdsChanged consumerGroup is null");
-            return;
-        }
-
-        NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
-        requestHeader.setConsumerGroup(consumerGroup);
-        RemotingCommand request =
-                RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
-
-        try {
-            this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
-        } catch (Exception e) {
-            log.error("notifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage());
-        }
-    }
-
-    public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce) {
-        return resetOffset(topic, group, timeStamp, isForce, false);
-    }
-
-    public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce,
-                                       boolean isC) {
-        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
-
-        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
-        if (null == topicConfig) {
-            log.error("[reset-offset] reset offset failed, no topic in this broker. topic={}", topic);
-            response.setCode(ResponseCode.SYSTEM_ERROR);
-            response.setRemark("[reset-offset] reset offset failed, no topic in this broker. topic=" + topic);
-            return response;
-        }
-
-        Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
-
-        for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
-            MessageQueue mq = new MessageQueue();
-            mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
-            mq.setTopic(topic);
-            mq.setQueueId(i);
-
-            long consumerOffset =
-                    this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i);
-            if (-1 == consumerOffset) {
-                response.setCode(ResponseCode.SYSTEM_ERROR);
-                response.setRemark(String.format("THe consumer group <%s> not exist", group));
-                return response;
-            }
-
-            long timeStampOffset;
-            if (timeStamp == -1) {
-
-                timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i);
-            } else {
-                timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
-            }
-
-            if (timeStampOffset < 0) {
-                log.warn("reset offset is invalid. topic={}, queueId={}, timeStampOffset={}", topic, i, timeStampOffset);
-                timeStampOffset = 0;
-            }
-
-            if (isForce || timeStampOffset < consumerOffset) {
-                offsetTable.put(mq, timeStampOffset);
-            } else {
-                offsetTable.put(mq, consumerOffset);
-            }
-        }
-
-        ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
-        requestHeader.setTopic(topic);
-        requestHeader.setGroup(group);
-        requestHeader.setTimestamp(timeStamp);
-        RemotingCommand request =
-                RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader);
-        if (isC) {
-            // c++ language
-            ResetOffsetBodyForC body = new ResetOffsetBodyForC();
-            List<MessageQueueForC> offsetList = convertOffsetTable2OffsetList(offsetTable);
-            body.setOffsetTable(offsetList);
-            request.setBody(body.encode());
-        } else {
-            // other language
-            ResetOffsetBody body = new ResetOffsetBody();
-            body.setOffsetTable(offsetTable);
-            request.setBody(body.encode());
-        }
-
-        ConsumerGroupInfo consumerGroupInfo =
-                this.brokerController.getConsumerManager().getConsumerGroupInfo(group);
-
-        if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) {
-            ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable =
-                    consumerGroupInfo.getChannelInfoTable();
-            for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {
-                int version = entry.getValue().getVersion();
-                if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
-                    try {
-                        this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);
-                        log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}",
-                                new Object[]{topic, group, entry.getValue().getClientId()});
-                    } catch (Exception e) {
-                        log.error("[reset-offset] reset offset exception. topic={}, group={}",
-                                new Object[]{topic, group}, e);
-                    }
-                } else {
-                    response.setCode(ResponseCode.SYSTEM_ERROR);
-                    response.setRemark("the client does not support this feature. version="
-                            + MQVersion.getVersionDesc(version));
-                    log.warn("[reset-offset] the client does not support this feature. version={}",
-                            RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
-                    return response;
-                }
-            }
-        } else {
-            String errorInfo =
-                    String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d",
-                            requestHeader.getGroup(),
-                            requestHeader.getTopic(),
-                            requestHeader.getTimestamp());
-            log.error(errorInfo);
-            response.setCode(ResponseCode.CONSUMER_NOT_ONLINE);
-            response.setRemark(errorInfo);
-            return response;
-        }
-        response.setCode(ResponseCode.SUCCESS);
-        ResetOffsetBody resBody = new ResetOffsetBody();
-        resBody.setOffsetTable(offsetTable);
-        response.setBody(resBody.encode());
-        return response;
-    }
-
-    private List<MessageQueueForC> convertOffsetTable2OffsetList(Map<MessageQueue, Long> table) {
-        List<MessageQueueForC> list = new ArrayList<MessageQueueForC>();
-        for (Entry<MessageQueue, Long> entry : table.entrySet()) {
-            MessageQueue mq = entry.getKey();
-            MessageQueueForC tmp =
-                    new MessageQueueForC(mq.getTopic(), mq.getBrokerName(), mq.getQueueId(), entry.getValue());
-            list.add(tmp);
-        }
-        return list;
-    }
-
-    public RemotingCommand getConsumeStatus(String topic, String group, String originClientId) {
-        final RemotingCommand result = RemotingCommand.createResponseCommand(null);
-
-        GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader();
-        requestHeader.setTopic(topic);
-        requestHeader.setGroup(group);
-        RemotingCommand request =
-                RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT,
-                        requestHeader);
-
-        Map<String, Map<MessageQueue, Long>> consumerStatusTable =
-                new HashMap<String, Map<MessageQueue, Long>>();
-        ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable =
-                this.brokerController.getConsumerManager().getConsumerGroupInfo(group).getChannelInfoTable();
-        if (null == channelInfoTable || channelInfoTable.isEmpty()) {
-            result.setCode(ResponseCode.SYSTEM_ERROR);
-            result.setRemark(String.format("No Any Consumer online in the consumer group: [%s]", group));
-            return result;
-        }
-
-        for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {
-            int version = entry.getValue().getVersion();
-            String clientId = entry.getValue().getClientId();
-            if (version < MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
-                result.setCode(ResponseCode.SYSTEM_ERROR);
-                result.setRemark("the client does not support this feature. version="
-                        + MQVersion.getVersionDesc(version));
-                log.warn("[get-consumer-status] the client does not support this feature. version={}",
-                        RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
-                return result;
-            } else if (UtilAll.isBlank(originClientId) || originClientId.equals(clientId)) {
-                try {
-                    RemotingCommand response =
-                            this.brokerController.getRemotingServer().invokeSync(entry.getKey(), request, 5000);
-                    assert response != null;
-                    switch (response.getCode()) {
-                        case ResponseCode.SUCCESS: {
-                            if (response.getBody() != null) {
-                                GetConsumerStatusBody body =
-                                        GetConsumerStatusBody.decode(response.getBody(),
-                                                GetConsumerStatusBody.class);
-
-                                consumerStatusTable.put(clientId, body.getMessageQueueTable());
-                                log.info(
-                                        "[get-consumer-status] get consumer status success. topic={}, group={}, channelRemoteAddr={}",
-                                        new Object[]{topic, group, clientId});
-                            }
-                        }
-                        default:
-                            break;
-                    }
-                } catch (Exception e) {
-                    log.error(
-                            "[get-consumer-status] get consumer status exception. topic={}, group={}, offset={}",
-                            new Object[]{topic, group}, e);
-                }
-
-                if (!UtilAll.isBlank(originClientId) && originClientId.equals(clientId)) {
-                    break;
-                }
-            }
-        }
-
-        result.setCode(ResponseCode.SUCCESS);
-        GetConsumerStatusBody resBody = new GetConsumerStatusBody();
-        resBody.setConsumerTable(consumerStatusTable);
-        result.setBody(resBody.encode());
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/client/rebalance/RebalanceLockManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/client/rebalance/RebalanceLockManager.java b/broker/src/main/java/com/alibaba/rocketmq/broker/client/rebalance/RebalanceLockManager.java
deleted file mode 100644
index 84be628..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/client/rebalance/RebalanceLockManager.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.client.rebalance;
-
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-
-/**
- * @author shijia.wxr
- */
-public class RebalanceLockManager {
-    private static final Logger log = LoggerFactory.getLogger(LoggerName.REBALANCE_LOCK_LOGGER_NAME);
-    private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty(
-            "rocketmq.broker.rebalance.lockMaxLiveTime", "60000"));
-    private final Lock lock = new ReentrantLock();
-    private final ConcurrentHashMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =
-            new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024);
-
-    public boolean tryLock(final String group, final MessageQueue mq, final String clientId) {
-
-        if (!this.isLocked(group, mq, clientId)) {
-            try {
-                this.lock.lockInterruptibly();
-                try {
-                    ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
-                    if (null == groupValue) {
-                        groupValue = new ConcurrentHashMap<MessageQueue, LockEntry>(32);
-                        this.mqLockTable.put(group, groupValue);
-                    }
-
-                    LockEntry lockEntry = groupValue.get(mq);
-                    if (null == lockEntry) {
-                        lockEntry = new LockEntry();
-                        lockEntry.setClientId(clientId);
-                        groupValue.put(mq, lockEntry);
-                        log.info("tryLock, message queue not locked, I got it. Group: {} NewClientId: {} {}", //
-                                group, //
-                                clientId, //
-                                mq);
-                    }
-
-                    if (lockEntry.isLocked(clientId)) {
-                        lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
-                        return true;
-                    }
-
-                    String oldClientId = lockEntry.getClientId();
-
-
-                    if (lockEntry.isExpired()) {
-                        lockEntry.setClientId(clientId);
-                        lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
-                        log.warn(
-                                "tryLock, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", //
-                                group, //
-                                oldClientId, //
-                                clientId, //
-                                mq);
-                        return true;
-                    }
-
-
-                    log.warn(
-                            "tryLock, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", //
-                            group, //
-                            oldClientId, //
-                            clientId, //
-                            mq);
-                    return false;
-                } finally {
-                    this.lock.unlock();
-                }
-            } catch (InterruptedException e) {
-                log.error("putMessage exception", e);
-            }
-        } else {
-
-        }
-
-        return true;
-    }
-
-    private boolean isLocked(final String group, final MessageQueue mq, final String clientId) {
-        ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
-        if (groupValue != null) {
-            LockEntry lockEntry = groupValue.get(mq);
-            if (lockEntry != null) {
-                boolean locked = lockEntry.isLocked(clientId);
-                if (locked) {
-                    lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
-                }
-
-                return locked;
-            }
-        }
-
-        return false;
-    }
-
-    public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs,
-                                          final String clientId) {
-        Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size());
-        Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size());
-
-
-        for (MessageQueue mq : mqs) {
-            if (this.isLocked(group, mq, clientId)) {
-                lockedMqs.add(mq);
-            } else {
-                notLockedMqs.add(mq);
-            }
-        }
-
-        if (!notLockedMqs.isEmpty()) {
-            try {
-                this.lock.lockInterruptibly();
-                try {
-                    ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
-                    if (null == groupValue) {
-                        groupValue = new ConcurrentHashMap<MessageQueue, LockEntry>(32);
-                        this.mqLockTable.put(group, groupValue);
-                    }
-
-
-                    for (MessageQueue mq : notLockedMqs) {
-                        LockEntry lockEntry = groupValue.get(mq);
-                        if (null == lockEntry) {
-                            lockEntry = new LockEntry();
-                            lockEntry.setClientId(clientId);
-                            groupValue.put(mq, lockEntry);
-                            log.info(
-                                    "tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}", //
-                                    group, //
-                                    clientId, //
-                                    mq);
-                        }
-
-
-                        if (lockEntry.isLocked(clientId)) {
-                            lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
-                            lockedMqs.add(mq);
-                            continue;
-                        }
-
-                        String oldClientId = lockEntry.getClientId();
-
-
-                        if (lockEntry.isExpired()) {
-                            lockEntry.setClientId(clientId);
-                            lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
-                            log.warn(
-                                    "tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", //
-                                    group, //
-                                    oldClientId, //
-                                    clientId, //
-                                    mq);
-                            lockedMqs.add(mq);
-                            continue;
-                        }
-
-
-                        log.warn(
-                                "tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", //
-                                group, //
-                                oldClientId, //
-                                clientId, //
-                                mq);
-                    }
-                } finally {
-                    this.lock.unlock();
-                }
-            } catch (InterruptedException e) {
-                log.error("putMessage exception", e);
-            }
-        }
-
-        return lockedMqs;
-    }
-
-    public void unlockBatch(final String group, final Set<MessageQueue> mqs, final String clientId) {
-        try {
-            this.lock.lockInterruptibly();
-            try {
-                ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
-                if (null != groupValue) {
-                    for (MessageQueue mq : mqs) {
-                        LockEntry lockEntry = groupValue.get(mq);
-                        if (null != lockEntry) {
-                            if (lockEntry.getClientId().equals(clientId)) {
-                                groupValue.remove(mq);
-                                log.info("unlockBatch, Group: {} {} {}",
-                                        group,
-                                        mq,
-                                        clientId);
-                            } else {
-                                log.warn("unlockBatch, but mq locked by other client: {}, Group: {} {} {}",
-                                        lockEntry.getClientId(),
-                                        group,
-                                        mq,
-                                        clientId);
-                            }
-                        } else {
-                            log.warn("unlockBatch, but mq not locked, Group: {} {} {}",
-                                    group,
-                                    mq,
-                                    clientId);
-                        }
-                    }
-                } else {
-                    log.warn("unlockBatch, group not exist, Group: {} {}",
-                            group,
-                            clientId);
-                }
-            } finally {
-                this.lock.unlock();
-            }
-        } catch (InterruptedException e) {
-            log.error("putMessage exception", e);
-        }
-    }
-
-    static class LockEntry {
-        private String clientId;
-        private volatile long lastUpdateTimestamp = System.currentTimeMillis();
-
-
-        public String getClientId() {
-            return clientId;
-        }
-
-
-        public void setClientId(String clientId) {
-            this.clientId = clientId;
-        }
-
-
-        public long getLastUpdateTimestamp() {
-            return lastUpdateTimestamp;
-        }
-
-
-        public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
-            this.lastUpdateTimestamp = lastUpdateTimestamp;
-        }
-
-        public boolean isLocked(final String clientId) {
-            boolean eq = this.clientId.equals(clientId);
-            return eq && !this.isExpired();
-        }
-
-        public boolean isExpired() {
-            boolean expired =
-                    (System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;
-
-            return expired;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerManager.java b/broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerManager.java
deleted file mode 100644
index b2e7e82..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerManager.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.broker.filtersrv;
-
-import com.alibaba.rocketmq.broker.BrokerController;
-import com.alibaba.rocketmq.broker.BrokerStartup;
-import com.alibaba.rocketmq.common.ThreadFactoryImpl;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.remoting.common.RemotingUtil;
-import io.netty.channel.Channel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-
-public class FilterServerManager {
-
-    public static final long FILTER_SERVER_MAX_IDLE_TIME_MILLS = 30000;
-    private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-    private final ConcurrentHashMap<Channel, FilterServerInfo> filterServerTable =
-            new ConcurrentHashMap<Channel, FilterServerInfo>(16);
-    private final BrokerController brokerController;
-
-    private ScheduledExecutorService scheduledExecutorService = Executors
-            .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FilterServerManagerScheduledThread"));
-
-    public FilterServerManager(final BrokerController brokerController) {
-        this.brokerController = brokerController;
-    }
-
-    public void start() {
-
-        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    FilterServerManager.this.createFilterServer();
-                } catch (Exception e) {
-                    log.error("", e);
-                }
-            }
-        }, 1000 * 5, 1000 * 30, TimeUnit.MILLISECONDS);
-    }
-
-    public void createFilterServer() {
-        int more =
-                this.brokerController.getBrokerConfig().getFilterServerNums() - this.filterServerTable.size();
-        String cmd = this.buildStartCommand();
-        for (int i = 0; i < more; i++) {
-            FilterServerUtil.callShell(cmd, log);
-        }
-    }
-
-    private String buildStartCommand() {
-        String config = "";
-        if (BrokerStartup.configFile != null) {
-            config = String.format("-c %s", BrokerStartup.configFile);
-        }
-
-        if (this.brokerController.getBrokerConfig().getNamesrvAddr() != null) {
-            config += String.format(" -n %s", this.brokerController.getBrokerConfig().getNamesrvAddr());
-        }
-
-        if (RemotingUtil.isWindowsPlatform()) {
-            return String.format("start /b %s\\bin\\mqfiltersrv.exe %s",
-                    this.brokerController.getBrokerConfig().getRocketmqHome(),
-                    config);
-        } else {
-            return String.format("sh %s/bin/startfsrv.sh %s",
-                    this.brokerController.getBrokerConfig().getRocketmqHome(),
-                    config);
-        }
-    }
-
-    public void shutdown() {
-        this.scheduledExecutorService.shutdown();
-    }
-
-    public void registerFilterServer(final Channel channel, final String filterServerAddr) {
-        FilterServerInfo filterServerInfo = this.filterServerTable.get(channel);
-        if (filterServerInfo != null) {
-            filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis());
-        } else {
-            filterServerInfo = new FilterServerInfo();
-            filterServerInfo.setFilterServerAddr(filterServerAddr);
-            filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis());
-            this.filterServerTable.put(channel, filterServerInfo);
-            log.info("Receive a New Filter Server<{}>", filterServerAddr);
-        }
-    }
-
-    /**
-
-     */
-    public void scanNotActiveChannel() {
-
-        Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator();
-        while (it.hasNext()) {
-            Entry<Channel, FilterServerInfo> next = it.next();
-            long timestamp = next.getValue().getLastUpdateTimestamp();
-            Channel channel = next.getKey();
-            if ((System.currentTimeMillis() - timestamp) > FILTER_SERVER_MAX_IDLE_TIME_MILLS) {
-                log.info("The Filter Server<{}> expired, remove it", next.getKey());
-                it.remove();
-                RemotingUtil.closeChannel(channel);
-            }
-        }
-    }
-
-    public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
-        FilterServerInfo old = this.filterServerTable.remove(channel);
-        if (old != null) {
-            log.warn("The Filter Server<{}> connection<{}> closed, remove it", old.getFilterServerAddr(),
-                    remoteAddr);
-        }
-    }
-
-    public List<String> buildNewFilterServerList() {
-        List<String> addr = new ArrayList<String>();
-        Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator();
-        while (it.hasNext()) {
-            Entry<Channel, FilterServerInfo> next = it.next();
-            addr.add(next.getValue().getFilterServerAddr());
-        }
-        return addr;
-    }
-
-    static class FilterServerInfo {
-        private String filterServerAddr;
-        private long lastUpdateTimestamp;
-
-
-        public String getFilterServerAddr() {
-            return filterServerAddr;
-        }
-
-
-        public void setFilterServerAddr(String filterServerAddr) {
-            this.filterServerAddr = filterServerAddr;
-        }
-
-
-        public long getLastUpdateTimestamp() {
-            return lastUpdateTimestamp;
-        }
-
-
-        public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
-            this.lastUpdateTimestamp = lastUpdateTimestamp;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerUtil.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerUtil.java b/broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerUtil.java
deleted file mode 100644
index c5ace19..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerUtil.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package com.alibaba.rocketmq.broker.filtersrv;
-
-import org.slf4j.Logger;
-
-
-public class FilterServerUtil {
-    public static void callShell(final String shellString, final Logger log) {
-        Process process = null;
-        try {
-            String[] cmdArray = splitShellString(shellString);
-            process = Runtime.getRuntime().exec(cmdArray);
-            process.waitFor();
-            log.info("callShell: <{}> OK", shellString);
-        } catch (Throwable e) {
-            log.error("callShell: readLine IOException, " + shellString, e);
-        } finally {
-            if (null != process)
-                process.destroy();
-        }
-    }
-
-    private static String[] splitShellString(final String shellString) {
-        String[] split = shellString.split(" ");
-        return split;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFastFailure.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFastFailure.java
deleted file mode 100644
index 586bed0..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFastFailure.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.broker.latency;
-
-import com.alibaba.rocketmq.broker.BrokerController;
-import com.alibaba.rocketmq.common.ThreadFactoryImpl;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.remoting.netty.RequestTask;
-import com.alibaba.rocketmq.remoting.protocol.RemotingSysResponseCode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-
-/**
- * @author shijia.wxr
- */
-public class BrokerFastFailure {
-    private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
-            "BrokerFastFailureScheduledThread"));
-    private final BrokerController brokerController;
-
-    public BrokerFastFailure(final BrokerController brokerController) {
-        this.brokerController = brokerController;
-    }
-
-    public void start() {
-        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-            @Override
-            public void run() {
-                cleanExpiredRequest();
-            }
-        }, 1000, 10, TimeUnit.MILLISECONDS);
-    }
-
-    private void cleanExpiredRequest() {
-        while (this.brokerController.getMessageStore().isOSPageCacheBusy()) {
-            try {
-                if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
-                    final Runnable runnable = this.brokerController.getSendThreadPoolQueue().poll(0, TimeUnit.SECONDS);
-                    if (null == runnable) {
-                        break;
-                    }
-
-                    final RequestTask rt = castRunnable(runnable);
-                    rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis() - rt.getCreateTimestamp(), this.brokerController.getSendThreadPoolQueue().size()));
-                } else {
-                    break;
-                }
-            } catch (Throwable e) {
-            }
-        }
-
-        while (true) {
-            try {
-                if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
-                    final Runnable runnable = this.brokerController.getSendThreadPoolQueue().peek();
-                    if (null == runnable) {
-                        break;
-                    }
-                    final RequestTask rt = castRunnable(runnable);
-                    if (rt.isStopRun()) {
-                        break;
-                    }
-
-                    final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
-                    if (behind >= this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue()) {
-                        if (this.brokerController.getSendThreadPoolQueue().remove(runnable)) {
-                            rt.setStopRun(true);
-                            rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, this.brokerController.getSendThreadPoolQueue().size()));
-                        }
-                    } else {
-                        break;
-                    }
-                } else {
-                    break;
-                }
-            } catch (Throwable e) {
-            }
-        }
-    }
-
-    public static RequestTask castRunnable(final Runnable runnable) {
-        try {
-            FutureTaskExt object = (FutureTaskExt) runnable;
-            return (RequestTask) object.getRunnable();
-        } catch (Throwable e) {
-            log.error(String.format("castRunnable exception, %s", runnable.getClass().getName()), e);
-        }
-
-        return null;
-    }
-
-    public void shutdown() {
-        this.scheduledExecutorService.shutdown();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java b/broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
deleted file mode 100644
index f81d48a..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.broker.latency;
-
-import java.util.concurrent.*;
-
-/**
- * @author shijia.wxr
- */
-public class BrokerFixedThreadPoolExecutor extends ThreadPoolExecutor {
-    public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue) {
-        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
-    }
-
-    public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory) {
-        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
-    }
-
-    public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final RejectedExecutionHandler handler) {
-        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
-    }
-
-    public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory, final RejectedExecutionHandler handler) {
-        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
-    }
-
-    @Override
-    protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
-        return new FutureTaskExt<T>(runnable, value);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/latency/FutureTaskExt.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/latency/FutureTaskExt.java b/broker/src/main/java/com/alibaba/rocketmq/broker/latency/FutureTaskExt.java
deleted file mode 100644
index 6ec7bb5..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/latency/FutureTaskExt.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.broker.latency;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.FutureTask;
-
-/**
- * @author shijia.wxr
- */
-public class FutureTaskExt<V> extends FutureTask<V> {
-    private final Runnable runnable;
-
-    public FutureTaskExt(final Callable<V> callable) {
-        super(callable);
-        this.runnable = null;
-    }
-
-    public FutureTaskExt(final Runnable runnable, final V result) {
-        super(runnable, result);
-        this.runnable = runnable;
-    }
-
-    public Runnable getRunnable() {
-        return runnable;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/ManyPullRequest.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/ManyPullRequest.java b/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/ManyPullRequest.java
deleted file mode 100644
index bc9c58d..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/ManyPullRequest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.broker.longpolling;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- */
-public class ManyPullRequest {
-    private final ArrayList<PullRequest> pullRequestList = new ArrayList<PullRequest>();
-
-
-    public synchronized void addPullRequest(final PullRequest pullRequest) {
-        this.pullRequestList.add(pullRequest);
-    }
-
-
-    public synchronized void addPullRequest(final List<PullRequest> many) {
-        this.pullRequestList.addAll(many);
-    }
-
-
-    public synchronized List<PullRequest> cloneListAndClear() {
-        if (!this.pullRequestList.isEmpty()) {
-            List<PullRequest> result = (ArrayList<PullRequest>) this.pullRequestList.clone();
-            this.pullRequestList.clear();
-            return result;
-        }
-
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java b/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
deleted file mode 100644
index 15ee050..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package com.alibaba.rocketmq.broker.longpolling;
-
-import com.alibaba.rocketmq.store.MessageArrivingListener;
-
-
-public class NotifyMessageArrivingListener implements MessageArrivingListener {
-    private final PullRequestHoldService pullRequestHoldService;
-
-
-    public NotifyMessageArrivingListener(final PullRequestHoldService pullRequestHoldService) {
-        this.pullRequestHoldService = pullRequestHoldService;
-    }
-
-
-    @Override
-    public void arriving(String topic, int queueId, long logicOffset, long tagsCode) {
-        this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequest.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequest.java b/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequest.java
deleted file mode 100644
index b4f1e11..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.broker.longpolling;
-
-import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
-import io.netty.channel.Channel;
-
-
-/**
- * @author shijia.wxr
- */
-public class PullRequest {
-    private final RemotingCommand requestCommand;
-    private final Channel clientChannel;
-    private final long timeoutMillis;
-    private final long suspendTimestamp;
-    private final long pullFromThisOffset;
-    private final SubscriptionData subscriptionData;
-
-
-    public PullRequest(RemotingCommand requestCommand, Channel clientChannel, long timeoutMillis, long suspendTimestamp,
-                       long pullFromThisOffset, SubscriptionData subscriptionData) {
-        this.requestCommand = requestCommand;
-        this.clientChannel = clientChannel;
-        this.timeoutMillis = timeoutMillis;
-        this.suspendTimestamp = suspendTimestamp;
-        this.pullFromThisOffset = pullFromThisOffset;
-        this.subscriptionData = subscriptionData;
-    }
-
-
-    public RemotingCommand getRequestCommand() {
-        return requestCommand;
-    }
-
-
-    public Channel getClientChannel() {
-        return clientChannel;
-    }
-
-
-    public long getTimeoutMillis() {
-        return timeoutMillis;
-    }
-
-
-    public long getSuspendTimestamp() {
-        return suspendTimestamp;
-    }
-
-
-    public long getPullFromThisOffset() {
-        return pullFromThisOffset;
-    }
-
-    public SubscriptionData getSubscriptionData() {
-        return subscriptionData;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequestHoldService.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequestHoldService.java b/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequestHoldService.java
deleted file mode 100644
index 888c5f2..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequestHoldService.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.longpolling;
-
-import com.alibaba.rocketmq.broker.BrokerController;
-import com.alibaba.rocketmq.common.ServiceThread;
-import com.alibaba.rocketmq.common.SystemClock;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.store.DefaultMessageFilter;
-import com.alibaba.rocketmq.store.MessageFilter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-/**
- * @author shijia.wxr
- */
-public class PullRequestHoldService extends ServiceThread {
-    private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
-    private static final String TOPIC_QUEUEID_SEPARATOR = "@";
-    private final BrokerController brokerController;
-    private final SystemClock systemClock = new SystemClock();
-    private final MessageFilter messageFilter = new DefaultMessageFilter();
-    private ConcurrentHashMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
-            new ConcurrentHashMap<String, ManyPullRequest>(1024);
-
-
-    public PullRequestHoldService(final BrokerController brokerController) {
-        this.brokerController = brokerController;
-    }
-
-    public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
-        String key = this.buildKey(topic, queueId);
-        ManyPullRequest mpr = this.pullRequestTable.get(key);
-        if (null == mpr) {
-            mpr = new ManyPullRequest();
-            ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
-            if (prev != null) {
-                mpr = prev;
-            }
-        }
-
-        mpr.addPullRequest(pullRequest);
-    }
-
-    private String buildKey(final String topic, final int queueId) {
-        StringBuilder sb = new StringBuilder();
-        sb.append(topic);
-        sb.append(TOPIC_QUEUEID_SEPARATOR);
-        sb.append(queueId);
-        return sb.toString();
-    }
-
-    @Override
-    public void run() {
-        log.info(this.getServiceName() + " service started");
-        while (!this.isStopped()) {
-            try {
-                if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
-                    this.waitForRunning(5 * 1000);
-                } else {
-                    this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
-                }
-
-                long beginLockTimestamp = this.systemClock.now();
-                this.checkHoldRequest();
-                long costTime = this.systemClock.now() - beginLockTimestamp;
-                if (costTime > 5 * 1000) {
-                    log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
-                }
-            } catch (Throwable e) {
-                log.warn(this.getServiceName() + " service has exception. ", e);
-            }
-        }
-
-        log.info(this.getServiceName() + " service end");
-    }
-
-    @Override
-    public String getServiceName() {
-        return PullRequestHoldService.class.getSimpleName();
-    }
-
-    private void checkHoldRequest() {
-        for (String key : this.pullRequestTable.keySet()) {
-            String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
-            if (kArray != null && 2 == kArray.length) {
-                String topic = kArray[0];
-                int queueId = Integer.parseInt(kArray[1]);
-                final long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);
-                try {
-                    this.notifyMessageArriving(topic, queueId, offset);
-                } catch (Throwable e) {
-                    log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
-                }
-            }
-        }
-    }
-
-    public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) {
-        notifyMessageArriving(topic, queueId, maxOffset, null);
-    }
-
-    public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode) {
-        String key = this.buildKey(topic, queueId);
-        ManyPullRequest mpr = this.pullRequestTable.get(key);
-        if (mpr != null) {
-            List<PullRequest> requestList = mpr.cloneListAndClear();
-            if (requestList != null) {
-                List<PullRequest> replayList = new ArrayList<PullRequest>();
-
-                for (PullRequest request : requestList) {
-                    long newestOffset = maxOffset;
-                    if (newestOffset <= request.getPullFromThisOffset()) {
-                        newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);
-                    }
-
-                    Long tmp = tagsCode;
-                    if (newestOffset > request.getPullFromThisOffset()) {
-                        if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tmp)) {
-                            try {
-                                this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(),
-                                        request.getRequestCommand());
-                            } catch (Throwable e) {
-                                log.error("execute request when wakeup failed.", e);
-                            }
-                            continue;
-                        }
-                    }
-
-                    if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
-                        try {
-                            this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(),
-                                    request.getRequestCommand());
-                        } catch (Throwable e) {
-                            log.error("execute request when wakeup failed.", e);
-                        }
-                        continue;
-                    }
-
-
-                    replayList.add(request);
-                }
-
-                if (!replayList.isEmpty()) {
-                    mpr.addPullRequest(replayList);
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageContext.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageContext.java b/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageContext.java
deleted file mode 100644
index b7f9c6e..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageContext.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.broker.mqtrace;
-
-import com.alibaba.rocketmq.store.stats.BrokerStatsManager;
-
-import java.util.Map;
-
-
-public class ConsumeMessageContext {
-    private String consumerGroup;
-    private String topic;
-    private Integer queueId;
-    private String clientHost;
-    private String storeHost;
-    private Map<String, Long> messageIds;
-    private int bodyLength;
-    private boolean success;
-    private String status;
-    private Object mqTraceContext;
-
-    private String commercialOwner;
-    private BrokerStatsManager.StatsType commercialRcvStats;
-    private int commercialRcvTimes;
-    private int commercialRcvSize;
-
-
-    public String getConsumerGroup() {
-        return consumerGroup;
-    }
-
-
-    public void setConsumerGroup(String consumerGroup) {
-        this.consumerGroup = consumerGroup;
-    }
-
-
-    public String getTopic() {
-        return topic;
-    }
-
-
-    public void setTopic(String topic) {
-        this.topic = topic;
-    }
-
-
-    public Integer getQueueId() {
-        return queueId;
-    }
-
-
-    public void setQueueId(Integer queueId) {
-        this.queueId = queueId;
-    }
-
-
-    public String getClientHost() {
-        return clientHost;
-    }
-
-
-    public void setClientHost(String clientHost) {
-        this.clientHost = clientHost;
-    }
-
-
-    public String getStoreHost() {
-        return storeHost;
-    }
-
-
-    public void setStoreHost(String storeHost) {
-        this.storeHost = storeHost;
-    }
-
-
-    public Map<String, Long> getMessageIds() {
-        return messageIds;
-    }
-
-
-    public void setMessageIds(Map<String, Long> messageIds) {
-        this.messageIds = messageIds;
-    }
-
-
-    public boolean isSuccess() {
-        return success;
-    }
-
-
-    public void setSuccess(boolean success) {
-        this.success = success;
-    }
-
-
-    public String getStatus() {
-        return status;
-    }
-
-
-    public void setStatus(String status) {
-        this.status = status;
-    }
-
-
-    public Object getMqTraceContext() {
-        return mqTraceContext;
-    }
-
-
-    public void setMqTraceContext(Object mqTraceContext) {
-        this.mqTraceContext = mqTraceContext;
-    }
-
-
-    public int getBodyLength() {
-        return bodyLength;
-    }
-
-
-    public void setBodyLength(int bodyLength) {
-        this.bodyLength = bodyLength;
-    }
-
-    public String getCommercialOwner() {
-        return commercialOwner;
-    }
-
-    public void setCommercialOwner(final String commercialOwner) {
-        this.commercialOwner = commercialOwner;
-    }
-
-    public BrokerStatsManager.StatsType getCommercialRcvStats() {
-        return commercialRcvStats;
-    }
-
-    public void setCommercialRcvStats(final BrokerStatsManager.StatsType commercialRcvStats) {
-        this.commercialRcvStats = commercialRcvStats;
-    }
-
-    public int getCommercialRcvTimes() {
-        return commercialRcvTimes;
-    }
-
-    public void setCommercialRcvTimes(final int commercialRcvTimes) {
-        this.commercialRcvTimes = commercialRcvTimes;
-    }
-
-    public int getCommercialRcvSize() {
-        return commercialRcvSize;
-    }
-
-    public void setCommercialRcvSize(final int commercialRcvSize) {
-        this.commercialRcvSize = commercialRcvSize;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageHook.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageHook.java b/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageHook.java
deleted file mode 100644
index 4a74db3..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageHook.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.broker.mqtrace;
-
-public interface ConsumeMessageHook {
-    String hookName();
-
-
-    void consumeMessageBefore(final ConsumeMessageContext context);
-
-
-    void consumeMessageAfter(final ConsumeMessageContext context);
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageContext.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageContext.java b/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageContext.java
deleted file mode 100644
index 5bd29cf..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageContext.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.broker.mqtrace;
-
-import com.alibaba.rocketmq.common.message.MessageType;
-import com.alibaba.rocketmq.store.stats.BrokerStatsManager;
-
-import java.util.Properties;
-
-
-public class SendMessageContext {
-    private String producerGroup;
-    private String topic;
-    private String msgId;
-    private String originMsgId;
-    private Integer queueId;
-    private Long queueOffset;
-    private String brokerAddr;
-    private String bornHost;
-    private int bodyLength;
-    private int code;
-    private String errorMsg;
-    private String msgProps;
-    private Object mqTraceContext;
-    private Properties extProps;
-    private String brokerRegionId;
-    private String msgUniqueKey;
-    private long bornTimeStamp;
-    private MessageType msgType = MessageType.Trans_msg_Commit;
-    private boolean isSuccess = false;
-    //For Commercial
-    private String commercialOwner;
-    private BrokerStatsManager.StatsType commercialSendStats;
-    private int commercialSendSize;
-    private int commercialSendTimes;
-
-    public boolean isSuccess() {
-        return isSuccess;
-    }
-
-    public void setSuccess(final boolean success) {
-        isSuccess = success;
-    }
-
-    public MessageType getMsgType() {
-        return msgType;
-    }
-
-    public void setMsgType(final MessageType msgType) {
-        this.msgType = msgType;
-    }
-
-    public String getMsgUniqueKey() {
-        return msgUniqueKey;
-    }
-
-    public void setMsgUniqueKey(final String msgUniqueKey) {
-        this.msgUniqueKey = msgUniqueKey;
-    }
-
-    public long getBornTimeStamp() {
-        return bornTimeStamp;
-    }
-
-    public void setBornTimeStamp(final long bornTimeStamp) {
-        this.bornTimeStamp = bornTimeStamp;
-    }
-
-    public String getBrokerRegionId() {
-        return brokerRegionId;
-    }
-
-    public void setBrokerRegionId(final String brokerRegionId) {
-        this.brokerRegionId = brokerRegionId;
-    }
-
-    public String getProducerGroup() {
-        return producerGroup;
-    }
-
-
-    public void setProducerGroup(String producerGroup) {
-        this.producerGroup = producerGroup;
-    }
-
-
-    public String getTopic() {
-        return topic;
-    }
-
-
-    public void setTopic(String topic) {
-        this.topic = topic;
-    }
-
-
-    public String getMsgId() {
-        return msgId;
-    }
-
-
-    public void setMsgId(String msgId) {
-        this.msgId = msgId;
-    }
-
-
-    public String getOriginMsgId() {
-        return originMsgId;
-    }
-
-
-    public void setOriginMsgId(String originMsgId) {
-        this.originMsgId = originMsgId;
-    }
-
-
-    public Integer getQueueId() {
-        return queueId;
-    }
-
-
-    public void setQueueId(Integer queueId) {
-        this.queueId = queueId;
-    }
-
-
-    public Long getQueueOffset() {
-        return queueOffset;
-    }
-
-
-    public void setQueueOffset(Long queueOffset) {
-        this.queueOffset = queueOffset;
-    }
-
-
-    public String getBrokerAddr() {
-        return brokerAddr;
-    }
-
-
-    public void setBrokerAddr(String brokerAddr) {
-        this.brokerAddr = brokerAddr;
-    }
-
-
-    public String getBornHost() {
-        return bornHost;
-    }
-
-
-    public void setBornHost(String bornHost) {
-        this.bornHost = bornHost;
-    }
-
-
-    public int getBodyLength() {
-        return bodyLength;
-    }
-
-
-    public void setBodyLength(int bodyLength) {
-        this.bodyLength = bodyLength;
-    }
-
-
-    public int getCode() {
-        return code;
-    }
-
-
-    public void setCode(int code) {
-        this.code = code;
-    }
-
-
-    public String getErrorMsg() {
-        return errorMsg;
-    }
-
-
-    public void setErrorMsg(String errorMsg) {
-        this.errorMsg = errorMsg;
-    }
-
-
-    public String getMsgProps() {
-        return msgProps;
-    }
-
-
-    public void setMsgProps(String msgProps) {
-        this.msgProps = msgProps;
-    }
-
-
-    public Object getMqTraceContext() {
-        return mqTraceContext;
-    }
-
-
-    public void setMqTraceContext(Object mqTraceContext) {
-        this.mqTraceContext = mqTraceContext;
-    }
-
-
-    public Properties getExtProps() {
-        return extProps;
-    }
-
-
-    public void setExtProps(Properties extProps) {
-        this.extProps = extProps;
-    }
-
-    public String getCommercialOwner() {
-        return commercialOwner;
-    }
-
-    public void setCommercialOwner(final String commercialOwner) {
-        this.commercialOwner = commercialOwner;
-    }
-
-    public BrokerStatsManager.StatsType getCommercialSendStats() {
-        return commercialSendStats;
-    }
-
-    public void setCommercialSendStats(final BrokerStatsManager.StatsType commercialSendStats) {
-        this.commercialSendStats = commercialSendStats;
-    }
-
-    public int getCommercialSendSize() {
-        return commercialSendSize;
-    }
-
-    public void setCommercialSendSize(final int commercialSendSize) {
-        this.commercialSendSize = commercialSendSize;
-    }
-
-    public int getCommercialSendTimes() {
-        return commercialSendTimes;
-    }
-
-    public void setCommercialSendTimes(final int commercialSendTimes) {
-        this.commercialSendTimes = commercialSendTimes;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageHook.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageHook.java b/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageHook.java
deleted file mode 100644
index e079b9f..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageHook.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package com.alibaba.rocketmq.broker.mqtrace;
-
-public interface SendMessageHook {
-    public String hookName();
-
-
-    public void sendMessageBefore(final SendMessageContext context);
-
-
-    public void sendMessageAfter(final SendMessageContext context);
-}