You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/02/08 02:44:00 UTC
[rocketmq] 17/17: [Assignment] Fix the risk of memory overflow caused by excessive popShareQueueNum.
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 5.0.0-alpha
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 0b4adb49b48b5addc0eafc4ef7412f9f72a9fbfa
Author: zhangyang21 <zh...@xiaomi.com>
AuthorDate: Mon Nov 22 18:31:13 2021 +0800
[Assignment] Fix the risk of memory overflow caused by excessive popShareQueueNum.
Signed-off-by: zhangyang21 <zh...@xiaomi.com>
---
.../broker/processor/QueryAssignmentProcessor.java | 65 +++++++++++--------
.../processor/QueryAssignmentProcessorTest.java | 75 ++++++++++++++++++++++
2 files changed, 113 insertions(+), 27 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java
index fdc320d..6fe9210 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java
@@ -208,33 +208,8 @@ public class QueryAssignmentProcessor implements NettyRequestProcessor {
}
if (setMessageRequestModeRequestBody != null && setMessageRequestModeRequestBody.getMode() == MessageRequestMode.POP) {
- if (setMessageRequestModeRequestBody.getPopShareQueueNum() <= 0) {
- //each client pop all messagequeue
- allocateResult = new ArrayList<>(mqAll.size());
- for (MessageQueue mq : mqAll) {
- //must create new MessageQueue in case of change cache in AssignmentManager
- MessageQueue newMq = new MessageQueue(mq.getTopic(), mq.getBrokerName(), -1);
- allocateResult.add(newMq);
- }
-
- } else {
- if (cidAll.size() <= mqAll.size()) {
- //consumer working in pop mode could share the MessageQueues assigned to the N (N = popWorkGroupSize) consumer following it in the cid list
- allocateResult = allocateMessageQueueStrategy.allocate(consumerGroup, clientId, mqAll, cidAll);
- int index = cidAll.indexOf(clientId);
- if (index >= 0) {
- for (int i = 1; i <= setMessageRequestModeRequestBody.getPopShareQueueNum(); i++) {
- index++;
- index = index % cidAll.size();
- List<MessageQueue> tmp = allocateMessageQueueStrategy.allocate(consumerGroup, cidAll.get(index), mqAll, cidAll);
- allocateResult.addAll(tmp);
- }
- }
- } else {
- //make sure each cid is assigned
- allocateResult = allocate(consumerGroup, clientId, mqAll, cidAll);
- }
- }
+ allocateResult = allocate4Pop(allocateMessageQueueStrategy, consumerGroup, clientId, mqAll,
+ cidAll, setMessageRequestModeRequestBody.getPopShareQueueNum());
} else {
allocateResult = allocateMessageQueueStrategy.allocate(consumerGroup, clientId, mqAll, cidAll);
@@ -256,6 +231,42 @@ public class QueryAssignmentProcessor implements NettyRequestProcessor {
return assignedQueueSet;
}
+ public List<MessageQueue> allocate4Pop(AllocateMessageQueueStrategy allocateMessageQueueStrategy,
+ final String consumerGroup, final String clientId, List<MessageQueue> mqAll, List<String> cidAll,
+ int popShareQueueNum) {
+
+ List<MessageQueue> allocateResult;
+ if (popShareQueueNum <= 0 || popShareQueueNum >= cidAll.size() - 1) {
+ //each client pop all messagequeue
+ allocateResult = new ArrayList<>(mqAll.size());
+ for (MessageQueue mq : mqAll) {
+ //must create new MessageQueue in case of change cache in AssignmentManager
+ MessageQueue newMq = new MessageQueue(mq.getTopic(), mq.getBrokerName(), -1);
+ allocateResult.add(newMq);
+ }
+
+ } else {
+ if (cidAll.size() <= mqAll.size()) {
+ //consumer working in pop mode could share the MessageQueues assigned to the N (N = popWorkGroupSize) consumer following it in the cid list
+ allocateResult = allocateMessageQueueStrategy.allocate(consumerGroup, clientId, mqAll, cidAll);
+ int index = cidAll.indexOf(clientId);
+ if (index >= 0) {
+ for (int i = 1; i <= popShareQueueNum; i++) {
+ index++;
+ index = index % cidAll.size();
+ List<MessageQueue> tmp = allocateMessageQueueStrategy.allocate(consumerGroup, cidAll.get(index), mqAll, cidAll);
+ allocateResult.addAll(tmp);
+ }
+ }
+ } else {
+ //make sure each cid is assigned
+ allocateResult = allocate(consumerGroup, clientId, mqAll, cidAll);
+ }
+ }
+
+ return allocateResult;
+ }
+
private List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (currentCID == null || currentCID.length() < 1) {
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessorTest.java
index 681fcc3..b16533b 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessorTest.java
@@ -19,9 +19,15 @@ package org.apache.rocketmq.broker.processor;
import com.google.common.collect.ImmutableSet;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.loadbalance.AssignmentManager;
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
+import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle;
+import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueConsistentHash;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
@@ -40,6 +46,7 @@ import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -67,6 +74,7 @@ public class QueryAssignmentProcessorTest {
@Mock
private Channel channel;
+ private String broker = "defaultBroker";
private String topic = "FooBar";
private String group = "FooBarGroup";
private String clientId = "127.0.0.1";
@@ -118,6 +126,73 @@ public class QueryAssignmentProcessorTest {
assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
}
+
+ @Test
+ public void testAllocate4Pop() {
+ testAllocate4Pop(new AllocateMessageQueueAveragely());
+ testAllocate4Pop(new AllocateMessageQueueAveragelyByCircle());
+ testAllocate4Pop(new AllocateMessageQueueConsistentHash());
+ }
+
+ private void testAllocate4Pop(AllocateMessageQueueStrategy strategy) {
+ int testNum = 16;
+ List<MessageQueue> mqAll = new ArrayList<>();
+ for (int mqSize = 0; mqSize < testNum; mqSize++) {
+ mqAll.add(new MessageQueue(topic, broker, mqSize));
+
+ List<String> cidAll = new ArrayList<>();
+ for (int cidSize = 0; cidSize < testNum; cidSize++) {
+ String clientId = String.valueOf(cidSize);
+ cidAll.add(clientId);
+
+ for (int popShareQueueNum = 0; popShareQueueNum < testNum; popShareQueueNum++) {
+ List<MessageQueue> allocateResult =
+ queryAssignmentProcessor.allocate4Pop(strategy, group, clientId, mqAll, cidAll, popShareQueueNum);
+ Assert.assertTrue(checkAllocateResult(popShareQueueNum, mqAll.size(), cidAll.size(), allocateResult.size(), strategy));
+ }
+ }
+ }
+ }
+
+ private boolean checkAllocateResult(int popShareQueueNum, int mqSize, int cidSize, int allocateSize,
+ AllocateMessageQueueStrategy strategy) {
+
+ //The maximum size of allocations will not exceed mqSize.
+ if (allocateSize > mqSize) {
+ return false;
+ }
+
+ //It is not allowed that the client is not assigned to the consumeQueue.
+ if (allocateSize <= 0) {
+ return false;
+ }
+
+ if (popShareQueueNum <= 0 || popShareQueueNum >= cidSize - 1) {
+ return allocateSize == mqSize;
+ } else if (mqSize < cidSize) {
+ return allocateSize == 1;
+ }
+
+ if (strategy instanceof AllocateMessageQueueAveragely
+ || strategy instanceof AllocateMessageQueueAveragelyByCircle) {
+
+ if (mqSize % cidSize == 0) {
+ return allocateSize == (mqSize / cidSize) * (popShareQueueNum + 1);
+ } else {
+ int avgSize = mqSize / cidSize;
+ return allocateSize >= avgSize * (popShareQueueNum + 1)
+ && allocateSize <= (avgSize + 1) * (popShareQueueNum + 1);
+ }
+ }
+
+ if (strategy instanceof AllocateMessageQueueConsistentHash) {
+ //Just skip
+ return true;
+ }
+
+ return false;
+ }
+
private RemotingCommand createQueryAssignmentRequest() {
QueryAssignmentRequestBody requestBody = new QueryAssignmentRequestBody();
requestBody.setTopic(topic);