You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/06/01 03:58:11 UTC

[pulsar] branch master updated: Use consistent hashing in KeyShared distribution (#6791)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bf8268  Use consistent hashing in KeyShared distribution (#6791)
4bf8268 is described below

commit 4bf8268a2fec78ba451ec517ce5a4e2e3f0a6951
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sun May 31 20:57:52 2020 -0700

    Use consistent hashing in KeyShared distribution (#6791)
    
    ### Motivation
    
    The current implementation of KeyShared subscriptions uses a mechanism to divide they hash space across the available consumers. This is based on dividing the currently assigned hash ranges when a new consumer joins or leaves.
    
    There are few problems with the current approach:
    
     1. When adding a new consumer, the bigger range is split to make space for the new consumer. That means that when adding 3 consumers, 1 of them will "own" a hash range that is double in size compared to the other 2 consumers and consequently it will receive twice the traffic. This is not terrible, but not ideal either.
    
     2. When removing consumers, the range for the removed consumer will always be assigned to the next consumer. The new hash distribution really depends on the sequence upon which the consumers are removed. If one is unlucky, the traffic will be very heavily skewed having situations where 1 consumer is getting >90% of the traffic.
    
    This is an example of removing consumers in sequence, with attached the size of their respective hash ranges:
    
    ```
    Removed consumer from rangeMap: {c1=8192, c10=4096, c3=4096, c4=8192, c5=4096, c6=8192, c7=16384, c8=8192, c9=4096}
    Removed consumer from rangeMap: {c1=8192, c10=4096, c4=8192, c5=4096, c6=12288, c7=16384, c8=8192, c9=4096}
    Removed consumer from rangeMap: {c1=8192, c10=4096, c5=4096, c6=12288, c7=16384, c8=16384, c9=4096}
    Removed consumer from rangeMap: {c1=8192, c10=8192, c6=12288, c7=16384, c8=16384, c9=4096}
    Removed consumer from rangeMap: {c1=24576, c10=8192, c6=12288, c7=16384, c9=4096}
    Removed consumer from rangeMap: {c1=24576, c10=8192, c7=28672, c9=4096}
    Removed consumer from rangeMap: {c1=53248, c10=8192, c9=4096}
    ```
    As you can see, `c1` will take most of the traffic.
    
    Most likely it will not be able to process all the messages and the backlog builds up.
    
    
    ### Modifications
    
     * No functional difference from user perspective
     * Use consistent hashing mechanism to assign keys to consumers. This will ensure even distribution without the degradation in the corner cases.
     * Number of points in the ring is configurable, default=100.
     * Refactored current unit test. The test are currently duplicating the logic of the implementation and checking the a messages is placed on the bucket for one consumer. Of course it works, since it's the same code executed on both sides. But, instead, the test should focus on the contract of the feature: message should arrive in order, there should be "decent" sharing of load across consumers.
      * @codelipenghui I've removed the `selectByIndex()`. In my opinion there's absolutely no difference in efficiency/performance as I've also explained on https://github.com/apache/pulsar/pull/6647#issuecomment-617497271. I'm happy to discuss more about it.
---
 conf/broker.conf                                   |   8 +
 conf/standalone.conf                               |   8 +
 .../apache/pulsar/broker/ServiceConfiguration.java |  11 +
 ...ConsistentHashingStickyKeyConsumerSelector.java | 109 +++++
 ...ashRangeAutoSplitStickyKeyConsumerSelector.java |  14 -
 ...ashRangeExclusiveStickyKeyConsumerSelector.java |  22 -
 .../broker/service/StickyKeyConsumerSelector.java  |   9 -
 ...istentStickyKeyDispatcherMultipleConsumers.java |   4 +-
 .../nonpersistent/NonPersistentSubscription.java   |  43 +-
 ...istentStickyKeyDispatcherMultipleConsumers.java |   4 +-
 .../service/persistent/PersistentSubscription.java |  43 +-
 ...istentHashingStickyKeyConsumerSelectorTest.java | 137 ++++++
 ...angeAutoSplitStickyKeyConsumerSelectorTest.java | 155 -------
 ...ntStickyKeyDispatcherMultipleConsumersTest.java |   2 +-
 .../client/api/KeySharedSubscriptionTest.java      | 240 +++++-----
 .../NonPersistentKeySharedSubscriptionTest.java    | 482 ---------------------
 16 files changed, 451 insertions(+), 840 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index 6cb0d86..436b420 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -143,6 +143,14 @@ subscriptionExpiryCheckIntervalInMinutes=5
 # Enable Key_Shared subscription (default is enabled)
 subscriptionKeySharedEnable=true
 
+# On KeyShared subscriptions, with default AUTO_SPLIT mode, use splitting ranges or
+# consistent hashing to reassign keys to new consumers
+subscriptionKeySharedUseConsistentHashing=false
+
+# On KeyShared subscriptions, number of points in the consistent-hashing ring.
+# The higher the number, the more equal the assignment of keys to consumers
+subscriptionKeySharedConsistentHashingReplicaPoints=100
+
 # Set the default behavior for message deduplication in the broker
 # This can be overridden per-namespace. If enabled, broker will reject
 # messages that were already stored in the topic
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 0f58a87..319b510 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -98,6 +98,14 @@ subscriptionExpirationTimeMinutes=0
 # Enable subscription message redelivery tracker to send redelivery count to consumer (default is enabled)
 subscriptionRedeliveryTrackerEnabled=true
 
+# On KeyShared subscriptions, with default AUTO_SPLIT mode, use splitting ranges or
+# consistent hashing to reassign keys to new consumers
+subscriptionKeySharedUseConsistentHashing=false
+
+# On KeyShared subscriptions, number of points in the consistent-hashing ring.
+# The higher the number, the more equal the assignment of keys to consumers
+subscriptionKeySharedConsistentHashingReplicaPoints=100
+
 # How frequently to proactively check and purge expired subscription
 subscriptionExpiryCheckIntervalInMinutes=5
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index b623afc..e9b9246 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -336,6 +336,17 @@ public class ServiceConfiguration implements PulsarConfiguration {
     )
     private boolean subscriptionKeySharedEnable = true;
 
+    @FieldContext(category = CATEGORY_POLICIES,
+            doc = "On KeyShared subscriptions, with default AUTO_SPLIT mode, use splitting ranges or " +
+            "consistent hashing to reassign keys to new consumers")
+    private boolean subscriptionKeySharedUseConsistentHashing = false;
+
+    @FieldContext(
+        category = CATEGORY_POLICIES,
+        doc = "On KeyShared subscriptions, number of points in the consistent-hashing ring. "
+                + "The higher the number, the more equal the assignment of keys to consumers")
+    private int subscriptionKeySharedConsistentHashingReplicaPoints = 100;
+
     @FieldContext(
         category = CATEGORY_POLICIES,
         doc = "Set the default behavior for message deduplication in the broker.\n\n"
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
new file mode 100644
index 0000000..f27c7f9
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
+
+/**
+ * This is a consumer selector based fixed hash range.
+ *
+ * The implementation uses consistent hashing to evenly split, the
+ * number of keys assigned to each consumer.
+ */
+public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyConsumerSelector {
+
+    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+
+    // Consistent-Hash ring
+    private final NavigableMap<Integer, Consumer> hashRing;
+
+    private final int numberOfPoints;
+
+    public ConsistentHashingStickyKeyConsumerSelector(int numberOfPoints) {
+        this.hashRing = new TreeMap<>();
+        this.numberOfPoints = numberOfPoints;
+    }
+
+    @Override
+    public void addConsumer(Consumer consumer) throws ConsumerAssignException {
+        rwLock.writeLock().lock();
+        try {
+            // Insert multiple points on the hash ring for every consumer
+            // The points are deterministically added based on the hash of the consumer name
+            for (int i = 0; i < numberOfPoints; i++) {
+                String key = consumer.consumerName() + i;
+                int hash = Murmur3_32Hash.getInstance().makeHash(key.getBytes());
+                hashRing.put(hash, consumer);
+            }
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public void removeConsumer(Consumer consumer) {
+        rwLock.writeLock().lock();
+        try {
+            // Remove all the points that were added for this consumer
+            for (int i = 0; i < numberOfPoints; i++) {
+                String key = consumer.consumerName() + i;
+                int hash = Murmur3_32Hash.getInstance().makeHash(key.getBytes());
+                hashRing.remove(hash, consumer);
+            }
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public Consumer select(byte[] stickyKey) {
+        return select(Murmur3_32Hash.getInstance().makeHash(stickyKey));
+    }
+
+    @Override
+    public Consumer select(int hash) {
+        rwLock.readLock().lock();
+        try {
+            if (hashRing.isEmpty()) {
+                return null;
+            }
+
+            Map.Entry<Integer, Consumer> ceilingEntry = hashRing.ceilingEntry(hash);
+            if (ceilingEntry != null) {
+                return ceilingEntry.getValue();
+            } else {
+                return hashRing.firstEntry().getValue();
+            }
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    Map<Integer, Consumer> getRangeConsumer() {
+        return Collections.unmodifiableMap(hashRing);
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
index a6e93c2..5c3c5b5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
@@ -115,15 +115,6 @@ public class HashRangeAutoSplitStickyKeyConsumerSelector implements StickyKeyCon
         }
     }
 
-    @Override
-    public Consumer selectByIndex(int index) {
-        if (rangeMap.size() > 0) {
-            return rangeMap.ceilingEntry(index).getValue();
-        } else {
-            return null;
-        }
-    }
-
     private int findBiggestRange() {
         int slots = 0;
         int busiestRange = rangeSize;
@@ -159,11 +150,6 @@ public class HashRangeAutoSplitStickyKeyConsumerSelector implements StickyKeyCon
         return (num & num - 1) == 0;
     }
 
-    @Override
-    public int getRangeSize() {
-        return rangeSize;
-    }
-
     Map<Consumer, Integer> getConsumerRange() {
         return Collections.unmodifiableMap(consumerRange);
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
index 8fb99ed..21e94ba 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
@@ -80,28 +80,6 @@ public class HashRangeExclusiveStickyKeyConsumerSelector implements StickyKeyCon
         }
     }
 
-    @Override
-    public Consumer selectByIndex(int index) {
-        if (rangeMap.size() > 0) {
-            Map.Entry<Integer, Consumer> ceilingEntry = rangeMap.ceilingEntry(index);
-            Map.Entry<Integer, Consumer> floorEntry = rangeMap.floorEntry(index);
-            Consumer ceilingConsumer = ceilingEntry != null ? ceilingEntry.getValue() : null;
-            Consumer floorConsumer = floorEntry != null ? floorEntry.getValue() : null;
-            if (floorConsumer != null && floorConsumer.equals(ceilingConsumer)) {
-                return ceilingConsumer;
-            } else {
-                return null;
-            }
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public int getRangeSize() {
-        return rangeSize;
-    }
-
     private void validateKeySharedMeta(Consumer consumer) throws BrokerServiceException.ConsumerAssignException {
         if (consumer.getKeySharedMeta() == null) {
             throw new BrokerServiceException.ConsumerAssignException("Must specify key shared meta for consumer.");
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
index 545e42d..88852b5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
@@ -50,13 +50,4 @@ public interface StickyKeyConsumerSelector {
      * @return
      */
     Consumer select(int keyHash);
-
-    /**
-     * Select a consumer by key hash range index.
-     * @param index index of the key hash range
-     * @return
-     */
-    Consumer selectByIndex(int index);
-
-    int getRangeSize();
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index 3fdad35..c5183cd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -67,7 +67,7 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
         if (entries.size() > 0) {
             final Map<Integer, List<Entry>> groupedEntries = new HashMap<>();
             for (Entry entry : entries) {
-                int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer())) % selector.getRangeSize();
+                int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer()));
                 groupedEntries.putIfAbsent(key, new ArrayList<>());
                 groupedEntries.get(key).add(entry);
             }
@@ -75,7 +75,7 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
             while (iterator.hasNext()) {
                 final Map.Entry<Integer, List<Entry>> entriesWithSameKey = iterator.next();
                 //TODO: None key policy
-                Consumer consumer = selector.selectByIndex(entriesWithSameKey.getKey());
+                Consumer consumer = selector.select(entriesWithSameKey.getKey());
                 if (consumer != null) {
                     SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
                     EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesWithSameKey.getValue().size());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index d3a6197..fecfd09 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -28,6 +28,7 @@ import com.google.common.base.MoreObjects;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
@@ -35,9 +36,12 @@ import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFence
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
+import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector;
+import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.common.api.proto.PulsarApi.KeySharedMeta;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.TopicName;
@@ -122,24 +126,27 @@ public class NonPersistentSubscription implements Subscription {
             case Key_Shared:
                 if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared) {
                     previousDispatcher = dispatcher;
-                    if (consumer.getKeySharedMeta() != null) {
-                        switch (consumer.getKeySharedMeta().getKeySharedMode()) {
-                            case STICKY:
-                                dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this,
-                                        new HashRangeExclusiveStickyKeyConsumerSelector());
-                                break;
-                            case AUTO_SPLIT:
-                                dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this,
-                                        new HashRangeAutoSplitStickyKeyConsumerSelector());
-                                break;
-                            default:
-                                dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this,
-                                        new HashRangeAutoSplitStickyKeyConsumerSelector());
-                                break;
-                        }
-                    } else {
-                        dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this,
-                                new HashRangeAutoSplitStickyKeyConsumerSelector());
+                    KeySharedMeta ksm = consumer.getKeySharedMeta() != null ? consumer.getKeySharedMeta() : KeySharedMeta.getDefaultInstance();
+
+                    switch (ksm.getKeySharedMode()) {
+                        case STICKY:
+                            dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this,
+                                    new HashRangeExclusiveStickyKeyConsumerSelector());
+                            break;
+
+                        case AUTO_SPLIT:
+                        default:
+                            StickyKeyConsumerSelector selector;
+                            ServiceConfiguration conf = topic.getBrokerService().getPulsar().getConfiguration();
+                            if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
+                                selector = new ConsistentHashingStickyKeyConsumerSelector(
+                                        conf.getSubscriptionKeySharedConsistentHashingReplicaPoints());
+                            } else {
+                                selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
+                            }
+
+                            dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this, selector);
+                            break;
                     }
                 }
                 break;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index ffc4e97..987ecfc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -73,7 +73,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         }
         final Map<Integer, List<Entry>> groupedEntries = new HashMap<>();
         for (Entry entry : entries) {
-            int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer())) % selector.getRangeSize();
+            int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer()));
             groupedEntries.putIfAbsent(key, new ArrayList<>());
             groupedEntries.get(key).add(entry);
         }
