You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/10/31 01:45:55 UTC

[pulsar] branch master updated: Handle hash collision in ConsistentHashingStickyKeyConsumerSelector (#8396)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 014f99a  Handle hash collision in ConsistentHashingStickyKeyConsumerSelector (#8396)
014f99a is described below

commit 014f99acadae4d76c8884d806a17ed9797ef86dc
Author: ltamber <lt...@gmail.com>
AuthorDate: Sat Oct 31 09:45:38 2020 +0800

    Handle hash collision in ConsistentHashingStickyKeyConsumerSelector (#8396)
    
    ### Motivation
    Currently, in `ConsistentHashingStickyKeyConsumerSelector` key consumer selector,if multi key have the same hash code, the consumer in the `hashRing` will be replaced by the newer consumer,  so the behavior of the message dispatch will not consistent if the consumer subscribed in a different order.
    
    ### Modifications
    
    handle the hash collision with a list.
    
    ### Verifying this change
    
    unit test `ConsistentHashingStickyKeyConsumerSelectorTest` was passed.
---
 ...ConsistentHashingStickyKeyConsumerSelector.java | 46 +++++++++++++++++-----
 1 file changed, 36 insertions(+), 10 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
index 377edae..f4008ab 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
@@ -18,16 +18,19 @@
  */
 package org.apache.pulsar.broker.service;
 
+import com.google.common.collect.Lists;
+import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
+
 import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
-import org.apache.pulsar.common.util.Murmur3_32Hash;
-
 /**
  * This is a consumer selector based fixed hash range.
  *
@@ -39,7 +42,7 @@ public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyCons
     private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
 
     // Consistent-Hash ring
-    private final NavigableMap<Integer, Consumer> hashRing;
+    private final NavigableMap<Integer, List<Consumer>> hashRing;
 
     private final int numberOfPoints;
 
@@ -57,7 +60,17 @@ public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyCons
             for (int i = 0; i < numberOfPoints; i++) {
                 String key = consumer.consumerName() + i;
                 int hash = Murmur3_32Hash.getInstance().makeHash(key.getBytes());
-                hashRing.put(hash, consumer);
+                hashRing.compute(hash, (k, v) -> {
+                    if (v == null) {
+                        return Lists.newArrayList(consumer);
+                    } else {
+                        if (!v.contains(consumer)) {
+                            v.add(consumer);
+                            v.sort(Comparator.comparing(Consumer::consumerName, String::compareTo));
+                        }
+                        return v;
+                    }
+                });
             }
         } finally {
             rwLock.writeLock().unlock();
@@ -72,7 +85,17 @@ public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyCons
             for (int i = 0; i < numberOfPoints; i++) {
                 String key = consumer.consumerName() + i;
                 int hash = Murmur3_32Hash.getInstance().makeHash(key.getBytes());
-                hashRing.remove(hash, consumer);
+                hashRing.compute(hash, (k, v) -> {
+                    if (v == null) {
+                        return null;
+                    } else {
+                        v.removeIf(c -> c.consumerName().equals(consumer.consumerName()));
+                        if (v.isEmpty()) {
+                            v = null;
+                        }
+                        return v;
+                    }
+                });
             }
         } finally {
             rwLock.writeLock().unlock();
@@ -89,18 +112,21 @@ public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyCons
                 return null;
             }
 
-            Map.Entry<Integer, Consumer> ceilingEntry = hashRing.ceilingEntry(hash);
+            List<Consumer> consumerList;
+            Map.Entry<Integer, List<Consumer>> ceilingEntry = hashRing.ceilingEntry(hash);
             if (ceilingEntry != null) {
-                return ceilingEntry.getValue();
+                consumerList =  ceilingEntry.getValue();
             } else {
-                return hashRing.firstEntry().getValue();
+                consumerList = hashRing.firstEntry().getValue();
             }
+
+            return consumerList.get(hash % consumerList.size());
         } finally {
             rwLock.readLock().unlock();
         }
     }
 
-    Map<Integer, Consumer> getRangeConsumer() {
+    Map<Integer, List<Consumer>> getRangeConsumer() {
         return Collections.unmodifiableMap(hashRing);
     }
 }