You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2022/04/06 14:29:02 UTC
[rocketmq] branch develop updated: [ISSUE #3940]Optimize AllocateMessageQueueStrategy (#3941)
This is an automated email from the ASF dual-hosted git repository.
yuzhou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 13765b040 [ISSUE #3940]Optimize AllocateMessageQueueStrategy (#3941)
13765b040 is described below
commit 13765b040a69f81bde5d66e98c1104efde86819a
Author: zhangjidi2016 <10...@qq.com>
AuthorDate: Wed Apr 6 22:28:49 2022 +0800
[ISSUE #3940]Optimize AllocateMessageQueueStrategy (#3941)
* [ISSUE #3940]Optimize AllocateMessageQueueStrategy
* remove the check in AllocateMessageQueueByConfig
Co-authored-by: zhangjidi <zh...@cmss.chinamobile.com>
---
...a => AbstractAllocateMessageQueueStrategy.java} | 37 ++++------
.../rebalance/AllocateMachineRoomNearby.java | 20 +-----
.../rebalance/AllocateMessageQueueAveragely.java | 21 +-----
.../AllocateMessageQueueAveragelyByCircle.java | 21 +-----
.../rebalance/AllocateMessageQueueByConfig.java | 3 +-
.../AllocateMessageQueueByMachineRoom.java | 18 ++---
.../AllocateMessageQueueConsistentHash.java | 22 +-----
.../AllocateMessageQueueByConfigTest.java | 65 +++++++++++++++++
.../AllocateMessageQueueByMachineRoomTest.java | 82 ++++++++++++++++++++++
9 files changed, 173 insertions(+), 116 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AbstractAllocateMessageQueueStrategy.java
similarity index 66%
copy from client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
copy to client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AbstractAllocateMessageQueueStrategy.java
index fe78f0a6b..22ba5060c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AbstractAllocateMessageQueueStrategy.java
@@ -14,54 +14,41 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.rocketmq.client.consumer.rebalance;
-import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.InternalLogger;
+
+public abstract class AbstractAllocateMessageQueueStrategy implements AllocateMessageQueueStrategy {
-/**
- * Cycle average Hashing queue algorithm
- */
-public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {
private final InternalLogger log = ClientLogger.getLog();
- @Override
- public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
+ public boolean check(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
- if (currentCID == null || currentCID.length() < 1) {
+ if (StringUtils.isEmpty(currentCID)) {
throw new IllegalArgumentException("currentCID is empty");
}
- if (mqAll == null || mqAll.isEmpty()) {
+ if (CollectionUtils.isEmpty(mqAll)) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
- if (cidAll == null || cidAll.isEmpty()) {
+ if (CollectionUtils.isEmpty(cidAll)) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
- List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
- return result;
- }
-
- int index = cidAll.indexOf(currentCID);
- for (int i = index; i < mqAll.size(); i++) {
- if (i % cidAll.size() == index) {
- result.add(mqAll.get(i));
- }
+ return false;
}
- return result;
- }
- @Override
- public String getName() {
- return "AVG_BY_CIRCLE";
+ return true;
}
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java
index 8e9267459..415a5fa51 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java
@@ -23,9 +23,7 @@ import java.util.Map.Entry;
import java.util.TreeMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.logging.InternalLogger;
/**
* An allocate strategy proxy for based on machine room nearside priority. An actual allocate strategy can be
@@ -35,8 +33,7 @@ import org.apache.rocketmq.logging.InternalLogger;
* should only be allocated to those. Otherwise, those message queues can be shared along all consumers since there are
* no alive consumer to monopolize them.
*/
-public class AllocateMachineRoomNearby implements AllocateMessageQueueStrategy {
- private final InternalLogger log = ClientLogger.getLog();
+public class AllocateMachineRoomNearby extends AbstractAllocateMessageQueueStrategy {
private final AllocateMessageQueueStrategy allocateMessageQueueStrategy;//actual allocate strategy
private final MachineRoomResolver machineRoomResolver;
@@ -58,22 +55,9 @@ public class AllocateMachineRoomNearby implements AllocateMessageQueueStrategy {
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
- if (currentCID == null || currentCID.length() < 1) {
- throw new IllegalArgumentException("currentCID is empty");
- }
- if (mqAll == null || mqAll.isEmpty()) {
- throw new IllegalArgumentException("mqAll is null or mqAll empty");
- }
- if (cidAll == null || cidAll.isEmpty()) {
- throw new IllegalArgumentException("cidAll is null or cidAll empty");
- }
List<MessageQueue> result = new ArrayList<MessageQueue>();
- if (!cidAll.contains(currentCID)) {
- log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
- consumerGroup,
- currentCID,
- cidAll);
+ if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
return result;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java
index 155e692ad..895f27757 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java
@@ -18,36 +18,19 @@ package org.apache.rocketmq.client.consumer.rebalance;
import java.util.ArrayList;
import java.util.List;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageQueue;
/**
* Average Hashing queue algorithm
*/
-public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
- private final InternalLogger log = ClientLogger.getLog();
+public class AllocateMessageQueueAveragely extends AbstractAllocateMessageQueueStrategy {
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
- if (currentCID == null || currentCID.length() < 1) {
- throw new IllegalArgumentException("currentCID is empty");
- }
- if (mqAll == null || mqAll.isEmpty()) {
- throw new IllegalArgumentException("mqAll is null or mqAll empty");
- }
- if (cidAll == null || cidAll.isEmpty()) {
- throw new IllegalArgumentException("cidAll is null or cidAll empty");
- }
List<MessageQueue> result = new ArrayList<MessageQueue>();
- if (!cidAll.contains(currentCID)) {
- log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
- consumerGroup,
- currentCID,
- cidAll);
+ if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
return result;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
index fe78f0a6b..d23350074 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
@@ -18,36 +18,19 @@ package org.apache.rocketmq.client.consumer.rebalance;
import java.util.ArrayList;
import java.util.List;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageQueue;
/**
* Cycle average Hashing queue algorithm
*/
-public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {
- private final InternalLogger log = ClientLogger.getLog();
+public class AllocateMessageQueueAveragelyByCircle extends AbstractAllocateMessageQueueStrategy {
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
- if (currentCID == null || currentCID.length() < 1) {
- throw new IllegalArgumentException("currentCID is empty");
- }
- if (mqAll == null || mqAll.isEmpty()) {
- throw new IllegalArgumentException("mqAll is null or mqAll empty");
- }
- if (cidAll == null || cidAll.isEmpty()) {
- throw new IllegalArgumentException("cidAll is null or cidAll empty");
- }
List<MessageQueue> result = new ArrayList<MessageQueue>();
- if (!cidAll.contains(currentCID)) {
- log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
- consumerGroup,
- currentCID,
- cidAll);
+ if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
return result;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
index e548803d0..5866e95dd 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
@@ -17,10 +17,9 @@
package org.apache.rocketmq.client.consumer.rebalance;
import java.util.List;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.message.MessageQueue;
-public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy {
+public class AllocateMessageQueueByConfig extends AbstractAllocateMessageQueueStrategy {
private List<MessageQueue> messageQueueList;
@Override
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java
index 42a0be1dd..289242f8d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java
@@ -19,30 +19,22 @@ package org.apache.rocketmq.client.consumer.rebalance;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.message.MessageQueue;
/**
* Computer room Hashing queue algorithm, such as Alipay logic room
*/
-public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy {
+public class AllocateMessageQueueByMachineRoom extends AbstractAllocateMessageQueueStrategy {
private Set<String> consumeridcs;
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
- if (StringUtils.isBlank(currentCID)) {
- throw new IllegalArgumentException("currentCID is empty");
- }
- if (CollectionUtils.isEmpty(mqAll)) {
- throw new IllegalArgumentException("mqAll is null or mqAll empty");
- }
- if (CollectionUtils.isEmpty(cidAll)) {
- throw new IllegalArgumentException("cidAll is null or cidAll empty");
- }
+
List<MessageQueue> result = new ArrayList<MessageQueue>();
+ if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
+ return result;
+ }
int currentIndex = cidAll.indexOf(currentCID);
if (currentIndex < 0) {
return result;
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
index 65dcf7992..7dededa76 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
@@ -19,19 +19,15 @@ package org.apache.rocketmq.client.consumer.rebalance;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.consistenthash.ConsistentHashRouter;
import org.apache.rocketmq.common.consistenthash.HashFunction;
import org.apache.rocketmq.common.consistenthash.Node;
-import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageQueue;
/**
* Consistent Hashing queue algorithm
*/
-public class AllocateMessageQueueConsistentHash implements AllocateMessageQueueStrategy {
- private final InternalLogger log = ClientLogger.getLog();
+public class AllocateMessageQueueConsistentHash extends AbstractAllocateMessageQueueStrategy {
private final int virtualNodeCnt;
private final HashFunction customHashFunction;
@@ -56,22 +52,8 @@ public class AllocateMessageQueueConsistentHash implements AllocateMessageQueueS
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
- if (currentCID == null || currentCID.length() < 1) {
- throw new IllegalArgumentException("currentCID is empty");
- }
- if (mqAll == null || mqAll.isEmpty()) {
- throw new IllegalArgumentException("mqAll is null or mqAll empty");
- }
- if (cidAll == null || cidAll.isEmpty()) {
- throw new IllegalArgumentException("cidAll is null or cidAll empty");
- }
-
List<MessageQueue> result = new ArrayList<MessageQueue>();
- if (!cidAll.contains(currentCID)) {
- log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
- consumerGroup,
- currentCID,
- cidAll);
+ if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
return result;
}
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfigTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfigTest.java
new file mode 100644
index 000000000..a1c925c3a
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfigTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.rebalance;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import junit.framework.TestCase;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.Assert;
+
+public class AllocateMessageQueueByConfigTest extends TestCase {
+
+ public void testAllocateMessageQueueByConfig() {
+ List<String> consumerIdList = createConsumerIdList(2);
+ List<MessageQueue> messageQueueList = createMessageQueueList(4);
+ AllocateMessageQueueByConfig allocateStrategy = new AllocateMessageQueueByConfig();
+ allocateStrategy.setMessageQueueList(messageQueueList);
+
+ Map<String, int[]> consumerAllocateQueue = new HashMap<String, int[]>(consumerIdList.size());
+ for (String consumerId : consumerIdList) {
+ List<MessageQueue> queues = allocateStrategy.allocate("", consumerId, messageQueueList, consumerIdList);
+ int[] queueIds = new int[queues.size()];
+ for (int i = 0; i < queues.size(); i++) {
+ queueIds[i] = queues.get(i).getQueueId();
+ }
+ consumerAllocateQueue.put(consumerId, queueIds);
+ }
+ Assert.assertArrayEquals(new int[] {0, 1, 2, 3}, consumerAllocateQueue.get("CID_PREFIX0"));
+ Assert.assertArrayEquals(new int[] {0, 1, 2, 3}, consumerAllocateQueue.get("CID_PREFIX1"));
+ }
+
+ private List<String> createConsumerIdList(int size) {
+ List<String> consumerIdList = new ArrayList<String>(size);
+ for (int i = 0; i < size; i++) {
+ consumerIdList.add("CID_PREFIX" + i);
+ }
+ return consumerIdList;
+ }
+
+ private List<MessageQueue> createMessageQueueList(int size) {
+ List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(size);
+ for (int i = 0; i < size; i++) {
+ MessageQueue mq = new MessageQueue("topic", "brokerName", i);
+ messageQueueList.add(mq);
+ }
+ return messageQueueList;
+ }
+}
+
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoomTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoomTest.java
new file mode 100644
index 000000000..7b6dc6d7d
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoomTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.rebalance;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import junit.framework.TestCase;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.Assert;
+
+public class AllocateMessageQueueByMachineRoomTest extends TestCase {
+
+ public void testAllocateMessageQueueByMachineRoom() {
+ List<String> consumerIdList = createConsumerIdList(2);
+ List<MessageQueue> messageQueueList = createMessageQueueList(10);
+ Set<String> consumeridcs = new HashSet<String>();
+ consumeridcs.add("room1");
+ AllocateMessageQueueByMachineRoom allocateStrategy = new AllocateMessageQueueByMachineRoom();
+ allocateStrategy.setConsumeridcs(consumeridcs);
+
+ // mqAll is null or mqAll empty
+ try {
+ allocateStrategy.allocate("", consumerIdList.get(0), new ArrayList<MessageQueue>(), consumerIdList);
+ } catch (Exception e) {
+ assert e instanceof IllegalArgumentException;
+ Assert.assertEquals("mqAll is null or mqAll empty", e.getMessage());
+ }
+
+ Map<String, int[]> consumerAllocateQueue = new HashMap<String, int[]>(consumerIdList.size());
+ for (String consumerId : consumerIdList) {
+ List<MessageQueue> queues = allocateStrategy.allocate("", consumerId, messageQueueList, consumerIdList);
+ int[] queueIds = new int[queues.size()];
+ for (int i = 0; i < queues.size(); i++) {
+ queueIds[i] = queues.get(i).getQueueId();
+ }
+ consumerAllocateQueue.put(consumerId, queueIds);
+ }
+ Assert.assertArrayEquals(new int[] {0, 1, 4}, consumerAllocateQueue.get("CID_PREFIX0"));
+ Assert.assertArrayEquals(new int[] {2, 3}, consumerAllocateQueue.get("CID_PREFIX1"));
+ }
+
+ private List<String> createConsumerIdList(int size) {
+ List<String> consumerIdList = new ArrayList<String>(size);
+ for (int i = 0; i < size; i++) {
+ consumerIdList.add("CID_PREFIX" + i);
+ }
+ return consumerIdList;
+ }
+
+ private List<MessageQueue> createMessageQueueList(int size) {
+ List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(size);
+ for (int i = 0; i < size; i++) {
+ MessageQueue mq;
+ if (i < size / 2) {
+ mq = new MessageQueue("topic", "room1@broker-a", i);
+ } else {
+ mq = new MessageQueue("topic", "room2@broker-b", i);
+ }
+ messageQueueList.add(mq);
+ }
+ return messageQueueList;
+ }
+}
+