@@ -82,7 +82,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         while (iterator.hasNext() && totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
             final Map.Entry<Integer, List<Entry>> entriesWithSameKey = iterator.next();
             //TODO: None key policy
-            Consumer consumer = selector.selectByIndex(entriesWithSameKey.getKey());
+            Consumer consumer = selector.select(entriesWithSameKey.getKey());
             if (consumer == null) {
                 // Do nothing, cursor will be rewind at reconnection
                 log.info("[{}] rewind because no available consumer found for key {} from total {}", name,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 4df24ee..24f02d2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -48,6 +48,7 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
@@ -56,11 +57,14 @@ import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionInval
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
+import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector;
+import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
+import org.apache.pulsar.common.api.proto.PulsarApi.KeySharedMeta;
 import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshot;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
@@ -215,24 +219,27 @@ public class PersistentSubscription implements Subscription {
             case Key_Shared:
                 if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared) {
                     previousDispatcher = dispatcher;
-                    if (consumer.getKeySharedMeta() != null) {
-                        switch (consumer.getKeySharedMeta().getKeySharedMode()) {
-                            case STICKY:
-                                dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this,
-                                        new HashRangeExclusiveStickyKeyConsumerSelector());
-                                break;
-                            case AUTO_SPLIT:
-                                dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this,
-                                        new HashRangeAutoSplitStickyKeyConsumerSelector());
-                                break;
-                            default:
-                                dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this,
-                                        new HashRangeAutoSplitStickyKeyConsumerSelector());
-                                break;
-                        }
-                    } else {
-                        dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this,
-                                new HashRangeAutoSplitStickyKeyConsumerSelector());
+                    KeySharedMeta ksm = consumer.getKeySharedMeta() != null ? consumer.getKeySharedMeta() : KeySharedMeta.getDefaultInstance();
+
+                    switch (ksm.getKeySharedMode()) {
+                        case STICKY:
+                            dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this,
+                                    new HashRangeExclusiveStickyKeyConsumerSelector());
+                            break;
+
+                        case AUTO_SPLIT:
+                        default:
+                            StickyKeyConsumerSelector selector;
+                            ServiceConfiguration conf = topic.getBrokerService().getPulsar().getConfiguration();
+                            if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
+                                selector = new ConsistentHashingStickyKeyConsumerSelector(
+                                        conf.getSubscriptionKeySharedConsistentHashingReplicaPoints());
+                            } else {
+                                selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
+                            }
+
+                            dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this, selector);
+                            break;
                     }
                 }
                 break;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
