You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/19 09:40:57 UTC
[40/43] incubator-rocketmq git commit: Finish code dump. Reviewed by:
@yukon @vongosling @stevenschew @vintagewang @lollipop @zander
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ProducerManager.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ProducerManager.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ProducerManager.java
new file mode 100644
index 0000000..74e7ea7
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/ProducerManager.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.client;
+
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+import com.alibaba.rocketmq.remoting.common.RemotingUtil;
+import io.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ProducerManager {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private static final long LOCK_TIMEOUT_MILLIS = 3000;
+ private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
+ private final Lock groupChannelLock = new ReentrantLock();
+ private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable =
+ new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
+
+
+ public ProducerManager() {
+ }
+
+
+ public HashMap<String, HashMap<Channel, ClientChannelInfo>> getGroupChannelTable() {
+ HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> newGroupChannelTable =
+ new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
+ try {
+ if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ newGroupChannelTable.putAll(groupChannelTable);
+ } finally {
+ groupChannelLock.unlock();
+ }
+ }
+ } catch (InterruptedException e) {
+ log.error("", e);
+ }
+ return newGroupChannelTable;
+ }
+
+
+ public void scanNotActiveChannel() {
+ try {
+ if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
+ .entrySet()) {
+ final String group = entry.getKey();
+ final HashMap<Channel, ClientChannelInfo> chlMap = entry.getValue();
+
+ Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<Channel, ClientChannelInfo> item = it.next();
+ // final Integer id = item.getKey();
+ final ClientChannelInfo info = item.getValue();
+
+ long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp();
+ if (diff > CHANNEL_EXPIRED_TIMEOUT) {
+ it.remove();
+ log.warn(
+ "SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
+ RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
+ RemotingUtil.closeChannel(info.getChannel());
+ }
+ }
+ }
+ } finally {
+ this.groupChannelLock.unlock();
+ }
+ } else {
+ log.warn("ProducerManager scanNotActiveChannel lock timeout");
+ }
+ } catch (InterruptedException e) {
+ log.error("", e);
+ }
+ }
+
+
+ public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
+ if (channel != null) {
+ try {
+ if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
+ .entrySet()) {
+ final String group = entry.getKey();
+ final HashMap<Channel, ClientChannelInfo> clientChannelInfoTable =
+ entry.getValue();
+ final ClientChannelInfo clientChannelInfo =
+ clientChannelInfoTable.remove(channel);
+ if (clientChannelInfo != null) {
+ log.info(
+ "NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",
+ clientChannelInfo.toString(), remoteAddr, group);
+ }
+
+ }
+ } finally {
+ this.groupChannelLock.unlock();
+ }
+ } else {
+ log.warn("ProducerManager doChannelCloseEvent lock timeout");
+ }
+ } catch (InterruptedException e) {
+ log.error("", e);
+ }
+ }
+ }
+
+
+ public void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {
+ try {
+ ClientChannelInfo clientChannelInfoFound = null;
+
+ if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
+ if (null == channelTable) {
+ channelTable = new HashMap<Channel, ClientChannelInfo>();
+ this.groupChannelTable.put(group, channelTable);
+ }
+
+ clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
+ if (null == clientChannelInfoFound) {
+ channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
+ log.info("new producer connected, group: {} channel: {}", group,
+ clientChannelInfo.toString());
+ }
+ } finally {
+ this.groupChannelLock.unlock();
+ }
+
+ if (clientChannelInfoFound != null) {
+ clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
+ }
+ } else {
+ log.warn("ProducerManager registerProducer lock timeout");
+ }
+ } catch (InterruptedException e) {
+ log.error("", e);
+ }
+ }
+
+
+ public void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {
+ try {
+ if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ try {
+ HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
+ if (null != channelTable && !channelTable.isEmpty()) {
+ ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
+ if (old != null) {
+ log.info("unregister a producer[{}] from groupChannelTable {}", group,
+ clientChannelInfo.toString());
+ }
+
+ if (channelTable.isEmpty()) {
+ this.groupChannelTable.remove(group);
+ log.info("unregister a producer group[{}] from groupChannelTable", group);
+ }
+ }
+ } finally {
+ this.groupChannelLock.unlock();
+ }
+ } else {
+ log.warn("ProducerManager unregisterProducer lock timeout");
+ }
+ } catch (InterruptedException e) {
+ log.error("", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/net/Broker2Client.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/net/Broker2Client.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/net/Broker2Client.java
new file mode 100644
index 0000000..a38c9cb
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/net/Broker2Client.java
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.client.net;
+
+import com.alibaba.rocketmq.broker.BrokerController;
+import com.alibaba.rocketmq.broker.client.ClientChannelInfo;
+import com.alibaba.rocketmq.broker.client.ConsumerGroupInfo;
+import com.alibaba.rocketmq.broker.pagecache.OneMessageTransfer;
+import com.alibaba.rocketmq.common.MQVersion;
+import com.alibaba.rocketmq.common.TopicConfig;
+import com.alibaba.rocketmq.common.UtilAll;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import com.alibaba.rocketmq.common.message.MessageQueueForC;
+import com.alibaba.rocketmq.common.protocol.RequestCode;
+import com.alibaba.rocketmq.common.protocol.ResponseCode;
+import com.alibaba.rocketmq.common.protocol.body.GetConsumerStatusBody;
+import com.alibaba.rocketmq.common.protocol.body.ResetOffsetBody;
+import com.alibaba.rocketmq.common.protocol.body.ResetOffsetBodyForC;
+import com.alibaba.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
+import com.alibaba.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader;
+import com.alibaba.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
+import com.alibaba.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
+import com.alibaba.rocketmq.remoting.common.RemotingHelper;
+import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
+import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import com.alibaba.rocketmq.store.SelectMappedBufferResult;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.FileRegion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class Broker2Client {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private final BrokerController brokerController;
+
+ public Broker2Client(BrokerController brokerController) {
+ this.brokerController = brokerController;
+ }
+
+ public void checkProducerTransactionState(
+ final Channel channel,
+ final CheckTransactionStateRequestHeader requestHeader,
+ final SelectMappedBufferResult selectMappedBufferResult) {
+ RemotingCommand request =
+ RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
+ request.markOnewayRPC();
+
+ try {
+ FileRegion fileRegion =
+ new OneMessageTransfer(request.encodeHeader(selectMappedBufferResult.getSize()),
+ selectMappedBufferResult);
+ channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ selectMappedBufferResult.release();
+ if (!future.isSuccess()) {
+ log.error("invokeProducer failed,", future.cause());
+ }
+ }
+ });
+ } catch (Throwable e) {
+ log.error("invokeProducer exception", e);
+ selectMappedBufferResult.release();
+ }
+ }
+
+ public RemotingCommand callClient(final Channel channel,
+ final RemotingCommand request
+ ) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
+ return this.brokerController.getRemotingServer().invokeSync(channel, request, 10000);
+ }
+
+ public void notifyConsumerIdsChanged(
+ final Channel channel,
+ final String consumerGroup) {
+ if (null == consumerGroup) {
+ log.error("notifyConsumerIdsChanged consumerGroup is null");
+ return;
+ }
+
+ NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
+ requestHeader.setConsumerGroup(consumerGroup);
+ RemotingCommand request =
+ RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
+
+ try {
+ this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
+ } catch (Exception e) {
+ log.error("notifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage());
+ }
+ }
+
+ public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce) {
+ return resetOffset(topic, group, timeStamp, isForce, false);
+ }
+
+ public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce,
+ boolean isC) {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+
+ TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
+ if (null == topicConfig) {
+ log.error("[reset-offset] reset offset failed, no topic in this broker. topic={}", topic);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("[reset-offset] reset offset failed, no topic in this broker. topic=" + topic);
+ return response;
+ }
+
+ Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
+
+ for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
+ MessageQueue mq = new MessageQueue();
+ mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
+ mq.setTopic(topic);
+ mq.setQueueId(i);
+
+ long consumerOffset =
+ this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i);
+ if (-1 == consumerOffset) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(String.format("THe consumer group <%s> not exist", group));
+ return response;
+ }
+
+ long timeStampOffset;
+ if (timeStamp == -1) {
+
+ timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i);
+ } else {
+ timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
+ }
+
+ if (timeStampOffset < 0) {
+ log.warn("reset offset is invalid. topic={}, queueId={}, timeStampOffset={}", topic, i, timeStampOffset);
+ timeStampOffset = 0;
+ }
+
+ if (isForce || timeStampOffset < consumerOffset) {
+ offsetTable.put(mq, timeStampOffset);
+ } else {
+ offsetTable.put(mq, consumerOffset);
+ }
+ }
+
+ ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
+ requestHeader.setTopic(topic);
+ requestHeader.setGroup(group);
+ requestHeader.setTimestamp(timeStamp);
+ RemotingCommand request =
+ RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader);
+ if (isC) {
+ // c++ language
+ ResetOffsetBodyForC body = new ResetOffsetBodyForC();
+ List<MessageQueueForC> offsetList = convertOffsetTable2OffsetList(offsetTable);
+ body.setOffsetTable(offsetList);
+ request.setBody(body.encode());
+ } else {
+ // other language
+ ResetOffsetBody body = new ResetOffsetBody();
+ body.setOffsetTable(offsetTable);
+ request.setBody(body.encode());
+ }
+
+ ConsumerGroupInfo consumerGroupInfo =
+ this.brokerController.getConsumerManager().getConsumerGroupInfo(group);
+
+ if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) {
+ ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable =
+ consumerGroupInfo.getChannelInfoTable();
+ for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {
+ int version = entry.getValue().getVersion();
+ if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
+ try {
+ this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);
+ log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}",
+ new Object[]{topic, group, entry.getValue().getClientId()});
+ } catch (Exception e) {
+ log.error("[reset-offset] reset offset exception. topic={}, group={}",
+ new Object[]{topic, group}, e);
+ }
+ } else {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("the client does not support this feature. version="
+ + MQVersion.getVersionDesc(version));
+ log.warn("[reset-offset] the client does not support this feature. version={}",
+ RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
+ return response;
+ }
+ }
+ } else {
+ String errorInfo =
+ String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d",
+ requestHeader.getGroup(),
+ requestHeader.getTopic(),
+ requestHeader.getTimestamp());
+ log.error(errorInfo);
+ response.setCode(ResponseCode.CONSUMER_NOT_ONLINE);
+ response.setRemark(errorInfo);
+ return response;
+ }
+ response.setCode(ResponseCode.SUCCESS);
+ ResetOffsetBody resBody = new ResetOffsetBody();
+ resBody.setOffsetTable(offsetTable);
+ response.setBody(resBody.encode());
+ return response;
+ }
+
+ private List<MessageQueueForC> convertOffsetTable2OffsetList(Map<MessageQueue, Long> table) {
+ List<MessageQueueForC> list = new ArrayList<MessageQueueForC>();
+ for (Entry<MessageQueue, Long> entry : table.entrySet()) {
+ MessageQueue mq = entry.getKey();
+ MessageQueueForC tmp =
+ new MessageQueueForC(mq.getTopic(), mq.getBrokerName(), mq.getQueueId(), entry.getValue());
+ list.add(tmp);
+ }
+ return list;
+ }
+
+ public RemotingCommand getConsumeStatus(String topic, String group, String originClientId) {
+ final RemotingCommand result = RemotingCommand.createResponseCommand(null);
+
+ GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader();
+ requestHeader.setTopic(topic);
+ requestHeader.setGroup(group);
+ RemotingCommand request =
+ RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT,
+ requestHeader);
+
+ Map<String, Map<MessageQueue, Long>> consumerStatusTable =
+ new HashMap<String, Map<MessageQueue, Long>>();
+ ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable =
+ this.brokerController.getConsumerManager().getConsumerGroupInfo(group).getChannelInfoTable();
+ if (null == channelInfoTable || channelInfoTable.isEmpty()) {
+ result.setCode(ResponseCode.SYSTEM_ERROR);
+ result.setRemark(String.format("No Any Consumer online in the consumer group: [%s]", group));
+ return result;
+ }
+
+ for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {
+ int version = entry.getValue().getVersion();
+ String clientId = entry.getValue().getClientId();
+ if (version < MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
+ result.setCode(ResponseCode.SYSTEM_ERROR);
+ result.setRemark("the client does not support this feature. version="
+ + MQVersion.getVersionDesc(version));
+ log.warn("[get-consumer-status] the client does not support this feature. version={}",
+ RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
+ return result;
+ } else if (UtilAll.isBlank(originClientId) || originClientId.equals(clientId)) {
+ try {
+ RemotingCommand response =
+ this.brokerController.getRemotingServer().invokeSync(entry.getKey(), request, 5000);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ if (response.getBody() != null) {
+ GetConsumerStatusBody body =
+ GetConsumerStatusBody.decode(response.getBody(),
+ GetConsumerStatusBody.class);
+
+ consumerStatusTable.put(clientId, body.getMessageQueueTable());
+ log.info(
+ "[get-consumer-status] get consumer status success. topic={}, group={}, channelRemoteAddr={}",
+ new Object[]{topic, group, clientId});
+ }
+ }
+ default:
+ break;
+ }
+ } catch (Exception e) {
+ log.error(
+ "[get-consumer-status] get consumer status exception. topic={}, group={}, offset={}",
+ new Object[]{topic, group}, e);
+ }
+
+ if (!UtilAll.isBlank(originClientId) && originClientId.equals(clientId)) {
+ break;
+ }
+ }
+ }
+
+ result.setCode(ResponseCode.SUCCESS);
+ GetConsumerStatusBody resBody = new GetConsumerStatusBody();
+ resBody.setConsumerTable(consumerStatusTable);
+ result.setBody(resBody.encode());
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/rebalance/RebalanceLockManager.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/rebalance/RebalanceLockManager.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/rebalance/RebalanceLockManager.java
new file mode 100644
index 0000000..84be628
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/client/rebalance/RebalanceLockManager.java
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.client.rebalance;
+
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.common.message.MessageQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class RebalanceLockManager {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.REBALANCE_LOCK_LOGGER_NAME);
+ private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty(
+ "rocketmq.broker.rebalance.lockMaxLiveTime", "60000"));
+ private final Lock lock = new ReentrantLock();
+ private final ConcurrentHashMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =
+ new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024);
+
+ public boolean tryLock(final String group, final MessageQueue mq, final String clientId) {
+
+ if (!this.isLocked(group, mq, clientId)) {
+ try {
+ this.lock.lockInterruptibly();
+ try {
+ ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
+ if (null == groupValue) {
+ groupValue = new ConcurrentHashMap<MessageQueue, LockEntry>(32);
+ this.mqLockTable.put(group, groupValue);
+ }
+
+ LockEntry lockEntry = groupValue.get(mq);
+ if (null == lockEntry) {
+ lockEntry = new LockEntry();
+ lockEntry.setClientId(clientId);
+ groupValue.put(mq, lockEntry);
+ log.info("tryLock, message queue not locked, I got it. Group: {} NewClientId: {} {}", //
+ group, //
+ clientId, //
+ mq);
+ }
+
+ if (lockEntry.isLocked(clientId)) {
+ lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
+ return true;
+ }
+
+ String oldClientId = lockEntry.getClientId();
+
+
+ if (lockEntry.isExpired()) {
+ lockEntry.setClientId(clientId);
+ lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
+ log.warn(
+ "tryLock, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", //
+ group, //
+ oldClientId, //
+ clientId, //
+ mq);
+ return true;
+ }
+
+
+ log.warn(
+ "tryLock, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", //
+ group, //
+ oldClientId, //
+ clientId, //
+ mq);
+ return false;
+ } finally {
+ this.lock.unlock();
+ }
+ } catch (InterruptedException e) {
+ log.error("putMessage exception", e);
+ }
+ } else {
+
+ }
+
+ return true;
+ }
+
+ private boolean isLocked(final String group, final MessageQueue mq, final String clientId) {
+ ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
+ if (groupValue != null) {
+ LockEntry lockEntry = groupValue.get(mq);
+ if (lockEntry != null) {
+ boolean locked = lockEntry.isLocked(clientId);
+ if (locked) {
+ lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
+ }
+
+ return locked;
+ }
+ }
+
+ return false;
+ }
+
+ public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs,
+ final String clientId) {
+ Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size());
+ Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size());
+
+
+ for (MessageQueue mq : mqs) {
+ if (this.isLocked(group, mq, clientId)) {
+ lockedMqs.add(mq);
+ } else {
+ notLockedMqs.add(mq);
+ }
+ }
+
+ if (!notLockedMqs.isEmpty()) {
+ try {
+ this.lock.lockInterruptibly();
+ try {
+ ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
+ if (null == groupValue) {
+ groupValue = new ConcurrentHashMap<MessageQueue, LockEntry>(32);
+ this.mqLockTable.put(group, groupValue);
+ }
+
+
+ for (MessageQueue mq : notLockedMqs) {
+ LockEntry lockEntry = groupValue.get(mq);
+ if (null == lockEntry) {
+ lockEntry = new LockEntry();
+ lockEntry.setClientId(clientId);
+ groupValue.put(mq, lockEntry);
+ log.info(
+ "tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}", //
+ group, //
+ clientId, //
+ mq);
+ }
+
+
+ if (lockEntry.isLocked(clientId)) {
+ lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
+ lockedMqs.add(mq);
+ continue;
+ }
+
+ String oldClientId = lockEntry.getClientId();
+
+
+ if (lockEntry.isExpired()) {
+ lockEntry.setClientId(clientId);
+ lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
+ log.warn(
+ "tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", //
+ group, //
+ oldClientId, //
+ clientId, //
+ mq);
+ lockedMqs.add(mq);
+ continue;
+ }
+
+
+ log.warn(
+ "tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", //
+ group, //
+ oldClientId, //
+ clientId, //
+ mq);
+ }
+ } finally {
+ this.lock.unlock();
+ }
+ } catch (InterruptedException e) {
+ log.error("putMessage exception", e);
+ }
+ }
+
+ return lockedMqs;
+ }
+
+ public void unlockBatch(final String group, final Set<MessageQueue> mqs, final String clientId) {
+ try {
+ this.lock.lockInterruptibly();
+ try {
+ ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
+ if (null != groupValue) {
+ for (MessageQueue mq : mqs) {
+ LockEntry lockEntry = groupValue.get(mq);
+ if (null != lockEntry) {
+ if (lockEntry.getClientId().equals(clientId)) {
+ groupValue.remove(mq);
+ log.info("unlockBatch, Group: {} {} {}",
+ group,
+ mq,
+ clientId);
+ } else {
+ log.warn("unlockBatch, but mq locked by other client: {}, Group: {} {} {}",
+ lockEntry.getClientId(),
+ group,
+ mq,
+ clientId);
+ }
+ } else {
+ log.warn("unlockBatch, but mq not locked, Group: {} {} {}",
+ group,
+ mq,
+ clientId);
+ }
+ }
+ } else {
+ log.warn("unlockBatch, group not exist, Group: {} {}",
+ group,
+ clientId);
+ }
+ } finally {
+ this.lock.unlock();
+ }
+ } catch (InterruptedException e) {
+ log.error("putMessage exception", e);
+ }
+ }
+
+ static class LockEntry {
+ private String clientId;
+ private volatile long lastUpdateTimestamp = System.currentTimeMillis();
+
+
+ public String getClientId() {
+ return clientId;
+ }
+
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+
+ public long getLastUpdateTimestamp() {
+ return lastUpdateTimestamp;
+ }
+
+
+ public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+ this.lastUpdateTimestamp = lastUpdateTimestamp;
+ }
+
+ public boolean isLocked(final String clientId) {
+ boolean eq = this.clientId.equals(clientId);
+ return eq && !this.isExpired();
+ }
+
+ public boolean isExpired() {
+ boolean expired =
+ (System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;
+
+ return expired;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerManager.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerManager.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerManager.java
new file mode 100644
index 0000000..b2e7e82
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerManager.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.broker.filtersrv;
+
+import com.alibaba.rocketmq.broker.BrokerController;
+import com.alibaba.rocketmq.broker.BrokerStartup;
+import com.alibaba.rocketmq.common.ThreadFactoryImpl;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.remoting.common.RemotingUtil;
+import io.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+
+public class FilterServerManager {
+
+ public static final long FILTER_SERVER_MAX_IDLE_TIME_MILLS = 30000;
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private final ConcurrentHashMap<Channel, FilterServerInfo> filterServerTable =
+ new ConcurrentHashMap<Channel, FilterServerInfo>(16);
+ private final BrokerController brokerController;
+
+ private ScheduledExecutorService scheduledExecutorService = Executors
+ .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FilterServerManagerScheduledThread"));
+
+ public FilterServerManager(final BrokerController brokerController) {
+ this.brokerController = brokerController;
+ }
+
+ public void start() {
+
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ FilterServerManager.this.createFilterServer();
+ } catch (Exception e) {
+ log.error("", e);
+ }
+ }
+ }, 1000 * 5, 1000 * 30, TimeUnit.MILLISECONDS);
+ }
+
+ public void createFilterServer() {
+ int more =
+ this.brokerController.getBrokerConfig().getFilterServerNums() - this.filterServerTable.size();
+ String cmd = this.buildStartCommand();
+ for (int i = 0; i < more; i++) {
+ FilterServerUtil.callShell(cmd, log);
+ }
+ }
+
+ private String buildStartCommand() {
+ String config = "";
+ if (BrokerStartup.configFile != null) {
+ config = String.format("-c %s", BrokerStartup.configFile);
+ }
+
+ if (this.brokerController.getBrokerConfig().getNamesrvAddr() != null) {
+ config += String.format(" -n %s", this.brokerController.getBrokerConfig().getNamesrvAddr());
+ }
+
+ if (RemotingUtil.isWindowsPlatform()) {
+ return String.format("start /b %s\\bin\\mqfiltersrv.exe %s",
+ this.brokerController.getBrokerConfig().getRocketmqHome(),
+ config);
+ } else {
+ return String.format("sh %s/bin/startfsrv.sh %s",
+ this.brokerController.getBrokerConfig().getRocketmqHome(),
+ config);
+ }
+ }
+
+ public void shutdown() {
+ this.scheduledExecutorService.shutdown();
+ }
+
+ public void registerFilterServer(final Channel channel, final String filterServerAddr) {
+ FilterServerInfo filterServerInfo = this.filterServerTable.get(channel);
+ if (filterServerInfo != null) {
+ filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis());
+ } else {
+ filterServerInfo = new FilterServerInfo();
+ filterServerInfo.setFilterServerAddr(filterServerAddr);
+ filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis());
+ this.filterServerTable.put(channel, filterServerInfo);
+ log.info("Receive a New Filter Server<{}>", filterServerAddr);
+ }
+ }
+
+ /**
+
+ */
+ public void scanNotActiveChannel() {
+
+ Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<Channel, FilterServerInfo> next = it.next();
+ long timestamp = next.getValue().getLastUpdateTimestamp();
+ Channel channel = next.getKey();
+ if ((System.currentTimeMillis() - timestamp) > FILTER_SERVER_MAX_IDLE_TIME_MILLS) {
+ log.info("The Filter Server<{}> expired, remove it", next.getKey());
+ it.remove();
+ RemotingUtil.closeChannel(channel);
+ }
+ }
+ }
+
+ public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
+ FilterServerInfo old = this.filterServerTable.remove(channel);
+ if (old != null) {
+ log.warn("The Filter Server<{}> connection<{}> closed, remove it", old.getFilterServerAddr(),
+ remoteAddr);
+ }
+ }
+
+ public List<String> buildNewFilterServerList() {
+ List<String> addr = new ArrayList<String>();
+ Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<Channel, FilterServerInfo> next = it.next();
+ addr.add(next.getValue().getFilterServerAddr());
+ }
+ return addr;
+ }
+
+ static class FilterServerInfo {
+ private String filterServerAddr;
+ private long lastUpdateTimestamp;
+
+
+ public String getFilterServerAddr() {
+ return filterServerAddr;
+ }
+
+
+ public void setFilterServerAddr(String filterServerAddr) {
+ this.filterServerAddr = filterServerAddr;
+ }
+
+
+ public long getLastUpdateTimestamp() {
+ return lastUpdateTimestamp;
+ }
+
+
+ public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+ this.lastUpdateTimestamp = lastUpdateTimestamp;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerUtil.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerUtil.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerUtil.java
new file mode 100644
index 0000000..c5ace19
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerUtil.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.broker.filtersrv;
+
+import org.slf4j.Logger;
+
+
+public class FilterServerUtil {
+ public static void callShell(final String shellString, final Logger log) {
+ Process process = null;
+ try {
+ String[] cmdArray = splitShellString(shellString);
+ process = Runtime.getRuntime().exec(cmdArray);
+ process.waitFor();
+ log.info("callShell: <{}> OK", shellString);
+ } catch (Throwable e) {
+ log.error("callShell: readLine IOException, " + shellString, e);
+ } finally {
+ if (null != process)
+ process.destroy();
+ }
+ }
+
+ private static String[] splitShellString(final String shellString) {
+ String[] split = shellString.split(" ");
+ return split;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFastFailure.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFastFailure.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFastFailure.java
new file mode 100644
index 0000000..586bed0
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFastFailure.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.latency;
+
+import com.alibaba.rocketmq.broker.BrokerController;
+import com.alibaba.rocketmq.common.ThreadFactoryImpl;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.remoting.netty.RequestTask;
+import com.alibaba.rocketmq.remoting.protocol.RemotingSysResponseCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class BrokerFastFailure {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
+ "BrokerFastFailureScheduledThread"));
+ private final BrokerController brokerController;
+
+ public BrokerFastFailure(final BrokerController brokerController) {
+ this.brokerController = brokerController;
+ }
+
+ public void start() {
+ this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ cleanExpiredRequest();
+ }
+ }, 1000, 10, TimeUnit.MILLISECONDS);
+ }
+
+ private void cleanExpiredRequest() {
+ while (this.brokerController.getMessageStore().isOSPageCacheBusy()) {
+ try {
+ if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
+ final Runnable runnable = this.brokerController.getSendThreadPoolQueue().poll(0, TimeUnit.SECONDS);
+ if (null == runnable) {
+ break;
+ }
+
+ final RequestTask rt = castRunnable(runnable);
+ rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis() - rt.getCreateTimestamp(), this.brokerController.getSendThreadPoolQueue().size()));
+ } else {
+ break;
+ }
+ } catch (Throwable e) {
+ }
+ }
+
+ while (true) {
+ try {
+ if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
+ final Runnable runnable = this.brokerController.getSendThreadPoolQueue().peek();
+ if (null == runnable) {
+ break;
+ }
+ final RequestTask rt = castRunnable(runnable);
+ if (rt.isStopRun()) {
+ break;
+ }
+
+ final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
+ if (behind >= this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue()) {
+ if (this.brokerController.getSendThreadPoolQueue().remove(runnable)) {
+ rt.setStopRun(true);
+ rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, this.brokerController.getSendThreadPoolQueue().size()));
+ }
+ } else {
+ break;
+ }
+ } else {
+ break;
+ }
+ } catch (Throwable e) {
+ }
+ }
+ }
+
+ public static RequestTask castRunnable(final Runnable runnable) {
+ try {
+ FutureTaskExt object = (FutureTaskExt) runnable;
+ return (RequestTask) object.getRunnable();
+ } catch (Throwable e) {
+ log.error(String.format("castRunnable exception, %s", runnable.getClass().getName()), e);
+ }
+
+ return null;
+ }
+
+ public void shutdown() {
+ this.scheduledExecutorService.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
new file mode 100644
index 0000000..f81d48a
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.broker.latency;
+
+import java.util.concurrent.*;
+
+/**
+ * @author shijia.wxr
+ */
+public class BrokerFixedThreadPoolExecutor extends ThreadPoolExecutor {
+ public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
+ }
+
+ public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
+ }
+
+ public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final RejectedExecutionHandler handler) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
+ }
+
+ public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory, final RejectedExecutionHandler handler) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
+ }
+
+ @Override
+ protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
+ return new FutureTaskExt<T>(runnable, value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/latency/FutureTaskExt.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/latency/FutureTaskExt.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/latency/FutureTaskExt.java
new file mode 100644
index 0000000..6ec7bb5
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/latency/FutureTaskExt.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.broker.latency;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.FutureTask;
+
+/**
+ * @author shijia.wxr
+ */
+public class FutureTaskExt<V> extends FutureTask<V> {
+ private final Runnable runnable;
+
+ public FutureTaskExt(final Callable<V> callable) {
+ super(callable);
+ this.runnable = null;
+ }
+
+ public FutureTaskExt(final Runnable runnable, final V result) {
+ super(runnable, result);
+ this.runnable = runnable;
+ }
+
+ public Runnable getRunnable() {
+ return runnable;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/ManyPullRequest.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/ManyPullRequest.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/ManyPullRequest.java
new file mode 100644
index 0000000..bc9c58d
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/ManyPullRequest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.longpolling;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class ManyPullRequest {
+ private final ArrayList<PullRequest> pullRequestList = new ArrayList<PullRequest>();
+
+
+ public synchronized void addPullRequest(final PullRequest pullRequest) {
+ this.pullRequestList.add(pullRequest);
+ }
+
+
+ public synchronized void addPullRequest(final List<PullRequest> many) {
+ this.pullRequestList.addAll(many);
+ }
+
+
+ public synchronized List<PullRequest> cloneListAndClear() {
+ if (!this.pullRequestList.isEmpty()) {
+ List<PullRequest> result = (ArrayList<PullRequest>) this.pullRequestList.clone();
+ this.pullRequestList.clear();
+ return result;
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
new file mode 100644
index 0000000..15ee050
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.broker.longpolling;
+
+import com.alibaba.rocketmq.store.MessageArrivingListener;
+
+
+public class NotifyMessageArrivingListener implements MessageArrivingListener {
+ private final PullRequestHoldService pullRequestHoldService;
+
+
+ public NotifyMessageArrivingListener(final PullRequestHoldService pullRequestHoldService) {
+ this.pullRequestHoldService = pullRequestHoldService;
+ }
+
+
+ @Override
+ public void arriving(String topic, int queueId, long logicOffset, long tagsCode) {
+ this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequest.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequest.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequest.java
new file mode 100644
index 0000000..b4f1e11
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.longpolling;
+
+import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import io.netty.channel.Channel;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class PullRequest {
+ private final RemotingCommand requestCommand;
+ private final Channel clientChannel;
+ private final long timeoutMillis;
+ private final long suspendTimestamp;
+ private final long pullFromThisOffset;
+ private final SubscriptionData subscriptionData;
+
+
+ public PullRequest(RemotingCommand requestCommand, Channel clientChannel, long timeoutMillis, long suspendTimestamp,
+ long pullFromThisOffset, SubscriptionData subscriptionData) {
+ this.requestCommand = requestCommand;
+ this.clientChannel = clientChannel;
+ this.timeoutMillis = timeoutMillis;
+ this.suspendTimestamp = suspendTimestamp;
+ this.pullFromThisOffset = pullFromThisOffset;
+ this.subscriptionData = subscriptionData;
+ }
+
+
+ public RemotingCommand getRequestCommand() {
+ return requestCommand;
+ }
+
+
+ public Channel getClientChannel() {
+ return clientChannel;
+ }
+
+
+ public long getTimeoutMillis() {
+ return timeoutMillis;
+ }
+
+
+ public long getSuspendTimestamp() {
+ return suspendTimestamp;
+ }
+
+
+ public long getPullFromThisOffset() {
+ return pullFromThisOffset;
+ }
+
+ public SubscriptionData getSubscriptionData() {
+ return subscriptionData;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequestHoldService.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequestHoldService.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequestHoldService.java
new file mode 100644
index 0000000..888c5f2
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequestHoldService.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.longpolling;
+
+import com.alibaba.rocketmq.broker.BrokerController;
+import com.alibaba.rocketmq.common.ServiceThread;
+import com.alibaba.rocketmq.common.SystemClock;
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import com.alibaba.rocketmq.store.DefaultMessageFilter;
+import com.alibaba.rocketmq.store.MessageFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class PullRequestHoldService extends ServiceThread {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+ private static final String TOPIC_QUEUEID_SEPARATOR = "@";
+ private final BrokerController brokerController;
+ private final SystemClock systemClock = new SystemClock();
+ private final MessageFilter messageFilter = new DefaultMessageFilter();
+ private ConcurrentHashMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
+ new ConcurrentHashMap<String, ManyPullRequest>(1024);
+
+
+ public PullRequestHoldService(final BrokerController brokerController) {
+ this.brokerController = brokerController;
+ }
+
+ public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
+ String key = this.buildKey(topic, queueId);
+ ManyPullRequest mpr = this.pullRequestTable.get(key);
+ if (null == mpr) {
+ mpr = new ManyPullRequest();
+ ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
+ if (prev != null) {
+ mpr = prev;
+ }
+ }
+
+ mpr.addPullRequest(pullRequest);
+ }
+
+ private String buildKey(final String topic, final int queueId) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(topic);
+ sb.append(TOPIC_QUEUEID_SEPARATOR);
+ sb.append(queueId);
+ return sb.toString();
+ }
+
+ @Override
+ public void run() {
+ log.info(this.getServiceName() + " service started");
+ while (!this.isStopped()) {
+ try {
+ if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
+ this.waitForRunning(5 * 1000);
+ } else {
+ this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
+ }
+
+ long beginLockTimestamp = this.systemClock.now();
+ this.checkHoldRequest();
+ long costTime = this.systemClock.now() - beginLockTimestamp;
+ if (costTime > 5 * 1000) {
+ log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
+ }
+ } catch (Throwable e) {
+ log.warn(this.getServiceName() + " service has exception. ", e);
+ }
+ }
+
+ log.info(this.getServiceName() + " service end");
+ }
+
+ @Override
+ public String getServiceName() {
+ return PullRequestHoldService.class.getSimpleName();
+ }
+
+ private void checkHoldRequest() {
+ for (String key : this.pullRequestTable.keySet()) {
+ String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
+ if (kArray != null && 2 == kArray.length) {
+ String topic = kArray[0];
+ int queueId = Integer.parseInt(kArray[1]);
+ final long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);
+ try {
+ this.notifyMessageArriving(topic, queueId, offset);
+ } catch (Throwable e) {
+ log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
+ }
+ }
+ }
+ }
+
+ public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) {
+ notifyMessageArriving(topic, queueId, maxOffset, null);
+ }
+
+ public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode) {
+ String key = this.buildKey(topic, queueId);
+ ManyPullRequest mpr = this.pullRequestTable.get(key);
+ if (mpr != null) {
+ List<PullRequest> requestList = mpr.cloneListAndClear();
+ if (requestList != null) {
+ List<PullRequest> replayList = new ArrayList<PullRequest>();
+
+ for (PullRequest request : requestList) {
+ long newestOffset = maxOffset;
+ if (newestOffset <= request.getPullFromThisOffset()) {
+ newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);
+ }
+
+ Long tmp = tagsCode;
+ if (newestOffset > request.getPullFromThisOffset()) {
+ if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tmp)) {
+ try {
+ this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(),
+ request.getRequestCommand());
+ } catch (Throwable e) {
+ log.error("execute request when wakeup failed.", e);
+ }
+ continue;
+ }
+ }
+
+ if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
+ try {
+ this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(),
+ request.getRequestCommand());
+ } catch (Throwable e) {
+ log.error("execute request when wakeup failed.", e);
+ }
+ continue;
+ }
+
+
+ replayList.add(request);
+ }
+
+ if (!replayList.isEmpty()) {
+ mpr.addPullRequest(replayList);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageContext.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageContext.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageContext.java
new file mode 100644
index 0000000..b7f9c6e
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageContext.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.mqtrace;
+
+import com.alibaba.rocketmq.store.stats.BrokerStatsManager;
+
+import java.util.Map;
+
+
+public class ConsumeMessageContext {
+ private String consumerGroup;
+ private String topic;
+ private Integer queueId;
+ private String clientHost;
+ private String storeHost;
+ private Map<String, Long> messageIds;
+ private int bodyLength;
+ private boolean success;
+ private String status;
+ private Object mqTraceContext;
+
+ private String commercialOwner;
+ private BrokerStatsManager.StatsType commercialRcvStats;
+ private int commercialRcvTimes;
+ private int commercialRcvSize;
+
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+
+ public String getTopic() {
+ return topic;
+ }
+
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+
+ public Integer getQueueId() {
+ return queueId;
+ }
+
+
+ public void setQueueId(Integer queueId) {
+ this.queueId = queueId;
+ }
+
+
+ public String getClientHost() {
+ return clientHost;
+ }
+
+
+ public void setClientHost(String clientHost) {
+ this.clientHost = clientHost;
+ }
+
+
+ public String getStoreHost() {
+ return storeHost;
+ }
+
+
+ public void setStoreHost(String storeHost) {
+ this.storeHost = storeHost;
+ }
+
+
+ public Map<String, Long> getMessageIds() {
+ return messageIds;
+ }
+
+
+ public void setMessageIds(Map<String, Long> messageIds) {
+ this.messageIds = messageIds;
+ }
+
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+
+ public void setSuccess(boolean success) {
+ this.success = success;
+ }
+
+
+ public String getStatus() {
+ return status;
+ }
+
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+
+ public Object getMqTraceContext() {
+ return mqTraceContext;
+ }
+
+
+ public void setMqTraceContext(Object mqTraceContext) {
+ this.mqTraceContext = mqTraceContext;
+ }
+
+
+ public int getBodyLength() {
+ return bodyLength;
+ }
+
+
+ public void setBodyLength(int bodyLength) {
+ this.bodyLength = bodyLength;
+ }
+
+ public String getCommercialOwner() {
+ return commercialOwner;
+ }
+
+ public void setCommercialOwner(final String commercialOwner) {
+ this.commercialOwner = commercialOwner;
+ }
+
+ public BrokerStatsManager.StatsType getCommercialRcvStats() {
+ return commercialRcvStats;
+ }
+
+ public void setCommercialRcvStats(final BrokerStatsManager.StatsType commercialRcvStats) {
+ this.commercialRcvStats = commercialRcvStats;
+ }
+
+ public int getCommercialRcvTimes() {
+ return commercialRcvTimes;
+ }
+
+ public void setCommercialRcvTimes(final int commercialRcvTimes) {
+ this.commercialRcvTimes = commercialRcvTimes;
+ }
+
+ public int getCommercialRcvSize() {
+ return commercialRcvSize;
+ }
+
+ public void setCommercialRcvSize(final int commercialRcvSize) {
+ this.commercialRcvSize = commercialRcvSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageHook.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageHook.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageHook.java
new file mode 100644
index 0000000..4a74db3
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageHook.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.mqtrace;
+
+public interface ConsumeMessageHook {
+ String hookName();
+
+
+ void consumeMessageBefore(final ConsumeMessageContext context);
+
+
+ void consumeMessageAfter(final ConsumeMessageContext context);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageContext.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageContext.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageContext.java
new file mode 100644
index 0000000..5bd29cf
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageContext.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.mqtrace;
+
+import com.alibaba.rocketmq.common.message.MessageType;
+import com.alibaba.rocketmq.store.stats.BrokerStatsManager;
+
+import java.util.Properties;
+
+
+public class SendMessageContext {
+ private String producerGroup;
+ private String topic;
+ private String msgId;
+ private String originMsgId;
+ private Integer queueId;
+ private Long queueOffset;
+ private String brokerAddr;
+ private String bornHost;
+ private int bodyLength;
+ private int code;
+ private String errorMsg;
+ private String msgProps;
+ private Object mqTraceContext;
+ private Properties extProps;
+ private String brokerRegionId;
+ private String msgUniqueKey;
+ private long bornTimeStamp;
+ private MessageType msgType = MessageType.Trans_msg_Commit;
+ private boolean isSuccess = false;
+ //For Commercial
+ private String commercialOwner;
+ private BrokerStatsManager.StatsType commercialSendStats;
+ private int commercialSendSize;
+ private int commercialSendTimes;
+
+ public boolean isSuccess() {
+ return isSuccess;
+ }
+
+ public void setSuccess(final boolean success) {
+ isSuccess = success;
+ }
+
+ public MessageType getMsgType() {
+ return msgType;
+ }
+
+ public void setMsgType(final MessageType msgType) {
+ this.msgType = msgType;
+ }
+
+ public String getMsgUniqueKey() {
+ return msgUniqueKey;
+ }
+
+ public void setMsgUniqueKey(final String msgUniqueKey) {
+ this.msgUniqueKey = msgUniqueKey;
+ }
+
+ public long getBornTimeStamp() {
+ return bornTimeStamp;
+ }
+
+ public void setBornTimeStamp(final long bornTimeStamp) {
+ this.bornTimeStamp = bornTimeStamp;
+ }
+
+ public String getBrokerRegionId() {
+ return brokerRegionId;
+ }
+
+ public void setBrokerRegionId(final String brokerRegionId) {
+ this.brokerRegionId = brokerRegionId;
+ }
+
+ public String getProducerGroup() {
+ return producerGroup;
+ }
+
+
+ public void setProducerGroup(String producerGroup) {
+ this.producerGroup = producerGroup;
+ }
+
+
+ public String getTopic() {
+ return topic;
+ }
+
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+
+ public String getMsgId() {
+ return msgId;
+ }
+
+
+ public void setMsgId(String msgId) {
+ this.msgId = msgId;
+ }
+
+
+ public String getOriginMsgId() {
+ return originMsgId;
+ }
+
+
+ public void setOriginMsgId(String originMsgId) {
+ this.originMsgId = originMsgId;
+ }
+
+
+ public Integer getQueueId() {
+ return queueId;
+ }
+
+
+ public void setQueueId(Integer queueId) {
+ this.queueId = queueId;
+ }
+
+
+ public Long getQueueOffset() {
+ return queueOffset;
+ }
+
+
+ public void setQueueOffset(Long queueOffset) {
+ this.queueOffset = queueOffset;
+ }
+
+
+ public String getBrokerAddr() {
+ return brokerAddr;
+ }
+
+
+ public void setBrokerAddr(String brokerAddr) {
+ this.brokerAddr = brokerAddr;
+ }
+
+
+ public String getBornHost() {
+ return bornHost;
+ }
+
+
+ public void setBornHost(String bornHost) {
+ this.bornHost = bornHost;
+ }
+
+
+ public int getBodyLength() {
+ return bodyLength;
+ }
+
+
+ public void setBodyLength(int bodyLength) {
+ this.bodyLength = bodyLength;
+ }
+
+
+ public int getCode() {
+ return code;
+ }
+
+
+ public void setCode(int code) {
+ this.code = code;
+ }
+
+
+ public String getErrorMsg() {
+ return errorMsg;
+ }
+
+
+ public void setErrorMsg(String errorMsg) {
+ this.errorMsg = errorMsg;
+ }
+
+
+ public String getMsgProps() {
+ return msgProps;
+ }
+
+
+ public void setMsgProps(String msgProps) {
+ this.msgProps = msgProps;
+ }
+
+
+ public Object getMqTraceContext() {
+ return mqTraceContext;
+ }
+
+
+ public void setMqTraceContext(Object mqTraceContext) {
+ this.mqTraceContext = mqTraceContext;
+ }
+
+
+ public Properties getExtProps() {
+ return extProps;
+ }
+
+
+ public void setExtProps(Properties extProps) {
+ this.extProps = extProps;
+ }
+
+ public String getCommercialOwner() {
+ return commercialOwner;
+ }
+
+ public void setCommercialOwner(final String commercialOwner) {
+ this.commercialOwner = commercialOwner;
+ }
+
+ public BrokerStatsManager.StatsType getCommercialSendStats() {
+ return commercialSendStats;
+ }
+
+ public void setCommercialSendStats(final BrokerStatsManager.StatsType commercialSendStats) {
+ this.commercialSendStats = commercialSendStats;
+ }
+
+ public int getCommercialSendSize() {
+ return commercialSendSize;
+ }
+
+ public void setCommercialSendSize(final int commercialSendSize) {
+ this.commercialSendSize = commercialSendSize;
+ }
+
+ public int getCommercialSendTimes() {
+ return commercialSendTimes;
+ }
+
+ public void setCommercialSendTimes(final int commercialSendTimes) {
+ this.commercialSendTimes = commercialSendTimes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageHook.java
----------------------------------------------------------------------
diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageHook.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageHook.java
new file mode 100644
index 0000000..e079b9f
--- /dev/null
+++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageHook.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.broker.mqtrace;
+
+public interface SendMessageHook {
+ public String hookName();
+
+
+ public void sendMessageBefore(final SendMessageContext context);
+
+
+ public void sendMessageAfter(final SendMessageContext context);
+}