You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2018/07/14 02:43:51 UTC
[rocketmq] branch develop updated: [ROCKETMQ-203]Support client to
allocate message queue in machine room nearby priority (#109)
This is an automated email from the ASF dual-hosted git repository.
vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 461e516 [ROCKETMQ-203]Support client to allocate message queue in machine room nearby priority (#109)
461e516 is described below
commit 461e5166899c6a7304832f74d703549c53b2f5d0
Author: Jaskey <li...@gmail.com>
AuthorDate: Sat Jul 14 10:43:49 2018 +0800
[ROCKETMQ-203]Support client to allocate message queue in machine room nearby priority (#109)
---
.../rebalance/AllocateMachineRoomNearby.java | 144 +++++++++++++
.../rebalance/AllocateMachineRoomNearByTest.java | 237 +++++++++++++++++++++
2 files changed, 381 insertions(+)
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java
new file mode 100644
index 0000000..9b166e7
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.rebalance;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.slf4j.Logger;
+
+/**
+ * An allocate strategy proxy for based on machine room nearside priority. An actual allocate strategy can be
+ * specified.
+ *
+ * If any consumer is alive in a machine room, the message queue of the broker which is deployed in the same machine
+ * should only be allocated to those. Otherwise, those message queues can be shared along all consumers since there are
+ * no alive consumer to monopolize them.
+ */
+public class AllocateMachineRoomNearby implements AllocateMessageQueueStrategy {
+ private final Logger log = ClientLogger.getLog();
+
+ private final AllocateMessageQueueStrategy allocateMessageQueueStrategy;//actual allocate strategy
+ private final MachineRoomResolver machineRoomResolver;
+
+ public AllocateMachineRoomNearby(AllocateMessageQueueStrategy allocateMessageQueueStrategy,
+ MachineRoomResolver machineRoomResolver) throws NullPointerException {
+ if (allocateMessageQueueStrategy == null) {
+ throw new NullPointerException("allocateMessageQueueStrategy is null");
+ }
+
+ if (machineRoomResolver == null) {
+ throw new NullPointerException("machineRoomResolver is null");
+ }
+
+ this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
+ this.machineRoomResolver = machineRoomResolver;
+ }
+
+ @Override
+ public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
+ List<String> cidAll) {
+ if (currentCID == null || currentCID.length() < 1) {
+ throw new IllegalArgumentException("currentCID is empty");
+ }
+ if (mqAll == null || mqAll.isEmpty()) {
+ throw new IllegalArgumentException("mqAll is null or mqAll empty");
+ }
+ if (cidAll == null || cidAll.isEmpty()) {
+ throw new IllegalArgumentException("cidAll is null or cidAll empty");
+ }
+
+ List<MessageQueue> result = new ArrayList<MessageQueue>();
+ if (!cidAll.contains(currentCID)) {
+ log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
+ consumerGroup,
+ currentCID,
+ cidAll);
+ return result;
+ }
+
+ //group mq by machine room
+ Map<String/*machine room */, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>();
+ for (MessageQueue mq : mqAll) {
+ String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq);
+ if (StringUtils.isNoneEmpty(brokerMachineRoom)) {
+ if (mr2Mq.get(brokerMachineRoom) == null) {
+ mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>());
+ }
+ mr2Mq.get(brokerMachineRoom).add(mq);
+ } else {
+ throw new IllegalArgumentException("Machine room is null for mq " + mq);
+ }
+ }
+
+ //group consumer by machine room
+ Map<String/*machine room */, List<String/*clientId*/>> mr2c = new TreeMap<String, List<String>>();
+ for (String cid : cidAll) {
+ String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid);
+ if (StringUtils.isNoneEmpty(consumerMachineRoom)) {
+ if (mr2c.get(consumerMachineRoom) == null) {
+ mr2c.put(consumerMachineRoom, new ArrayList<String>());
+ }
+ mr2c.get(consumerMachineRoom).add(cid);
+ } else {
+ throw new IllegalArgumentException("Machine room is null for consumer id " + cid);
+ }
+ }
+
+ List<MessageQueue> allocateResults = new ArrayList<MessageQueue>();
+
+ //1.allocate the mq that deploy in the same machine room with the current consumer
+ String currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID);
+ List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom);
+ List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom);
+ if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) {
+ allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom));
+ }
+
+ //2.allocate the rest mq to each machine room if there are no consumer alive in that machine room
+ for (String machineRoom : mr2Mq.keySet()) {
+ if (!mr2c.containsKey(machineRoom)) { // no alive consumer in the corresponding machine room, so all consumers share these queues
+ allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mr2Mq.get(machineRoom), cidAll));
+ }
+ }
+
+ return allocateResults;
+ }
+
+ @Override
+ public String getName() {
+ return "MACHINE_ROOM_NEARBY" + "-" + allocateMessageQueueStrategy.getName();
+ }
+
+ /**
+ * A resolver object to determine which machine room do the message queues or clients are deployed in.
+ *
+ * AllocateMachineRoomNearby will use the results to group the message queues and clients by machine room.
+ *
+ * The result returned from the implemented method CANNOT be null.
+ */
+ public interface MachineRoomResolver {
+ String brokerDeployIn(MessageQueue messageQueue);
+
+ String consumerDeployIn(String clientID);
+ }
+}
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearByTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearByTest.java
new file mode 100644
index 0000000..0d394c3
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearByTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.rebalance;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class AllocateMachineRoomNearByTest {
+
+ private static final String CID_PREFIX = "CID-";
+
+ private final String topic = "topic_test";
+ private final AllocateMachineRoomNearby.MachineRoomResolver machineRoomResolver = new AllocateMachineRoomNearby.MachineRoomResolver() {
+ @Override public String brokerDeployIn(MessageQueue messageQueue) {
+ return messageQueue.getBrokerName().split("-")[0];
+ }
+
+ @Override public String consumerDeployIn(String clientID) {
+ return clientID.split("-")[0];
+ }
+ };
+ private final AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMachineRoomNearby(new AllocateMessageQueueAveragely(), machineRoomResolver);
+
+
+ @Before
+ public void init() {
+ }
+
+
+ @Test
+ public void test1() {
+ testWhenIDCSizeEquals(5,20,10, false);
+ testWhenIDCSizeEquals(5,20,20, false);
+ testWhenIDCSizeEquals(5,20,30, false);
+ testWhenIDCSizeEquals(5,20,0, false );
+ }
+
+ @Test
+ public void test2() {
+ testWhenConsumerIDCIsMore(5,1,10, 10, false);
+ testWhenConsumerIDCIsMore(5,1,10, 5, false);
+ testWhenConsumerIDCIsMore(5,1,10, 20, false);
+ testWhenConsumerIDCIsMore(5,1,10, 0, false);
+ }
+
+ @Test
+ public void test3() {
+ testWhenConsumerIDCIsLess(5,2,10, 10, false);
+ testWhenConsumerIDCIsLess(5,2,10, 5, false);
+ testWhenConsumerIDCIsLess(5,2,10, 20, false);
+ testWhenConsumerIDCIsLess(5,2,10, 0, false);
+ }
+
+
+ @Test
+ public void testRun10RandomCase(){
+ for(int i=0;i<10;i++){
+ int consumerSize = new Random().nextInt(200)+1;//1-200
+ int queueSize = new Random().nextInt(100)+1;//1-100
+ int brokerIDCSize = new Random().nextInt(10)+1;//1-10
+ int consumerIDCSize = new Random().nextInt(10)+1;//1-10
+
+ if (brokerIDCSize == consumerIDCSize) {
+ testWhenIDCSizeEquals(brokerIDCSize,queueSize,consumerSize,false);
+ }
+ else if (brokerIDCSize > consumerIDCSize) {
+ testWhenConsumerIDCIsLess(brokerIDCSize,brokerIDCSize- consumerIDCSize, queueSize, consumerSize, false);
+ } else {
+ testWhenConsumerIDCIsMore(brokerIDCSize, consumerIDCSize - brokerIDCSize, queueSize, consumerSize, false);
+ }
+ }
+ }
+
+
+
+
+ public void testWhenIDCSizeEquals(int IDCSize, int queueSize, int consumerSize, boolean print) {
+ if (print) {
+ System.out.println("Test : IDCSize = "+ IDCSize +"queueSize = " + queueSize +" consumerSize = " + consumerSize);
+ }
+ List<String> cidAll = prepareConsumer(IDCSize, consumerSize);
+ List<MessageQueue> mqAll = prepareMQ(IDCSize, queueSize);
+ List<MessageQueue> resAll = new ArrayList<MessageQueue>();
+ for (String currentID : cidAll) {
+ List<MessageQueue> res = allocateMessageQueueStrategy.allocate("Test-C-G",currentID,mqAll,cidAll);
+ if (print) {
+ System.out.println("cid: "+currentID+"--> res :" +res);
+ }
+ for (MessageQueue mq : res) {
+ Assert.assertTrue(machineRoomResolver.brokerDeployIn(mq).equals(machineRoomResolver.consumerDeployIn(currentID)));
+ }
+ resAll.addAll(res);
+ }
+ Assert.assertTrue(hasAllocateAllQ(cidAll,mqAll,resAll));
+
+ if (print) {
+ System.out.println("-------------------------------------------------------------------");
+ }
+ }
+
+ public void testWhenConsumerIDCIsMore(int brokerIDCSize, int consumerMore, int queueSize, int consumerSize, boolean print) {
+ if (print) {
+ System.out.println("Test : IDCSize = "+ brokerIDCSize +" queueSize = " + queueSize +" consumerSize = " + consumerSize);
+ }
+ Set<String> brokerIDCWithConsumer = new TreeSet<String>();
+ List<String> cidAll = prepareConsumer(brokerIDCSize +consumerMore, consumerSize);
+ List<MessageQueue> mqAll = prepareMQ(brokerIDCSize, queueSize);
+ for (MessageQueue mq : mqAll) {
+ brokerIDCWithConsumer.add(machineRoomResolver.brokerDeployIn(mq));
+ }
+
+ List<MessageQueue> resAll = new ArrayList<MessageQueue>();
+ for (String currentID : cidAll) {
+ List<MessageQueue> res = allocateMessageQueueStrategy.allocate("Test-C-G",currentID,mqAll,cidAll);
+ if (print) {
+ System.out.println("cid: "+currentID+"--> res :" +res);
+ }
+ for (MessageQueue mq : res) {
+ if (brokerIDCWithConsumer.contains(machineRoomResolver.brokerDeployIn(mq))) {//healthy idc, so only consumer in this idc should be allocated
+ Assert.assertTrue(machineRoomResolver.brokerDeployIn(mq).equals(machineRoomResolver.consumerDeployIn(currentID)));
+ }
+ }
+ resAll.addAll(res);
+ }
+
+ Assert.assertTrue(hasAllocateAllQ(cidAll,mqAll,resAll));
+ if (print) {
+ System.out.println("-------------------------------------------------------------------");
+ }
+ }
+
+ public void testWhenConsumerIDCIsLess(int brokerIDCSize, int consumerIDCLess, int queueSize, int consumerSize, boolean print) {
+ if (print) {
+ System.out.println("Test : IDCSize = "+ brokerIDCSize +" queueSize = " + queueSize +" consumerSize = " + consumerSize);
+ }
+ Set<String> healthyIDC = new TreeSet<String>();
+ List<String> cidAll = prepareConsumer(brokerIDCSize - consumerIDCLess, consumerSize);
+ List<MessageQueue> mqAll = prepareMQ(brokerIDCSize, queueSize);
+ for (String cid : cidAll) {
+ healthyIDC.add(machineRoomResolver.consumerDeployIn(cid));
+ }
+
+ List<MessageQueue> resAll = new ArrayList<MessageQueue>();
+ Map<String, List<MessageQueue>> idc2Res = new TreeMap<String, List<MessageQueue>>();
+ for (String currentID : cidAll) {
+ String currentIDC = machineRoomResolver.consumerDeployIn(currentID);
+ List<MessageQueue> res = allocateMessageQueueStrategy.allocate("Test-C-G",currentID,mqAll,cidAll);
+ if (print) {
+ System.out.println("cid: "+currentID+"--> res :" +res);
+ }
+ if ( !idc2Res.containsKey(currentIDC)) {
+ idc2Res.put(currentIDC, new ArrayList<MessageQueue>());
+ }
+ idc2Res.get(currentIDC).addAll(res);
+ resAll.addAll(res);
+ }
+
+ for (String consumerIDC : healthyIDC) {
+ List<MessageQueue> resInOneIDC = idc2Res.get(consumerIDC);
+ List<MessageQueue> mqInThisIDC = createMessageQueueList(consumerIDC,queueSize);
+ Assert.assertTrue(resInOneIDC.containsAll(mqInThisIDC));
+ }
+
+ Assert.assertTrue(hasAllocateAllQ(cidAll,mqAll,resAll));
+ if (print) {
+ System.out.println("-------------------------------------------------------------------");
+ }
+ }
+
+
+ private boolean hasAllocateAllQ(List<String> cidAll,List<MessageQueue> mqAll, List<MessageQueue> allocatedResAll) {
+ if (cidAll.isEmpty()){
+ return allocatedResAll.isEmpty();
+ }
+ return mqAll.containsAll(allocatedResAll) && allocatedResAll.containsAll(mqAll) && mqAll.size() == allocatedResAll.size();
+ }
+
+
+ private List<String> createConsumerIdList(String machineRoom, int size) {
+ List<String> consumerIdList = new ArrayList<String>(size);
+ for (int i = 0; i < size; i++) {
+ consumerIdList.add(machineRoom +"-"+CID_PREFIX + String.valueOf(i));
+ }
+ return consumerIdList;
+ }
+
+ private List<MessageQueue> createMessageQueueList(String machineRoom, int size) {
+ List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(size);
+ for (int i = 0; i < size; i++) {
+ MessageQueue mq = new MessageQueue(topic, machineRoom+"-brokerName", i);
+ messageQueueList.add(mq);
+ }
+ return messageQueueList;
+ }
+
+ private List<MessageQueue> prepareMQ(int brokerIDCSize, int queueSize) {
+ List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
+ for (int i=1;i<=brokerIDCSize;i++) {
+ mqAll.addAll(createMessageQueueList("IDC"+i, queueSize));
+ }
+
+ return mqAll;
+ }
+
+ private List<String> prepareConsumer( int IDCSize, int consumerSize) {
+ List<String> cidAll = new ArrayList<String>();
+ for (int i=1;i<=IDCSize;i++) {
+ cidAll.addAll(createConsumerIdList("IDC"+i, consumerSize));
+ }
+ return cidAll;
+ }
+}