new file mode 100644
index 0000000..53df067
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelectorTest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class ConsistentHashingStickyKeyConsumerSelectorTest {
+
+    @Test
+    public void testConsumerSelect() throws ConsumerAssignException {
+
+        ConsistentHashingStickyKeyConsumerSelector selector = new ConsistentHashingStickyKeyConsumerSelector(100);
+        String key1 = "anyKey";
+        Assert.assertNull(selector.select(key1.getBytes()));
+
+        Consumer consumer1 = mock(Consumer.class);
+        when(consumer1.consumerName()).thenReturn("c1");
+        selector.addConsumer(consumer1);
+        Assert.assertEquals(selector.select(key1.getBytes()), consumer1);
+
+        Consumer consumer2 = mock(Consumer.class);
+        when(consumer2.consumerName()).thenReturn("c2");
+        selector.addConsumer(consumer2);
+
+        final int N = 1000;
+        final double PERCENT_ERROR = 0.20; // 20 %
+
+        Map<String, Integer> selectionMap = new HashMap<>();
+        for (int i = 0; i < N; i++) {
+            String key = UUID.randomUUID().toString();
+            Consumer selectedConsumer = selector.select(key.getBytes());
+            int count = selectionMap.computeIfAbsent(selectedConsumer.consumerName(), c -> 0);
+            selectionMap.put(selectedConsumer.consumerName(), count + 1);
+        }
+
+        // Check that keys got assigned uniformely to consumers
+        Assert.assertEquals(selectionMap.get("c1"), N/2, N/2 * PERCENT_ERROR);
+        Assert.assertEquals(selectionMap.get("c2"), N/2, N/2 * PERCENT_ERROR);
+        selectionMap.clear();
+
+        Consumer consumer3 = mock(Consumer.class);
+        when(consumer3.consumerName()).thenReturn("c3");
+        selector.addConsumer(consumer3);
+
+        for (int i = 0; i < N; i++) {
+            String key = UUID.randomUUID().toString();
+            Consumer selectedConsumer = selector.select(key.getBytes());
+            int count = selectionMap.computeIfAbsent(selectedConsumer.consumerName(), c -> 0);
+            selectionMap.put(selectedConsumer.consumerName(), count + 1);
+        }
+
+        Assert.assertEquals(selectionMap.get("c1"), N/3, N/3 * PERCENT_ERROR);
+        Assert.assertEquals(selectionMap.get("c2"), N/3, N/3 * PERCENT_ERROR);
+        Assert.assertEquals(selectionMap.get("c3"), N/3, N/3 * PERCENT_ERROR);
+        selectionMap.clear();
+
+        Consumer consumer4 = mock(Consumer.class);
+        when(consumer4.consumerName()).thenReturn("c4");
+        selector.addConsumer(consumer4);
+
+        for (int i = 0; i < N; i++) {
+            String key = UUID.randomUUID().toString();
+            Consumer selectedConsumer = selector.select(key.getBytes());
+            int count = selectionMap.computeIfAbsent(selectedConsumer.consumerName(), c -> 0);
+            selectionMap.put(selectedConsumer.consumerName(), count + 1);
+        }
+
+        Assert.assertEquals(selectionMap.get("c1"), N/4, N/4 * PERCENT_ERROR);
+        Assert.assertEquals(selectionMap.get("c2"), N/4, N/4 * PERCENT_ERROR);
+        Assert.assertEquals(selectionMap.get("c3"), N/4, N/4 * PERCENT_ERROR);
+        Assert.assertEquals(selectionMap.get("c4"), N/4, N/4 * PERCENT_ERROR);
+        selectionMap.clear();
+
+        selector.removeConsumer(consumer1);
+
+        for (int i = 0; i < N; i++) {
+            String key = UUID.randomUUID().toString();
+            Consumer selectedConsumer = selector.select(key.getBytes());
+            int count = selectionMap.computeIfAbsent(selectedConsumer.consumerName(), c -> 0);
+            selectionMap.put(selectedConsumer.consumerName(), count + 1);
+        }
+
+        Assert.assertEquals(selectionMap.get("c2"), N/3, N/3 * PERCENT_ERROR);
+        Assert.assertEquals(selectionMap.get("c3"), N/3, N/3 * PERCENT_ERROR);
+        Assert.assertEquals(selectionMap.get("c4"), N/3, N/3 * PERCENT_ERROR);
+        selectionMap.clear();
+
+        selector.removeConsumer(consumer2);
+        for (int i = 0; i < N; i++) {
+            String key = UUID.randomUUID().toString();
+            Consumer selectedConsumer = selector.select(key.getBytes());
+            int count = selectionMap.computeIfAbsent(selectedConsumer.consumerName(), c -> 0);
+            selectionMap.put(selectedConsumer.consumerName(), count + 1);
+        }
+
+        System.err.println(selectionMap);
+        Assert.assertEquals(selectionMap.get("c3"), N/2, N/2 * PERCENT_ERROR);
+        Assert.assertEquals(selectionMap.get("c4"), N/2, N/2 * PERCENT_ERROR);
+        selectionMap.clear();
+
+        selector.removeConsumer(consumer3);
+        for (int i = 0; i < N; i++) {
+            String key = UUID.randomUUID().toString();
+            Consumer selectedConsumer = selector.select(key.getBytes());
+            int count = selectionMap.computeIfAbsent(selectedConsumer.consumerName(), c -> 0);
+            selectionMap.put(selectedConsumer.consumerName(), count + 1);
+        }
+
+        Assert.assertEquals(selectionMap.get("c4").intValue(), N);
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelectorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelectorTest.java
deleted file mode 100644
index d42328f..0000000
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelectorTest.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service;
-
-import static org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
-import static org.mockito.Mockito.mock;
-
-import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
-import org.apache.pulsar.common.util.Murmur3_32Hash;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import java.util.UUID;
-
-public class HashRangeAutoSplitStickyKeyConsumerSelectorTest {
-
-    @Test
-    public void testConsumerSelect() throws ConsumerAssignException {
-
-        HashRangeAutoSplitStickyKeyConsumerSelector selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
-        String key1 = "anyKey";
-        Assert.assertNull(selector.select(key1.getBytes()));
-
-        Consumer consumer1 = mock(Consumer.class);
-        selector.addConsumer(consumer1);
-        int consumer1Slot = DEFAULT_RANGE_SIZE;
-        Assert.assertEquals(selector.select(key1.getBytes()), consumer1);
-        Assert.assertEquals(selector.getConsumerRange().size(), 1);
-        Assert.assertEquals(selector.getRangeConsumer().size(), 1);
-
-        Consumer consumer2 = mock(Consumer.class);
-        selector.addConsumer(consumer2);
-        Assert.assertEquals(selector.getConsumerRange().size(), 2);
-        Assert.assertEquals(selector.getRangeConsumer().size(), 2);
-        int consumer2Slot = consumer1Slot >> 1;
-
-        for (int i = 0; i < 100; i++) {
-            String key = UUID.randomUUID().toString();
-            int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % DEFAULT_RANGE_SIZE;
-            if (slot < consumer2Slot) {
-                Assert.assertEquals(selector.select(key.getBytes()), consumer2);
-            } else {
-                Assert.assertEquals(selector.select(key.getBytes()), consumer1);
-            }
-        }
-
-        Consumer consumer3 = mock(Consumer.class);
-        selector.addConsumer(consumer3);
-        Assert.assertEquals(selector.getConsumerRange().size(), 3);
-        Assert.assertEquals(selector.getRangeConsumer().size(), 3);
-        int consumer3Slot = consumer2Slot >> 1;
-
-        for (int i = 0; i < 100; i++) {
-            String key = UUID.randomUUID().toString();
-            int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % DEFAULT_RANGE_SIZE;
-            if (slot < consumer3Slot) {
-                Assert.assertEquals(selector.select(key.getBytes()), consumer3);
-            } else if (slot < consumer2Slot) {
-                Assert.assertEquals(selector.select(key.getBytes()), consumer2);
-            } else {
-                Assert.assertEquals(selector.select(key.getBytes()), consumer1);
-            }
-        }
-
-        Consumer consumer4 = mock(Consumer.class);
-        selector.addConsumer(consumer4);
-        Assert.assertEquals(selector.getConsumerRange().size(), 4);
-        Assert.assertEquals(selector.getRangeConsumer().size(), 4);
-        int consumer4Slot = consumer1Slot - ((consumer1Slot - consumer2Slot) >> 1);
-
-        for (int i = 0; i < 100; i++) {
-            String key = UUID.randomUUID().toString();
-            int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % DEFAULT_RANGE_SIZE;
-            if (slot < consumer3Slot) {
-                Assert.assertEquals(selector.select(key.getBytes()), consumer3);
-            } else if (slot < consumer2Slot) {
-                Assert.assertEquals(selector.select(key.getBytes()), consumer2);
-            } else if (slot < consumer4Slot) {
-                Assert.assertEquals(selector.select(key.getBytes()), consumer4);
-            } else {
-                Assert.assertEquals(selector.select(key.getBytes()), consumer1);
-            }
-        }
-
-        selector.removeConsumer(consumer1);
-        Assert.assertEquals(selector.getConsumerRange().size(), 3);
-        Assert.assertEquals(selector.getRangeConsumer().size(), 3);
-        for (int i = 0; i < 100; i++) {
-            String key = UUID.randomUUID().toString();
-            int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % DEFAULT_RANGE_SIZE;
-            if (slot < consumer3Slot) {
-                Assert.assertEquals(selector.select(key.getBytes()), consumer3);
-            } else if (slot < consumer2Slot) {
-                Assert.assertEquals(selector.select(key.getBytes()), consumer2);
-            } else {
-                Assert.assertEquals(selector.select(key.getBytes()), consumer4);
-            }
-        }
-
-        selector.removeConsumer(consumer2);
-        Assert.assertEquals(selector.getConsumerRange().size(), 2);
-        Assert.assertEquals(selector.getRangeConsumer().size(), 2);
-        for (int i = 0; i < 100; i++) {
-            String key = UUID.randomUUID().toString();
-            int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % DEFAULT_RANGE_SIZE;
-            if (slot < consumer3Slot) {
-                Assert.assertEquals(selector.select(key.getBytes()), consumer3);
-            } else {
-                Assert.assertEquals(selector.select(key.getBytes()), consumer4);
-            }
-        }
-
-        selector.removeConsumer(consumer3);
-        Assert.assertEquals(selector.getConsumerRange().size(), 1);
-        Assert.assertEquals(selector.getRangeConsumer().size(), 1);
-        for (int i = 0; i < 100; i++) {
-            String key = UUID.randomUUID().toString();
-            Assert.assertEquals(selector.select(key.getBytes()), consumer4);
-        }
-    }
-
-    @Test(expectedExceptions = ConsumerAssignException.class)
-    public void testSplitExceed() throws ConsumerAssignException {
-        StickyKeyConsumerSelector selector = new HashRangeAutoSplitStickyKeyConsumerSelector(16);
-        for (int i = 0; i <= 16; i++) {
-            selector.addConsumer(mock(Consumer.class));
-        }
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testRangeSizeLessThan2() {
-        new HashRangeAutoSplitStickyKeyConsumerSelector(1);
-    }
-
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testRangeSizePower2() {
-        new HashRangeAutoSplitStickyKeyConsumerSelector(6);
-    }
-}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 7af5b79..598f39d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -116,7 +116,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
         ).thenReturn(false);
 
         persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
-                topicMock, cursorMock, subscriptionMock, new HashRangeAutoSplitStickyKeyConsumerSelector());
+                topicMock, cursorMock, subscriptionMock, new ConsistentHashingStickyKeyConsumerSelector(100));
         persistentDispatcher.addConsumer(consumerMock);
         persistentDispatcher.consumerFlow(consumerMock, 1000);
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 8387ac7..84a1978 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -20,7 +20,8 @@ package org.apache.pulsar.client.api;
 
 import com.google.common.collect.Sets;
 import lombok.Cleanup;
