You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/10/31 01:45:55 UTC
[pulsar] branch master updated: Handle hash collision in
ConsistentHashingStickyKeyConsumerSelector (#8396)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 014f99a Handle hash collision in ConsistentHashingStickyKeyConsumerSelector (#8396)
014f99a is described below
commit 014f99acadae4d76c8884d806a17ed9797ef86dc
Author: ltamber <lt...@gmail.com>
AuthorDate: Sat Oct 31 09:45:38 2020 +0800
Handle hash collision in ConsistentHashingStickyKeyConsumerSelector (#8396)
### Motivation
Currently, in `ConsistentHashingStickyKeyConsumerSelector` key consumer selector,if multi key have the same hash code, the consumer in the `hashRing` will be replaced by the newer consumer, so the behavior of the message dispatch will not consistent if the consumer subscribed in a different order.
### Modifications
handle the hash collision with a list.
### Verifying this change
unit test `ConsistentHashingStickyKeyConsumerSelectorTest` was passed.
---
...ConsistentHashingStickyKeyConsumerSelector.java | 46 +++++++++++++++++-----
1 file changed, 36 insertions(+), 10 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
index 377edae..f4008ab 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
@@ -18,16 +18,19 @@
*/
package org.apache.pulsar.broker.service;
+import com.google.common.collect.Lists;
+import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
+
import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
-import org.apache.pulsar.common.util.Murmur3_32Hash;
-
/**
* This is a consumer selector based fixed hash range.
*
@@ -39,7 +42,7 @@ public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyCons
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
// Consistent-Hash ring
- private final NavigableMap<Integer, Consumer> hashRing;
+ private final NavigableMap<Integer, List<Consumer>> hashRing;
private final int numberOfPoints;
@@ -57,7 +60,17 @@ public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyCons
for (int i = 0; i < numberOfPoints; i++) {
String key = consumer.consumerName() + i;
int hash = Murmur3_32Hash.getInstance().makeHash(key.getBytes());
- hashRing.put(hash, consumer);
+ hashRing.compute(hash, (k, v) -> {
+ if (v == null) {
+ return Lists.newArrayList(consumer);
+ } else {
+ if (!v.contains(consumer)) {
+ v.add(consumer);
+ v.sort(Comparator.comparing(Consumer::consumerName, String::compareTo));
+ }
+ return v;
+ }
+ });
}
} finally {
rwLock.writeLock().unlock();
@@ -72,7 +85,17 @@ public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyCons
for (int i = 0; i < numberOfPoints; i++) {
String key = consumer.consumerName() + i;
int hash = Murmur3_32Hash.getInstance().makeHash(key.getBytes());
- hashRing.remove(hash, consumer);
+ hashRing.compute(hash, (k, v) -> {
+ if (v == null) {
+ return null;
+ } else {
+ v.removeIf(c -> c.consumerName().equals(consumer.consumerName()));
+ if (v.isEmpty()) {
+ v = null;
+ }
+ return v;
+ }
+ });
}
} finally {
rwLock.writeLock().unlock();
@@ -89,18 +112,21 @@ public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyCons
return null;
}
- Map.Entry<Integer, Consumer> ceilingEntry = hashRing.ceilingEntry(hash);
+ List<Consumer> consumerList;
+ Map.Entry<Integer, List<Consumer>> ceilingEntry = hashRing.ceilingEntry(hash);
if (ceilingEntry != null) {
- return ceilingEntry.getValue();
+ consumerList = ceilingEntry.getValue();
} else {
- return hashRing.firstEntry().getValue();
+ consumerList = hashRing.firstEntry().getValue();
}
+
+ return consumerList.get(hash % consumerList.size());
} finally {
rwLock.readLock().unlock();
}
}
- Map<Integer, Consumer> getRangeConsumer() {
+ Map<Integer, List<Consumer>> getRangeConsumer() {
return Collections.unmodifiableMap(hashRing);
}
}