You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/06/06 09:05:41 UTC

[pulsar] branch master updated: Key_Shared subscription on non-persistent topic (#4462)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6309149  Key_Shared subscription on non-persistent topic (#4462)
6309149 is described below

commit 630914901477357af9a47d687a0b43a7a7b7fa37
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Jun 6 17:05:37 2019 +0800

    Key_Shared subscription on non-persistent topic (#4462)
    
    * Support Key_Shared subscription on non-persistent topic.
    
    * fix review comments.
---
 .../AbstractDispatcherMultipleConsumers.java       |  26 ++
 .../NonPersistentDispatcherMultipleConsumers.java  |   4 +-
 ...istentStickyKeyDispatcherMultipleConsumers.java | 102 +++++++
 .../nonpersistent/NonPersistentSubscription.java   |   5 +
 ...istentStickyKeyDispatcherMultipleConsumers.java |  22 --
 .../NonPersistentKeySharedSubscriptionTest.java    | 335 +++++++++++++++++++++
 6 files changed, 470 insertions(+), 24 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
index d663752..c36c8ab 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
@@ -20,11 +20,18 @@ package org.apache.pulsar.broker.service;
 
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
+import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.utils.CopyOnWriteArrayList;
 
 import com.carrotsearch.hppc.ObjectHashSet;
 import com.carrotsearch.hppc.ObjectSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  */
@@ -205,4 +212,23 @@ public abstract class AbstractDispatcherMultipleConsumers extends AbstractBaseDi
         return -1;
     }
 
+    public static final String NONE_KEY = "NONE_KEY";
+    protected byte[] peekStickyKey(ByteBuf metadataAndPayload) {
+        metadataAndPayload.markReaderIndex();
+        PulsarApi.MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
+        metadataAndPayload.resetReaderIndex();
+        String key = metadata.getPartitionKey();
+        if (log.isDebugEnabled()) {
+            log.debug("Parse message metadata, partition key is {}, ordering key is {}", key, metadata.getOrderingKey());
+        }
+        if (StringUtils.isNotBlank(key) || metadata.hasOrderingKey()) {
+            return metadata.hasOrderingKey() ? metadata.getOrderingKey().toByteArray() : key.getBytes();
+        }
+        metadata.recycle();
+        return NONE_KEY.getBytes();
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);
+
+
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 1bb4042..78513c5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -46,11 +46,11 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher
         implements NonPersistentDispatcher {
 
     private final NonPersistentTopic topic;
-    private final Subscription subscription;
+    protected final Subscription subscription;
 
     private CompletableFuture<Void> closeFuture = null;
     private final String name;
-    private final Rate msgDrop;
+    protected final Rate msgDrop;
     protected static final AtomicIntegerFieldUpdater<NonPersistentDispatcherMultipleConsumers> TOTAL_AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater
             .newUpdater(NonPersistentDispatcherMultipleConsumers.class, "totalAvailablePermits");
     @SuppressWarnings("unused")
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
new file mode 100644
index 0000000..3f0d903
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.nonpersistent;
+
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.EntryBatchSizes;
+import org.apache.pulsar.broker.service.HashRangeStickyKeyConsumerSelector;
+import org.apache.pulsar.broker.service.SendMessageInfo;
+import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersistentDispatcherMultipleConsumers {
+
+    private final StickyKeyConsumerSelector selector;
+
+    public NonPersistentStickyKeyDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription) {
+        super(topic, subscription);
+        //TODO: Consumer selector Pluggable
+        selector = new HashRangeStickyKeyConsumerSelector();
+    }
+
+    @Override
+    public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
+        super.addConsumer(consumer);
+        selector.addConsumer(consumer);
+    }
+
+    @Override
+    public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
+        super.removeConsumer(consumer);
+        selector.removeConsumer(consumer);
+    }
+
+    @Override
+    public SubType getType() {
+        return SubType.Key_Shared;
+    }
+
+    @Override
+    public void sendMessages(List<Entry> entries) {
+        if (entries.size() > 0) {
+            final Map<Integer, List<Entry>> groupedEntries = new HashMap<>();
+            for (Entry entry : entries) {
+                int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer()));
+                groupedEntries.putIfAbsent(key, new ArrayList<>());
+                groupedEntries.get(key).add(entry);
+            }
+            final Iterator<Map.Entry<Integer, List<Entry>>> iterator = groupedEntries.entrySet().iterator();
+            while (iterator.hasNext()) {
+                final Map.Entry<Integer, List<Entry>> entriesWithSameKey = iterator.next();
+                //TODO: None key policy
+                Consumer consumer = null;
+                if (selector instanceof HashRangeStickyKeyConsumerSelector) {
+                    consumer = ((HashRangeStickyKeyConsumerSelector)selector).select(entriesWithSameKey.getKey());
+                }
+                if (consumer != null) {
+                    SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
+                    EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesWithSameKey.getValue().size());
+                    filterEntriesForConsumer(entriesWithSameKey.getValue(), batchSizes, sendMessageInfo);
+                    consumer.sendMessages(entriesWithSameKey.getValue(), batchSizes, sendMessageInfo.getTotalMessages(),
+                            sendMessageInfo.getTotalBytes(), getRedeliveryTracker());
+                    TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages());
+                } else {
+                    entries.forEach(entry -> {
+                        int totalMsgs = Commands.getNumberOfMessagesInBatch(entry.getDataBuffer(), subscription.toString(), -1);
+                        if (totalMsgs > 0) {
+                            msgDrop.recordEvent(totalMsgs);
+                        }
+                        entry.release();
+                    });
+                }
+            }
+        }
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 5c060ba..beba94b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -111,6 +111,11 @@ public class NonPersistentSubscription implements Subscription {
                             topic, this);
                 }
                 break;
