You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/04/06 14:29:02 UTC

[rocketmq] branch develop updated: [ISSUE #3940]Optimize AllocateMessageQueueStrategy (#3941)

This is an automated email from the ASF dual-hosted git repository.

yuzhou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 13765b040 [ISSUE #3940]Optimize AllocateMessageQueueStrategy (#3941)
13765b040 is described below

commit 13765b040a69f81bde5d66e98c1104efde86819a
Author: zhangjidi2016 <10...@qq.com>
AuthorDate: Wed Apr 6 22:28:49 2022 +0800

    [ISSUE #3940]Optimize AllocateMessageQueueStrategy (#3941)
    
    * [ISSUE #3940]Optimize AllocateMessageQueueStrategy
    
    * remove the check in AllocateMessageQueueByConfig
    
    Co-authored-by: zhangjidi <zh...@cmss.chinamobile.com>
---
 ...a => AbstractAllocateMessageQueueStrategy.java} | 37 ++++------
 .../rebalance/AllocateMachineRoomNearby.java       | 20 +-----
 .../rebalance/AllocateMessageQueueAveragely.java   | 21 +-----
 .../AllocateMessageQueueAveragelyByCircle.java     | 21 +-----
 .../rebalance/AllocateMessageQueueByConfig.java    |  3 +-
 .../AllocateMessageQueueByMachineRoom.java         | 18 ++---
 .../AllocateMessageQueueConsistentHash.java        | 22 +-----
 .../AllocateMessageQueueByConfigTest.java          | 65 +++++++++++++++++
 .../AllocateMessageQueueByMachineRoomTest.java     | 82 ++++++++++++++++++++++
 9 files changed, 173 insertions(+), 116 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AbstractAllocateMessageQueueStrategy.java
similarity index 66%
copy from client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
copy to client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AbstractAllocateMessageQueueStrategy.java
index fe78f0a6b..22ba5060c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AbstractAllocateMessageQueueStrategy.java
@@ -14,54 +14,41 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.rocketmq.client.consumer.rebalance;
 
-import java.util.ArrayList;
 import java.util.List;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
 import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.InternalLogger;
+
+public abstract class AbstractAllocateMessageQueueStrategy implements AllocateMessageQueueStrategy {
 
-/**
- * Cycle average Hashing queue algorithm
- */
-public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {
     private final InternalLogger log = ClientLogger.getLog();
 
-    @Override
-    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
+    public boolean check(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
         List<String> cidAll) {
-        if (currentCID == null || currentCID.length() < 1) {
+        if (StringUtils.isEmpty(currentCID)) {
             throw new IllegalArgumentException("currentCID is empty");
         }
-        if (mqAll == null || mqAll.isEmpty()) {
+        if (CollectionUtils.isEmpty(mqAll)) {
             throw new IllegalArgumentException("mqAll is null or mqAll empty");
         }
-        if (cidAll == null || cidAll.isEmpty()) {
+        if (CollectionUtils.isEmpty(cidAll)) {
             throw new IllegalArgumentException("cidAll is null or cidAll empty");
         }
 
-        List<MessageQueue> result = new ArrayList<MessageQueue>();
         if (!cidAll.contains(currentCID)) {
             log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
                 consumerGroup,
                 currentCID,
                 cidAll);
-            return result;
-        }
-
-        int index = cidAll.indexOf(currentCID);
-        for (int i = index; i < mqAll.size(); i++) {
-            if (i % cidAll.size() == index) {
-                result.add(mqAll.get(i));
-            }
+            return false;
         }
-        return result;
-    }
 
-    @Override
-    public String getName() {
-        return "AVG_BY_CIRCLE";
+        return true;
     }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java
index 8e9267459..415a5fa51 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java
@@ -23,9 +23,7 @@ import java.util.Map.Entry;
 import java.util.TreeMap;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.logging.InternalLogger;
 
 /**
  * An allocate strategy proxy for based on machine room nearside priority. An actual allocate strategy can be
@@ -35,8 +33,7 @@ import org.apache.rocketmq.logging.InternalLogger;
  * should only be allocated to those. Otherwise, those message queues can be shared along all consumers since there are
  * no alive consumer to monopolize them.
  */
-public class AllocateMachineRoomNearby implements AllocateMessageQueueStrategy {
-    private final InternalLogger log = ClientLogger.getLog();
+public class AllocateMachineRoomNearby extends AbstractAllocateMessageQueueStrategy {
 
     private final AllocateMessageQueueStrategy allocateMessageQueueStrategy;//actual allocate strategy
     private final MachineRoomResolver machineRoomResolver;
@@ -58,22 +55,9 @@ public class AllocateMachineRoomNearby implements AllocateMessageQueueStrategy {
     @Override
     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
         List<String> cidAll) {
-        if (currentCID == null || currentCID.length() < 1) {
-            throw new IllegalArgumentException("currentCID is empty");
-        }
-        if (mqAll == null || mqAll.isEmpty()) {
-            throw new IllegalArgumentException("mqAll is null or mqAll empty");
-        }
-        if (cidAll == null || cidAll.isEmpty()) {
-            throw new IllegalArgumentException("cidAll is null or cidAll empty");
-        }
 
         List<MessageQueue> result = new ArrayList<MessageQueue>();
-        if (!cidAll.contains(currentCID)) {
-            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
-                consumerGroup,
-                currentCID,
-                cidAll);
+        if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
             return result;
         }
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java
index 155e692ad..895f27757 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java
@@ -18,36 +18,19 @@ package org.apache.rocketmq.client.consumer.rebalance;
 
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.MessageQueue;
 
 /**
  * Average Hashing queue algorithm
  */
-public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
-    private final InternalLogger log = ClientLogger.getLog();
+public class AllocateMessageQueueAveragely extends AbstractAllocateMessageQueueStrategy {
 
     @Override
     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
         List<String> cidAll) {
-        if (currentCID == null || currentCID.length() < 1) {
-            throw new IllegalArgumentException("currentCID is empty");
-        }
-        if (mqAll == null || mqAll.isEmpty()) {
-            throw new IllegalArgumentException("mqAll is null or mqAll empty");
-        }
-        if (cidAll == null || cidAll.isEmpty()) {
-            throw new IllegalArgumentException("cidAll is null or cidAll empty");
-        }
 
         List<MessageQueue> result = new ArrayList<MessageQueue>();
-        if (!cidAll.contains(currentCID)) {
-            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
-                consumerGroup,
-                currentCID,
-                cidAll);
+        if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
             return result;
         }
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
index fe78f0a6b..d23350074 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
@@ -18,36 +18,19 @@ package org.apache.rocketmq.client.consumer.rebalance;
 
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.MessageQueue;
 
 /**
  * Cycle average Hashing queue algorithm
  */
-public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {
-    private final InternalLogger log = ClientLogger.getLog();
+public class AllocateMessageQueueAveragelyByCircle extends AbstractAllocateMessageQueueStrategy {
 
     @Override
     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
         List<String> cidAll) {
-        if (currentCID == null || currentCID.length() < 1) {
-            throw new IllegalArgumentException("currentCID is empty");
-        }
-        if (mqAll == null || mqAll.isEmpty()) {
-            throw new IllegalArgumentException("mqAll is null or mqAll empty");
-        }
-        if (cidAll == null || cidAll.isEmpty()) {
-            throw new IllegalArgumentException("cidAll is null or cidAll empty");
-        }
 
         List<MessageQueue> result = new ArrayList<MessageQueue>();
-        if (!cidAll.contains(currentCID)) {
-            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
-                consumerGroup,
-                currentCID,
-                cidAll);
+        if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
             return result;
         }
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
index e548803d0..5866e95dd 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
@@ -17,10 +17,9 @@
 package org.apache.rocketmq.client.consumer.rebalance;
 
 import java.util.List;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
 import org.apache.rocketmq.common.message.MessageQueue;
 
-public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy {
+public class AllocateMessageQueueByConfig extends AbstractAllocateMessageQueueStrategy {
     private List<MessageQueue> messageQueueList;
 
     @Override
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java
index 42a0be1dd..289242f8d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java
@@ -19,30 +19,22 @@ package org.apache.rocketmq.client.consumer.rebalance;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
 import org.apache.rocketmq.common.message.MessageQueue;
 
 /**
  * Computer room Hashing queue algorithm, such as Alipay logic room
  */
-public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy {
+public class AllocateMessageQueueByMachineRoom extends AbstractAllocateMessageQueueStrategy {
     private Set<String> consumeridcs;
 
     @Override
     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
         List<String> cidAll) {
-        if (StringUtils.isBlank(currentCID)) {
-            throw new IllegalArgumentException("currentCID is empty");
-        }
-        if (CollectionUtils.isEmpty(mqAll)) {
-            throw new IllegalArgumentException("mqAll is null or mqAll empty");
-        }
-        if (CollectionUtils.isEmpty(cidAll)) {
-            throw new IllegalArgumentException("cidAll is null or cidAll empty");
-        }
+
         List<MessageQueue> result = new ArrayList<MessageQueue>();
+        if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
+            return result;
+        }
         int currentIndex = cidAll.indexOf(currentCID);
         if (currentIndex < 0) {
             return result;
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
index 65dcf7992..7dededa76 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
@@ -19,19 +19,15 @@ package org.apache.rocketmq.client.consumer.rebalance;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.common.consistenthash.ConsistentHashRouter;
 import org.apache.rocketmq.common.consistenthash.HashFunction;
 import org.apache.rocketmq.common.consistenthash.Node;
-import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.MessageQueue;
 
 /**
  * Consistent Hashing queue algorithm
  */
-public class AllocateMessageQueueConsistentHash implements AllocateMessageQueueStrategy {
-    private final InternalLogger log = ClientLogger.getLog();
+public class AllocateMessageQueueConsistentHash extends AbstractAllocateMessageQueueStrategy {
 
     private final int virtualNodeCnt;
     private final HashFunction customHashFunction;
@@ -56,22 +52,8 @@ public class AllocateMessageQueueConsistentHash implements AllocateMessageQueueS
     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
         List<String> cidAll) {
 
-        if (currentCID == null || currentCID.length() < 1) {
-            throw new IllegalArgumentException("currentCID is empty");
-        }
-        if (mqAll == null || mqAll.isEmpty()) {
-            throw new IllegalArgumentException("mqAll is null or mqAll empty");
-        }
-        if (cidAll == null || cidAll.isEmpty()) {
-            throw new IllegalArgumentException("cidAll is null or cidAll empty");
-        }
-
         List<MessageQueue> result = new ArrayList<MessageQueue>();
-        if (!cidAll.contains(currentCID)) {
-            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
-                consumerGroup,
-                currentCID,
-                cidAll);
+        if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
             return result;
         }
 
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfigTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfigTest.java
new file mode 100644
index 000000000..a1c925c3a
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfigTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.rebalance;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import junit.framework.TestCase;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.Assert;
+
+public class AllocateMessageQueueByConfigTest extends TestCase {
+
+    public void testAllocateMessageQueueByConfig() {
+        List<String> consumerIdList = createConsumerIdList(2);
+        List<MessageQueue> messageQueueList = createMessageQueueList(4);
+        AllocateMessageQueueByConfig allocateStrategy = new AllocateMessageQueueByConfig();
+        allocateStrategy.setMessageQueueList(messageQueueList);
+
+        Map<String, int[]> consumerAllocateQueue = new HashMap<String, int[]>(consumerIdList.size());
+        for (String consumerId : consumerIdList) {
+            List<MessageQueue> queues = allocateStrategy.allocate("", consumerId, messageQueueList, consumerIdList);
+            int[] queueIds = new int[queues.size()];
+            for (int i = 0; i < queues.size(); i++) {
+                queueIds[i] = queues.get(i).getQueueId();
+            }
+            consumerAllocateQueue.put(consumerId, queueIds);
+        }
+        Assert.assertArrayEquals(new int[] {0, 1, 2, 3}, consumerAllocateQueue.get("CID_PREFIX0"));
+        Assert.assertArrayEquals(new int[] {0, 1, 2, 3}, consumerAllocateQueue.get("CID_PREFIX1"));
+    }
+
+    private List<String> createConsumerIdList(int size) {
+        List<String> consumerIdList = new ArrayList<String>(size);
+        for (int i = 0; i < size; i++) {
+            consumerIdList.add("CID_PREFIX" + i);
+        }
+        return consumerIdList;
+    }
+
+    private List<MessageQueue> createMessageQueueList(int size) {
+        List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(size);
+        for (int i = 0; i < size; i++) {
+            MessageQueue mq = new MessageQueue("topic", "brokerName", i);
+            messageQueueList.add(mq);
+        }
+        return messageQueueList;
+    }
+}
+
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoomTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoomTest.java
new file mode 100644
index 000000000..7b6dc6d7d
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoomTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.rebalance;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import junit.framework.TestCase;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.Assert;
+
+public class AllocateMessageQueueByMachineRoomTest extends TestCase {
+
+    public void testAllocateMessageQueueByMachineRoom() {
+        List<String> consumerIdList = createConsumerIdList(2);
+        List<MessageQueue> messageQueueList = createMessageQueueList(10);
+        Set<String> consumeridcs = new HashSet<String>();
+        consumeridcs.add("room1");
+        AllocateMessageQueueByMachineRoom allocateStrategy = new AllocateMessageQueueByMachineRoom();
+        allocateStrategy.setConsumeridcs(consumeridcs);
+
+        // mqAll is null or mqAll empty
+        try {
+            allocateStrategy.allocate("", consumerIdList.get(0), new ArrayList<MessageQueue>(), consumerIdList);
+        } catch (Exception e) {
+            assert e instanceof IllegalArgumentException;
+            Assert.assertEquals("mqAll is null or mqAll empty", e.getMessage());
+        }
+
+        Map<String, int[]> consumerAllocateQueue = new HashMap<String, int[]>(consumerIdList.size());
+        for (String consumerId : consumerIdList) {
+            List<MessageQueue> queues = allocateStrategy.allocate("", consumerId, messageQueueList, consumerIdList);
+            int[] queueIds = new int[queues.size()];
+            for (int i = 0; i < queues.size(); i++) {
+                queueIds[i] = queues.get(i).getQueueId();
+            }
+            consumerAllocateQueue.put(consumerId, queueIds);
+        }
+        Assert.assertArrayEquals(new int[] {0, 1, 4}, consumerAllocateQueue.get("CID_PREFIX0"));
+        Assert.assertArrayEquals(new int[] {2, 3}, consumerAllocateQueue.get("CID_PREFIX1"));
+    }
+
+    private List<String> createConsumerIdList(int size) {
+        List<String> consumerIdList = new ArrayList<String>(size);
+        for (int i = 0; i < size; i++) {
+            consumerIdList.add("CID_PREFIX" + i);
+        }
+        return consumerIdList;
+    }
+
+    private List<MessageQueue> createMessageQueueList(int size) {
+        List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(size);
+        for (int i = 0; i < size; i++) {
+            MessageQueue mq;
+            if (i < size / 2) {
+                mq = new MessageQueue("topic", "room1@broker-a", i);
+            } else {
+                mq = new MessageQueue("topic", "room2@broker-b", i);
+            }
+            messageQueueList.add(mq);
+        }
+        return messageQueueList;
+    }
+}
+