-import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
+
+import org.apache.curator.shaded.com.google.common.collect.Lists;
 import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.util.Murmur3_32Hash;
@@ -37,10 +38,12 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -57,11 +60,23 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         };
     }
 
+    @DataProvider(name = "data")
+    public Object[][] dataProvider() {
+        return new Object[][] {
+                // Topic-Type and "Batching"
+                { "persistent", false  },
+                { "persistent", true  },
+                { "non-persistent", false },
+                { "non-persistent", true },
+        };
+    }
+
     @BeforeMethod
     @Override
     protected void setup() throws Exception {
         super.internalSetup();
         super.producerBaseSetup();
+        this.conf.setSubscriptionKeySharedUseConsistentHashing(true);
     }
 
     @AfterMethod
@@ -70,10 +85,14 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         super.internalCleanup();
     }
 
-    @Test(dataProvider = "batch")
-    public void testSendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector(boolean enableBatch) throws PulsarClientException {
+    private static final Random random = new Random(System.nanoTime());
+    private static final int NUMBER_OF_KEYS = 300;
+
+    @Test(dataProvider = "data")
+    public void testSendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector(String topicType, boolean enableBatch)
+            throws PulsarClientException {
         this.conf.setSubscriptionKeySharedEnable(true);
-        String topic = "persistent://public/default/key_shared-" + UUID.randomUUID();
+        String topic = topicType + "://public/default/key_shared-" + UUID.randomUUID();
 
         @Cleanup
         Consumer<Integer> consumer1 = createConsumer(topic);
@@ -87,38 +106,14 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         @Cleanup
         Producer<Integer> producer = createProducer(topic, enableBatch);
 
-        int consumer1Slot = HashRangeAutoSplitStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
-        int consumer2Slot = consumer1Slot >> 1;
-        int consumer3Slot = consumer2Slot >> 1;
-
-        int consumer1ExpectMessages = 0;
-        int consumer2ExpectMessages = 0;
-        int consumer3ExpectMessages = 0;
-
-        for (int i = 0; i < 10; i++) {
-            for (String key : keys) {
-                int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes())
-                    % HashRangeAutoSplitStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
-                if (slot < consumer3Slot) {
-                    consumer3ExpectMessages++;
-                } else if (slot < consumer2Slot) {
-                    consumer2ExpectMessages++;
-                } else {
-                    consumer1ExpectMessages++;
-                }
-                producer.newMessage()
-                    .key(key)
+        for (int i = 0; i < 1000; i++) {
+            producer.newMessage()
+                    .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
                     .value(i)
                     .send();
-            }
         }
 
-        List<KeyValue<Consumer<Integer>, Integer>> checkList = new ArrayList<>();
-        checkList.add(new KeyValue<>(consumer1, consumer1ExpectMessages));
-        checkList.add(new KeyValue<>(consumer2, consumer2ExpectMessages));
-        checkList.add(new KeyValue<>(consumer3, consumer3ExpectMessages));
-
-        receiveAndCheck(checkList);
+        receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3));
     }
 
     @Test(dataProvider = "batch")