+            case Key_Shared:
+                if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared) {
+                    dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this);
+                }
+                break;
             default:
                 throw new ServerMetadataException("Unsupported subscription type");
             }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 0cbe354..26d599e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
-import io.netty.buffer.ByteBuf;
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -31,7 +29,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.EntryBatchSizes;
@@ -39,17 +36,13 @@ import org.apache.pulsar.broker.service.HashRangeStickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.SendMessageInfo;
 import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Subscription;
-import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
-import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.util.Murmur3_32Hash;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDispatcherMultipleConsumers {
 
-    public static final String NONE_KEY = "NONE_KEY";
-
     private final StickyKeyConsumerSelector selector;
 
     PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Subscription subscription) {
@@ -171,21 +164,6 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         return cursor.asyncReplayEntries(positions, this, ReadType.Replay, true);
     }
 
-    private byte[] peekStickyKey(ByteBuf metadataAndPayload) {
-        metadataAndPayload.markReaderIndex();
-        MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
-        metadataAndPayload.resetReaderIndex();
-        String key = metadata.getPartitionKey();
-        if (log.isDebugEnabled()) {
-            log.debug("Parse message metadata, partition key is {}, ordering key is {}", key, metadata.getOrderingKey());
-        }
-        if (StringUtils.isNotBlank(key) || metadata.hasOrderingKey()) {
-            return metadata.hasOrderingKey() ? metadata.getOrderingKey().toByteArray() : key.getBytes();
-        }
-        metadata.recycle();
-        return NONE_KEY.getBytes();
-    }
-
     private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);
 
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentKeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentKeySharedSubscriptionTest.java
new file mode 100644
index 0000000..f03460d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentKeySharedSubscriptionTest.java
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import com.google.common.collect.Sets;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.service.HashRangeStickyKeyConsumerSelector;
+import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.assertTrue;
+
+public class NonPersistentKeySharedSubscriptionTest extends ProducerConsumerBase {
+
+    private static final Logger log = LoggerFactory.getLogger(NonPersistentKeySharedSubscriptionTest.class);
+    private static final List<String> keys = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
+
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testSendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {
+        this.conf.setSubscriptionKeySharedEnable(true);
+        String topic = "non-persistent://public/default/key_shared";
+
+        @Cleanup
+        Consumer<Integer> consumer1 = createConsumer(topic);
+
+        @Cleanup
+        Consumer<Integer> consumer2 = createConsumer(topic);
+
+        @Cleanup
+        Consumer<Integer> consumer3 = createConsumer(topic);
+
+        @Cleanup
+        Producer<Integer> producer = createProducer(topic);
+
+        int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+        int consumer2Slot = consumer1Slot >> 1;
+        int consumer3Slot = consumer2Slot >> 1;
+
+        int consumer1ExpectMessages = 0;
+        int consumer2ExpectMessages = 0;
+        int consumer3ExpectMessages = 0;
+
+        for (int i = 0; i < 10; i++) {
+            for (String key : keys) {
+                int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes())
+                        % HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+                if (slot < consumer3Slot) {
+                    consumer3ExpectMessages++;
+                } else if (slot < consumer2Slot) {
+                    consumer2ExpectMessages++;
+                } else {
+                    consumer1ExpectMessages++;
+                }
+                producer.newMessage()
+                        .key(key)
+                        .value(i)
+                        .send();
+            }
+        }
+
+        List<KeyValue<Consumer<Integer>, Integer>> checkList = new ArrayList<>();
+        checkList.add(new KeyValue<>(consumer1, consumer1ExpectMessages));
+        checkList.add(new KeyValue<>(consumer2, consumer2ExpectMessages));
+        checkList.add(new KeyValue<>(consumer3, consumer3ExpectMessages));
+
+        receiveAndCheck(checkList);
+    }
+
+    @Test
+    public void testConsumerCrashSendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException, InterruptedException {
+
+        this.conf.setSubscriptionKeySharedEnable(true);
+        String topic = "non-persistent://public/default/key_shared_consumer_crash";
+
+        @Cleanup
+        Consumer<Integer> consumer1 = createConsumer(topic);
+
+        @Cleanup
+        Consumer<Integer> consumer2 = createConsumer(topic);
+
+        @Cleanup
+        Consumer<Integer> consumer3 = createConsumer(topic);
+
+        @Cleanup
+        Producer<Integer> producer = createProducer(topic);
+
+        int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+        int consumer2Slot = consumer1Slot >> 1;
+        int consumer3Slot = consumer2Slot >> 1;
+
+        int consumer1ExpectMessages = 0;
+        int consumer2ExpectMessages = 0;
+        int consumer3ExpectMessages = 0;
+
+        for (int i = 0; i < 10; i++) {
+            for (String key : keys) {
+                int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes())
+                        % HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+                if (slot < consumer3Slot) {
+                    consumer3ExpectMessages++;
+                } else if (slot < consumer2Slot) {
+                    consumer2ExpectMessages++;
+                } else {
+                    consumer1ExpectMessages++;
+                }
+                producer.newMessage()
+                        .key(key)
+                        .value(i)
+                        .send();
+            }
+        }
+
+        List<KeyValue<Consumer<Integer>, Integer>> checkList = new ArrayList<>();
+        checkList.add(new KeyValue<>(consumer1, consumer1ExpectMessages));
+        checkList.add(new KeyValue<>(consumer2, consumer2ExpectMessages));
+        checkList.add(new KeyValue<>(consumer3, consumer3ExpectMessages));
+
+        receiveAndCheck(checkList);
+
+        // wait for consumer grouping acking send.
+        Thread.sleep(1000);
+
+        consumer1.close();
+        consumer2.close();
+
+        for (int i = 0; i < 10; i++) {
+            for (String key : keys) {
+                producer.newMessage()
+                        .key(key)
+                        .value(i)
+                        .send();
+            }
+        }
+
+        checkList = new ArrayList<>();
+        checkList.add(new KeyValue<>(consumer3, 100));
+        receiveAndCheck(checkList);
+    }
+
+    @Test
+    public void testNonKeySendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {
+        this.conf.setSubscriptionKeySharedEnable(true);
+        String topic = "non-persistent://public/default/key_shared_none_key";
+
+        @Cleanup
+        Consumer<Integer> consumer1 = createConsumer(topic);
+
+        @Cleanup
+        Consumer<Integer> consumer2 = createConsumer(topic);
+
+        @Cleanup
+        Consumer<Integer> consumer3 = createConsumer(topic);
+
+        @Cleanup
+        Producer<Integer> producer = createProducer(topic);
+
+        int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+        int consumer2Slot = consumer1Slot >> 1;
+        int consumer3Slot = consumer2Slot >> 1;
+
+        for (int i = 0; i < 100; i++) {
+            producer.newMessage()
+                    .value(i)
+                    .send();
+        }
+        int slot = Murmur3_32Hash.getInstance().makeHash(PersistentStickyKeyDispatcherMultipleConsumers.NONE_KEY.getBytes())
+                % HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+        List<KeyValue<Consumer<Integer>, Integer>> checkList = new ArrayList<>();
+        if (slot < consumer3Slot) {
+            checkList.add(new KeyValue<>(consumer3, 100));
+        } else if (slot < consumer2Slot) {
+            checkList.add(new KeyValue<>(consumer2, 100));
+        } else {
+            checkList.add(new KeyValue<>(consumer1, 100));
+        }
+        receiveAndCheck(checkList);
+    }
+
+    @Test
+    public void testOrderingKeyWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {
+        this.conf.setSubscriptionKeySharedEnable(true);
+        String topic = "non-persistent://public/default/key_shared_ordering_key";
+
+        @Cleanup
+        Consumer<Integer> consumer1 = createConsumer(topic);
+
+        @Cleanup
+        Consumer<Integer> consumer2 = createConsumer(topic);
+
+        @Cleanup
+        Consumer<Integer> consumer3 = createConsumer(topic);
+
+        @Cleanup
+        Producer<Integer> producer = createProducer(topic);
+
+        int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+        int consumer2Slot = consumer1Slot >> 1;
+        int consumer3Slot = consumer2Slot >> 1;
+
+        int consumer1ExpectMessages = 0;
+        int consumer2ExpectMessages = 0;
+        int consumer3ExpectMessages = 0;
+
+        for (int i = 0; i < 10; i++) {
+            for (String key : keys) {
+                int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes())
+                        % HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+                if (slot < consumer3Slot) {
+                    consumer3ExpectMessages++;
+                } else if (slot < consumer2Slot) {
+                    consumer2ExpectMessages++;
+                } else {
+                    consumer1ExpectMessages++;
+                }
+                producer.newMessage()
+                        .key("any key")
+                        .orderingKey(key.getBytes())
+                        .value(i)
+                        .send();
+            }
+        }
+
+        List<KeyValue<Consumer<Integer>, Integer>> checkList = new ArrayList<>();
+        checkList.add(new KeyValue<>(consumer1, consumer1ExpectMessages));
+        checkList.add(new KeyValue<>(consumer2, consumer2ExpectMessages));
+        checkList.add(new KeyValue<>(consumer3, consumer3ExpectMessages));
+
+        receiveAndCheck(checkList);
+    }
+
+    @Test(expectedExceptions = PulsarClientException.class)
+    public void testDisableKeySharedSubscription() throws PulsarClientException {
+        this.conf.setSubscriptionKeySharedEnable(false);
+        String topic = "persistent://public/default/key_shared_disabled";
+        pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("key_shared")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .ackTimeout(10, TimeUnit.SECONDS)
+                .subscribe();
+    }
+
+    private Producer<Integer> createProducer(String topic) throws PulsarClientException {
+        return pulsarClient.newProducer(Schema.INT32)
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+    }
+
+    private Consumer<Integer> createConsumer(String topic) throws PulsarClientException {
+        return pulsarClient.newConsumer(Schema.INT32)
+                .topic(topic)
+                .subscriptionName("key_shared")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .ackTimeout(3, TimeUnit.SECONDS)
+                .subscribe();
+    }
+
+    private void receiveAndCheck(List<KeyValue<Consumer<Integer>, Integer>> checkList) throws PulsarClientException {
+        Map<Consumer, Set<String>> consumerKeys = new HashMap<>();
+        for (KeyValue<Consumer<Integer>, Integer> check : checkList) {
+            int received = 0;
+            Map<String, Message<Integer>> lastMessageForKey = new HashMap<>();
+            for (Integer i = 0; i < check.getValue(); i++) {
+                Message<Integer> message = check.getKey().receive();
+                check.getKey().acknowledge(message);
+                String key = message.hasOrderingKey() ? new String(message.getOrderingKey()) : message.getKey();
+                log.info("[{}] Receive message key: {} value: {} messageId: {}",
+                        check.getKey().getConsumerName(), key, message.getValue(), message.getMessageId());
+                // check messages is order by key
+                if (lastMessageForKey.get(key) == null) {
+                    Assert.assertNotNull(message);
+                } else {
+                    Assert.assertTrue(message.getValue()
+                            .compareTo(lastMessageForKey.get(key).getValue()) > 0);
+                }
+                lastMessageForKey.put(key, message);
+                consumerKeys.putIfAbsent(check.getKey(), Sets.newHashSet());
+                consumerKeys.get(check.getKey()).add(key);
+                received++;
+            }
+            Assert.assertEquals(check.getValue().intValue(), received);
+        }
+        Set<String> allKeys = Sets.newHashSet();
+        consumerKeys.forEach((k, v) -> v.forEach(key -> {
+            assertTrue(allKeys.add(key),
+                    "Key "+ key +  "is distributed to multiple consumers." );
+        }));
+    }
+}