You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2018/07/14 02:43:51 UTC

[rocketmq] branch develop updated: [ROCKETMQ-203]Support client to allocate message queue in machine room nearby priority (#109)

This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 461e516  [ROCKETMQ-203]Support client to allocate message queue in machine room nearby priority (#109)
461e516 is described below

commit 461e5166899c6a7304832f74d703549c53b2f5d0
Author: Jaskey <li...@gmail.com>
AuthorDate: Sat Jul 14 10:43:49 2018 +0800

    [ROCKETMQ-203]Support client to allocate message queue in machine room nearby priority (#109)
---
 .../rebalance/AllocateMachineRoomNearby.java       | 144 +++++++++++++
 .../rebalance/AllocateMachineRoomNearByTest.java   | 237 +++++++++++++++++++++
 2 files changed, 381 insertions(+)

diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java
new file mode 100644
index 0000000..9b166e7
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.rebalance;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.slf4j.Logger;
+
+/**
+ * An allocate strategy proxy for based on machine room nearside priority. An actual allocate strategy can be
+ * specified.
+ *
+ * If any consumer is alive in a machine room, the message queue of the broker which is deployed in the same machine
+ * should only be allocated to those. Otherwise, those message queues can be shared along all consumers since there are
+ * no alive consumer to monopolize them.
+ */
+public class AllocateMachineRoomNearby implements AllocateMessageQueueStrategy {
+    private final Logger log = ClientLogger.getLog();
+
+    private final AllocateMessageQueueStrategy allocateMessageQueueStrategy;//actual allocate strategy
+    private final MachineRoomResolver machineRoomResolver;
+
+    public AllocateMachineRoomNearby(AllocateMessageQueueStrategy allocateMessageQueueStrategy,
+        MachineRoomResolver machineRoomResolver) throws NullPointerException {
+        if (allocateMessageQueueStrategy == null) {
+            throw new NullPointerException("allocateMessageQueueStrategy is null");
+        }
+
+        if (machineRoomResolver == null) {
+            throw new NullPointerException("machineRoomResolver is null");
+        }
+
+        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
+        this.machineRoomResolver = machineRoomResolver;
+    }
+
+    @Override
+    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
+        List<String> cidAll) {
+        if (currentCID == null || currentCID.length() < 1) {
+            throw new IllegalArgumentException("currentCID is empty");
+        }
+        if (mqAll == null || mqAll.isEmpty()) {
+            throw new IllegalArgumentException("mqAll is null or mqAll empty");
+        }
+        if (cidAll == null || cidAll.isEmpty()) {
+            throw new IllegalArgumentException("cidAll is null or cidAll empty");
+        }
+
+        List<MessageQueue> result = new ArrayList<MessageQueue>();
+        if (!cidAll.contains(currentCID)) {
+            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
+                consumerGroup,
+                currentCID,
+                cidAll);
+            return result;
+        }
+
+        //group mq by machine room
+        Map<String/*machine room */, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>();
+        for (MessageQueue mq : mqAll) {
+            String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq);
+            if (StringUtils.isNoneEmpty(brokerMachineRoom)) {
+                if (mr2Mq.get(brokerMachineRoom) == null) {
+                    mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>());
+                }
+                mr2Mq.get(brokerMachineRoom).add(mq);
+            } else {
+                throw new IllegalArgumentException("Machine room is null for mq " + mq);
+            }
+        }
+
+        //group consumer by machine room
+        Map<String/*machine room */, List<String/*clientId*/>> mr2c = new TreeMap<String, List<String>>();
+        for (String cid : cidAll) {
+            String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid);
+            if (StringUtils.isNoneEmpty(consumerMachineRoom)) {
+                if (mr2c.get(consumerMachineRoom) == null) {
+                    mr2c.put(consumerMachineRoom, new ArrayList<String>());
+                }
+                mr2c.get(consumerMachineRoom).add(cid);
+            } else {
+                throw new IllegalArgumentException("Machine room is null for consumer id " + cid);
+            }
+        }
+
+        List<MessageQueue> allocateResults = new ArrayList<MessageQueue>();
+
+        //1.allocate the mq that deploy in the same machine room with the current consumer
+        String currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID);
+        List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom);
+        List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom);
+        if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) {
+            allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom));
+        }
+
+        //2.allocate the rest mq to each machine room if there are no consumer alive in that machine room
+        for (String machineRoom : mr2Mq.keySet()) {
+            if (!mr2c.containsKey(machineRoom)) { // no alive consumer in the corresponding machine room, so all consumers share these queues
+                allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mr2Mq.get(machineRoom), cidAll));
+            }
+        }
+
+        return allocateResults;
+    }
+
+    @Override
+    public String getName() {
+        return "MACHINE_ROOM_NEARBY" + "-" + allocateMessageQueueStrategy.getName();
+    }
+
+    /**
+     * A resolver object to determine which machine room do the message queues or clients are deployed in.
+     *
+     * AllocateMachineRoomNearby will use the results to group the message queues and clients by machine room.
+     *
+     * The result returned from the implemented method CANNOT be null.
+     */
+    public interface MachineRoomResolver {
+        String brokerDeployIn(MessageQueue messageQueue);
+
+        String consumerDeployIn(String clientID);
+    }
+}
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearByTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearByTest.java
new file mode 100644
index 0000000..0d394c3
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearByTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.rebalance;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class AllocateMachineRoomNearByTest {
+
+    private static final String CID_PREFIX = "CID-";
+
+    private final String topic = "topic_test";
+    private final AllocateMachineRoomNearby.MachineRoomResolver machineRoomResolver =  new AllocateMachineRoomNearby.MachineRoomResolver() {
+        @Override public String brokerDeployIn(MessageQueue messageQueue) {
+            return messageQueue.getBrokerName().split("-")[0];
+        }
+
+        @Override public String consumerDeployIn(String clientID) {
+            return clientID.split("-")[0];
+        }
+    };
+    private final AllocateMessageQueueStrategy allocateMessageQueueStrategy = new AllocateMachineRoomNearby(new AllocateMessageQueueAveragely(), machineRoomResolver);
+
+
+    @Before
+    public void init() {
+    }
+
+
+    @Test
+    public void test1() {
+        testWhenIDCSizeEquals(5,20,10, false);
+        testWhenIDCSizeEquals(5,20,20, false);
+        testWhenIDCSizeEquals(5,20,30, false);
+        testWhenIDCSizeEquals(5,20,0, false );
+    }
+
+    @Test
+    public void test2() {
+        testWhenConsumerIDCIsMore(5,1,10, 10, false);
+        testWhenConsumerIDCIsMore(5,1,10, 5, false);
+        testWhenConsumerIDCIsMore(5,1,10, 20, false);
+        testWhenConsumerIDCIsMore(5,1,10, 0, false);
+    }
+
+    @Test
+    public void test3() {
+        testWhenConsumerIDCIsLess(5,2,10, 10, false);
+        testWhenConsumerIDCIsLess(5,2,10, 5, false);
+        testWhenConsumerIDCIsLess(5,2,10, 20, false);
+        testWhenConsumerIDCIsLess(5,2,10, 0, false);
+    }
+
+
+    @Test
+    public void testRun10RandomCase(){
+        for(int i=0;i<10;i++){
+            int consumerSize = new Random().nextInt(200)+1;//1-200
+            int queueSize = new Random().nextInt(100)+1;//1-100
+            int brokerIDCSize = new Random().nextInt(10)+1;//1-10
+            int consumerIDCSize = new Random().nextInt(10)+1;//1-10
+
+            if (brokerIDCSize == consumerIDCSize) {
+                testWhenIDCSizeEquals(brokerIDCSize,queueSize,consumerSize,false);
+            }
+            else if (brokerIDCSize > consumerIDCSize) {
+                testWhenConsumerIDCIsLess(brokerIDCSize,brokerIDCSize- consumerIDCSize, queueSize, consumerSize, false);
+            } else {
+                testWhenConsumerIDCIsMore(brokerIDCSize, consumerIDCSize - brokerIDCSize, queueSize, consumerSize, false);
+            }
+        }
+    }
+
+
+
+
+    public void testWhenIDCSizeEquals(int IDCSize, int queueSize, int consumerSize, boolean print) {
+        if (print) {
+            System.out.println("Test : IDCSize = "+ IDCSize +"queueSize = " + queueSize +" consumerSize = " + consumerSize);
+        }
+        List<String> cidAll = prepareConsumer(IDCSize, consumerSize);
+        List<MessageQueue> mqAll = prepareMQ(IDCSize, queueSize);
+        List<MessageQueue> resAll = new ArrayList<MessageQueue>();
+        for (String currentID : cidAll) {
+            List<MessageQueue> res = allocateMessageQueueStrategy.allocate("Test-C-G",currentID,mqAll,cidAll);
+            if (print) {
+                System.out.println("cid: "+currentID+"--> res :" +res);
+            }
+            for (MessageQueue mq : res) {
+                Assert.assertTrue(machineRoomResolver.brokerDeployIn(mq).equals(machineRoomResolver.consumerDeployIn(currentID)));
+            }
+            resAll.addAll(res);
+        }
+        Assert.assertTrue(hasAllocateAllQ(cidAll,mqAll,resAll));
+
+        if (print) {
+            System.out.println("-------------------------------------------------------------------");
+        }
+    }
+
+    public void testWhenConsumerIDCIsMore(int brokerIDCSize, int consumerMore, int queueSize, int consumerSize, boolean print) {
+        if (print) {
+            System.out.println("Test : IDCSize = "+ brokerIDCSize +" queueSize = " + queueSize +" consumerSize = " + consumerSize);
+        }
+        Set<String> brokerIDCWithConsumer = new TreeSet<String>();
+        List<String> cidAll = prepareConsumer(brokerIDCSize +consumerMore, consumerSize);
+        List<MessageQueue> mqAll = prepareMQ(brokerIDCSize, queueSize);
+        for (MessageQueue mq : mqAll) {
+            brokerIDCWithConsumer.add(machineRoomResolver.brokerDeployIn(mq));
+        }
+
+        List<MessageQueue> resAll = new ArrayList<MessageQueue>();
+        for (String currentID : cidAll) {
+            List<MessageQueue> res = allocateMessageQueueStrategy.allocate("Test-C-G",currentID,mqAll,cidAll);
+            if (print) {
+                System.out.println("cid: "+currentID+"--> res :" +res);
+            }
+            for (MessageQueue mq : res) {
+                if (brokerIDCWithConsumer.contains(machineRoomResolver.brokerDeployIn(mq))) {//healthy idc, so only consumer in this idc should be allocated
+                    Assert.assertTrue(machineRoomResolver.brokerDeployIn(mq).equals(machineRoomResolver.consumerDeployIn(currentID)));
+                }
+            }
+            resAll.addAll(res);
+        }
+
+        Assert.assertTrue(hasAllocateAllQ(cidAll,mqAll,resAll));
+        if (print) {
+            System.out.println("-------------------------------------------------------------------");
+        }
+    }
+
+    public void testWhenConsumerIDCIsLess(int brokerIDCSize, int consumerIDCLess, int queueSize, int consumerSize, boolean print) {
+        if (print) {
+            System.out.println("Test : IDCSize = "+ brokerIDCSize +" queueSize = " + queueSize +" consumerSize = " + consumerSize);
+        }
+        Set<String> healthyIDC = new TreeSet<String>();
+        List<String> cidAll = prepareConsumer(brokerIDCSize - consumerIDCLess, consumerSize);
+        List<MessageQueue> mqAll = prepareMQ(brokerIDCSize, queueSize);
+        for (String cid : cidAll) {
+            healthyIDC.add(machineRoomResolver.consumerDeployIn(cid));
+        }
+
+        List<MessageQueue> resAll = new ArrayList<MessageQueue>();
+        Map<String, List<MessageQueue>> idc2Res = new TreeMap<String, List<MessageQueue>>();
+        for (String currentID : cidAll) {
+            String currentIDC = machineRoomResolver.consumerDeployIn(currentID);
+            List<MessageQueue> res = allocateMessageQueueStrategy.allocate("Test-C-G",currentID,mqAll,cidAll);
+            if (print) {
+                System.out.println("cid: "+currentID+"--> res :" +res);
+            }
+            if ( !idc2Res.containsKey(currentIDC)) {
+                idc2Res.put(currentIDC, new ArrayList<MessageQueue>());
+            }
+            idc2Res.get(currentIDC).addAll(res);
+            resAll.addAll(res);
+        }
+
+        for (String consumerIDC : healthyIDC) {
+            List<MessageQueue> resInOneIDC = idc2Res.get(consumerIDC);
+            List<MessageQueue> mqInThisIDC = createMessageQueueList(consumerIDC,queueSize);
+            Assert.assertTrue(resInOneIDC.containsAll(mqInThisIDC));
+        }
+
+        Assert.assertTrue(hasAllocateAllQ(cidAll,mqAll,resAll));
+        if (print) {
+            System.out.println("-------------------------------------------------------------------");
+        }
+    }
+
+
+    private boolean hasAllocateAllQ(List<String> cidAll,List<MessageQueue> mqAll, List<MessageQueue> allocatedResAll) {
+        if (cidAll.isEmpty()){
+            return allocatedResAll.isEmpty();
+        }
+        return mqAll.containsAll(allocatedResAll) && allocatedResAll.containsAll(mqAll) && mqAll.size() == allocatedResAll.size();
+    }
+
+
+    private List<String> createConsumerIdList(String machineRoom, int size) {
+        List<String> consumerIdList = new ArrayList<String>(size);
+        for (int i = 0; i < size; i++) {
+            consumerIdList.add(machineRoom +"-"+CID_PREFIX + String.valueOf(i));
+        }
+        return consumerIdList;
+    }
+
+    private List<MessageQueue> createMessageQueueList(String machineRoom, int size) {
+        List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(size);
+        for (int i = 0; i < size; i++) {
+            MessageQueue mq = new MessageQueue(topic, machineRoom+"-brokerName", i);
+            messageQueueList.add(mq);
+        }
+        return messageQueueList;
+    }
+
+    private List<MessageQueue> prepareMQ(int brokerIDCSize, int queueSize) {
+        List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
+        for (int i=1;i<=brokerIDCSize;i++) {
+            mqAll.addAll(createMessageQueueList("IDC"+i, queueSize));
+        }
+
+        return mqAll;
+    }
+
+    private List<String> prepareConsumer( int IDCSize, int consumerSize) {
+        List<String> cidAll = new ArrayList<String>();
+        for (int i=1;i<=IDCSize;i++) {
+            cidAll.addAll(createConsumerIdList("IDC"+i, consumerSize));
+        }
+        return cidAll;
+    }
+}