@@ -172,11 +167,12 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
 
     }
 
-    @Test(dataProvider = "batch")
-    public void testConsumerCrashSendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector(boolean enableBatch) throws PulsarClientException, InterruptedException {
+    @Test(dataProvider = "data")
+    public void testConsumerCrashSendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector(String topicType,
+            boolean enableBatch) throws PulsarClientException, InterruptedException {
 
         this.conf.setSubscriptionKeySharedEnable(true);
-        String topic = "persistent://public/default/key_shared_consumer_crash-" + UUID.randomUUID();
+        String topic = topicType + "://public/default/key_shared_consumer_crash-" + UUID.randomUUID();
 
         @Cleanup
         Consumer<Integer> consumer1 = createConsumer(topic);
@@ -190,38 +186,14 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         @Cleanup
         Producer<Integer> producer = createProducer(topic, enableBatch);
 
-        int consumer1Slot = HashRangeAutoSplitStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
-        int consumer2Slot = consumer1Slot >> 1;
-        int consumer3Slot = consumer2Slot >> 1;
-
-        int consumer1ExpectMessages = 0;
-        int consumer2ExpectMessages = 0;
-        int consumer3ExpectMessages = 0;
-
-        for (int i = 0; i < 10; i++) {
-            for (String key : keys) {
-                int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes())
-                    % HashRangeAutoSplitStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
-                if (slot < consumer3Slot) {
-                    consumer3ExpectMessages++;
-                } else if (slot < consumer2Slot) {
-                    consumer2ExpectMessages++;
-                } else {
-                    consumer1ExpectMessages++;
-                }
-                producer.newMessage()
-                    .key(key)
+        for (int i = 0; i < 1000; i++) {
+            producer.newMessage()
+                    .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
                     .value(i)
                     .send();
-            }
         }
 
-        List<KeyValue<Consumer<Integer>, Integer>> checkList = new ArrayList<>();
-        checkList.add(new KeyValue<>(consumer1, consumer1ExpectMessages));
-        checkList.add(new KeyValue<>(consumer2, consumer2ExpectMessages));
-        checkList.add(new KeyValue<>(consumer3, consumer3ExpectMessages));
-
-        receiveAndCheck(checkList);
+        receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3));
 
         // wait for consumer grouping acking send.
         Thread.sleep(1000);
@@ -230,24 +202,19 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         consumer2.close();
 
         for (int i = 0; i < 10; i++) {
-            for (String key : keys) {
-                producer.newMessage()
-                    .key(key)
+            producer.newMessage()
+                    .key(String.valueOf(random.nextInt(NUMBER_OF_KEYS)))
                     .value(i)
                     .send();
-            }
         }
 
-        checkList = new ArrayList<>();
-        checkList.add(new KeyValue<>(consumer3, 100));
-        receiveAndCheck(checkList);
+        receiveAndCheckDistribution(Lists.newArrayList(consumer3));
     }
 
-
-    @Test(dataProvider = "batch")
-    public void testNonKeySendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector(boolean enableBatch) throws PulsarClientException {
+    @Test(dataProvider = "data")
+    public void testNonKeySendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector(String topicType, boolean enableBatch) throws PulsarClientException {
         this.conf.setSubscriptionKeySharedEnable(true);
-        String topic = "persistent://public/default/key_shared_none_key-" + UUID.randomUUID();
+        String topic = topicType + "://public/default/key_shared_none_key-" + UUID.randomUUID();
 
         @Cleanup
         Consumer<Integer> consumer1 = createConsumer(topic);
@@ -261,26 +228,13 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         @Cleanup
         Producer<Integer> producer = createProducer(topic, enableBatch);
 
-        int consumer1Slot = HashRangeAutoSplitStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
-        int consumer2Slot = consumer1Slot >> 1;
-        int consumer3Slot = consumer2Slot >> 1;
-
         for (int i = 0; i < 100; i++) {
             producer.newMessage()
                     .value(i)
                     .send();
         }
-        int slot = Murmur3_32Hash.getInstance().makeHash(PersistentStickyKeyDispatcherMultipleConsumers.NONE_KEY.getBytes())
-            % HashRangeAutoSplitStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
-        List<KeyValue<Consumer<Integer>, Integer>> checkList = new ArrayList<>();
-        if (slot < consumer3Slot) {
-            checkList.add(new KeyValue<>(consumer3, 100));
-        } else if (slot < consumer2Slot) {
-            checkList.add(new KeyValue<>(consumer2, 100));
-        } else {
-            checkList.add(new KeyValue<>(consumer1, 100));
-        }
-        receiveAndCheck(checkList);
+
+        receive(Lists.newArrayList(consumer1, consumer2, consumer3));
     }
 
     @Test(dataProvider = "batch")
@@ -338,39 +292,15 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         @Cleanup
         Producer<Integer> producer = createProducer(topic, enableBatch);
 
-        int consumer1Slot = HashRangeAutoSplitStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
-        int consumer2Slot = consumer1Slot >> 1;
-        int consumer3Slot = consumer2Slot >> 1;
-
-        int consumer1ExpectMessages = 0;
-        int consumer2ExpectMessages = 0;
-        int consumer3ExpectMessages = 0;
-
-        for (int i = 0; i < 10; i++) {
-            for (String key : keys) {
-                int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes())
-                    % HashRangeAutoSplitStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
-                if (slot < consumer3Slot) {
-                    consumer3ExpectMessages++;
-                } else if (slot < consumer2Slot) {
-                    consumer2ExpectMessages++;
-                } else {
-                    consumer1ExpectMessages++;
-                }
-                producer.newMessage()
+        for (int i = 0; i < 1000; i++) {
+            producer.newMessage()
                     .key("any key")
-                    .orderingKey(key.getBytes())
+                    .orderingKey(String.valueOf(random.nextInt(NUMBER_OF_KEYS)).getBytes())
                     .value(i)
                     .send();
-            }
         }
 
-        List<KeyValue<Consumer<Integer>, Integer>> checkList = new ArrayList<>();
-        checkList.add(new KeyValue<>(consumer1, consumer1ExpectMessages));
-        checkList.add(new KeyValue<>(consumer2, consumer2ExpectMessages));
-        checkList.add(new KeyValue<>(consumer3, consumer3ExpectMessages));
-
-        receiveAndCheck(checkList);
+        receiveAndCheckDistribution(Lists.newArrayList(consumer1, consumer2, consumer3));
     }
 
     @Test(dataProvider = "batch")
