You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/04/22 23:23:01 UTC

[GitHub] [pulsar] merlimat commented on a change in pull request #6791: Use consistent hashing in KeyShared distribution

merlimat commented on a change in pull request #6791:
URL: https://github.com/apache/pulsar/pull/6791#discussion_r413398739



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
##########
@@ -18,86 +18,64 @@
  */
 package org.apache.pulsar.broker.service;
 
-import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
-import org.apache.pulsar.common.util.Murmur3_32Hash;
-
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
 
 /**
  * This is a consumer selector based fixed hash range.
  *
- * 1.Each consumer serves a fixed range of hash value
- * 2.The whole range of hash value could be covered by all the consumers.
- * 3.Once a consumer is removed, the left consumers could still serve the whole range.
- *
- * Initializing with a fixed hash range, by default 2 << 5.
- * First consumer added, hash range looks like:
- *
- * 0 -> 65536(consumer-1)
- *
- * Second consumer added, will find a biggest range to split:
- *
- * 0 -> 32768(consumer-2) -> 65536(consumer-1)
- *
- * While a consumer removed, The range for this consumer will be taken over
- * by other consumer, consumer-2 removed:
- *
- * 0 -> 65536(consumer-1)
- *
- * In this approach use skip list map to maintain the hash range and consumers.
- *
- * Select consumer will return the ceiling key of message key hashcode % range size.
- *
+ * The implementation uses consistent hashing to evenly split, the
+ * number of keys assigned to each consumer.
  */
 public class HashRangeAutoSplitStickyKeyConsumerSelector implements StickyKeyConsumerSelector {
 
-    private final int rangeSize;
+    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
 
-    private final ConcurrentSkipListMap<Integer, Consumer> rangeMap;
-    private final Map<Consumer, Integer> consumerRange;
+    // Consistent-Hash ring
+    private final NavigableMap<Integer, Consumer> hashRing;
 
-    public HashRangeAutoSplitStickyKeyConsumerSelector() {
-        this(DEFAULT_RANGE_SIZE);
-    }
+    private final int numberOfPOints;
 
-    public HashRangeAutoSplitStickyKeyConsumerSelector(int rangeSize) {
-        if (rangeSize < 2) {
-            throw new IllegalArgumentException("range size must greater than 2");
-        }
-        if (!is2Power(rangeSize)) {
-            throw new IllegalArgumentException("range size must be nth power with 2");
-        }
-        this.rangeMap = new ConcurrentSkipListMap<>();
-        this.consumerRange = new HashMap<>();
-        this.rangeSize = rangeSize;
+    public HashRangeAutoSplitStickyKeyConsumerSelector(int numberOfPOints) {
+        this.hashRing = new TreeMap<>();
+        this.numberOfPOints = numberOfPOints;
     }
 
     @Override
-    public synchronized void addConsumer(Consumer consumer) throws ConsumerAssignException {
-        if (rangeMap.size() == 0) {
-            rangeMap.put(rangeSize, consumer);
-            consumerRange.put(consumer, rangeSize);
-        } else {
-            splitRange(findBiggestRange(), consumer);
+    public void addConsumer(Consumer consumer) throws ConsumerAssignException {
+        rwLock.writeLock().lock();
+        try {
+            // Insert multiple points on the hash ring for every consumer
+            // The points are deterministically added based on the hash of the consumer name
+            for (int i = 0; i < numberOfPOints; i++) {
+                String key = consumer.consumerName() + i;
+                int hash = Murmur3_32Hash.getInstance().makeHash(key.getBytes());
+                hashRing.put(hash, consumer);
+            }
+        } finally {
+            rwLock.writeLock().unlock();
         }
     }
 
     @Override
-    public synchronized void removeConsumer(Consumer consumer) {
-        Integer removeRange = consumerRange.remove(consumer);
-        if (removeRange != null) {
-            if (removeRange == rangeSize && rangeMap.size() > 1) {
-                Map.Entry<Integer, Consumer> lowerEntry = rangeMap.lowerEntry(removeRange);
-                rangeMap.put(removeRange, lowerEntry.getValue());
-                rangeMap.remove(lowerEntry.getKey());
-                consumerRange.put(lowerEntry.getValue(), removeRange);
-            } else {
-                rangeMap.remove(removeRange);
+    public void removeConsumer(Consumer consumer) {
+        rwLock.writeLock().lock();
+        try {
+            // Remove all the points that were added for this consumer
+            for (int i = 0; i < numberOfPOints; i++) {
+                String key = consumer.consumerName() + i;
+                int hash = Murmur3_32Hash.getInstance().makeHash(key.getBytes());
+                hashRing.remove(hash, consumer);

Review comment:
       It's necessary because there can be a hash conflict. When that happens, the point will only be assigned to one consumer. 
   
   It is not a problem since we have multiple points in the ring, though when we remove `c1` we need to be sure we're not removing a point that was actually assigned to `c2`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org