You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/02/08 02:44:00 UTC

[rocketmq] 17/17: [Assignment] Fix the risk of memory overflow caused by excessive popShareQueueNum.

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch 5.0.0-alpha
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 0b4adb49b48b5addc0eafc4ef7412f9f72a9fbfa
Author: zhangyang21 <zh...@xiaomi.com>
AuthorDate: Mon Nov 22 18:31:13 2021 +0800

    [Assignment] Fix the risk of memory overflow caused by excessive popShareQueueNum.
    
    Signed-off-by: zhangyang21 <zh...@xiaomi.com>
---
 .../broker/processor/QueryAssignmentProcessor.java | 65 +++++++++++--------
 .../processor/QueryAssignmentProcessorTest.java    | 75 ++++++++++++++++++++++
 2 files changed, 113 insertions(+), 27 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java
index fdc320d..6fe9210 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java
@@ -208,33 +208,8 @@ public class QueryAssignmentProcessor implements NettyRequestProcessor {
                     }
 
                     if (setMessageRequestModeRequestBody != null && setMessageRequestModeRequestBody.getMode() == MessageRequestMode.POP) {
-                        if (setMessageRequestModeRequestBody.getPopShareQueueNum() <= 0) {
-                            //each client pop all messagequeue
-                            allocateResult = new ArrayList<>(mqAll.size());
-                            for (MessageQueue mq : mqAll) {
-                                //must create new MessageQueue in case of change cache in AssignmentManager
-                                MessageQueue newMq = new MessageQueue(mq.getTopic(), mq.getBrokerName(), -1);
-                                allocateResult.add(newMq);
-                            }
-
-                        } else {
-                            if (cidAll.size() <= mqAll.size()) {
-                                //consumer working in pop mode could share the MessageQueues assigned to the N (N = popWorkGroupSize) consumer following it in the cid list
-                                allocateResult = allocateMessageQueueStrategy.allocate(consumerGroup, clientId, mqAll, cidAll);
-                                int index = cidAll.indexOf(clientId);
-                                if (index >= 0) {
-                                    for (int i = 1; i <= setMessageRequestModeRequestBody.getPopShareQueueNum(); i++) {
-                                        index++;
-                                        index = index % cidAll.size();
-                                        List<MessageQueue> tmp = allocateMessageQueueStrategy.allocate(consumerGroup, cidAll.get(index), mqAll, cidAll);
-                                        allocateResult.addAll(tmp);
-                                    }
-                                }
-                            } else {
-                                //make sure each cid is assigned
-                                allocateResult = allocate(consumerGroup, clientId, mqAll, cidAll);
-                            }
-                        }
+                        allocateResult = allocate4Pop(allocateMessageQueueStrategy, consumerGroup, clientId, mqAll,
+                            cidAll, setMessageRequestModeRequestBody.getPopShareQueueNum());
 
                     } else {
                         allocateResult = allocateMessageQueueStrategy.allocate(consumerGroup, clientId, mqAll, cidAll);
@@ -256,6 +231,42 @@ public class QueryAssignmentProcessor implements NettyRequestProcessor {
         return assignedQueueSet;
     }
 
+    public List<MessageQueue> allocate4Pop(AllocateMessageQueueStrategy allocateMessageQueueStrategy,
+        final String consumerGroup, final String clientId, List<MessageQueue> mqAll, List<String> cidAll,
+        int popShareQueueNum) {
+
+        List<MessageQueue> allocateResult;
+        if (popShareQueueNum <= 0 || popShareQueueNum >= cidAll.size() - 1) {
+            //each client pop all messagequeue
+            allocateResult = new ArrayList<>(mqAll.size());
+            for (MessageQueue mq : mqAll) {
+                //must create new MessageQueue in case of change cache in AssignmentManager
+                MessageQueue newMq = new MessageQueue(mq.getTopic(), mq.getBrokerName(), -1);
+                allocateResult.add(newMq);
+            }
+
+        } else {
+            if (cidAll.size() <= mqAll.size()) {
+                //consumer working in pop mode could share the MessageQueues assigned to the N (N = popWorkGroupSize) consumer following it in the cid list
+                allocateResult = allocateMessageQueueStrategy.allocate(consumerGroup, clientId, mqAll, cidAll);
+                int index = cidAll.indexOf(clientId);
+                if (index >= 0) {
+                    for (int i = 1; i <= popShareQueueNum; i++) {
+                        index++;
+                        index = index % cidAll.size();
+                        List<MessageQueue> tmp = allocateMessageQueueStrategy.allocate(consumerGroup, cidAll.get(index), mqAll, cidAll);
+                        allocateResult.addAll(tmp);
+                    }
+                }
+            } else {
+                //make sure each cid is assigned
+                allocateResult = allocate(consumerGroup, clientId, mqAll, cidAll);
+            }
+        }
+
+        return allocateResult;
+    }
+
     private List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
         List<String> cidAll) {
         if (currentCID == null || currentCID.length() < 1) {
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessorTest.java
index 681fcc3..b16533b 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessorTest.java
@@ -19,9 +19,15 @@ package org.apache.rocketmq.broker.processor;
 import com.google.common.collect.ImmutableSet;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.loadbalance.AssignmentManager;
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
+import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle;
+import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueConsistentHash;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
@@ -40,6 +46,7 @@ import org.apache.rocketmq.remoting.protocol.LanguageCode;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -67,6 +74,7 @@ public class QueryAssignmentProcessorTest {
     @Mock
     private Channel channel;
 
+    private String broker = "defaultBroker";
     private String topic = "FooBar";
     private String group = "FooBarGroup";
     private String clientId = "127.0.0.1";
@@ -118,6 +126,73 @@ public class QueryAssignmentProcessorTest {
         assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.NO_PERMISSION);
     }
 
+
+    @Test
+    public void testAllocate4Pop() {
+        testAllocate4Pop(new AllocateMessageQueueAveragely());
+        testAllocate4Pop(new AllocateMessageQueueAveragelyByCircle());
+        testAllocate4Pop(new AllocateMessageQueueConsistentHash());
+    }
+
+    private void testAllocate4Pop(AllocateMessageQueueStrategy strategy) {
+        int testNum = 16;
+        List<MessageQueue> mqAll = new ArrayList<>();
+        for (int mqSize = 0; mqSize < testNum; mqSize++) {
+            mqAll.add(new MessageQueue(topic, broker, mqSize));
+
+            List<String> cidAll = new ArrayList<>();
+            for (int cidSize = 0; cidSize < testNum; cidSize++) {
+                String clientId = String.valueOf(cidSize);
+                cidAll.add(clientId);
+
+                for (int popShareQueueNum = 0; popShareQueueNum < testNum; popShareQueueNum++) {
+                    List<MessageQueue> allocateResult =
+                        queryAssignmentProcessor.allocate4Pop(strategy, group, clientId, mqAll, cidAll, popShareQueueNum);
+                    Assert.assertTrue(checkAllocateResult(popShareQueueNum, mqAll.size(), cidAll.size(), allocateResult.size(), strategy));
+                }
+            }
+        }
+    }
+
+    private boolean checkAllocateResult(int popShareQueueNum, int mqSize, int cidSize, int allocateSize,
+        AllocateMessageQueueStrategy strategy) {
+
+        //The maximum size of allocations will not exceed mqSize.
+        if (allocateSize > mqSize) {
+            return false;
+        }
+
+        //It is not allowed that the client is not assigned to the consumeQueue.
+        if (allocateSize <= 0) {
+            return false;
+        }
+
+        if (popShareQueueNum <= 0 || popShareQueueNum >= cidSize - 1) {
+            return allocateSize == mqSize;
+        } else if (mqSize < cidSize) {
+            return allocateSize == 1;
+        }
+
+        if (strategy instanceof AllocateMessageQueueAveragely
+            || strategy instanceof AllocateMessageQueueAveragelyByCircle) {
+
+            if (mqSize % cidSize == 0) {
+                return allocateSize == (mqSize / cidSize) * (popShareQueueNum + 1);
+            } else {
+                int avgSize = mqSize / cidSize;
+                return allocateSize >= avgSize * (popShareQueueNum + 1)
+                    && allocateSize <= (avgSize + 1) * (popShareQueueNum + 1);
+            }
+        }
+
+        if (strategy instanceof AllocateMessageQueueConsistentHash) {
+            //Just skip
+            return true;
+        }
+
+        return false;
+    }
+
     private RemotingCommand createQueryAssignmentRequest() {
         QueryAssignmentRequestBody requestBody = new QueryAssignmentRequestBody();
         requestBody.setTopic(topic);