You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/12/07 09:26:21 UTC
[rocketmq] branch develop updated: [ISSUSE #5589] record subscription from request (#5590)
This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 7cab0a1aa [ISSUSE #5589] record subscription from request (#5590)
7cab0a1aa is described below
commit 7cab0a1aadc6336e95e2152fab8d188d33a43b78
Author: SSpirits <ad...@lv5.moe>
AuthorDate: Wed Dec 7 17:26:00 2022 +0800
[ISSUSE #5589] record subscription from request (#5590)
---
.../apache/rocketmq/broker/BrokerController.java | 10 +--
.../rocketmq/broker/client/ConsumerGroupInfo.java | 4 ++
.../rocketmq/broker/client/ConsumerManager.java | 82 ++++++++++++++++++++--
.../broker/processor/AdminBrokerProcessor.java | 4 +-
.../broker/processor/PopMessageProcessor.java | 13 ++++
.../broker/processor/PullMessageProcessor.java | 18 ++++-
.../broker/client/ConsumerManagerScannerTest.java | 2 +-
.../broker/client/ConsumerManagerTest.java | 67 ++++++++++++++++--
.../org/apache/rocketmq/common/BrokerConfig.java | 19 +++++
.../apache/rocketmq/proxy/config/ProxyConfig.java | 12 +++-
.../proxy/service/ClusterServiceManager.java | 4 +-
.../rocketmq/remoting/protocol/RequestSource.java | 7 ++
.../remoting/protocol/heartbeat/ConsumeType.java | 4 +-
.../remoting/protocol/RequestSourceTest.java | 44 ++++++++++++
14 files changed, 264 insertions(+), 26 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 0a5df7cb0..b584e8769 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -315,7 +315,7 @@ public class BrokerController {
this.replyMessageProcessor = new ReplyMessageProcessor(this);
this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService, this.popMessageProcessor, this.notificationProcessor);
this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
- this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener, this.brokerStatsManager);
+ this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener, this.brokerStatsManager, this.brokerConfig);
this.producerManager = new ProducerManager(this.brokerStatsManager);
this.consumerFilterManager = new ConsumerFilterManager(this);
this.consumerOrderInfoManager = new ConsumerOrderInfoManager(this);
@@ -932,18 +932,18 @@ public class BrokerController {
LOG.info("The broker dose not enable acl");
return;
}
-
+
List<AccessValidator> accessValidators = ServiceProvider.load(AccessValidator.class);
if (accessValidators.isEmpty()) {
LOG.info("ServiceProvider loaded no AccessValidator, using default org.apache.rocketmq.acl.plain.PlainAccessValidator");
accessValidators.add(new PlainAccessValidator());
}
-
+
for (AccessValidator accessValidator : accessValidators) {
final AccessValidator validator = accessValidator;
accessValidatorMap.put(validator.getClass(), validator);
this.registerServerRPCHook(new RPCHook() {
-
+
@Override
public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
//Do not catch the exception
@@ -959,7 +959,7 @@ public class BrokerController {
}
private void initialRpcHooks() {
-
+
List<RPCHook> rpcHooks = ServiceProvider.load(RPCHook.class);
if (rpcHooks == null || rpcHooks.isEmpty()) {
return;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
index f75c369b6..867b9c720 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java
@@ -52,6 +52,10 @@ public class ConsumerGroupInfo {
this.consumeFromWhere = consumeFromWhere;
}
+ public ConsumerGroupInfo(String groupName) {
+ this.groupName = groupName;
+ }
+
public ClientChannelInfo findChannel(final String clientId) {
Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator();
while (it.hasNext()) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
index 1201037b6..5f95ac1af 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.broker.client;
import io.netty.channel.Channel;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -26,6 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
+import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -38,21 +40,28 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager;
public class ConsumerManager {
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
- private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
private final ConcurrentMap<String, ConsumerGroupInfo> consumerTable =
new ConcurrentHashMap<>(1024);
+ private final ConcurrentMap<String, ConsumerGroupInfo> consumerCompensationTable =
+ new ConcurrentHashMap<>(1024);
private final List<ConsumerIdsChangeListener> consumerIdsChangeListenerList = new CopyOnWriteArrayList<>();
protected final BrokerStatsManager brokerStatsManager;
+ private final long channelExpiredTimeout;
+ private final long subscriptionExpiredTimeout;
- public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener) {
+ public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener, long expiredTimeout) {
this.consumerIdsChangeListenerList.add(consumerIdsChangeListener);
this.brokerStatsManager = null;
+ this.channelExpiredTimeout = expiredTimeout;
+ this.subscriptionExpiredTimeout = expiredTimeout;
}
public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener,
- final BrokerStatsManager brokerStatsManager) {
+ final BrokerStatsManager brokerStatsManager, BrokerConfig brokerConfig) {
this.consumerIdsChangeListenerList.add(consumerIdsChangeListener);
this.brokerStatsManager = brokerStatsManager;
+ this.channelExpiredTimeout = brokerConfig.getChannelExpiredTimeout();
+ this.subscriptionExpiredTimeout = brokerConfig.getSubscriptionExpiredTimeout();
}
public ClientChannelInfo findChannel(final String group, final String clientId) {
@@ -72,11 +81,25 @@ public class ConsumerManager {
}
public SubscriptionData findSubscriptionData(final String group, final String topic) {
- ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);
+ return findSubscriptionData(group, topic, true);
+ }
+
+ public SubscriptionData findSubscriptionData(final String group, final String topic,
+ boolean fromCompensationTable) {
+ ConsumerGroupInfo consumerGroupInfo = getConsumerGroupInfo(group, false);
if (consumerGroupInfo != null) {
- return consumerGroupInfo.findSubscriptionData(topic);
+ SubscriptionData subscriptionData = consumerGroupInfo.findSubscriptionData(topic);
+ if (subscriptionData != null) {
+ return subscriptionData;
+ }
}
+ if (fromCompensationTable) {
+ ConsumerGroupInfo consumerGroupCompensationInfo = consumerCompensationTable.get(group);
+ if (consumerGroupCompensationInfo != null) {
+ return consumerGroupCompensationInfo.findSubscriptionData(topic);
+ }
+ }
return null;
}
@@ -85,7 +108,15 @@ public class ConsumerManager {
}
public ConsumerGroupInfo getConsumerGroupInfo(final String group) {
- return this.consumerTable.get(group);
+ return getConsumerGroupInfo(group, false);
+ }
+
+ public ConsumerGroupInfo getConsumerGroupInfo(String group, boolean fromCompensationTable) {
+ ConsumerGroupInfo consumerGroupInfo = consumerTable.get(group);
+ if (consumerGroupInfo == null && fromCompensationTable) {
+ consumerGroupInfo = consumerCompensationTable.get(group);
+ }
+ return consumerGroupInfo;
}
public int findSubscriptionDataCount(final String group) {
@@ -121,6 +152,19 @@ public class ConsumerManager {
return removed;
}
+ // compensate consumer info for consumer without heartbeat
+ public void compensateBasicConsumerInfo(String group, ConsumeType consumeType, MessageModel messageModel) {
+ ConsumerGroupInfo consumerGroupInfo = consumerCompensationTable.computeIfAbsent(group, ConsumerGroupInfo::new);
+ consumerGroupInfo.setConsumeType(consumeType);
+ consumerGroupInfo.setMessageModel(messageModel);
+ }
+
+ // compensate subscription for pull consumer and consumer via proxy
+ public void compensateSubscribeData(String group, String topic, SubscriptionData subscriptionData) {
+ ConsumerGroupInfo consumerGroupInfo = consumerCompensationTable.computeIfAbsent(group, ConsumerGroupInfo::new);
+ consumerGroupInfo.getSubscriptionTable().put(topic, subscriptionData);
+ }
+
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {
@@ -185,6 +229,29 @@ public class ConsumerManager {
}
}
+ public void removeExpireConsumerGroupInfo() {
+ List<String> removeList = new ArrayList<>();
+ consumerCompensationTable.forEach((group, consumerGroupInfo) -> {
+ List<String> removeTopicList = new ArrayList<>();
+ ConcurrentMap<String, SubscriptionData> subscriptionTable = consumerGroupInfo.getSubscriptionTable();
+ subscriptionTable.forEach((topic, subscriptionData) -> {
+ long diff = System.currentTimeMillis() - subscriptionData.getSubVersion();
+ if (diff > subscriptionExpiredTimeout) {
+ removeTopicList.add(topic);
+ }
+ });
+ for (String topic : removeTopicList) {
+ subscriptionTable.remove(topic);
+ if (subscriptionTable.isEmpty()) {
+ removeList.add(group);
+ }
+ }
+ });
+ for (String group : removeList) {
+ consumerCompensationTable.remove(group);
+ }
+ }
+
public void scanNotActiveChannel() {
Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
@@ -199,7 +266,7 @@ public class ConsumerManager {
Entry<Channel, ClientChannelInfo> nextChannel = itChannel.next();
ClientChannelInfo clientChannelInfo = nextChannel.getValue();
long diff = System.currentTimeMillis() - clientChannelInfo.getLastUpdateTimestamp();
- if (diff > CHANNEL_EXPIRED_TIMEOUT) {
+ if (diff > channelExpiredTimeout) {
LOGGER.warn(
"SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}",
RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()), group);
@@ -216,6 +283,7 @@ public class ConsumerManager {
it.remove();
}
}
+ removeExpireConsumerGroupInfo();
}
public HashSet<String> queryTopicConsumeByWho(final String topic) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index ad86ab34a..12eab475b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -1836,8 +1836,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
QuerySubscriptionByConsumerRequestHeader requestHeader =
(QuerySubscriptionByConsumerRequestHeader) request.decodeCommandCustomHeader(QuerySubscriptionByConsumerRequestHeader.class);
- SubscriptionData subscriptionData =
- this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getGroup(), requestHeader.getTopic());
+ SubscriptionData subscriptionData = this.brokerController.getConsumerManager()
+ .findSubscriptionData(requestHeader.getGroup(), requestHeader.getTopic());
QuerySubscriptionResponseBody responseBody = new QuerySubscriptionResponseBody();
responseBody.setGroup(requestHeader.getGroup());
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 393631e45..5bb81df5a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -70,6 +70,8 @@ import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PopMessageResponseHeader;
+import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
+import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.store.GetMessageResult;
@@ -262,6 +264,9 @@ public class PopMessageProcessor implements NettyRequestProcessor {
orderCountInfo = new StringBuilder(64);
}
+ brokerController.getConsumerManager().compensateBasicConsumerInfo(requestHeader.getConsumerGroup(),
+ ConsumeType.CONSUME_POP, MessageModel.CLUSTERING);
+
response.setOpaque(request.getOpaque());
if (brokerController.getBrokerConfig().isEnablePopLog()) {
@@ -333,6 +338,14 @@ public class PopMessageProcessor implements NettyRequestProcessor {
if (requestHeader.getExp() != null && requestHeader.getExp().length() > 0) {
try {
SubscriptionData subscriptionData = FilterAPI.build(requestHeader.getTopic(), requestHeader.getExp(), requestHeader.getExpType());
+ brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(),
+ requestHeader.getTopic(), subscriptionData);
+
+ String retryTopic = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup());
+ SubscriptionData retrySubscriptionData = FilterAPI.build(retryTopic, SubscriptionData.SUB_ALL, requestHeader.getExpType());
+ brokerController.getConsumerManager().compensateSubscribeData(requestHeader.getConsumerGroup(),
+ requestHeader.getTopic(), retrySubscriptionData);
+
ConsumerFilterData consumerFilterData = null;
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
consumerFilterData = ConsumerFilterManager.build(
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index e1294c129..562c15275 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -23,14 +23,15 @@ import java.util.Objects;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.broker.client.ConsumerManager;
import org.apache.rocketmq.broker.filter.ConsumerFilterData;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.broker.filter.ExpressionForRetryMessageFilter;
import org.apache.rocketmq.broker.filter.ExpressionMessageFilter;
-import org.apache.rocketmq.common.AbortProcessException;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.plugin.PullMessageResultHandler;
+import org.apache.rocketmq.common.AbortProcessException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -369,6 +370,19 @@ public class PullMessageProcessor implements NettyRequestProcessor {
return response;
}
+ ConsumerManager consumerManager = brokerController.getConsumerManager();
+ switch (RequestSource.parseInteger(requestHeader.getRequestSource())) {
+ case PROXY_FOR_BROADCAST:
+ consumerManager.compensateBasicConsumerInfo(requestHeader.getConsumerGroup(), ConsumeType.CONSUME_PASSIVELY, MessageModel.BROADCASTING);
+ break;
+ case PROXY_FOR_STREAM:
+ consumerManager.compensateBasicConsumerInfo(requestHeader.getConsumerGroup(), ConsumeType.CONSUME_ACTIVELY, MessageModel.CLUSTERING);
+ break;
+ default:
+ consumerManager.compensateBasicConsumerInfo(requestHeader.getConsumerGroup(), ConsumeType.CONSUME_PASSIVELY, MessageModel.CLUSTERING);
+ break;
+ }
+
SubscriptionData subscriptionData = null;
ConsumerFilterData consumerFilterData = null;
if (hasSubscriptionFlag) {
@@ -376,6 +390,8 @@ public class PullMessageProcessor implements NettyRequestProcessor {
subscriptionData = FilterAPI.build(
requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType()
);
+ consumerManager.compensateSubscribeData(requestHeader.getConsumerGroup(), requestHeader.getTopic(), subscriptionData);
+
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
consumerFilterData = ConsumerFilterManager.build(
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerScannerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerScannerTest.java
index 40059d579..d190c0dac 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerScannerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerScannerTest.java
@@ -68,7 +68,7 @@ public class ConsumerManagerScannerTest {
public void shutdown() {
}
- });
+ }, 1000 * 120);
}
private static class ConsumerIdsChangeListenerData {
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java
index 620be39ec..8c9098243 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java
@@ -63,7 +63,6 @@ public class ConsumerManagerTest {
private BrokerStatsManager brokerStatsManager;
-
private static final String GROUP = "DEFAULT_GROUP";
private static final String CLIENT_ID = "1";
@@ -77,16 +76,43 @@ public class ConsumerManagerTest {
clientChannelInfo = new ClientChannelInfo(channel, CLIENT_ID, LanguageCode.JAVA, VERSION);
defaultConsumerIdsChangeListener = new DefaultConsumerIdsChangeListener(brokerController);
brokerStatsManager = new BrokerStatsManager(brokerConfig);
- consumerManager = new ConsumerManager(defaultConsumerIdsChangeListener, brokerStatsManager);
+ consumerManager = new ConsumerManager(defaultConsumerIdsChangeListener, brokerStatsManager, brokerConfig);
broker2Client = new Broker2Client(brokerController);
when(brokerController.getConsumerFilterManager()).thenReturn(consumerFilterManager);
when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
when(brokerController.getBroker2Client()).thenReturn(broker2Client);
- register();
+ }
+
+ @Test
+ public void compensateBasicConsumerInfoTest() {
+ ConsumerGroupInfo consumerGroupInfo = consumerManager.getConsumerGroupInfo(GROUP, true);
+ Assertions.assertThat(consumerGroupInfo).isNull();
+
+ consumerManager.compensateBasicConsumerInfo(GROUP, ConsumeType.CONSUME_ACTIVELY, MessageModel.BROADCASTING);
+ consumerGroupInfo = consumerManager.getConsumerGroupInfo(GROUP, true);
+ Assertions.assertThat(consumerGroupInfo).isNotNull();
+ Assertions.assertThat(consumerGroupInfo.getConsumeType()).isEqualTo(ConsumeType.CONSUME_ACTIVELY);
+ Assertions.assertThat(consumerGroupInfo.getMessageModel()).isEqualTo(MessageModel.BROADCASTING);
+ }
+
+ @Test
+ public void compensateSubscribeDataTest() {
+ ConsumerGroupInfo consumerGroupInfo = consumerManager.getConsumerGroupInfo(GROUP, true);
+ Assertions.assertThat(consumerGroupInfo).isNull();
+
+ consumerManager.compensateSubscribeData(GROUP, TOPIC, new SubscriptionData(TOPIC, SubscriptionData.SUB_ALL));
+ consumerGroupInfo = consumerManager.getConsumerGroupInfo(GROUP, true);
+ Assertions.assertThat(consumerGroupInfo).isNotNull();
+ Assertions.assertThat(consumerGroupInfo.getSubscriptionTable().size()).isEqualTo(1);
+ SubscriptionData subscriptionData = consumerGroupInfo.getSubscriptionTable().get(TOPIC);
+ Assertions.assertThat(subscriptionData).isNotNull();
+ Assertions.assertThat(subscriptionData.getTopic()).isEqualTo(TOPIC);
+ Assertions.assertThat(subscriptionData.getSubString()).isEqualTo(SubscriptionData.SUB_ALL);
}
@Test
public void registerConsumerTest() {
+ register();
final Set<SubscriptionData> subList = new HashSet<>();
SubscriptionData subscriptionData = new SubscriptionData(TOPIC, "*");
subList.add(subscriptionData);
@@ -107,32 +133,50 @@ public class ConsumerManagerTest {
@Test
public void findChannelTest() {
-
+ register();
final ClientChannelInfo consumerManagerChannel = consumerManager.findChannel(GROUP, CLIENT_ID);
Assertions.assertThat(consumerManagerChannel).isNotNull();
}
@Test
public void findSubscriptionDataTest() {
+ register();
final SubscriptionData subscriptionData = consumerManager.findSubscriptionData(GROUP, TOPIC);
Assertions.assertThat(subscriptionData).isNotNull();
}
@Test
public void findSubscriptionDataCountTest() {
+ register();
final int count = consumerManager.findSubscriptionDataCount(GROUP);
assert count > 0;
}
+ @Test
+ public void findSubscriptionTest() {
+ SubscriptionData subscriptionData = consumerManager.findSubscriptionData(GROUP, TOPIC, true);
+ Assertions.assertThat(subscriptionData).isNull();
+
+ consumerManager.compensateSubscribeData(GROUP, TOPIC, new SubscriptionData(TOPIC, SubscriptionData.SUB_ALL));
+ subscriptionData = consumerManager.findSubscriptionData(GROUP, TOPIC, true);
+ Assertions.assertThat(subscriptionData).isNotNull();
+ Assertions.assertThat(subscriptionData.getTopic()).isEqualTo(TOPIC);
+ Assertions.assertThat(subscriptionData.getSubString()).isEqualTo(SubscriptionData.SUB_ALL);
+
+ subscriptionData = consumerManager.findSubscriptionData(GROUP, TOPIC, false);
+ Assertions.assertThat(subscriptionData).isNull();
+ }
+
@Test
public void scanNotActiveChannelTest() {
- clientChannelInfo.setLastUpdateTimestamp(System.currentTimeMillis() - 1000 * 200);
+ clientChannelInfo.setLastUpdateTimestamp(System.currentTimeMillis() - brokerConfig.getChannelExpiredTimeout() * 2);
consumerManager.scanNotActiveChannel();
- assert consumerManager.getConsumerTable().size() == 0;
+ Assertions.assertThat(consumerManager.getConsumerTable().size()).isEqualTo(0);
}
@Test
public void queryTopicConsumeByWhoTest() {
+ register();
final HashSet<String> consumeGroup = consumerManager.queryTopicConsumeByWho(TOPIC);
assert consumeGroup.size() > 0;
}
@@ -152,4 +196,15 @@ public class ConsumerManagerTest {
MessageModel.BROADCASTING, ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET, subList, true);
}
+ @Test
+ public void removeExpireConsumerGroupInfo() {
+ SubscriptionData subscriptionData = new SubscriptionData(TOPIC, SubscriptionData.SUB_ALL);
+ subscriptionData.setSubVersion(System.currentTimeMillis() - brokerConfig.getSubscriptionExpiredTimeout() * 2);
+ consumerManager.compensateSubscribeData(GROUP, TOPIC, subscriptionData);
+ consumerManager.compensateSubscribeData(GROUP, TOPIC + "_1", new SubscriptionData(TOPIC, SubscriptionData.SUB_ALL));
+ consumerManager.removeExpireConsumerGroupInfo();
+ Assertions.assertThat(consumerManager.getConsumerGroupInfo(GROUP, true)).isNotNull();
+ Assertions.assertThat(consumerManager.findSubscriptionData(GROUP, TOPIC)).isNull();
+ Assertions.assertThat(consumerManager.findSubscriptionData(GROUP, TOPIC + "_1")).isNotNull();
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 81531e3f1..1dee9101b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -387,6 +387,9 @@ public class BrokerConfig extends BrokerIdentity {
private boolean metricsInDelta = false;
+ private long channelExpiredTimeout = 1000 * 120;
+ private long subscriptionExpiredTimeout = 1000 * 60 * 10;
+
/**
* Estimate accumulation or not when subscription filter type is tag and is not SUB_ALL.
*/
@@ -1592,6 +1595,22 @@ public class BrokerConfig extends BrokerIdentity {
this.transactionOpBatchInterval = transactionOpBatchInterval;
}
+ public long getChannelExpiredTimeout() {
+ return channelExpiredTimeout;
+ }
+
+ public void setChannelExpiredTimeout(long channelExpiredTimeout) {
+ this.channelExpiredTimeout = channelExpiredTimeout;
+ }
+
+ public long getSubscriptionExpiredTimeout() {
+ return subscriptionExpiredTimeout;
+ }
+
+ public void setSubscriptionExpiredTimeout(long subscriptionExpiredTimeout) {
+ this.subscriptionExpiredTimeout = subscriptionExpiredTimeout;
+ }
+
public boolean isValidateSystemTopicWhenUpdateTopic() {
return validateSystemTopicWhenUpdateTopic;
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index d01458de0..6bb488984 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -31,9 +31,9 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.proxy.ProxyMode;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.proxy.ProxyMode;
public class ProxyConfig implements ConfigFile {
private final static Logger log = LoggerFactory.getLogger(LoggerName.PROXY_LOGGER_NAME);
@@ -193,6 +193,8 @@ public class ProxyConfig implements ConfigFile {
private boolean metricsInDelta = false;
+ private long channelExpiredTimeout = 1000 * 120;
+
@Override
public void initData() {
parseDelayLevel();
@@ -1038,4 +1040,12 @@ public class ProxyConfig implements ConfigFile {
public void setMetricsInDelta(boolean metricsInDelta) {
this.metricsInDelta = metricsInDelta;
}
+
+ public long getChannelExpiredTimeout() {
+ return channelExpiredTimeout;
+ }
+
+ public void setChannelExpiredTimeout(long channelExpiredTimeout) {
+ this.channelExpiredTimeout = channelExpiredTimeout;
+ }
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
index c68f77401..ac1ff6a88 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/ClusterServiceManager.java
@@ -64,11 +64,11 @@ public class ClusterServiceManager extends AbstractStartAndShutdown implements S
protected MQClientAPIFactory transactionClientAPIFactory;
public ClusterServiceManager(RPCHook rpcHook) {
+ ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
this.scheduledExecutorService = Executors.newScheduledThreadPool(3);
this.producerManager = new ProducerManager();
- this.consumerManager = new ConsumerManager(new ConsumerIdsChangeListenerImpl());
+ this.consumerManager = new ConsumerManager(new ConsumerIdsChangeListenerImpl(), proxyConfig.getChannelExpiredTimeout());
- ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
this.messagingClientAPIFactory = new MQClientAPIFactory(
"ClusterMQClient_",
proxyConfig.getRocketmqMQClientNum(),
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestSource.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestSource.java
index 26c3ab402..5d8116013 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestSource.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestSource.java
@@ -37,4 +37,11 @@ public enum RequestSource {
public static boolean isValid(Integer value) {
return null != value && value >= -1 && value < RequestSource.values().length - 1;
}
+
+ public static RequestSource parseInteger(Integer value) {
+ if (isValid(value)) {
+ return RequestSource.values()[value + 1];
+ }
+ return SDK;
+ }
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/heartbeat/ConsumeType.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/heartbeat/ConsumeType.java
index 10f8da527..fbcca5d5e 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/heartbeat/ConsumeType.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/heartbeat/ConsumeType.java
@@ -24,7 +24,9 @@ public enum ConsumeType {
CONSUME_ACTIVELY("PULL"),
- CONSUME_PASSIVELY("PUSH");
+ CONSUME_PASSIVELY("PUSH"),
+
+ CONSUME_POP("POP");
private String typeCN;
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RequestSourceTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RequestSourceTest.java
new file mode 100644
index 000000000..b2ed1e341
--- /dev/null
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/RequestSourceTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.protocol;
+
+import junit.framework.TestCase;
+
+public class RequestSourceTest extends TestCase {
+
+ public void testIsValid() {
+ assertEquals(4, RequestSource.values().length);
+
+ assertTrue(RequestSource.isValid(-1));
+ assertTrue(RequestSource.isValid(0));
+ assertTrue(RequestSource.isValid(1));
+ assertTrue(RequestSource.isValid(2));
+
+ assertFalse(RequestSource.isValid(-2));
+ assertFalse(RequestSource.isValid(3));
+ }
+
+ public void testParseInteger() {
+ assertEquals(RequestSource.SDK, RequestSource.parseInteger(-1));
+ assertEquals(RequestSource.PROXY_FOR_ORDER, RequestSource.parseInteger(0));
+ assertEquals(RequestSource.PROXY_FOR_BROADCAST, RequestSource.parseInteger(1));
+ assertEquals(RequestSource.PROXY_FOR_STREAM, RequestSource.parseInteger(2));
+
+ assertEquals(RequestSource.SDK, RequestSource.parseInteger(-10));
+ assertEquals(RequestSource.SDK, RequestSource.parseInteger(10));
+ }
+}