You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@rocketmq.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/05/16 03:58:04 UTC
[jira] [Commented] (ROCKETMQ-67) Consistent Hash allocate strategy
support
[ https://issues.apache.org/jira/browse/ROCKETMQ-67?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16011699#comment-16011699 ]
ASF GitHub Bot commented on ROCKETMQ-67:
----------------------------------------
Github user dhchao11 commented on a diff in the pull request:
https://github.com/apache/incubator-rocketmq/pull/67#discussion_r116650808
--- Diff: common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.consistenthash;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * To hash Node objects to a hash ring with a certain amount of virtual node.
+ * Method routeNode will return a Node instance which the object key should be allocated to according to consistent hash algorithm
+ *
+ * @param <T>
+ */
+public class ConsistentHashRouter<T extends Node> {
+ private final SortedMap<Long, VirtualNode<T>> ring = new TreeMap<>();
+ private final HashFunction hashFunction;
+
+ public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount) {
+ this(pNodes,vNodeCount, new MD5Hash());
+ }
+
+ /**
+ *
+ * @param pNodes collections of physical nodes
+ * @param vNodeCount amounts of virtual nodes
+ * @param hashFunction hash Function to hash Node instances
+ */
+ public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount, HashFunction hashFunction) {
+ if (hashFunction == null) {
+ throw new NullPointerException("Hash Function is null");
+ }
+ this.hashFunction = hashFunction;
+ if (pNodes != null) {
+ for (T pNode : pNodes) {
+ addNode(pNode, vNodeCount);
+ }
+ }
+ }
+
+ /**
+ * add physic node to the hash ring with some virtual nodes
+ * @param pNode physical node needs added to hash ring
+ * @param vNodeCount the number of virtual node of the physical node. Value should be greater than or equals to 0
+ */
+ public void addNode(T pNode, int vNodeCount) {
+ if (vNodeCount < 0) throw new IllegalArgumentException("illegal virtual node counts :" + vNodeCount);
+ int existingReplicas = getExistingReplicas(pNode);
+ for (int i = 0; i < vNodeCount; i++) {
+ VirtualNode<T> vNode = new VirtualNode<>(pNode, i + existingReplicas);
+ ring.put(hashFunction.hash(vNode.getKey()), vNode);
+ }
+ }
+
+ /**
+ * remove the physical node from the hash ring
+ * @param pNode
+ */
+ public void removeNode(T pNode) {
+ Iterator<Long> it = ring.keySet().iterator();
+ while (it.hasNext()) {
+ Long key = it.next();
+ VirtualNode<T> virtualNode = ring.get(key);
+ if (virtualNode.isVirtualNodeOf(pNode)) {
+ it.remove();
+ }
+ }
+ }
+
+ /**
+ * with a specified key, route the nearest Node instance in the current hash ring
+ * @param objectKey the object key to find a nearest Node
+ * @return
+ */
+ public T routeNode(String objectKey) {
+ if (ring.isEmpty()) {
+ return null;
+ }
+ Long hashVal = hashFunction.hash(objectKey);
+ SortedMap<Long,VirtualNode<T>> tailMap = ring.tailMap(hashVal);
+ Long nodeHashVal = !tailMap.isEmpty() ? tailMap.firstKey() : ring.firstKey();
+ return ring.get(nodeHashVal).getPhysicalNode();
+ }
+
+
+ public int getExistingReplicas(T pNode) {
+ int replicas = 0;
+ for (VirtualNode<T> vNode : ring.values()) {
+ if (vNode.isVirtualNodeOf(pNode)) {
+ replicas++;
+ }
+ }
+ return replicas;
+ }
+
+
+ //default hash function
+ private static class MD5Hash implements HashFunction {
+ MessageDigest instance;
+
+ public MD5Hash() {
+ try {
+ instance = MessageDigest.getInstance("MD5");
+ } catch (NoSuchAlgorithmException e) {
+ }
+ }
+
+ @Override
+ public long hash(String key) {
+ instance.reset();
+ instance.update(key.getBytes());
+ byte[] digest = instance.digest();
+
+ long h = 0;
+ for (int i = 0; i < 4; i++) {
+ h <<= 8;
+ h |= ((int) digest[i]) & 0xFF;
+ }
--- End diff --
digest is 128 bits, but the generated h only use 32 bits, other 96 bits are ignored, will it be better use ^ method
> Consistent Hash allocate strategy support
> -----------------------------------------
>
> Key: ROCKETMQ-67
> URL: https://issues.apache.org/jira/browse/ROCKETMQ-67
> Project: Apache RocketMQ
> Issue Type: New Feature
> Components: rocketmq-client
> Reporter: Jaskey Lam
> Assignee: Jaskey Lam
> Fix For: 4.1.0-incubating
>
>
> For now, the average allocate strategy is very sensitive when clients register and unrigister.
> A Consistent Hash allocate strategy option is valueable for the developers who care more about latency stabilization and messages duplication.
> Intentions:
> The default AllocateMessageQueueStrategy is averaging strategy which allocate queue to consumer as evenly as possible. Whenever queues numbers or consumer numbers changed, say a new consumer starts or an old consumer shutdowns, a rehashing will be triggered then almost all consumer suffered from this that they will rebalance to drop old queues and get new queues.
> And that will cause
> message latency from producer to consumer increases at the moment when consumer/queue numbers change, even when they scale up.
> messages will be duplicated significantly since the offset may not be persisted to broker and that queue is assigned to another consumer to pull messages from.
> This is especially significant when they have tens of consumer instances and scale-up or deployment is often.
> Consistent Hash strategy to allocate queue is a good choice for these users.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)