@@ -495,6 +425,82 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         return builder.subscribe();
     }
 
+    private void receive(List<Consumer<?>> consumers) throws PulsarClientException {
+        // Add a key so that we know this key was already assigned to one consumer
+        Map<String, Consumer<?>> keyToConsumer = new HashMap<>();
+
+        for (Consumer<?> c : consumers) {
+            while (true) {
+                Message<?> msg = c.receive(100, TimeUnit.MILLISECONDS);
+                if (msg == null) {
+                    // Go to next consumer
+                    break;
+                }
+
+                c.acknowledge(msg);
+
+                if (msg.hasKey()) {
+                    Consumer<?> assignedConsumer = keyToConsumer.get(msg.getKey());
+                    if (assignedConsumer == null) {
+                        // This is a new key
+                        keyToConsumer.put(msg.getKey(), c);
+                    } else {
+                        // The consumer should be the same
+                        assertEquals(c, assignedConsumer);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Check that every consumer receives a fair number of messages and that same key is delivered to only 1 consumer
+     */
+    private void receiveAndCheckDistribution(List<Consumer<?>> consumers) throws PulsarClientException {
+        // Add a key so that we know this key was already assigned to one consumer
+        Map<String, Consumer<?>> keyToConsumer = new HashMap<>();
+        Map<Consumer<?>, Integer> messagesPerConsumer = new HashMap<>();
+
+        int totalMessages = 0;
+
+        for (Consumer<?> c : consumers) {
+            int messagesForThisConsumer = 0;
+            while (true) {
+                Message<?> msg = c.receive(100, TimeUnit.MILLISECONDS);
+                if (msg == null) {
+                    // Go to next consumer
+                    messagesPerConsumer.put(c, messagesForThisConsumer);
+                    break;
+                }
+
+                ++totalMessages;
+                ++messagesForThisConsumer;
+                c.acknowledge(msg);
+
+                if (msg.hasKey() || msg.hasOrderingKey()) {
+                    String key = msg.hasOrderingKey() ? new String(msg.getOrderingKey()) : msg.getKey();
+                    Consumer<?> assignedConsumer = keyToConsumer.get(key);
+                    if (assignedConsumer == null) {
+                        // This is a new key
+                        keyToConsumer.put(key, c);
+                    } else {
+                        // The consumer should be the same
+                        assertEquals(c, assignedConsumer);
+                    }
+                }
+            }
+        }
+
+        final double PERCENT_ERROR = 0.40; // 40 %
+
+        double expectedMessagesPerConsumer = totalMessages / consumers.size();
+
+        System.err.println(messagesPerConsumer);
+        for (int count : messagesPerConsumer.values()) {
+            Assert.assertEquals(count, expectedMessagesPerConsumer, expectedMessagesPerConsumer * PERCENT_ERROR);
+        }
+    }
+
     private void receiveAndCheck(List<KeyValue<Consumer<Integer>, Integer>> checkList) throws PulsarClientException {
         Map<Consumer, Set<String>> consumerKeys = new HashMap<>();
         for (KeyValue<Consumer<Integer>, Integer> check : checkList) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentKeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentKeySharedSubscriptionTest.java
deleted file mode 100644
index 9c03362..0000000
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentKeySharedSubscriptionTest.java
+++ /dev/null
@@ -1,482 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.api;
-
-import com.google.common.collect.Sets;
-import lombok.Cleanup;
-import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
-import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
-import org.apache.pulsar.common.schema.KeyValue;
-import org.apache.pulsar.common.util.Murmur3_32Hash;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import static org.testng.Assert.assertTrue;
-
-public class NonPersistentKeySharedSubscriptionTest extends ProducerConsumerBase {
-
-    private static final Logger log = LoggerFactory.getLogger(NonPersistentKeySharedSubscriptionTest.class);
-    private static final List<String> keys = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
-
-
-    @BeforeMethod
-    @Override
-    protected void setup() throws Exception {
-        super.internalSetup();
-        super.producerBaseSetup();
-    }
-
-    @AfterMethod
-    @Override
-    protected void cleanup() throws Exception {
-        super.internalCleanup();
-    }
-
-    @Test
-    public void testSendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector() throws PulsarClientException {
-        this.conf.setSubscriptionKeySharedEnable(true);
-        String topic = "non-persistent://public/default/key_shared";
-
-        @Cleanup
-        Consumer<Integer> consumer1 = createConsumer(topic);
-
-        @Cleanup
-        Consumer<Integer> consumer2 = createConsumer(topic);
-
-        @Cleanup
-        Consumer<Integer> consumer3 = createConsumer(topic);
-
-        @Cleanup
-        Producer<Integer> producer = createProducer(topic);
-
-        int consumer1Slot = HashRangeAutoSplitStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
-        int consumer2Slot = consumer1Slot >> 1;
-        int consumer3Slot = consumer2Slot >> 1;
-
-        int consumer1ExpectMessages = 0;
-        int consumer2ExpectMessages = 0;
-        int consumer3ExpectMessages = 0;
-
-        for (int i = 0; i < 10; i++) {
-            for (String key : keys) {
-                int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes())
-                        % HashRangeAutoSplitStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
-                if (slot < consumer3Slot) {
-                    consumer3ExpectMessages++;
-                } else if (slot < consumer2Slot) {
-                    consumer2ExpectMessages++;
-                } else {
-                    consumer1ExpectMessages++;
-                }
-                producer.newMessage()
-                        .key(key)
-                        .value(i)
-                        .send();
-            }
-        }
-
-        List<KeyValue<Consumer<Integer>, Integer>> checkList = new ArrayList<>();
-        checkList.add(new KeyValue<>(consumer1, consumer1ExpectMessages));
-        checkList.add(new KeyValue<>(consumer2, consumer2ExpectMessages));
-        checkList.add(new KeyValue<>(consumer3, consumer3ExpectMessages));
-
-        receiveAndCheck(checkList);
-    }
-
-    @Test
-    public void testSendAndReceiveWithHashRangeExclusiveStickyKeyConsumerSelector() throws PulsarClientException {
-        this.conf.setSubscriptionKeySharedEnable(true);
-        String topic = "non-persistent://public/default/key_shared_exclusive";
-
-        @Cleanup
-        Consumer<Integer> consumer1 = createConsumer(topic, KeySharedPolicy.stickyHashRange()
-                .ranges(Range.of(0, 20000)));
-
-        @Cleanup
-        Consumer<Integer> consumer2 = createConsumer(topic, KeySharedPolicy.stickyHashRange()
-                .ranges(Range.of(20001, 40000)));
-
-        @Cleanup
-        Consumer<Integer> consumer3 = createConsumer(topic, KeySharedPolicy.stickyHashRange()
-                .ranges(Range.of(40001, KeySharedPolicy.DEFAULT_HASH_RANGE_SIZE - 1)));
-
-        @Cleanup
-        Producer<Integer> producer = createProducer(topic);
-
-        int consumer1ExpectMessages = 0;
-        int consumer2ExpectMessages = 0;
-        int consumer3ExpectMessages = 0;
-
-        for (int i = 0; i < 10; i++) {
-            for (String key : keys) {
-                int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes())
-                        % KeySharedPolicy.DEFAULT_HASH_RANGE_SIZE;
-                if (slot <= 20000) {
-                    consumer1ExpectMessages++;
-                } else if (slot <= 40000) {
-                    consumer2ExpectMessages++;
-                } else {
-                    consumer3ExpectMessages++;
-                }
-                producer.newMessage()
-                        .key(key)
-                        .value(i)
-                        .send();
-            }
-        }
-
-        List<KeyValue<Consumer<Integer>, Integer>> checkList = new ArrayList<>();
-        checkList.add(new KeyValue<>(consumer1, consumer1ExpectMessages));
-        checkList.add(new KeyValue<>(consumer2, consumer2ExpectMessages));
-        checkList.add(new KeyValue<>(consumer3, consumer3ExpectMessages));
-
-        receiveAndCheck(checkList);
-
-    }
-
-    @Test
-    public void testConsumerCrashSendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector() throws PulsarClientException, InterruptedException {
-
-        this.conf.setSubscriptionKeySharedEnable(true);
-        String topic = "non-persistent://public/default/key_shared_consumer_crash";
-
-        @Cleanup
-        Consumer<Integer> consumer1 = createConsumer(topic);
-
-        @Cleanup
-        Consumer<Integer> consumer2 = createConsumer(topic);
-
-        @Cleanup
-        Consumer<Integer> consumer3 = createConsumer(topic);
-
-        @Cleanup
-        Producer<Integer> producer = createProducer(topic);
-
-        int consumer1Slot = HashRangeAutoSplitStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
-        int consumer2Slot = consumer1Slot >> 1;
-        int consumer3Slot = consumer2Slot >> 1;
-
-        int consumer1ExpectMessages = 0;
-        int consumer2ExpectMessages = 0;
-        int consumer3ExpectMessages = 0;
-
-        for (int i = 0; i < 10; i++) {
-            for (String key : keys) {
-                int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes())
-                        % HashRangeAutoSplitStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
-                if (slot < consumer3Slot) {
-                    consumer3ExpectMessages++;
-                } else if (slot < consumer2Slot) {
-                    consumer2ExpectMessages++;
-                } else {
-                    consumer1ExpectMessages++;
-                }
-                producer.newMessage()
-                        .key(key)
-                        .value(i)
-                        .send();
-            }
-        }
-
-        List<KeyValue<Consumer<Integer>, Integer>> checkList = new ArrayList<>();
-        checkList.add(new KeyValue<>(consumer1, consumer1ExpectMessages));
-        checkList.add(new KeyValue<>(consumer2, consumer2ExpectMessages));
-        checkList.add(new KeyValue<>(consumer3, consumer3ExpectMessages));
-
-        receiveAndCheck(checkList);
-
-        // wait for consumer grouping acking send.
-        Thread.sleep(1000);
-
-        consumer1.close();
-        consumer2.close();
-
-        for (int i = 0; i < 10; i++) {
-            for (String key : keys) {
-                producer.newMessage()
-                        .key(key)
-                        .value(i)
-                        .send();
-            }
-        }
-
-        checkList = new ArrayList<>();
-        checkList.add(new KeyValue<>(consumer3, 100));
-        receiveAndCheck(checkList);
-    }
-
-    @Test
-    public void testNonKeySendAndReceiveWithHashRangeAutoSplitStickyKeyConsumerSelector() throws PulsarClientException {
-        this.conf.setSubscriptionKeySharedEnable(true);
-        String topic = "non-persistent://public/default/key_shared_none_key";
-
-        @Cleanup
-        Consumer<Integer> consumer1 = createConsumer(topic);
-
-        @Cleanup
-        Consumer<Integer> consumer2 = createConsumer(topic);
-
-        @Cleanup
-        Consumer<Integer> consumer3 = createConsumer(topic);
-
-        @Cleanup
-        Producer<Integer> producer = createProducer(topic);
-
-        int consumer1Slot = HashRangeAutoSplitStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
-        int consumer2Slot = consumer1Slot >> 1;
-        int consumer3Slot = consumer2Slot >> 1;
-
-        for (int i = 0; i < 100; i++) {
-            producer.newMessage()
-                    .value(i)
-                    .send();
-        }
-        int slot = Murmur3_32Hash.getInstance().makeHash(PersistentStickyKeyDispatcherMultipleConsumers.NONE_KEY.getBytes())
-                % HashRangeAutoSplitStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
-        List<KeyValue<Consumer<Integer>, Integer>> checkList = new ArrayList<>();
-        if (slot < consumer3Slot) {
-            checkList.add(new KeyValue<>(consumer3, 100));
-        } else if (slot < consumer2Slot) {
-            checkList.add(new KeyValue<>(consumer2, 100));
-        } else {
-            checkList.add(new KeyValue<>(consumer1, 100));
-        }
-        receiveAndCheck(checkList);
-    }
-
-    @Test
-    public void testNonKeySendAndReceiveWithHashRangeExclusiveStickyKeyConsumerSelector() throws PulsarClientException {
-        this.conf.setSubscriptionKeySharedEnable(true);
-        String topic = "non-persistent://public/default/key_shared_exclusive_non_key";
-
-        @Cleanup
-        Consumer<Integer> consumer1 = createConsumer(topic, KeySharedPolicy.stickyHashRange()
-                .ranges(Range.of(0, 20000)));
-
-        @Cleanup
-        Consumer<Integer> consumer2 = createConsumer(topic, KeySharedPolicy.stickyHashRange()
-                .ranges(Range.of(20001, 40000)));
-
-        @Cleanup
-        Consumer<Integer> consumer3 = createConsumer(topic, KeySharedPolicy.stickyHashRange()
-                .ranges(Range.of(40001, KeySharedPolicy.DEFAULT_HASH_RANGE_SIZE - 1)));
-
-        @Cleanup
-        Producer<Integer> producer = createProducer(topic);
-
-        for (int i = 0; i < 100; i++) {
-            producer.newMessage()
-                    .value(i)
-                    .send();
-        }
-        int slot = Murmur3_32Hash.getInstance().makeHash(PersistentStickyKeyDispatcherMultipleConsumers.NONE_KEY.getBytes())
-                % KeySharedPolicy.DEFAULT_HASH_RANGE_SIZE;
-        List<KeyValue<Consumer<Integer>, Integer>> checkList = new ArrayList<>();
-        if (slot <= 20000) {
-            checkList.add(new KeyValue<>(consumer1, 100));
-        } else if (slot <= 40000) {
-            checkList.add(new KeyValue<>(consumer2, 100));
-        } else {
-            checkList.add(new KeyValue<>(consumer3, 100));
-        }
-        receiveAndCheck(checkList);
-    }
-
-    @Test
-    public void testOrderingKeyWithHashRangeAutoSplitStickyKeyConsumerSelector() throws PulsarClientException {
-        this.conf.setSubscriptionKeySharedEnable(true);
-        String topic = "non-persistent://public/default/key_shared_ordering_key";
-
-        @Cleanup
-        Consumer<Integer> consumer1 = createConsumer(topic);
-
-        @Cleanup
-        Consumer<Integer> consumer2 = createConsumer(topic);
-
-        @Cleanup
-        Consumer<Integer> consumer3 = createConsumer(topic);
-
-        @Cleanup
-        Producer<Integer> producer = createProducer(topic);
-
-        int consumer1Slot = HashRangeAutoSplitStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
-        int consumer2Slot = consumer1Slot >> 1;
-        int consumer3Slot = consumer2Slot >> 1;
-
-        int consumer1ExpectMessages = 0;
-        int consumer2ExpectMessages = 0;
-        int consumer3ExpectMessages = 0;
-
-        for (int i = 0; i < 10; i++) {
-            for (String key : keys) {
-                int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes())
-                        % HashRangeAutoSplitStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
-                if (slot < consumer3Slot) {
-                    consumer3ExpectMessages++;
-                } else if (slot < consumer2Slot) {
-                    consumer2ExpectMessages++;
-                } else {
-                    consumer1ExpectMessages++;
-                }
-                producer.newMessage()
-                        .key("any key")
-                        .orderingKey(key.getBytes())
-                        .value(i)
-                        .send();
-            }
-        }
-
-        List<KeyValue<Consumer<Integer>, Integer>> checkList = new ArrayList<>();
-        checkList.add(new KeyValue<>(consumer1, consumer1ExpectMessages));
-        checkList.add(new KeyValue<>(consumer2, consumer2ExpectMessages));
-        checkList.add(new KeyValue<>(consumer3, consumer3ExpectMessages));
-
-        receiveAndCheck(checkList);
-    }
-
-    @Test
-    public void testOrderingKeyWithHashRangeExclusiveStickyKeyConsumerSelector() throws PulsarClientException {
-        this.conf.setSubscriptionKeySharedEnable(true);
-        String topic = "non-persistent://public/default/key_shared_exclusive_ordering_key";
-
-        @Cleanup
-        Consumer<Integer> consumer1 = createConsumer(topic, KeySharedPolicy.stickyHashRange()
-                .ranges(Range.of(0, 20000)));
-
-        @Cleanup
-        Consumer<Integer> consumer2 = createConsumer(topic, KeySharedPolicy.stickyHashRange()
-                .ranges(Range.of(20001, 40000)));
-
-        @Cleanup
-        Consumer<Integer> consumer3 = createConsumer(topic, KeySharedPolicy.stickyHashRange()
-                .ranges(Range.of(40001, KeySharedPolicy.DEFAULT_HASH_RANGE_SIZE)));
-
-        @Cleanup
-        Producer<Integer> producer = createProducer(topic);
-
-        int consumer1ExpectMessages = 0;
-        int consumer2ExpectMessages = 0;
-        int consumer3ExpectMessages = 0;
-
-        for (int i = 0; i < 10; i++) {
-            for (String key : keys) {
-                int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes())
-                        % KeySharedPolicy.DEFAULT_HASH_RANGE_SIZE;
-                if (slot <= 20000) {
-                    consumer1ExpectMessages++;
-                } else if (slot <= 40000) {
-                    consumer2ExpectMessages++;
-                } else {
-                    consumer3ExpectMessages++;
-                }
-                producer.newMessage()
-                        .key("any key")
-                        .orderingKey(key.getBytes())
-                        .value(i)
-                        .send();
-            }
-        }
-
-        List<KeyValue<Consumer<Integer>, Integer>> checkList = new ArrayList<>();
-        checkList.add(new KeyValue<>(consumer1, consumer1ExpectMessages));
-        checkList.add(new KeyValue<>(consumer2, consumer2ExpectMessages));
-        checkList.add(new KeyValue<>(consumer3, consumer3ExpectMessages));
-
-        receiveAndCheck(checkList);
-    }
-
-    @Test(expectedExceptions = PulsarClientException.class)
-    public void testDisableKeySharedSubscription() throws PulsarClientException {
-        this.conf.setSubscriptionKeySharedEnable(false);
-        String topic = "persistent://public/default/key_shared_disabled";
-        pulsarClient.newConsumer()
-                .topic(topic)
-                .subscriptionName("key_shared")
-                .subscriptionType(SubscriptionType.Key_Shared)
-                .ackTimeout(10, TimeUnit.SECONDS)
-                .subscribe();
-    }
-
-    private Producer<Integer> createProducer(String topic) throws PulsarClientException {
-        return pulsarClient.newProducer(Schema.INT32)
-                .topic(topic)
-                .enableBatching(false)
-                .create();
-    }
-
-    private Consumer<Integer> createConsumer(String topic) throws PulsarClientException {
-        return createConsumer(topic, null);
-    }
-
-    private Consumer<Integer> createConsumer(String topic, KeySharedPolicy keySharedPolicy) throws PulsarClientException {
-        ConsumerBuilder<Integer> builder = pulsarClient.newConsumer(Schema.INT32);
-        builder.topic(topic)
-                .subscriptionName("key_shared")
-                .subscriptionType(SubscriptionType.Key_Shared)
-                .ackTimeout(3, TimeUnit.SECONDS);
-        if (keySharedPolicy != null) {
-            builder.keySharedPolicy(keySharedPolicy);
-        }
-        return builder.subscribe();
-    }
-
-    private void receiveAndCheck(List<KeyValue<Consumer<Integer>, Integer>> checkList) throws PulsarClientException {
-        Map<Consumer, Set<String>> consumerKeys = new HashMap<>();
-        for (KeyValue<Consumer<Integer>, Integer> check : checkList) {
-            int received = 0;
-            Map<String, Message<Integer>> lastMessageForKey = new HashMap<>();
-            for (Integer i = 0; i < check.getValue(); i++) {
-                Message<Integer> message = check.getKey().receive();
-                check.getKey().acknowledge(message);
-                String key = message.hasOrderingKey() ? new String(message.getOrderingKey()) : message.getKey();
-                log.info("[{}] Receive message key: {} value: {} messageId: {}",
-                        check.getKey().getConsumerName(), key, message.getValue(), message.getMessageId());
-                // check messages is order by key
-                if (lastMessageForKey.get(key) == null) {
-                    Assert.assertNotNull(message);
-                } else {
-                    Assert.assertTrue(message.getValue()
-                            .compareTo(lastMessageForKey.get(key).getValue()) > 0);
-                }
-                lastMessageForKey.put(key, message);
-                consumerKeys.putIfAbsent(check.getKey(), Sets.newHashSet());
-                consumerKeys.get(check.getKey()).add(key);
-                received++;
-            }
-            Assert.assertEquals(check.getValue().intValue(), received);
-        }
-        Set<String> allKeys = Sets.newHashSet();
-        consumerKeys.forEach((k, v) -> v.forEach(key -> {
-            assertTrue(allKeys.add(key),
-                    "Key "+ key +  "is distributed to multiple consumers." );
-        }));
-    }
-}