You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/08/01 07:23:30 UTC
[32/50] [abbrv] incubator-rocketmq git commit: [ROCKETMQ-67]
Consistent Hash allocate strategy closes apache/incubator-rocketmq#67
[ROCKETMQ-67] Consistent Hash allocate strategy closes apache/incubator-rocketmq#67
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/787d1286
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/787d1286
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/787d1286
Branch: refs/heads/develop
Commit: 787d128613d00a617be2f8ef943e9cfafcc07f85
Parents: 3292d03
Author: Jaskey <li...@gmail.com>
Authored: Sat May 27 11:42:03 2017 +0800
Committer: dongeforever <zh...@yeah.net>
Committed: Tue Jun 6 11:37:29 2017 +0800
----------------------------------------------------------------------
.../AllocateMessageQueueConsistentHash.java | 124 ++++++++++
.../AllocateMessageQueueConsitentHashTest.java | 243 +++++++++++++++++++
.../consistenthash/ConsistentHashRouter.java | 140 +++++++++++
.../common/consistenthash/HashFunction.java | 24 ++
.../rocketmq/common/consistenthash/Node.java | 28 +++
.../common/consistenthash/VirtualNode.java | 41 ++++
6 files changed, 600 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/787d1286/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
new file mode 100644
index 0000000..77198b7
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.rebalance;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.consistenthash.ConsistentHashRouter;
+import org.apache.rocketmq.common.consistenthash.HashFunction;
+import org.apache.rocketmq.common.consistenthash.Node;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.slf4j.Logger;
+
+/**
+ * Consistent Hashing queue algorithm
+ */
+public class AllocateMessageQueueConsistentHash implements AllocateMessageQueueStrategy {
+ private final Logger log = ClientLogger.getLog();
+
+ private final int virtualNodeCnt;
+ private final HashFunction customHashFunction;
+
+ public AllocateMessageQueueConsistentHash() {
+ this(10);
+ }
+
+ public AllocateMessageQueueConsistentHash(int virtualNodeCnt) {
+ this(virtualNodeCnt,null);
+ }
+
+ public AllocateMessageQueueConsistentHash(int virtualNodeCnt, HashFunction customHashFunction) {
+ if (virtualNodeCnt < 0) {
+ throw new IllegalArgumentException("illegal virtualNodeCnt :" + virtualNodeCnt);
+ }
+ this.virtualNodeCnt = virtualNodeCnt;
+ this.customHashFunction = customHashFunction;
+ }
+
+ @Override
+ public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
+ List<String> cidAll) {
+
+ if (currentCID == null || currentCID.length() < 1) {
+ throw new IllegalArgumentException("currentCID is empty");
+ }
+ if (mqAll == null || mqAll.isEmpty()) {
+ throw new IllegalArgumentException("mqAll is null or mqAll empty");
+ }
+ if (cidAll == null || cidAll.isEmpty()) {
+ throw new IllegalArgumentException("cidAll is null or cidAll empty");
+ }
+
+ List<MessageQueue> result = new ArrayList<MessageQueue>();
+ if (!cidAll.contains(currentCID)) {
+ log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
+ consumerGroup,
+ currentCID,
+ cidAll);
+ return result;
+ }
+
+
+ Collection<ClientNode> cidNodes = new ArrayList<>();
+ for (String cid : cidAll) {
+ cidNodes.add(new ClientNode(cid));
+ }
+
+ final ConsistentHashRouter<ClientNode> router; //for building hash ring
+ if (customHashFunction != null) {
+ router = new ConsistentHashRouter<>(cidNodes, virtualNodeCnt, customHashFunction);
+ } else {
+ router = new ConsistentHashRouter<>(cidNodes, virtualNodeCnt);
+ }
+
+ List<MessageQueue> results = new ArrayList<>();
+ for (MessageQueue mq : mqAll) {
+ ClientNode clientNode = router.routeNode(mq.toString());
+ if (clientNode != null && currentCID.equals(clientNode.getKey())) {
+ results.add(mq);
+ }
+ }
+
+ return results;
+
+ }
+
+ @Override
+ public String getName() {
+ return "CONSISTENT_HASH";
+ }
+
+
+ private static class ClientNode implements Node {
+ private final String clientID;
+
+ public ClientNode(String clientID) {
+ this.clientID = clientID;
+ }
+
+ @Override
+ public String getKey() {
+ return clientID;
+ }
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/787d1286/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
new file mode 100644
index 0000000..fc7ab9f
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.rebalance;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class AllocateMessageQueueConsitentHashTest {
+
+ private String topic;
+ private static final String CID_PREFIX = "CID-";
+
+ @Before
+ public void init() {
+ topic = "topic_test";
+ }
+
+
+
+ public void printMessageQueue(List<MessageQueue> messageQueueList, String name) {
+ if (messageQueueList == null || messageQueueList.size() < 1)
+ return;
+ System.out.println(name + ".......................................start");
+ for (MessageQueue messageQueue : messageQueueList) {
+ System.out.println(messageQueue);
+ }
+ System.out.println(name + ".......................................end");
+ }
+
+ @Test
+ public void testCurrentCIDNotExists() {
+ String currentCID = String.valueOf(Integer.MAX_VALUE);
+ List<String> consumerIdList = createConsumerIdList(2);
+ List<MessageQueue> messageQueueList = createMessageQueueList(6);
+ List<MessageQueue> result = new AllocateMessageQueueConsistentHash().allocate("", currentCID, messageQueueList, consumerIdList);
+ printMessageQueue(result, "testCurrentCIDNotExists");
+ Assert.assertEquals(result.size(), 0);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testCurrentCIDIllegalArgument() {
+ List<String> consumerIdList = createConsumerIdList(2);
+ List<MessageQueue> messageQueueList = createMessageQueueList(6);
+ new AllocateMessageQueueConsistentHash().allocate("", "", messageQueueList, consumerIdList);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testMessageQueueIllegalArgument() {
+ String currentCID = "0";
+ List<String> consumerIdList = createConsumerIdList(2);
+ new AllocateMessageQueueConsistentHash().allocate("", currentCID, null, consumerIdList);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testConsumerIdIllegalArgument() {
+ String currentCID = "0";
+ List<MessageQueue> messageQueueList = createMessageQueueList(6);
+ new AllocateMessageQueueConsistentHash().allocate("", currentCID, messageQueueList, null);
+ }
+
+ @Test
+ public void testAllocate1() {
+ testAllocate(20,10);
+ }
+
+ @Test
+ public void testAllocate2() {
+ testAllocate(10,20);
+ }
+
+
+ @Test
+ public void testRun100RandomCase(){
+ for(int i=0;i<100;i++){
+ int consumerSize = new Random().nextInt(200)+1;//1-200
+ int queueSize = new Random().nextInt(100)+1;//1-100
+ testAllocate(queueSize,consumerSize);
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {}
+ }
+ }
+
+
+ public void testAllocate(int queueSize, int consumerSize) {
+ AllocateMessageQueueStrategy allocateMessageQueueConsistentHash = new AllocateMessageQueueConsistentHash(3);
+
+ List<MessageQueue> mqAll = createMessageQueueList(queueSize);
+ //System.out.println("mqAll:" + mqAll.toString());
+
+ List<String> cidAll = createConsumerIdList(consumerSize);
+ List<MessageQueue> allocatedResAll = new ArrayList<>();
+
+ Map<MessageQueue, String> allocateToAllOrigin = new TreeMap<>();
+ //test allocate all
+ {
+
+ List<String> cidBegin = new ArrayList<>(cidAll);
+
+ //System.out.println("cidAll:" + cidBegin.toString());
+ for (String cid : cidBegin) {
+ List<MessageQueue> rs = allocateMessageQueueConsistentHash.allocate("testConsumerGroup", cid, mqAll, cidBegin);
+ for (MessageQueue mq : rs) {
+ allocateToAllOrigin.put(mq, cid);
+ }
+ allocatedResAll.addAll(rs);
+ //System.out.println("rs[" + cid + "]:" + rs.toString());
+ }
+
+ Assert.assertTrue(
+ verifyAllocateAll(cidBegin,mqAll, allocatedResAll));
+ }
+
+ Map<MessageQueue, String> allocateToAllAfterRemoveOne = new TreeMap<>();
+ List<String> cidAfterRemoveOne = new ArrayList<>(cidAll);
+ //test allocate remove one cid
+ {
+ String removeCID = cidAfterRemoveOne.remove(0);
+ //System.out.println("removing one cid "+removeCID);
+ List<MessageQueue> mqShouldOnlyChanged = new ArrayList<>();
+ Iterator<Map.Entry<MessageQueue, String>> it = allocateToAllOrigin.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<MessageQueue, String> entry = it.next();
+ if (entry.getValue().equals(removeCID)) {
+ mqShouldOnlyChanged.add(entry.getKey());
+ }
+ }
+
+ //System.out.println("cidAll:" + cidAfterRemoveOne.toString());
+ List<MessageQueue> allocatedResAllAfterRemove = new ArrayList<>();
+ for (String cid : cidAfterRemoveOne) {
+ List<MessageQueue> rs = allocateMessageQueueConsistentHash.allocate("testConsumerGroup", cid, mqAll, cidAfterRemoveOne);
+ allocatedResAllAfterRemove.addAll(rs);
+ for (MessageQueue mq : rs) {
+ allocateToAllAfterRemoveOne.put(mq, cid);
+ }
+ //System.out.println("rs[" + cid + "]:" + "[" + rs.size() + "]" + rs.toString());
+ }
+
+ Assert.assertTrue("queueSize"+queueSize+"consumerSize:"+consumerSize+"\nmqAll:"+mqAll+"\nallocatedResAllAfterRemove"+allocatedResAllAfterRemove,
+ verifyAllocateAll(cidAfterRemoveOne, mqAll, allocatedResAllAfterRemove));
+ verifyAfterRemove(allocateToAllOrigin, allocateToAllAfterRemoveOne, removeCID);
+ }
+
+ List<String> cidAfterAdd = new ArrayList<>(cidAfterRemoveOne);
+ //test allocate add one more cid
+ {
+ String newCid = CID_PREFIX+"NEW";
+ //System.out.println("add one more cid "+newCid);
+ cidAfterAdd.add(newCid);
+ List<MessageQueue> mqShouldOnlyChanged = new ArrayList<>();
+ //System.out.println("cidAll:" + cidAfterAdd.toString());
+ List<MessageQueue> allocatedResAllAfterAdd = new ArrayList<>();
+ Map<MessageQueue, String> allocateToAll3 = new TreeMap<>();
+ for (String cid : cidAfterAdd) {
+ List<MessageQueue> rs = allocateMessageQueueConsistentHash.allocate("testConsumerGroup", cid, mqAll, cidAfterAdd);
+ allocatedResAllAfterAdd.addAll(rs);
+ for (MessageQueue mq : rs) {
+ allocateToAll3.put(mq, cid);
+ if (cid.equals(newCid)){
+ mqShouldOnlyChanged.add(mq);
+ }
+ }
+ //System.out.println("rs[" + cid + "]:" + "[" + rs.size() + "]" + rs.toString());
+ }
+
+ Assert.assertTrue(
+ verifyAllocateAll(cidAfterAdd,mqAll, allocatedResAllAfterAdd));
+ verifyAfterAdd(allocateToAllAfterRemoveOne, allocateToAll3, newCid);
+ }
+ }
+
+ private boolean verifyAllocateAll(List<String> cidAll,List<MessageQueue> mqAll, List<MessageQueue> allocatedResAll) {
+ if (cidAll.isEmpty()){
+ return allocatedResAll.isEmpty();
+ }
+ return mqAll.containsAll(allocatedResAll) && allocatedResAll.containsAll(mqAll);
+ }
+
+ private void verifyAfterRemove(Map<MessageQueue, String> allocateToBefore, Map<MessageQueue, String> allocateAfter, String removeCID) {
+ for (MessageQueue mq : allocateToBefore.keySet()) {
+ String allocateToOrigin = allocateToBefore.get(mq);
+ if (allocateToOrigin.equals(removeCID)) {
+
+ } else {//the rest queue should be the same
+ Assert.assertTrue(allocateAfter.get(mq).equals(allocateToOrigin));//should be the same
+ }
+ }
+ }
+
+ private void verifyAfterAdd(Map<MessageQueue, String> allocateBefore, Map<MessageQueue, String> allocateAfter, String newCID) {
+ for (MessageQueue mq : allocateAfter.keySet()) {
+ String allocateToOrigin = allocateBefore.get(mq);
+ String allocateToAfter = allocateAfter.get(mq);
+ if (allocateToAfter.equals(newCID)) {
+
+ } else {//the rest queue should be the same
+ Assert.assertTrue("it was allocated to "+allocateToOrigin+". Now, it is to "+allocateAfter.get(mq)+" mq:"+mq,allocateAfter.get(mq).equals(allocateToOrigin));//should be the same
+ }
+ }
+ }
+
+ private List<String> createConsumerIdList(int size) {
+ List<String> consumerIdList = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ consumerIdList.add(CID_PREFIX + String.valueOf(i));
+ }
+ return consumerIdList;
+ }
+
+ private List<MessageQueue> createMessageQueueList(int size) {
+ List<MessageQueue> messageQueueList = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ MessageQueue mq = new MessageQueue(topic, "brokerName", i);
+ messageQueueList.add(mq);
+ }
+ return messageQueueList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/787d1286/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java b/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java
new file mode 100644
index 0000000..8606c43
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.consistenthash;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * To hash Node objects to a hash ring with a certain amount of virtual node.
+ * Method routeNode will return a Node instance which the object key should be allocated to according to consistent hash algorithm
+ *
+ * @param <T>
+ */
+public class ConsistentHashRouter<T extends Node> {
+ private final SortedMap<Long, VirtualNode<T>> ring = new TreeMap<>();
+ private final HashFunction hashFunction;
+
+ public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount) {
+ this(pNodes,vNodeCount, new MD5Hash());
+ }
+
+ /**
+ *
+ * @param pNodes collections of physical nodes
+ * @param vNodeCount amounts of virtual nodes
+ * @param hashFunction hash Function to hash Node instances
+ */
+ public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount, HashFunction hashFunction) {
+ if (hashFunction == null) {
+ throw new NullPointerException("Hash Function is null");
+ }
+ this.hashFunction = hashFunction;
+ if (pNodes != null) {
+ for (T pNode : pNodes) {
+ addNode(pNode, vNodeCount);
+ }
+ }
+ }
+
+ /**
+ * add physic node to the hash ring with some virtual nodes
+ * @param pNode physical node needs added to hash ring
+ * @param vNodeCount the number of virtual node of the physical node. Value should be greater than or equals to 0
+ */
+ public void addNode(T pNode, int vNodeCount) {
+ if (vNodeCount < 0) throw new IllegalArgumentException("illegal virtual node counts :" + vNodeCount);
+ int existingReplicas = getExistingReplicas(pNode);
+ for (int i = 0; i < vNodeCount; i++) {
+ VirtualNode<T> vNode = new VirtualNode<>(pNode, i + existingReplicas);
+ ring.put(hashFunction.hash(vNode.getKey()), vNode);
+ }
+ }
+
+ /**
+ * remove the physical node from the hash ring
+ * @param pNode
+ */
+ public void removeNode(T pNode) {
+ Iterator<Long> it = ring.keySet().iterator();
+ while (it.hasNext()) {
+ Long key = it.next();
+ VirtualNode<T> virtualNode = ring.get(key);
+ if (virtualNode.isVirtualNodeOf(pNode)) {
+ it.remove();
+ }
+ }
+ }
+
+ /**
+ * with a specified key, route the nearest Node instance in the current hash ring
+ * @param objectKey the object key to find a nearest Node
+ * @return
+ */
+ public T routeNode(String objectKey) {
+ if (ring.isEmpty()) {
+ return null;
+ }
+ Long hashVal = hashFunction.hash(objectKey);
+ SortedMap<Long,VirtualNode<T>> tailMap = ring.tailMap(hashVal);
+ Long nodeHashVal = !tailMap.isEmpty() ? tailMap.firstKey() : ring.firstKey();
+ return ring.get(nodeHashVal).getPhysicalNode();
+ }
+
+
+ public int getExistingReplicas(T pNode) {
+ int replicas = 0;
+ for (VirtualNode<T> vNode : ring.values()) {
+ if (vNode.isVirtualNodeOf(pNode)) {
+ replicas++;
+ }
+ }
+ return replicas;
+ }
+
+
+ //default hash function
+ private static class MD5Hash implements HashFunction {
+ MessageDigest instance;
+
+ public MD5Hash() {
+ try {
+ instance = MessageDigest.getInstance("MD5");
+ } catch (NoSuchAlgorithmException e) {
+ }
+ }
+
+ @Override
+ public long hash(String key) {
+ instance.reset();
+ instance.update(key.getBytes());
+ byte[] digest = instance.digest();
+
+ long h = 0;
+ for (int i = 0; i < 4; i++) {
+ h <<= 8;
+ h |= ((int) digest[i]) & 0xFF;
+ }
+ return h;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/787d1286/common/src/main/java/org/apache/rocketmq/common/consistenthash/HashFunction.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/consistenthash/HashFunction.java b/common/src/main/java/org/apache/rocketmq/common/consistenthash/HashFunction.java
new file mode 100644
index 0000000..58fd777
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/consistenthash/HashFunction.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.consistenthash;
+
+/**
+ * Hash String to long value
+ */
+public interface HashFunction {
+ long hash(String key);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/787d1286/common/src/main/java/org/apache/rocketmq/common/consistenthash/Node.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/consistenthash/Node.java b/common/src/main/java/org/apache/rocketmq/common/consistenthash/Node.java
new file mode 100644
index 0000000..0ece210
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/consistenthash/Node.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.consistenthash;
+
+/**
+ * Represent a node which should be mapped to a hash ring
+ */
+public interface Node {
+ /**
+ *
+ * @return the key which will be used for hash mapping
+ */
+ String getKey();
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/787d1286/common/src/main/java/org/apache/rocketmq/common/consistenthash/VirtualNode.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/consistenthash/VirtualNode.java b/common/src/main/java/org/apache/rocketmq/common/consistenthash/VirtualNode.java
new file mode 100644
index 0000000..c8b72d9
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/consistenthash/VirtualNode.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.consistenthash;
+
+
+public class VirtualNode<T extends Node> implements Node {
+ final T physicalNode;
+ final int replicaIndex;
+
+ public VirtualNode(T physicalNode, int replicaIndex) {
+ this.replicaIndex = replicaIndex;
+ this.physicalNode = physicalNode;
+ }
+
+ @Override
+ public String getKey() {
+ return physicalNode.getKey() + "-" + replicaIndex;
+ }
+
+ public boolean isVirtualNodeOf(T pNode) {
+ return physicalNode.getKey().equals(pNode.getKey());
+ }
+
+ public T getPhysicalNode() {
+ return physicalNode;
+ }
+}