You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by Jaskey <gi...@git.apache.org> on 2017/05/31 09:14:34 UTC

[GitHub] incubator-rocketmq pull request #109: [ROCKETMQ-203]Support client to alloca...

GitHub user Jaskey opened a pull request:

    https://github.com/apache/incubator-rocketmq/pull/109

    [ROCKETMQ-203]Support client to allocate message queue in machine room nearby priority

    JIRA: https://issues.apache.org/jira/browse/ROCKETMQ-203?jql=project%20%3D%20ROCKETMQ


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/Jaskey/incubator-rocketmq ROCKETMQ-203-machineroom-nearby-strategy

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-rocketmq/pull/109.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #109
    
----
commit fd756669d8f07dc02aca540218013c3f8bd6fbc7
Author: Jaskey <li...@gmail.com>
Date:   2017-05-31T09:04:04Z

    Add allocate strategy to support allocating message queues to client  in machine room nearby priority

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #109: [ROCKETMQ-203]Support client to allocate mess...

Posted by coveralls <gi...@git.apache.org>.
Github user coveralls commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/109
  
    
    [![Coverage Status](https://coveralls.io/builds/11759889/badge)](https://coveralls.io/builds/11759889)
    
    Coverage increased (+0.1%) to 38.806% when pulling **a2164a873e180549a34be77f10897982428954ed on Jaskey:ROCKETMQ-203-machineroom-nearby-strategy** into **0fe947173a85d8931b1068805713e89dbba4125a on apache:develop**.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #109: [ROCKETMQ-203]Support client to allocate mess...

Posted by lizhanhui <gi...@git.apache.org>.
Github user lizhanhui commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/109
  
    @dongeforever @shroman @zhouxinyu Please review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq pull request #109: [ROCKETMQ-203]Support client to alloca...

Posted by Jaskey <gi...@git.apache.org>.
Github user Jaskey commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/109#discussion_r119676071
  
    --- Diff: client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java ---
    @@ -0,0 +1,129 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.rocketmq.client.consumer.rebalance;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.TreeMap;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
    +import org.apache.rocketmq.client.log.ClientLogger;
    +import org.apache.rocketmq.common.message.MessageQueue;
    +import org.slf4j.Logger;
    +
    +/**
    + * An allocate strategy proxy for based on machine room nearside priority. An actual allocate strategy can be
    + * specified.
    + *
    + * If any consumer is alive in a machine room, the message queue of the broker which is deployed in the same machine
    + * should only be allocated to those. Otherwise, those message queues can be shared along all consumers since there are
    + * no alive consumer to monopolize them.
    + */
    +public class AllocateMachineRoomNearby implements AllocateMessageQueueStrategy {
    +    private final Logger log = ClientLogger.getLog();
    +
    +    private final AllocateMessageQueueStrategy allocateMessageQueueStrategy;//actual allocate strategy
    +    private final MachineRoomSelector machineRoomSelector;
    +
    +    public AllocateMachineRoomNearby(AllocateMessageQueueStrategy allocateMessageQueueStrategy,
    +        MachineRoomSelector machineRoomSelector) {
    +        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
    +        this.machineRoomSelector = machineRoomSelector;
    +    }
    +
    +    @Override
    +    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
    +        List<String> cidAll) {
    +        if (currentCID == null || currentCID.length() < 1) {
    +            throw new IllegalArgumentException("currentCID is empty");
    +        }
    +        if (mqAll == null || mqAll.isEmpty()) {
    +            throw new IllegalArgumentException("mqAll is null or mqAll empty");
    +        }
    +        if (cidAll == null || cidAll.isEmpty()) {
    +            throw new IllegalArgumentException("cidAll is null or cidAll empty");
    +        }
    +
    +        List<MessageQueue> result = new ArrayList<MessageQueue>();
    +        if (!cidAll.contains(currentCID)) {
    +            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
    +                consumerGroup,
    +                currentCID,
    +                cidAll);
    +            return result;
    +        }
    +
    +        //group mq by machine room
    +        Map<String, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>();
    +        for (MessageQueue mq : mqAll) {
    +            String brokerMachineRoom = machineRoomSelector.brokerDeployIn(mq);
    +            if (StringUtils.isNoneEmpty(brokerMachineRoom)) {
    +                if (mr2Mq.get(brokerMachineRoom) == null) {
    +                    mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>());
    +                }
    +                mr2Mq.get(brokerMachineRoom).add(mq);
    +            } else {
    +                throw new IllegalArgumentException("Machine room is null for mq " + mq);
    +            }
    +        }
    +
    +        //group consumer by machine room
    +        Map<String, List<String>> mr2c = new TreeMap<String, List<String>>();
    +        for (String cid : cidAll) {
    +            String consumerMachineRoom = machineRoomSelector.consumerDeployIn(cid);
    +            if (StringUtils.isNoneEmpty(consumerMachineRoom)) {
    +                if (mr2c.get(consumerMachineRoom) == null) {
    +                    mr2c.put(consumerMachineRoom, new ArrayList<String>());
    +                }
    +                mr2c.get(consumerMachineRoom).add(cid);
    +            } else {
    +                throw new IllegalArgumentException("Machine room is null for consumer id " + cid);
    +            }
    +        }
    +
    +        List<MessageQueue> allocateResults = new ArrayList<MessageQueue>();
    +
    +        //1.allocate the mq that deploy in the same machine room with the current consumer
    +        String currentMachineRoom = machineRoomSelector.consumerDeployIn(currentCID);
    +        List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom);
    +        List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom);
    +        if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) {
    +            allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom));
    +        }
    +
    +        //2.allocate the rest mq to each machine room if there are no consumer alive in that machine room
    +        for (String machineRoom : mr2Mq.keySet()) {
    +            if (!mr2c.containsKey(machineRoom)) { // no alive consumer in the corresponding machine room, so all consumers share these queues
    +                allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mr2Mq.get(machineRoom), cidAll));
    +            }
    +        }
    +
    +        return allocateResults;
    +    }
    +
    +    @Override
    +    public String getName() {
    +        return "MACHINE_ROOM_NEARBY" + "-" + allocateMessageQueueStrategy.getName();
    +    }
    +
    +    public interface MachineRoomSelector {
    --- End diff --
    
    LGTM,I will rename this interface if more guys agree


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq pull request #109: [ROCKETMQ-203]Support client to alloca...

Posted by lizhanhui <gi...@git.apache.org>.
Github user lizhanhui commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/109#discussion_r119598779
  
    --- Diff: client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java ---
    @@ -0,0 +1,129 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.rocketmq.client.consumer.rebalance;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.TreeMap;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
    +import org.apache.rocketmq.client.log.ClientLogger;
    +import org.apache.rocketmq.common.message.MessageQueue;
    +import org.slf4j.Logger;
    +
    +/**
    + * An allocate strategy proxy for based on machine room nearside priority. An actual allocate strategy can be
    + * specified.
    + *
    + * If any consumer is alive in a machine room, the message queue of the broker which is deployed in the same machine
    + * should only be allocated to those. Otherwise, those message queues can be shared along all consumers since there are
    + * no alive consumer to monopolize them.
    + */
    +public class AllocateMachineRoomNearby implements AllocateMessageQueueStrategy {
    +    private final Logger log = ClientLogger.getLog();
    +
    +    private final AllocateMessageQueueStrategy allocateMessageQueueStrategy;//actual allocate strategy
    +    private final MachineRoomSelector machineRoomSelector;
    +
    +    public AllocateMachineRoomNearby(AllocateMessageQueueStrategy allocateMessageQueueStrategy,
    +        MachineRoomSelector machineRoomSelector) {
    +        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
    +        this.machineRoomSelector = machineRoomSelector;
    +    }
    +
    +    @Override
    +    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
    +        List<String> cidAll) {
    +        if (currentCID == null || currentCID.length() < 1) {
    +            throw new IllegalArgumentException("currentCID is empty");
    +        }
    +        if (mqAll == null || mqAll.isEmpty()) {
    +            throw new IllegalArgumentException("mqAll is null or mqAll empty");
    +        }
    +        if (cidAll == null || cidAll.isEmpty()) {
    +            throw new IllegalArgumentException("cidAll is null or cidAll empty");
    +        }
    +
    +        List<MessageQueue> result = new ArrayList<MessageQueue>();
    +        if (!cidAll.contains(currentCID)) {
    +            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
    +                consumerGroup,
    +                currentCID,
    +                cidAll);
    +            return result;
    +        }
    +
    +        //group mq by machine room
    +        Map<String, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>();
    +        for (MessageQueue mq : mqAll) {
    +            String brokerMachineRoom = machineRoomSelector.brokerDeployIn(mq);
    +            if (StringUtils.isNoneEmpty(brokerMachineRoom)) {
    +                if (mr2Mq.get(brokerMachineRoom) == null) {
    +                    mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>());
    +                }
    +                mr2Mq.get(brokerMachineRoom).add(mq);
    +            } else {
    +                throw new IllegalArgumentException("Machine room is null for mq " + mq);
    +            }
    +        }
    +
    +        //group consumer by machine room
    +        Map<String, List<String>> mr2c = new TreeMap<String, List<String>>();
    +        for (String cid : cidAll) {
    +            String consumerMachineRoom = machineRoomSelector.consumerDeployIn(cid);
    +            if (StringUtils.isNoneEmpty(consumerMachineRoom)) {
    +                if (mr2c.get(consumerMachineRoom) == null) {
    +                    mr2c.put(consumerMachineRoom, new ArrayList<String>());
    +                }
    +                mr2c.get(consumerMachineRoom).add(cid);
    +            } else {
    +                throw new IllegalArgumentException("Machine room is null for consumer id " + cid);
    +            }
    +        }
    +
    +        List<MessageQueue> allocateResults = new ArrayList<MessageQueue>();
    +
    +        //1.allocate the mq that deploy in the same machine room with the current consumer
    +        String currentMachineRoom = machineRoomSelector.consumerDeployIn(currentCID);
    +        List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom);
    +        List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom);
    +        if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) {
    +            allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom));
    +        }
    +
    +        //2.allocate the rest mq to each machine room if there are no consumer alive in that machine room
    +        for (String machineRoom : mr2Mq.keySet()) {
    +            if (!mr2c.containsKey(machineRoom)) { // no alive consumer in the corresponding machine room, so all consumers share these queues
    +                allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mr2Mq.get(machineRoom), cidAll));
    +            }
    +        }
    +
    +        return allocateResults;
    +    }
    +
    +    @Override
    +    public String getName() {
    +        return "MACHINE_ROOM_NEARBY" + "-" + allocateMessageQueueStrategy.getName();
    +    }
    +
    +    public interface MachineRoomSelector {
    --- End diff --
    
    How about rename this interface as `MachineRoomResolver`? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #109: [ROCKETMQ-203]Support client to allocate mess...

Posted by Jaskey <gi...@git.apache.org>.
Github user Jaskey commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/109
  
    @lizhanhui @dongeforever @shroman 
    What's your advice on this issue and implementation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rocketmq issue #109: [ROCKETMQ-203]Support client to allocate mess...

Posted by coveralls <gi...@git.apache.org>.
Github user coveralls commented on the issue:

    https://github.com/apache/incubator-rocketmq/pull/109
  
    
    [![Coverage Status](https://coveralls.io/builds/11829934/badge)](https://coveralls.io/builds/11829934)
    
    Coverage increased (+0.07%) to 38.733% when pulling **f19aa7096743dbb64ca79a25dbc2c63606877777 on Jaskey:ROCKETMQ-203-machineroom-nearby-strategy** into **0fe947173a85d8931b1068805713e89dbba4125a on apache:develop**.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---