You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by dongeforever <gi...@git.apache.org> on 2017/09/22 02:34:09 UTC

[GitHub] incubator-rocketmq pull request #138: [ROCKETMQ-138]add implement SelectMess...

Github user dongeforever commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/138#discussion_r140401292
  
    --- Diff: client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByConsistentHash.java ---
    @@ -0,0 +1,132 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.rocketmq.client.producer.selector;
    +
    +import org.apache.rocketmq.client.producer.MessageQueueSelector;
    +import org.apache.rocketmq.common.message.Message;
    +import org.apache.rocketmq.common.message.MessageQueue;
    +
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.SortedMap;
    +import java.util.TreeMap;
    +
    +/**
    +* wanting to send a message with the same key combine with orderly consumer, user can 
    +* use the implements of the MessageQueueSelector, which has one optimization compared with SelectMessageQueueByHash
    +* when a broker crash , it can reduce the message Arriving out of the order
    +*/
    +public class SelectMessageQueueByConsistentHash implements MessageQueueSelector {
    +
    +    private volatile SortedMap<Integer, String> virtualNodes =
    +            new TreeMap<Integer, String>();
    +
    +    private static final int DEFAULT_VIRTUAL_NODES = 100;
    +
    +    private final int virtualNodeNum;
    +
    +    private volatile HashMap<String, MessageQueue> idToQueueMap = new HashMap<String, MessageQueue>();
    +
    +    public SelectMessageQueueByConsistentHash() {
    +        this.virtualNodeNum = DEFAULT_VIRTUAL_NODES;
    +    }
    +
    +    public SelectMessageQueueByConsistentHash(int virtualNodeNum) {
    +        this.virtualNodeNum = virtualNodeNum;
    +    }
    +
    +    @Override
    +    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
    +        synchronized (this) {
    +            if (queueChange(mqs)) {
    +                reloadConsistentHash(mqs);
    +            }
    +
    +            String uniqueQueueId = getMsgQueueIdBy(arg.toString());
    +            MessageQueue messageQueue = idToQueueMap.get(uniqueQueueId);
    +            return messageQueue;
    +        }
    +    }
    +
    +    private boolean queueChange(List<MessageQueue> mqs) {
    +        if (mqs.size() != this.idToQueueMap.size()) {
    +            return true;
    +        }
    +
    +        for (MessageQueue queue : mqs) {
    +            String id = queue.getTopic() + "_" + queue.getBrokerName() + "_" + queue.getQueueId();
    +            if (!this.idToQueueMap.containsKey(id)) {
    +                return true;
    +            }
    +        }
    +
    +        return false;
    +    }
    +
    +    private String getMsgQueueIdBy(String arg) {
    +        int hash = getHash(arg);
    +        SortedMap<Integer, String> subMap = virtualNodes.tailMap(hash);
    +
    +        Integer i;
    +        String virtualNode;
    +
    +        if (subMap.size() == 0) {
    +            i = virtualNodes.firstKey();
    +            virtualNode = virtualNodes.get(i);
    +        } else {
    +            i = subMap.firstKey();
    +            virtualNode = subMap.get(i);
    +        }
    +
    +        String result = virtualNode.substring(0, virtualNode.indexOf("&&"));
    +        return result;
    +    }
    +
    +    private int getHash(String str) {
    --- End diff --
    
    Why not just use str.hashCode?


---