You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2017/08/01 07:23:30 UTC

[32/50] [abbrv] incubator-rocketmq git commit: [ROCKETMQ-67] Consistent Hash allocate strategy closes apache/incubator-rocketmq#67

[ROCKETMQ-67] Consistent Hash allocate strategy closes apache/incubator-rocketmq#67


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/787d1286
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/787d1286
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/787d1286

Branch: refs/heads/develop
Commit: 787d128613d00a617be2f8ef943e9cfafcc07f85
Parents: 3292d03
Author: Jaskey <li...@gmail.com>
Authored: Sat May 27 11:42:03 2017 +0800
Committer: dongeforever <zh...@yeah.net>
Committed: Tue Jun 6 11:37:29 2017 +0800

----------------------------------------------------------------------
 .../AllocateMessageQueueConsistentHash.java     | 124 ++++++++++
 .../AllocateMessageQueueConsitentHashTest.java  | 243 +++++++++++++++++++
 .../consistenthash/ConsistentHashRouter.java    | 140 +++++++++++
 .../common/consistenthash/HashFunction.java     |  24 ++
 .../rocketmq/common/consistenthash/Node.java    |  28 +++
 .../common/consistenthash/VirtualNode.java      |  41 ++++
 6 files changed, 600 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/787d1286/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
