You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@rocketmq.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/05/16 03:58:04 UTC

[jira] [Commented] (ROCKETMQ-67) Consistent Hash allocate strategy support

    [ https://issues.apache.org/jira/browse/ROCKETMQ-67?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16011699#comment-16011699 ] 

ASF GitHub Bot commented on ROCKETMQ-67:
----------------------------------------

Github user dhchao11 commented on a diff in the pull request:

    https://github.com/apache/incubator-rocketmq/pull/67#discussion_r116650808
  
    --- Diff: common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.rocketmq.common.consistenthash;
    +
    +import java.security.MessageDigest;
    +import java.security.NoSuchAlgorithmException;
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.SortedMap;
    +import java.util.TreeMap;
    +
    +/**
    + * To hash Node objects to a hash ring with a certain amount of virtual node.
    + * Method routeNode will return a Node instance which the object key should be allocated to according to consistent hash algorithm
    + *
    + * @param <T>
    + */
    +public class ConsistentHashRouter<T extends Node> {
    +    private final SortedMap<Long, VirtualNode<T>> ring = new TreeMap<>();
    +    private final HashFunction hashFunction;
    +
    +    public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount) {
    +        this(pNodes,vNodeCount, new MD5Hash());
    +    }
    +
    +    /**
    +     *
    +     * @param pNodes collections of physical nodes
    +     * @param vNodeCount amounts of virtual nodes
    +     * @param hashFunction hash Function to hash Node instances
    +     */
    +    public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount, HashFunction hashFunction) {
    +        if (hashFunction == null) {
    +            throw new NullPointerException("Hash Function is null");
    +        }
    +        this.hashFunction = hashFunction;
    +        if (pNodes != null) {
    +            for (T pNode : pNodes) {
    +                addNode(pNode, vNodeCount);
    +            }
    +        }
    +    }
    +
    +    /**
    +     * add physic node to the hash ring with some virtual nodes
    +     * @param pNode physical node needs added to hash ring
    +     * @param vNodeCount the number of virtual node of the physical node. Value should be greater than or equals to 0
    +     */
    +    public void addNode(T pNode, int vNodeCount) {
    +        if (vNodeCount < 0) throw new IllegalArgumentException("illegal virtual node counts :" + vNodeCount);
    +        int existingReplicas = getExistingReplicas(pNode);
    +        for (int i = 0; i < vNodeCount; i++) {
    +            VirtualNode<T> vNode = new VirtualNode<>(pNode, i + existingReplicas);
    +            ring.put(hashFunction.hash(vNode.getKey()), vNode);
    +        }
    +    }
    +
    +    /**
    +     * remove the physical node from the hash ring
    +     * @param pNode
    +     */
    +    public void removeNode(T pNode) {
    +        Iterator<Long> it = ring.keySet().iterator();
    +        while (it.hasNext()) {
    +            Long key = it.next();
    +            VirtualNode<T> virtualNode = ring.get(key);
    +            if (virtualNode.isVirtualNodeOf(pNode)) {
    +                it.remove();
    +            }
    +        }
    +    }
    +
    +    /**
    +     * with a specified key, route the nearest Node instance in the current hash ring
    +     * @param objectKey the object key to find a nearest Node
    +     * @return
    +     */
    +    public T routeNode(String objectKey) {
    +        if (ring.isEmpty()) {
    +            return null;
    +        }
    +        Long hashVal = hashFunction.hash(objectKey);
    +        SortedMap<Long,VirtualNode<T>> tailMap = ring.tailMap(hashVal);
    +        Long nodeHashVal = !tailMap.isEmpty() ? tailMap.firstKey() : ring.firstKey();
    +        return ring.get(nodeHashVal).getPhysicalNode();
    +    }
    +
    +
    +    public int getExistingReplicas(T pNode) {
    +        int replicas = 0;
    +        for (VirtualNode<T> vNode : ring.values()) {
    +            if (vNode.isVirtualNodeOf(pNode)) {
    +                replicas++;
    +            }
    +        }
    +        return replicas;
    +    }
    +
    +    
    +    //default hash function
    +    private static class MD5Hash implements HashFunction {
    +        MessageDigest instance;
    +
    +        public MD5Hash() {
    +            try {
    +                instance = MessageDigest.getInstance("MD5");
    +            } catch (NoSuchAlgorithmException e) {
    +            }
    +        }
    +
    +        @Override
    +        public long hash(String key) {
    +            instance.reset();
    +            instance.update(key.getBytes());
    +            byte[] digest = instance.digest();
    +
    +            long h = 0;
    +            for (int i = 0; i < 4; i++) {
    +                h <<= 8;
    +                h |= ((int) digest[i]) & 0xFF;
    +            }
    --- End diff --
    
    digest is 128 bits, but the generated h only use 32 bits, other 96 bits are ignored,  will it be better use ^ method


> Consistent Hash allocate strategy support
> -----------------------------------------
>
>                 Key: ROCKETMQ-67
>                 URL: https://issues.apache.org/jira/browse/ROCKETMQ-67
>             Project: Apache RocketMQ
>          Issue Type: New Feature
>          Components: rocketmq-client
>            Reporter: Jaskey Lam
>            Assignee: Jaskey Lam
>             Fix For: 4.1.0-incubating
>
>
> For now, the average allocate strategy is very sensitive when clients register and unrigister.
> A Consistent Hash allocate strategy option is valueable for the developers who care more about latency stabilization and messages duplication.
> Intentions: 
> The default AllocateMessageQueueStrategy is averaging strategy which allocate queue to consumer as evenly as possible. Whenever queues numbers or consumer numbers changed, say a new consumer starts or an old consumer shutdowns, a rehashing will be triggered then almost all consumer suffered from this that they will rebalance to drop old queues and get new queues.
> And that will cause
> message latency from producer to consumer increases at the moment when consumer/queue numbers change, even when they scale up.
> messages will be duplicated significantly since the offset may not be persisted to broker and that queue is assigned to another consumer to pull messages from.
> This is especially significant when they have tens of consumer instances and scale-up or deployment is often.
> Consistent Hash strategy to allocate queue is a good choice for these users.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)