You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 02:44:45 UTC
[49/58] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18
Rename package name from com.alibaba to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/client/ProducerManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/com/alibaba/rocketmq/broker/client/ProducerManager.java
deleted file mode 100644
index 74e7ea7..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/client/ProducerManager.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.client;
-
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-import com.alibaba.rocketmq.remoting.common.RemotingUtil;
-import io.netty.channel.Channel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-
-/**
- * @author shijia.wxr
- */
-public class ProducerManager {
- private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
- private static final long LOCK_TIMEOUT_MILLIS = 3000;
- private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
- private final Lock groupChannelLock = new ReentrantLock();
- private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable =
- new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
-
-
- public ProducerManager() {
- }
-
-
- public HashMap<String, HashMap<Channel, ClientChannelInfo>> getGroupChannelTable() {
- HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> newGroupChannelTable =
- new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
- try {
- if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
- try {
- newGroupChannelTable.putAll(groupChannelTable);
- } finally {
- groupChannelLock.unlock();
- }
- }
- } catch (InterruptedException e) {
- log.error("", e);
- }
- return newGroupChannelTable;
- }
-
-
- public void scanNotActiveChannel() {
- try {
- if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
- try {
- for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
- .entrySet()) {
- final String group = entry.getKey();
- final HashMap<Channel, ClientChannelInfo> chlMap = entry.getValue();
-
- Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator();
- while (it.hasNext()) {
- Entry<Channel, ClientChannelInfo> item = it.next();
- // final Integer id = item.getKey();
- final ClientChannelInfo info = item.getValue();
-
- long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp();
- if (diff > CHANNEL_EXPIRED_TIMEOUT) {
- it.remove();
- log.warn(
- "SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
- RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
- RemotingUtil.closeChannel(info.getChannel());
- }
- }
- }
- } finally {
- this.groupChannelLock.unlock();
- }
- } else {
- log.warn("ProducerManager scanNotActiveChannel lock timeout");
- }
- } catch (InterruptedException e) {
- log.error("", e);
- }
- }
-
-
- public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
- if (channel != null) {
- try {
- if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
- try {
- for (final Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
- .entrySet()) {
- final String group = entry.getKey();
- final HashMap<Channel, ClientChannelInfo> clientChannelInfoTable =
- entry.getValue();
- final ClientChannelInfo clientChannelInfo =
- clientChannelInfoTable.remove(channel);
- if (clientChannelInfo != null) {
- log.info(
- "NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",
- clientChannelInfo.toString(), remoteAddr, group);
- }
-
- }
- } finally {
- this.groupChannelLock.unlock();
- }
- } else {
- log.warn("ProducerManager doChannelCloseEvent lock timeout");
- }
- } catch (InterruptedException e) {
- log.error("", e);
- }
- }
- }
-
-
- public void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {
- try {
- ClientChannelInfo clientChannelInfoFound = null;
-
- if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
- try {
- HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
- if (null == channelTable) {
- channelTable = new HashMap<Channel, ClientChannelInfo>();
- this.groupChannelTable.put(group, channelTable);
- }
-
- clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
- if (null == clientChannelInfoFound) {
- channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
- log.info("new producer connected, group: {} channel: {}", group,
- clientChannelInfo.toString());
- }
- } finally {
- this.groupChannelLock.unlock();
- }
-
- if (clientChannelInfoFound != null) {
- clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
- }
- } else {
- log.warn("ProducerManager registerProducer lock timeout");
- }
- } catch (InterruptedException e) {
- log.error("", e);
- }
- }
-
-
- public void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {
- try {
- if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
- try {
- HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
- if (null != channelTable && !channelTable.isEmpty()) {
- ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
- if (old != null) {
- log.info("unregister a producer[{}] from groupChannelTable {}", group,
- clientChannelInfo.toString());
- }
-
- if (channelTable.isEmpty()) {
- this.groupChannelTable.remove(group);
- log.info("unregister a producer group[{}] from groupChannelTable", group);
- }
- }
- } finally {
- this.groupChannelLock.unlock();
- }
- } else {
- log.warn("ProducerManager unregisterProducer lock timeout");
- }
- } catch (InterruptedException e) {
- log.error("", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/client/net/Broker2Client.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/client/net/Broker2Client.java b/broker/src/main/java/com/alibaba/rocketmq/broker/client/net/Broker2Client.java
deleted file mode 100644
index a38c9cb..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/client/net/Broker2Client.java
+++ /dev/null
@@ -1,317 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.client.net;
-
-import com.alibaba.rocketmq.broker.BrokerController;
-import com.alibaba.rocketmq.broker.client.ClientChannelInfo;
-import com.alibaba.rocketmq.broker.client.ConsumerGroupInfo;
-import com.alibaba.rocketmq.broker.pagecache.OneMessageTransfer;
-import com.alibaba.rocketmq.common.MQVersion;
-import com.alibaba.rocketmq.common.TopicConfig;
-import com.alibaba.rocketmq.common.UtilAll;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import com.alibaba.rocketmq.common.message.MessageQueueForC;
-import com.alibaba.rocketmq.common.protocol.RequestCode;
-import com.alibaba.rocketmq.common.protocol.ResponseCode;
-import com.alibaba.rocketmq.common.protocol.body.GetConsumerStatusBody;
-import com.alibaba.rocketmq.common.protocol.body.ResetOffsetBody;
-import com.alibaba.rocketmq.common.protocol.body.ResetOffsetBodyForC;
-import com.alibaba.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
-import com.alibaba.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader;
-import com.alibaba.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
-import com.alibaba.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
-import com.alibaba.rocketmq.remoting.common.RemotingHelper;
-import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
-import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
-import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
-import com.alibaba.rocketmq.store.SelectMappedBufferResult;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.FileRegion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-/**
- * @author shijia.wxr
- */
-public class Broker2Client {
- private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
- private final BrokerController brokerController;
-
- public Broker2Client(BrokerController brokerController) {
- this.brokerController = brokerController;
- }
-
- public void checkProducerTransactionState(
- final Channel channel,
- final CheckTransactionStateRequestHeader requestHeader,
- final SelectMappedBufferResult selectMappedBufferResult) {
- RemotingCommand request =
- RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
- request.markOnewayRPC();
-
- try {
- FileRegion fileRegion =
- new OneMessageTransfer(request.encodeHeader(selectMappedBufferResult.getSize()),
- selectMappedBufferResult);
- channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- selectMappedBufferResult.release();
- if (!future.isSuccess()) {
- log.error("invokeProducer failed,", future.cause());
- }
- }
- });
- } catch (Throwable e) {
- log.error("invokeProducer exception", e);
- selectMappedBufferResult.release();
- }
- }
-
- public RemotingCommand callClient(final Channel channel,
- final RemotingCommand request
- ) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
- return this.brokerController.getRemotingServer().invokeSync(channel, request, 10000);
- }
-
- public void notifyConsumerIdsChanged(
- final Channel channel,
- final String consumerGroup) {
- if (null == consumerGroup) {
- log.error("notifyConsumerIdsChanged consumerGroup is null");
- return;
- }
-
- NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
- requestHeader.setConsumerGroup(consumerGroup);
- RemotingCommand request =
- RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
-
- try {
- this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
- } catch (Exception e) {
- log.error("notifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage());
- }
- }
-
- public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce) {
- return resetOffset(topic, group, timeStamp, isForce, false);
- }
-
- public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce,
- boolean isC) {
- final RemotingCommand response = RemotingCommand.createResponseCommand(null);
-
- TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
- if (null == topicConfig) {
- log.error("[reset-offset] reset offset failed, no topic in this broker. topic={}", topic);
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("[reset-offset] reset offset failed, no topic in this broker. topic=" + topic);
- return response;
- }
-
- Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();
-
- for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
- MessageQueue mq = new MessageQueue();
- mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
- mq.setTopic(topic);
- mq.setQueueId(i);
-
- long consumerOffset =
- this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i);
- if (-1 == consumerOffset) {
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark(String.format("THe consumer group <%s> not exist", group));
- return response;
- }
-
- long timeStampOffset;
- if (timeStamp == -1) {
-
- timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, i);
- } else {
- timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
- }
-
- if (timeStampOffset < 0) {
- log.warn("reset offset is invalid. topic={}, queueId={}, timeStampOffset={}", topic, i, timeStampOffset);
- timeStampOffset = 0;
- }
-
- if (isForce || timeStampOffset < consumerOffset) {
- offsetTable.put(mq, timeStampOffset);
- } else {
- offsetTable.put(mq, consumerOffset);
- }
- }
-
- ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
- requestHeader.setTopic(topic);
- requestHeader.setGroup(group);
- requestHeader.setTimestamp(timeStamp);
- RemotingCommand request =
- RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader);
- if (isC) {
- // c++ language
- ResetOffsetBodyForC body = new ResetOffsetBodyForC();
- List<MessageQueueForC> offsetList = convertOffsetTable2OffsetList(offsetTable);
- body.setOffsetTable(offsetList);
- request.setBody(body.encode());
- } else {
- // other language
- ResetOffsetBody body = new ResetOffsetBody();
- body.setOffsetTable(offsetTable);
- request.setBody(body.encode());
- }
-
- ConsumerGroupInfo consumerGroupInfo =
- this.brokerController.getConsumerManager().getConsumerGroupInfo(group);
-
- if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) {
- ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable =
- consumerGroupInfo.getChannelInfoTable();
- for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {
- int version = entry.getValue().getVersion();
- if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
- try {
- this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);
- log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}",
- new Object[]{topic, group, entry.getValue().getClientId()});
- } catch (Exception e) {
- log.error("[reset-offset] reset offset exception. topic={}, group={}",
- new Object[]{topic, group}, e);
- }
- } else {
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark("the client does not support this feature. version="
- + MQVersion.getVersionDesc(version));
- log.warn("[reset-offset] the client does not support this feature. version={}",
- RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
- return response;
- }
- }
- } else {
- String errorInfo =
- String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d",
- requestHeader.getGroup(),
- requestHeader.getTopic(),
- requestHeader.getTimestamp());
- log.error(errorInfo);
- response.setCode(ResponseCode.CONSUMER_NOT_ONLINE);
- response.setRemark(errorInfo);
- return response;
- }
- response.setCode(ResponseCode.SUCCESS);
- ResetOffsetBody resBody = new ResetOffsetBody();
- resBody.setOffsetTable(offsetTable);
- response.setBody(resBody.encode());
- return response;
- }
-
- private List<MessageQueueForC> convertOffsetTable2OffsetList(Map<MessageQueue, Long> table) {
- List<MessageQueueForC> list = new ArrayList<MessageQueueForC>();
- for (Entry<MessageQueue, Long> entry : table.entrySet()) {
- MessageQueue mq = entry.getKey();
- MessageQueueForC tmp =
- new MessageQueueForC(mq.getTopic(), mq.getBrokerName(), mq.getQueueId(), entry.getValue());
- list.add(tmp);
- }
- return list;
- }
-
- public RemotingCommand getConsumeStatus(String topic, String group, String originClientId) {
- final RemotingCommand result = RemotingCommand.createResponseCommand(null);
-
- GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader();
- requestHeader.setTopic(topic);
- requestHeader.setGroup(group);
- RemotingCommand request =
- RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT,
- requestHeader);
-
- Map<String, Map<MessageQueue, Long>> consumerStatusTable =
- new HashMap<String, Map<MessageQueue, Long>>();
- ConcurrentHashMap<Channel, ClientChannelInfo> channelInfoTable =
- this.brokerController.getConsumerManager().getConsumerGroupInfo(group).getChannelInfoTable();
- if (null == channelInfoTable || channelInfoTable.isEmpty()) {
- result.setCode(ResponseCode.SYSTEM_ERROR);
- result.setRemark(String.format("No Any Consumer online in the consumer group: [%s]", group));
- return result;
- }
-
- for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {
- int version = entry.getValue().getVersion();
- String clientId = entry.getValue().getClientId();
- if (version < MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
- result.setCode(ResponseCode.SYSTEM_ERROR);
- result.setRemark("the client does not support this feature. version="
- + MQVersion.getVersionDesc(version));
- log.warn("[get-consumer-status] the client does not support this feature. version={}",
- RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
- return result;
- } else if (UtilAll.isBlank(originClientId) || originClientId.equals(clientId)) {
- try {
- RemotingCommand response =
- this.brokerController.getRemotingServer().invokeSync(entry.getKey(), request, 5000);
- assert response != null;
- switch (response.getCode()) {
- case ResponseCode.SUCCESS: {
- if (response.getBody() != null) {
- GetConsumerStatusBody body =
- GetConsumerStatusBody.decode(response.getBody(),
- GetConsumerStatusBody.class);
-
- consumerStatusTable.put(clientId, body.getMessageQueueTable());
- log.info(
- "[get-consumer-status] get consumer status success. topic={}, group={}, channelRemoteAddr={}",
- new Object[]{topic, group, clientId});
- }
- }
- default:
- break;
- }
- } catch (Exception e) {
- log.error(
- "[get-consumer-status] get consumer status exception. topic={}, group={}, offset={}",
- new Object[]{topic, group}, e);
- }
-
- if (!UtilAll.isBlank(originClientId) && originClientId.equals(clientId)) {
- break;
- }
- }
- }
-
- result.setCode(ResponseCode.SUCCESS);
- GetConsumerStatusBody resBody = new GetConsumerStatusBody();
- resBody.setConsumerTable(consumerStatusTable);
- result.setBody(resBody.encode());
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/client/rebalance/RebalanceLockManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/client/rebalance/RebalanceLockManager.java b/broker/src/main/java/com/alibaba/rocketmq/broker/client/rebalance/RebalanceLockManager.java
deleted file mode 100644
index 84be628..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/client/rebalance/RebalanceLockManager.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.client.rebalance;
-
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.common.message.MessageQueue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-
-/**
- * @author shijia.wxr
- */
-public class RebalanceLockManager {
- private static final Logger log = LoggerFactory.getLogger(LoggerName.REBALANCE_LOCK_LOGGER_NAME);
- private final static long REBALANCE_LOCK_MAX_LIVE_TIME = Long.parseLong(System.getProperty(
- "rocketmq.broker.rebalance.lockMaxLiveTime", "60000"));
- private final Lock lock = new ReentrantLock();
- private final ConcurrentHashMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =
- new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024);
-
- public boolean tryLock(final String group, final MessageQueue mq, final String clientId) {
-
- if (!this.isLocked(group, mq, clientId)) {
- try {
- this.lock.lockInterruptibly();
- try {
- ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
- if (null == groupValue) {
- groupValue = new ConcurrentHashMap<MessageQueue, LockEntry>(32);
- this.mqLockTable.put(group, groupValue);
- }
-
- LockEntry lockEntry = groupValue.get(mq);
- if (null == lockEntry) {
- lockEntry = new LockEntry();
- lockEntry.setClientId(clientId);
- groupValue.put(mq, lockEntry);
- log.info("tryLock, message queue not locked, I got it. Group: {} NewClientId: {} {}", //
- group, //
- clientId, //
- mq);
- }
-
- if (lockEntry.isLocked(clientId)) {
- lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
- return true;
- }
-
- String oldClientId = lockEntry.getClientId();
-
-
- if (lockEntry.isExpired()) {
- lockEntry.setClientId(clientId);
- lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
- log.warn(
- "tryLock, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", //
- group, //
- oldClientId, //
- clientId, //
- mq);
- return true;
- }
-
-
- log.warn(
- "tryLock, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", //
- group, //
- oldClientId, //
- clientId, //
- mq);
- return false;
- } finally {
- this.lock.unlock();
- }
- } catch (InterruptedException e) {
- log.error("putMessage exception", e);
- }
- } else {
-
- }
-
- return true;
- }
-
- private boolean isLocked(final String group, final MessageQueue mq, final String clientId) {
- ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
- if (groupValue != null) {
- LockEntry lockEntry = groupValue.get(mq);
- if (lockEntry != null) {
- boolean locked = lockEntry.isLocked(clientId);
- if (locked) {
- lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
- }
-
- return locked;
- }
- }
-
- return false;
- }
-
- public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs,
- final String clientId) {
- Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size());
- Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size());
-
-
- for (MessageQueue mq : mqs) {
- if (this.isLocked(group, mq, clientId)) {
- lockedMqs.add(mq);
- } else {
- notLockedMqs.add(mq);
- }
- }
-
- if (!notLockedMqs.isEmpty()) {
- try {
- this.lock.lockInterruptibly();
- try {
- ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
- if (null == groupValue) {
- groupValue = new ConcurrentHashMap<MessageQueue, LockEntry>(32);
- this.mqLockTable.put(group, groupValue);
- }
-
-
- for (MessageQueue mq : notLockedMqs) {
- LockEntry lockEntry = groupValue.get(mq);
- if (null == lockEntry) {
- lockEntry = new LockEntry();
- lockEntry.setClientId(clientId);
- groupValue.put(mq, lockEntry);
- log.info(
- "tryLockBatch, message queue not locked, I got it. Group: {} NewClientId: {} {}", //
- group, //
- clientId, //
- mq);
- }
-
-
- if (lockEntry.isLocked(clientId)) {
- lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
- lockedMqs.add(mq);
- continue;
- }
-
- String oldClientId = lockEntry.getClientId();
-
-
- if (lockEntry.isExpired()) {
- lockEntry.setClientId(clientId);
- lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
- log.warn(
- "tryLockBatch, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", //
- group, //
- oldClientId, //
- clientId, //
- mq);
- lockedMqs.add(mq);
- continue;
- }
-
-
- log.warn(
- "tryLockBatch, message queue locked by other client. Group: {} OtherClientId: {} NewClientId: {} {}", //
- group, //
- oldClientId, //
- clientId, //
- mq);
- }
- } finally {
- this.lock.unlock();
- }
- } catch (InterruptedException e) {
- log.error("putMessage exception", e);
- }
- }
-
- return lockedMqs;
- }
-
- public void unlockBatch(final String group, final Set<MessageQueue> mqs, final String clientId) {
- try {
- this.lock.lockInterruptibly();
- try {
- ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
- if (null != groupValue) {
- for (MessageQueue mq : mqs) {
- LockEntry lockEntry = groupValue.get(mq);
- if (null != lockEntry) {
- if (lockEntry.getClientId().equals(clientId)) {
- groupValue.remove(mq);
- log.info("unlockBatch, Group: {} {} {}",
- group,
- mq,
- clientId);
- } else {
- log.warn("unlockBatch, but mq locked by other client: {}, Group: {} {} {}",
- lockEntry.getClientId(),
- group,
- mq,
- clientId);
- }
- } else {
- log.warn("unlockBatch, but mq not locked, Group: {} {} {}",
- group,
- mq,
- clientId);
- }
- }
- } else {
- log.warn("unlockBatch, group not exist, Group: {} {}",
- group,
- clientId);
- }
- } finally {
- this.lock.unlock();
- }
- } catch (InterruptedException e) {
- log.error("putMessage exception", e);
- }
- }
-
- static class LockEntry {
- private String clientId;
- private volatile long lastUpdateTimestamp = System.currentTimeMillis();
-
-
- public String getClientId() {
- return clientId;
- }
-
-
- public void setClientId(String clientId) {
- this.clientId = clientId;
- }
-
-
- public long getLastUpdateTimestamp() {
- return lastUpdateTimestamp;
- }
-
-
- public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
- this.lastUpdateTimestamp = lastUpdateTimestamp;
- }
-
- public boolean isLocked(final String clientId) {
- boolean eq = this.clientId.equals(clientId);
- return eq && !this.isExpired();
- }
-
- public boolean isExpired() {
- boolean expired =
- (System.currentTimeMillis() - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME;
-
- return expired;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerManager.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerManager.java b/broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerManager.java
deleted file mode 100644
index b2e7e82..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerManager.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.broker.filtersrv;
-
-import com.alibaba.rocketmq.broker.BrokerController;
-import com.alibaba.rocketmq.broker.BrokerStartup;
-import com.alibaba.rocketmq.common.ThreadFactoryImpl;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.remoting.common.RemotingUtil;
-import io.netty.channel.Channel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-
-public class FilterServerManager {
-
- public static final long FILTER_SERVER_MAX_IDLE_TIME_MILLS = 30000;
- private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
- private final ConcurrentHashMap<Channel, FilterServerInfo> filterServerTable =
- new ConcurrentHashMap<Channel, FilterServerInfo>(16);
- private final BrokerController brokerController;
-
- private ScheduledExecutorService scheduledExecutorService = Executors
- .newSingleThreadScheduledExecutor(new ThreadFactoryImpl("FilterServerManagerScheduledThread"));
-
- public FilterServerManager(final BrokerController brokerController) {
- this.brokerController = brokerController;
- }
-
- public void start() {
-
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- try {
- FilterServerManager.this.createFilterServer();
- } catch (Exception e) {
- log.error("", e);
- }
- }
- }, 1000 * 5, 1000 * 30, TimeUnit.MILLISECONDS);
- }
-
- public void createFilterServer() {
- int more =
- this.brokerController.getBrokerConfig().getFilterServerNums() - this.filterServerTable.size();
- String cmd = this.buildStartCommand();
- for (int i = 0; i < more; i++) {
- FilterServerUtil.callShell(cmd, log);
- }
- }
-
- private String buildStartCommand() {
- String config = "";
- if (BrokerStartup.configFile != null) {
- config = String.format("-c %s", BrokerStartup.configFile);
- }
-
- if (this.brokerController.getBrokerConfig().getNamesrvAddr() != null) {
- config += String.format(" -n %s", this.brokerController.getBrokerConfig().getNamesrvAddr());
- }
-
- if (RemotingUtil.isWindowsPlatform()) {
- return String.format("start /b %s\\bin\\mqfiltersrv.exe %s",
- this.brokerController.getBrokerConfig().getRocketmqHome(),
- config);
- } else {
- return String.format("sh %s/bin/startfsrv.sh %s",
- this.brokerController.getBrokerConfig().getRocketmqHome(),
- config);
- }
- }
-
- public void shutdown() {
- this.scheduledExecutorService.shutdown();
- }
-
- public void registerFilterServer(final Channel channel, final String filterServerAddr) {
- FilterServerInfo filterServerInfo = this.filterServerTable.get(channel);
- if (filterServerInfo != null) {
- filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis());
- } else {
- filterServerInfo = new FilterServerInfo();
- filterServerInfo.setFilterServerAddr(filterServerAddr);
- filterServerInfo.setLastUpdateTimestamp(System.currentTimeMillis());
- this.filterServerTable.put(channel, filterServerInfo);
- log.info("Receive a New Filter Server<{}>", filterServerAddr);
- }
- }
-
- /**
-
- */
- public void scanNotActiveChannel() {
-
- Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<Channel, FilterServerInfo> next = it.next();
- long timestamp = next.getValue().getLastUpdateTimestamp();
- Channel channel = next.getKey();
- if ((System.currentTimeMillis() - timestamp) > FILTER_SERVER_MAX_IDLE_TIME_MILLS) {
- log.info("The Filter Server<{}> expired, remove it", next.getKey());
- it.remove();
- RemotingUtil.closeChannel(channel);
- }
- }
- }
-
- public void doChannelCloseEvent(final String remoteAddr, final Channel channel) {
- FilterServerInfo old = this.filterServerTable.remove(channel);
- if (old != null) {
- log.warn("The Filter Server<{}> connection<{}> closed, remove it", old.getFilterServerAddr(),
- remoteAddr);
- }
- }
-
- public List<String> buildNewFilterServerList() {
- List<String> addr = new ArrayList<String>();
- Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator();
- while (it.hasNext()) {
- Entry<Channel, FilterServerInfo> next = it.next();
- addr.add(next.getValue().getFilterServerAddr());
- }
- return addr;
- }
-
- static class FilterServerInfo {
- private String filterServerAddr;
- private long lastUpdateTimestamp;
-
-
- public String getFilterServerAddr() {
- return filterServerAddr;
- }
-
-
- public void setFilterServerAddr(String filterServerAddr) {
- this.filterServerAddr = filterServerAddr;
- }
-
-
- public long getLastUpdateTimestamp() {
- return lastUpdateTimestamp;
- }
-
-
- public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
- this.lastUpdateTimestamp = lastUpdateTimestamp;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerUtil.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerUtil.java b/broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerUtil.java
deleted file mode 100644
index c5ace19..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/filtersrv/FilterServerUtil.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.broker.filtersrv;
-
-import org.slf4j.Logger;
-
-
-public class FilterServerUtil {
- public static void callShell(final String shellString, final Logger log) {
- Process process = null;
- try {
- String[] cmdArray = splitShellString(shellString);
- process = Runtime.getRuntime().exec(cmdArray);
- process.waitFor();
- log.info("callShell: <{}> OK", shellString);
- } catch (Throwable e) {
- log.error("callShell: readLine IOException, " + shellString, e);
- } finally {
- if (null != process)
- process.destroy();
- }
- }
-
- private static String[] splitShellString(final String shellString) {
- String[] split = shellString.split(" ");
- return split;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFastFailure.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFastFailure.java
deleted file mode 100644
index 586bed0..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFastFailure.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.latency;
-
-import com.alibaba.rocketmq.broker.BrokerController;
-import com.alibaba.rocketmq.common.ThreadFactoryImpl;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.remoting.netty.RequestTask;
-import com.alibaba.rocketmq.remoting.protocol.RemotingSysResponseCode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-
-/**
- * @author shijia.wxr
- */
-public class BrokerFastFailure {
- private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
- private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
- "BrokerFastFailureScheduledThread"));
- private final BrokerController brokerController;
-
- public BrokerFastFailure(final BrokerController brokerController) {
- this.brokerController = brokerController;
- }
-
- public void start() {
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- cleanExpiredRequest();
- }
- }, 1000, 10, TimeUnit.MILLISECONDS);
- }
-
- private void cleanExpiredRequest() {
- while (this.brokerController.getMessageStore().isOSPageCacheBusy()) {
- try {
- if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
- final Runnable runnable = this.brokerController.getSendThreadPoolQueue().poll(0, TimeUnit.SECONDS);
- if (null == runnable) {
- break;
- }
-
- final RequestTask rt = castRunnable(runnable);
- rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[PCBUSY_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", System.currentTimeMillis() - rt.getCreateTimestamp(), this.brokerController.getSendThreadPoolQueue().size()));
- } else {
- break;
- }
- } catch (Throwable e) {
- }
- }
-
- while (true) {
- try {
- if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
- final Runnable runnable = this.brokerController.getSendThreadPoolQueue().peek();
- if (null == runnable) {
- break;
- }
- final RequestTask rt = castRunnable(runnable);
- if (rt.isStopRun()) {
- break;
- }
-
- final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
- if (behind >= this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue()) {
- if (this.brokerController.getSendThreadPoolQueue().remove(runnable)) {
- rt.setStopRun(true);
- rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, this.brokerController.getSendThreadPoolQueue().size()));
- }
- } else {
- break;
- }
- } else {
- break;
- }
- } catch (Throwable e) {
- }
- }
- }
-
- public static RequestTask castRunnable(final Runnable runnable) {
- try {
- FutureTaskExt object = (FutureTaskExt) runnable;
- return (RequestTask) object.getRunnable();
- } catch (Throwable e) {
- log.error(String.format("castRunnable exception, %s", runnable.getClass().getName()), e);
- }
-
- return null;
- }
-
- public void shutdown() {
- this.scheduledExecutorService.shutdown();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java b/broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
deleted file mode 100644
index f81d48a..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/latency/BrokerFixedThreadPoolExecutor.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.broker.latency;
-
-import java.util.concurrent.*;
-
-/**
- * @author shijia.wxr
- */
-public class BrokerFixedThreadPoolExecutor extends ThreadPoolExecutor {
- public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
- }
-
- public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
- }
-
- public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final RejectedExecutionHandler handler) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
- }
-
- public BrokerFixedThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit, final BlockingQueue<Runnable> workQueue, final ThreadFactory threadFactory, final RejectedExecutionHandler handler) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
- }
-
- @Override
- protected <T> RunnableFuture<T> newTaskFor(final Runnable runnable, final T value) {
- return new FutureTaskExt<T>(runnable, value);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/latency/FutureTaskExt.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/latency/FutureTaskExt.java b/broker/src/main/java/com/alibaba/rocketmq/broker/latency/FutureTaskExt.java
deleted file mode 100644
index 6ec7bb5..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/latency/FutureTaskExt.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.broker.latency;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.FutureTask;
-
-/**
- * @author shijia.wxr
- */
-public class FutureTaskExt<V> extends FutureTask<V> {
- private final Runnable runnable;
-
- public FutureTaskExt(final Callable<V> callable) {
- super(callable);
- this.runnable = null;
- }
-
- public FutureTaskExt(final Runnable runnable, final V result) {
- super(runnable, result);
- this.runnable = runnable;
- }
-
- public Runnable getRunnable() {
- return runnable;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/ManyPullRequest.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/ManyPullRequest.java b/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/ManyPullRequest.java
deleted file mode 100644
index bc9c58d..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/ManyPullRequest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.longpolling;
-
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * @author shijia.wxr
- */
-public class ManyPullRequest {
- private final ArrayList<PullRequest> pullRequestList = new ArrayList<PullRequest>();
-
-
- public synchronized void addPullRequest(final PullRequest pullRequest) {
- this.pullRequestList.add(pullRequest);
- }
-
-
- public synchronized void addPullRequest(final List<PullRequest> many) {
- this.pullRequestList.addAll(many);
- }
-
-
- public synchronized List<PullRequest> cloneListAndClear() {
- if (!this.pullRequestList.isEmpty()) {
- List<PullRequest> result = (ArrayList<PullRequest>) this.pullRequestList.clone();
- this.pullRequestList.clear();
- return result;
- }
-
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java b/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
deleted file mode 100644
index 15ee050..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.rocketmq.broker.longpolling;
-
-import com.alibaba.rocketmq.store.MessageArrivingListener;
-
-
-public class NotifyMessageArrivingListener implements MessageArrivingListener {
- private final PullRequestHoldService pullRequestHoldService;
-
-
- public NotifyMessageArrivingListener(final PullRequestHoldService pullRequestHoldService) {
- this.pullRequestHoldService = pullRequestHoldService;
- }
-
-
- @Override
- public void arriving(String topic, int queueId, long logicOffset, long tagsCode) {
- this.pullRequestHoldService.notifyMessageArriving(topic, queueId, logicOffset, tagsCode);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequest.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequest.java b/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequest.java
deleted file mode 100644
index b4f1e11..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.longpolling;
-
-import com.alibaba.rocketmq.common.protocol.heartbeat.SubscriptionData;
-import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
-import io.netty.channel.Channel;
-
-
-/**
- * @author shijia.wxr
- */
-public class PullRequest {
- private final RemotingCommand requestCommand;
- private final Channel clientChannel;
- private final long timeoutMillis;
- private final long suspendTimestamp;
- private final long pullFromThisOffset;
- private final SubscriptionData subscriptionData;
-
-
- public PullRequest(RemotingCommand requestCommand, Channel clientChannel, long timeoutMillis, long suspendTimestamp,
- long pullFromThisOffset, SubscriptionData subscriptionData) {
- this.requestCommand = requestCommand;
- this.clientChannel = clientChannel;
- this.timeoutMillis = timeoutMillis;
- this.suspendTimestamp = suspendTimestamp;
- this.pullFromThisOffset = pullFromThisOffset;
- this.subscriptionData = subscriptionData;
- }
-
-
- public RemotingCommand getRequestCommand() {
- return requestCommand;
- }
-
-
- public Channel getClientChannel() {
- return clientChannel;
- }
-
-
- public long getTimeoutMillis() {
- return timeoutMillis;
- }
-
-
- public long getSuspendTimestamp() {
- return suspendTimestamp;
- }
-
-
- public long getPullFromThisOffset() {
- return pullFromThisOffset;
- }
-
- public SubscriptionData getSubscriptionData() {
- return subscriptionData;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequestHoldService.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequestHoldService.java b/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequestHoldService.java
deleted file mode 100644
index 888c5f2..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/longpolling/PullRequestHoldService.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.longpolling;
-
-import com.alibaba.rocketmq.broker.BrokerController;
-import com.alibaba.rocketmq.common.ServiceThread;
-import com.alibaba.rocketmq.common.SystemClock;
-import com.alibaba.rocketmq.common.constant.LoggerName;
-import com.alibaba.rocketmq.store.DefaultMessageFilter;
-import com.alibaba.rocketmq.store.MessageFilter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-/**
- * @author shijia.wxr
- */
-public class PullRequestHoldService extends ServiceThread {
- private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
- private static final String TOPIC_QUEUEID_SEPARATOR = "@";
- private final BrokerController brokerController;
- private final SystemClock systemClock = new SystemClock();
- private final MessageFilter messageFilter = new DefaultMessageFilter();
- private ConcurrentHashMap<String/* topic@queueId */, ManyPullRequest> pullRequestTable =
- new ConcurrentHashMap<String, ManyPullRequest>(1024);
-
-
- public PullRequestHoldService(final BrokerController brokerController) {
- this.brokerController = brokerController;
- }
-
- public void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
- String key = this.buildKey(topic, queueId);
- ManyPullRequest mpr = this.pullRequestTable.get(key);
- if (null == mpr) {
- mpr = new ManyPullRequest();
- ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
- if (prev != null) {
- mpr = prev;
- }
- }
-
- mpr.addPullRequest(pullRequest);
- }
-
- private String buildKey(final String topic, final int queueId) {
- StringBuilder sb = new StringBuilder();
- sb.append(topic);
- sb.append(TOPIC_QUEUEID_SEPARATOR);
- sb.append(queueId);
- return sb.toString();
- }
-
- @Override
- public void run() {
- log.info(this.getServiceName() + " service started");
- while (!this.isStopped()) {
- try {
- if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
- this.waitForRunning(5 * 1000);
- } else {
- this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
- }
-
- long beginLockTimestamp = this.systemClock.now();
- this.checkHoldRequest();
- long costTime = this.systemClock.now() - beginLockTimestamp;
- if (costTime > 5 * 1000) {
- log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
- }
- } catch (Throwable e) {
- log.warn(this.getServiceName() + " service has exception. ", e);
- }
- }
-
- log.info(this.getServiceName() + " service end");
- }
-
- @Override
- public String getServiceName() {
- return PullRequestHoldService.class.getSimpleName();
- }
-
- private void checkHoldRequest() {
- for (String key : this.pullRequestTable.keySet()) {
- String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR);
- if (kArray != null && 2 == kArray.length) {
- String topic = kArray[0];
- int queueId = Integer.parseInt(kArray[1]);
- final long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);
- try {
- this.notifyMessageArriving(topic, queueId, offset);
- } catch (Throwable e) {
- log.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);
- }
- }
- }
- }
-
- public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset) {
- notifyMessageArriving(topic, queueId, maxOffset, null);
- }
-
- public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode) {
- String key = this.buildKey(topic, queueId);
- ManyPullRequest mpr = this.pullRequestTable.get(key);
- if (mpr != null) {
- List<PullRequest> requestList = mpr.cloneListAndClear();
- if (requestList != null) {
- List<PullRequest> replayList = new ArrayList<PullRequest>();
-
- for (PullRequest request : requestList) {
- long newestOffset = maxOffset;
- if (newestOffset <= request.getPullFromThisOffset()) {
- newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);
- }
-
- Long tmp = tagsCode;
- if (newestOffset > request.getPullFromThisOffset()) {
- if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tmp)) {
- try {
- this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(),
- request.getRequestCommand());
- } catch (Throwable e) {
- log.error("execute request when wakeup failed.", e);
- }
- continue;
- }
- }
-
- if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
- try {
- this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(),
- request.getRequestCommand());
- } catch (Throwable e) {
- log.error("execute request when wakeup failed.", e);
- }
- continue;
- }
-
-
- replayList.add(request);
- }
-
- if (!replayList.isEmpty()) {
- mpr.addPullRequest(replayList);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageContext.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageContext.java b/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageContext.java
deleted file mode 100644
index b7f9c6e..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageContext.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.mqtrace;
-
-import com.alibaba.rocketmq.store.stats.BrokerStatsManager;
-
-import java.util.Map;
-
-
-public class ConsumeMessageContext {
- private String consumerGroup;
- private String topic;
- private Integer queueId;
- private String clientHost;
- private String storeHost;
- private Map<String, Long> messageIds;
- private int bodyLength;
- private boolean success;
- private String status;
- private Object mqTraceContext;
-
- private String commercialOwner;
- private BrokerStatsManager.StatsType commercialRcvStats;
- private int commercialRcvTimes;
- private int commercialRcvSize;
-
-
- public String getConsumerGroup() {
- return consumerGroup;
- }
-
-
- public void setConsumerGroup(String consumerGroup) {
- this.consumerGroup = consumerGroup;
- }
-
-
- public String getTopic() {
- return topic;
- }
-
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
-
- public Integer getQueueId() {
- return queueId;
- }
-
-
- public void setQueueId(Integer queueId) {
- this.queueId = queueId;
- }
-
-
- public String getClientHost() {
- return clientHost;
- }
-
-
- public void setClientHost(String clientHost) {
- this.clientHost = clientHost;
- }
-
-
- public String getStoreHost() {
- return storeHost;
- }
-
-
- public void setStoreHost(String storeHost) {
- this.storeHost = storeHost;
- }
-
-
- public Map<String, Long> getMessageIds() {
- return messageIds;
- }
-
-
- public void setMessageIds(Map<String, Long> messageIds) {
- this.messageIds = messageIds;
- }
-
-
- public boolean isSuccess() {
- return success;
- }
-
-
- public void setSuccess(boolean success) {
- this.success = success;
- }
-
-
- public String getStatus() {
- return status;
- }
-
-
- public void setStatus(String status) {
- this.status = status;
- }
-
-
- public Object getMqTraceContext() {
- return mqTraceContext;
- }
-
-
- public void setMqTraceContext(Object mqTraceContext) {
- this.mqTraceContext = mqTraceContext;
- }
-
-
- public int getBodyLength() {
- return bodyLength;
- }
-
-
- public void setBodyLength(int bodyLength) {
- this.bodyLength = bodyLength;
- }
-
- public String getCommercialOwner() {
- return commercialOwner;
- }
-
- public void setCommercialOwner(final String commercialOwner) {
- this.commercialOwner = commercialOwner;
- }
-
- public BrokerStatsManager.StatsType getCommercialRcvStats() {
- return commercialRcvStats;
- }
-
- public void setCommercialRcvStats(final BrokerStatsManager.StatsType commercialRcvStats) {
- this.commercialRcvStats = commercialRcvStats;
- }
-
- public int getCommercialRcvTimes() {
- return commercialRcvTimes;
- }
-
- public void setCommercialRcvTimes(final int commercialRcvTimes) {
- this.commercialRcvTimes = commercialRcvTimes;
- }
-
- public int getCommercialRcvSize() {
- return commercialRcvSize;
- }
-
- public void setCommercialRcvSize(final int commercialRcvSize) {
- this.commercialRcvSize = commercialRcvSize;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageHook.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageHook.java b/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageHook.java
deleted file mode 100644
index 4a74db3..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/ConsumeMessageHook.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.mqtrace;
-
-public interface ConsumeMessageHook {
- String hookName();
-
-
- void consumeMessageBefore(final ConsumeMessageContext context);
-
-
- void consumeMessageAfter(final ConsumeMessageContext context);
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageContext.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageContext.java b/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageContext.java
deleted file mode 100644
index 5bd29cf..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageContext.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.mqtrace;
-
-import com.alibaba.rocketmq.common.message.MessageType;
-import com.alibaba.rocketmq.store.stats.BrokerStatsManager;
-
-import java.util.Properties;
-
-
-public class SendMessageContext {
- private String producerGroup;
- private String topic;
- private String msgId;
- private String originMsgId;
- private Integer queueId;
- private Long queueOffset;
- private String brokerAddr;
- private String bornHost;
- private int bodyLength;
- private int code;
- private String errorMsg;
- private String msgProps;
- private Object mqTraceContext;
- private Properties extProps;
- private String brokerRegionId;
- private String msgUniqueKey;
- private long bornTimeStamp;
- private MessageType msgType = MessageType.Trans_msg_Commit;
- private boolean isSuccess = false;
- //For Commercial
- private String commercialOwner;
- private BrokerStatsManager.StatsType commercialSendStats;
- private int commercialSendSize;
- private int commercialSendTimes;
-
- public boolean isSuccess() {
- return isSuccess;
- }
-
- public void setSuccess(final boolean success) {
- isSuccess = success;
- }
-
- public MessageType getMsgType() {
- return msgType;
- }
-
- public void setMsgType(final MessageType msgType) {
- this.msgType = msgType;
- }
-
- public String getMsgUniqueKey() {
- return msgUniqueKey;
- }
-
- public void setMsgUniqueKey(final String msgUniqueKey) {
- this.msgUniqueKey = msgUniqueKey;
- }
-
- public long getBornTimeStamp() {
- return bornTimeStamp;
- }
-
- public void setBornTimeStamp(final long bornTimeStamp) {
- this.bornTimeStamp = bornTimeStamp;
- }
-
- public String getBrokerRegionId() {
- return brokerRegionId;
- }
-
- public void setBrokerRegionId(final String brokerRegionId) {
- this.brokerRegionId = brokerRegionId;
- }
-
- public String getProducerGroup() {
- return producerGroup;
- }
-
-
- public void setProducerGroup(String producerGroup) {
- this.producerGroup = producerGroup;
- }
-
-
- public String getTopic() {
- return topic;
- }
-
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
-
- public String getMsgId() {
- return msgId;
- }
-
-
- public void setMsgId(String msgId) {
- this.msgId = msgId;
- }
-
-
- public String getOriginMsgId() {
- return originMsgId;
- }
-
-
- public void setOriginMsgId(String originMsgId) {
- this.originMsgId = originMsgId;
- }
-
-
- public Integer getQueueId() {
- return queueId;
- }
-
-
- public void setQueueId(Integer queueId) {
- this.queueId = queueId;
- }
-
-
- public Long getQueueOffset() {
- return queueOffset;
- }
-
-
- public void setQueueOffset(Long queueOffset) {
- this.queueOffset = queueOffset;
- }
-
-
- public String getBrokerAddr() {
- return brokerAddr;
- }
-
-
- public void setBrokerAddr(String brokerAddr) {
- this.brokerAddr = brokerAddr;
- }
-
-
- public String getBornHost() {
- return bornHost;
- }
-
-
- public void setBornHost(String bornHost) {
- this.bornHost = bornHost;
- }
-
-
- public int getBodyLength() {
- return bodyLength;
- }
-
-
- public void setBodyLength(int bodyLength) {
- this.bodyLength = bodyLength;
- }
-
-
- public int getCode() {
- return code;
- }
-
-
- public void setCode(int code) {
- this.code = code;
- }
-
-
- public String getErrorMsg() {
- return errorMsg;
- }
-
-
- public void setErrorMsg(String errorMsg) {
- this.errorMsg = errorMsg;
- }
-
-
- public String getMsgProps() {
- return msgProps;
- }
-
-
- public void setMsgProps(String msgProps) {
- this.msgProps = msgProps;
- }
-
-
- public Object getMqTraceContext() {
- return mqTraceContext;
- }
-
-
- public void setMqTraceContext(Object mqTraceContext) {
- this.mqTraceContext = mqTraceContext;
- }
-
-
- public Properties getExtProps() {
- return extProps;
- }
-
-
- public void setExtProps(Properties extProps) {
- this.extProps = extProps;
- }
-
- public String getCommercialOwner() {
- return commercialOwner;
- }
-
- public void setCommercialOwner(final String commercialOwner) {
- this.commercialOwner = commercialOwner;
- }
-
- public BrokerStatsManager.StatsType getCommercialSendStats() {
- return commercialSendStats;
- }
-
- public void setCommercialSendStats(final BrokerStatsManager.StatsType commercialSendStats) {
- this.commercialSendStats = commercialSendStats;
- }
-
- public int getCommercialSendSize() {
- return commercialSendSize;
- }
-
- public void setCommercialSendSize(final int commercialSendSize) {
- this.commercialSendSize = commercialSendSize;
- }
-
- public int getCommercialSendTimes() {
- return commercialSendTimes;
- }
-
- public void setCommercialSendTimes(final int commercialSendTimes) {
- this.commercialSendTimes = commercialSendTimes;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageHook.java
----------------------------------------------------------------------
diff --git a/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageHook.java b/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageHook.java
deleted file mode 100644
index e079b9f..0000000
--- a/broker/src/main/java/com/alibaba/rocketmq/broker/mqtrace/SendMessageHook.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.broker.mqtrace;
-
-public interface SendMessageHook {
- public String hookName();
-
-
- public void sendMessageBefore(final SendMessageContext context);
-
-
- public void sendMessageAfter(final SendMessageContext context);
-}