new file mode 100644
index 0000000..77198b7
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.rebalance;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.consistenthash.ConsistentHashRouter;
+import org.apache.rocketmq.common.consistenthash.HashFunction;
+import org.apache.rocketmq.common.consistenthash.Node;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.slf4j.Logger;
+
+/**
+ * Consistent Hashing queue algorithm
+ */
+public class AllocateMessageQueueConsistentHash  implements AllocateMessageQueueStrategy {
+    private final Logger log = ClientLogger.getLog();
+
+    private final int virtualNodeCnt;
+    private final HashFunction customHashFunction;
+
+    public AllocateMessageQueueConsistentHash() {
+        this(10);
+    }
+
+    public AllocateMessageQueueConsistentHash(int virtualNodeCnt) {
+        this(virtualNodeCnt,null);
+    }
+
+    public AllocateMessageQueueConsistentHash(int virtualNodeCnt, HashFunction customHashFunction) {
+        if (virtualNodeCnt < 0) {
+            throw new IllegalArgumentException("illegal virtualNodeCnt :" + virtualNodeCnt);
+        }
+        this.virtualNodeCnt = virtualNodeCnt;
+        this.customHashFunction = customHashFunction;
+    }
+
+    @Override
+    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
+        List<String> cidAll) {
+
+        if (currentCID == null || currentCID.length() < 1) {
+            throw new IllegalArgumentException("currentCID is empty");
+        }
+        if (mqAll == null || mqAll.isEmpty()) {
+            throw new IllegalArgumentException("mqAll is null or mqAll empty");
+        }
+        if (cidAll == null || cidAll.isEmpty()) {
+            throw new IllegalArgumentException("cidAll is null or cidAll empty");
+        }
+
+        List<MessageQueue> result = new ArrayList<MessageQueue>();
+        if (!cidAll.contains(currentCID)) {
+            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
+                consumerGroup,
+                currentCID,
+                cidAll);
+            return result;
+        }
+
+
+        Collection<ClientNode> cidNodes = new ArrayList<>();
+        for (String cid : cidAll) {
+            cidNodes.add(new ClientNode(cid));
+        }
+
+        final ConsistentHashRouter<ClientNode> router; //for building hash ring
+        if (customHashFunction != null) {
+            router = new ConsistentHashRouter<>(cidNodes, virtualNodeCnt, customHashFunction);
+        } else {
+            router = new ConsistentHashRouter<>(cidNodes, virtualNodeCnt);
+        }
+
+        List<MessageQueue> results = new ArrayList<>();
+        for (MessageQueue mq : mqAll) {
+            ClientNode clientNode = router.routeNode(mq.toString());
+            if (clientNode != null && currentCID.equals(clientNode.getKey())) {
+                results.add(mq);
+            }
+        }
+
+        return results;
+
+    }
+
+    @Override
+    public String getName() {
+        return "CONSISTENT_HASH";
+    }
+
+
+    private static class ClientNode implements Node {
+        private final String clientID;
+
+        public ClientNode(String clientID) {
+            this.clientID = clientID;
+        }
+
+        @Override
+        public String getKey() {
+            return clientID;
+        }
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/787d1286/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
new file mode 100644
index 0000000..fc7ab9f
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.consumer.rebalance;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class AllocateMessageQueueConsitentHashTest {
+
+    private String topic;
+    private static final String CID_PREFIX = "CID-";
+
+    @Before
+    public void init() {
+        topic = "topic_test";
+    }
+
+
+
+    public void printMessageQueue(List<MessageQueue> messageQueueList, String name) {
+        if (messageQueueList == null || messageQueueList.size() < 1)
+            return;
+        System.out.println(name + ".......................................start");
+        for (MessageQueue messageQueue : messageQueueList) {
+            System.out.println(messageQueue);
+        }
+        System.out.println(name + ".......................................end");
+    }
+
+    @Test
+    public void testCurrentCIDNotExists() {
+        String currentCID = String.valueOf(Integer.MAX_VALUE);
+        List<String> consumerIdList = createConsumerIdList(2);
+        List<MessageQueue> messageQueueList = createMessageQueueList(6);
+        List<MessageQueue> result = new AllocateMessageQueueConsistentHash().allocate("", currentCID, messageQueueList, consumerIdList);
+        printMessageQueue(result, "testCurrentCIDNotExists");
+        Assert.assertEquals(result.size(), 0);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testCurrentCIDIllegalArgument() {
+        List<String> consumerIdList = createConsumerIdList(2);
+        List<MessageQueue> messageQueueList = createMessageQueueList(6);
+        new AllocateMessageQueueConsistentHash().allocate("", "", messageQueueList, consumerIdList);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testMessageQueueIllegalArgument() {
+        String currentCID = "0";
+        List<String> consumerIdList = createConsumerIdList(2);
+        new AllocateMessageQueueConsistentHash().allocate("", currentCID, null, consumerIdList);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testConsumerIdIllegalArgument() {
+        String currentCID = "0";
+        List<MessageQueue> messageQueueList = createMessageQueueList(6);
+        new AllocateMessageQueueConsistentHash().allocate("", currentCID, messageQueueList, null);
+    }
+
+    @Test
+    public void testAllocate1() {
+        testAllocate(20,10);
+    }
+
+    @Test
+    public void testAllocate2() {
+        testAllocate(10,20);
+    }
+
+
+    @Test
+    public void testRun100RandomCase(){
+        for(int i=0;i<100;i++){
+            int consumerSize = new Random().nextInt(200)+1;//1-200
+            int queueSize = new Random().nextInt(100)+1;//1-100
+            testAllocate(queueSize,consumerSize);
+            try {
+                Thread.sleep(1);
+            } catch (InterruptedException e) {}
+        }
+    }
+
+
+    public void testAllocate(int queueSize, int consumerSize) {
+        AllocateMessageQueueStrategy allocateMessageQueueConsistentHash = new AllocateMessageQueueConsistentHash(3);
+
+        List<MessageQueue> mqAll = createMessageQueueList(queueSize);
+        //System.out.println("mqAll:" + mqAll.toString());
+
+        List<String> cidAll = createConsumerIdList(consumerSize);
+        List<MessageQueue> allocatedResAll = new ArrayList<>();
+
+        Map<MessageQueue, String> allocateToAllOrigin = new TreeMap<>();
+        //test allocate all
+        {
+
+            List<String> cidBegin = new ArrayList<>(cidAll);
+
+            //System.out.println("cidAll:" + cidBegin.toString());
+            for (String cid : cidBegin) {
+                List<MessageQueue> rs = allocateMessageQueueConsistentHash.allocate("testConsumerGroup", cid, mqAll, cidBegin);
+                for (MessageQueue mq : rs) {
+                    allocateToAllOrigin.put(mq, cid);
+                }
+                allocatedResAll.addAll(rs);
+                //System.out.println("rs[" + cid + "]:" + rs.toString());
+            }
+
+            Assert.assertTrue(
+                verifyAllocateAll(cidBegin,mqAll, allocatedResAll));
+        }
+
+        Map<MessageQueue, String> allocateToAllAfterRemoveOne = new TreeMap<>();
+        List<String> cidAfterRemoveOne = new ArrayList<>(cidAll);
+        //test allocate remove one cid
+        {
+            String removeCID = cidAfterRemoveOne.remove(0);
+            //System.out.println("removing one cid "+removeCID);
+            List<MessageQueue> mqShouldOnlyChanged = new ArrayList<>();
+            Iterator<Map.Entry<MessageQueue, String>> it = allocateToAllOrigin.entrySet().iterator();
+            while (it.hasNext()) {
+                Map.Entry<MessageQueue, String> entry = it.next();
+                if (entry.getValue().equals(removeCID)) {
+                    mqShouldOnlyChanged.add(entry.getKey());
+                }
+            }
+
+            //System.out.println("cidAll:" + cidAfterRemoveOne.toString());
+            List<MessageQueue> allocatedResAllAfterRemove = new ArrayList<>();
+            for (String cid : cidAfterRemoveOne) {
+                List<MessageQueue> rs = allocateMessageQueueConsistentHash.allocate("testConsumerGroup", cid, mqAll, cidAfterRemoveOne);
+                allocatedResAllAfterRemove.addAll(rs);
+                for (MessageQueue mq : rs) {
+                    allocateToAllAfterRemoveOne.put(mq, cid);
+                }
+                //System.out.println("rs[" + cid + "]:" + "[" + rs.size() + "]" + rs.toString());
+            }
+
+            Assert.assertTrue("queueSize"+queueSize+"consumerSize:"+consumerSize+"\nmqAll:"+mqAll+"\nallocatedResAllAfterRemove"+allocatedResAllAfterRemove,
+                verifyAllocateAll(cidAfterRemoveOne, mqAll, allocatedResAllAfterRemove));
+            verifyAfterRemove(allocateToAllOrigin, allocateToAllAfterRemoveOne, removeCID);
+        }
+
+        List<String> cidAfterAdd = new ArrayList<>(cidAfterRemoveOne);
+        //test allocate add one more cid
+        {
+            String newCid = CID_PREFIX+"NEW";
+            //System.out.println("add one more cid "+newCid);
+            cidAfterAdd.add(newCid);
+            List<MessageQueue> mqShouldOnlyChanged = new ArrayList<>();
+            //System.out.println("cidAll:" + cidAfterAdd.toString());
+            List<MessageQueue> allocatedResAllAfterAdd = new ArrayList<>();
+            Map<MessageQueue, String> allocateToAll3 = new TreeMap<>();
+            for (String cid : cidAfterAdd) {
+                List<MessageQueue> rs = allocateMessageQueueConsistentHash.allocate("testConsumerGroup", cid, mqAll, cidAfterAdd);
+                allocatedResAllAfterAdd.addAll(rs);
+                for (MessageQueue mq : rs) {
+                    allocateToAll3.put(mq, cid);
+                    if (cid.equals(newCid)){
+                        mqShouldOnlyChanged.add(mq);
+                    }
+                }
+                //System.out.println("rs[" + cid + "]:" + "[" + rs.size() + "]" + rs.toString());
+            }
+
+            Assert.assertTrue(
+                verifyAllocateAll(cidAfterAdd,mqAll, allocatedResAllAfterAdd));
+            verifyAfterAdd(allocateToAllAfterRemoveOne, allocateToAll3, newCid);
+        }
+    }
+
+    private boolean verifyAllocateAll(List<String> cidAll,List<MessageQueue> mqAll, List<MessageQueue> allocatedResAll) {
+        if (cidAll.isEmpty()){
+            return allocatedResAll.isEmpty();
+        }
+        return mqAll.containsAll(allocatedResAll) && allocatedResAll.containsAll(mqAll);
+    }
+
+    private void verifyAfterRemove(Map<MessageQueue, String> allocateToBefore, Map<MessageQueue, String> allocateAfter, String removeCID) {
+        for (MessageQueue mq : allocateToBefore.keySet()) {
+            String allocateToOrigin = allocateToBefore.get(mq);
+            if (allocateToOrigin.equals(removeCID)) {
+
+            } else {//the rest queue should be the same
+                Assert.assertTrue(allocateAfter.get(mq).equals(allocateToOrigin));//should be the same
+            }
+        }
+    }
+
+    private void verifyAfterAdd(Map<MessageQueue, String> allocateBefore, Map<MessageQueue, String> allocateAfter, String newCID) {
+        for (MessageQueue mq : allocateAfter.keySet()) {
+            String allocateToOrigin = allocateBefore.get(mq);
+            String allocateToAfter = allocateAfter.get(mq);
+            if (allocateToAfter.equals(newCID)) {
+
+            } else {//the rest queue should be the same
+                Assert.assertTrue("it was allocated to "+allocateToOrigin+". Now, it is to "+allocateAfter.get(mq)+" mq:"+mq,allocateAfter.get(mq).equals(allocateToOrigin));//should be the same
+            }
+        }
+    }
+
+    private List<String> createConsumerIdList(int size) {
+        List<String> consumerIdList = new ArrayList<>(size);
+        for (int i = 0; i < size; i++) {
+            consumerIdList.add(CID_PREFIX + String.valueOf(i));
+        }
+        return consumerIdList;
+    }
+
+    private List<MessageQueue> createMessageQueueList(int size) {
+        List<MessageQueue> messageQueueList = new ArrayList<>(size);
+        for (int i = 0; i < size; i++) {
+            MessageQueue mq = new MessageQueue(topic, "brokerName", i);
+            messageQueueList.add(mq);
+        }
+        return messageQueueList;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/787d1286/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java b/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java
new file mode 100644
index 0000000..8606c43
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/consistenthash/ConsistentHashRouter.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.consistenthash;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * To hash Node objects to a hash ring with a certain amount of virtual node.
+ * Method routeNode will return a Node instance which the object key should be allocated to according to consistent hash algorithm
+ *
+ * @param <T>
+ */
+public class ConsistentHashRouter<T extends Node> {
+    private final SortedMap<Long, VirtualNode<T>> ring = new TreeMap<>();
+    private final HashFunction hashFunction;
+
+    public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount) {
+        this(pNodes,vNodeCount, new MD5Hash());
+    }
+
+    /**
+     *
+     * @param pNodes collections of physical nodes
+     * @param vNodeCount amounts of virtual nodes
+     * @param hashFunction hash Function to hash Node instances
+     */
+    public ConsistentHashRouter(Collection<T> pNodes, int vNodeCount, HashFunction hashFunction) {
+        if (hashFunction == null) {
+            throw new NullPointerException("Hash Function is null");
+        }
+        this.hashFunction = hashFunction;
+        if (pNodes != null) {
+            for (T pNode : pNodes) {
+                addNode(pNode, vNodeCount);
+            }
+        }
+    }
+
+    /**
+     * add physic node to the hash ring with some virtual nodes
+     * @param pNode physical node needs added to hash ring
+     * @param vNodeCount the number of virtual node of the physical node. Value should be greater than or equals to 0
+     */
+    public void addNode(T pNode, int vNodeCount) {
+        if (vNodeCount < 0) throw new IllegalArgumentException("illegal virtual node counts :" + vNodeCount);
+        int existingReplicas = getExistingReplicas(pNode);
+        for (int i = 0; i < vNodeCount; i++) {
+            VirtualNode<T> vNode = new VirtualNode<>(pNode, i + existingReplicas);
+            ring.put(hashFunction.hash(vNode.getKey()), vNode);
+        }
+    }
+
+    /**
+     * remove the physical node from the hash ring
+     * @param pNode
+     */
+    public void removeNode(T pNode) {
+        Iterator<Long> it = ring.keySet().iterator();
+        while (it.hasNext()) {
+            Long key = it.next();
+            VirtualNode<T> virtualNode = ring.get(key);
+            if (virtualNode.isVirtualNodeOf(pNode)) {
+                it.remove();
+            }
+        }
+    }
+
+    /**
+     * with a specified key, route the nearest Node instance in the current hash ring
+     * @param objectKey the object key to find a nearest Node
+     * @return
+     */
+    public T routeNode(String objectKey) {
+        if (ring.isEmpty()) {
+            return null;
+        }
+        Long hashVal = hashFunction.hash(objectKey);
+        SortedMap<Long,VirtualNode<T>> tailMap = ring.tailMap(hashVal);
+        Long nodeHashVal = !tailMap.isEmpty() ? tailMap.firstKey() : ring.firstKey();
+        return ring.get(nodeHashVal).getPhysicalNode();
+    }
+
+
+    public int getExistingReplicas(T pNode) {
+        int replicas = 0;
+        for (VirtualNode<T> vNode : ring.values()) {
+            if (vNode.isVirtualNodeOf(pNode)) {
+                replicas++;
+            }
+        }
+        return replicas;
+    }
+
+    
+    //default hash function
+    private static class MD5Hash implements HashFunction {
+        MessageDigest instance;
+
+        public MD5Hash() {
+            try {
+                instance = MessageDigest.getInstance("MD5");
+            } catch (NoSuchAlgorithmException e) {
+            }
+        }
+
+        @Override
+        public long hash(String key) {
+            instance.reset();
+            instance.update(key.getBytes());
+            byte[] digest = instance.digest();
+
+            long h = 0;
+            for (int i = 0; i < 4; i++) {
+                h <<= 8;
+                h |= ((int) digest[i]) & 0xFF;
+            }
+            return h;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/787d1286/common/src/main/java/org/apache/rocketmq/common/consistenthash/HashFunction.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/consistenthash/HashFunction.java b/common/src/main/java/org/apache/rocketmq/common/consistenthash/HashFunction.java
new file mode 100644
index 0000000..58fd777
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/consistenthash/HashFunction.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.consistenthash;
+
+/**
+ * Hash String to long value
+ */
+public interface HashFunction {
+    long hash(String key);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/787d1286/common/src/main/java/org/apache/rocketmq/common/consistenthash/Node.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/consistenthash/Node.java b/common/src/main/java/org/apache/rocketmq/common/consistenthash/Node.java
new file mode 100644
index 0000000..0ece210
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/consistenthash/Node.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.consistenthash;
+
+/**
+ * Represent a node which should be mapped to a hash ring
+ */
+public interface Node {
+    /**
+     *
+     * @return the key which will be used for hash mapping
+     */
+    String getKey();
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/787d1286/common/src/main/java/org/apache/rocketmq/common/consistenthash/VirtualNode.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/consistenthash/VirtualNode.java b/common/src/main/java/org/apache/rocketmq/common/consistenthash/VirtualNode.java
new file mode 100644
index 0000000..c8b72d9
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/consistenthash/VirtualNode.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.consistenthash;
+
+
+public class VirtualNode<T extends Node> implements Node {
+    final T physicalNode;
+    final int replicaIndex;
+
+    public VirtualNode(T physicalNode, int replicaIndex) {
+        this.replicaIndex = replicaIndex;
+        this.physicalNode = physicalNode;
+    }
+
+    @Override
+    public String getKey() {
+        return physicalNode.getKey() + "-" + replicaIndex;
+    }
+
+    public boolean isVirtualNodeOf(T pNode) {
+        return physicalNode.getKey().equals(pNode.getKey());
+    }
+
+    public T getPhysicalNode() {
+        return physicalNode;
+    }
+}