You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/05/28 12:09:55 UTC

[pulsar] branch master updated: Fix bug of Key_Shared Ordering (#4372)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a21094  Fix bug of Key_Shared Ordering (#4372)
7a21094 is described below

commit 7a2109427c174207ec3f7f2060ce30bf7e0a2990
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue May 28 20:09:49 2019 +0800

    Fix bug of Key_Shared Ordering (#4372)
    
    Fix bug of Key_Shared ordering and add check for order by key, key distribution, message redelivery in Key_Shared subscription mode.
---
 .../HashRangeStickyKeyConsumerSelector.java        |   6 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |   2 +-
 .../broker/service/StickyKeyConsumerSelector.java  |   1 -
 .../PersistentDispatcherMultipleConsumers.java     |   1 -
 ...istentStickyKeyDispatcherMultipleConsumers.java |  38 +-
 .../client/api/KeySharedSubscriptionTest.java      | 455 ++++++++++-----------
 .../apache/pulsar/client/impl/ConsumerImpl.java    |   3 +-
 7 files changed, 252 insertions(+), 254 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeStickyKeyConsumerSelector.java
index 940798f..57ff777 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeStickyKeyConsumerSelector.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeStickyKeyConsumerSelector.java
@@ -105,8 +105,12 @@ public class HashRangeStickyKeyConsumerSelector implements StickyKeyConsumerSele
 
     @Override
     public Consumer select(byte[] stickyKey) {
+        return select(Murmur3_32Hash.getInstance().makeHash(stickyKey));
+    }
+
+    public Consumer select(int hash) {
         if (rangeMap.size() > 0) {
-            int slot = Murmur3_32Hash.getInstance().makeHash(stickyKey) % rangeSize;
+            int slot = hash % rangeSize;
             return rangeMap.ceilingEntry(slot).getValue();
         } else {
             return null;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 4488613..ddb619f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1104,7 +1104,7 @@ public class ServerCnx extends PulsarHandler {
 
         if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
             Consumer consumer = consumerFuture.getNow(null);
-            if (redeliver.getMessageIdsCount() > 0 && consumer.subType() == SubType.Shared) {
+            if (redeliver.getMessageIdsCount() > 0 && (consumer.subType() == SubType.Shared || consumer.subType() == SubType.Key_Shared)) {
                 consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList());
             } else {
                 consumer.redeliverUnacknowledgedMessages();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
index ce8ef86..b18ffa8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
@@ -41,5 +41,4 @@ public interface StickyKeyConsumerSelector {
      * @return consumer
      */
     Consumer select(byte[] stickyKey);
-
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 4b873d5..060b45d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -409,7 +409,6 @@ public class PersistentDispatcherMultipleConsumers  extends AbstractDispatcherMu
         }
 
         sendMessagesToConsumers(readType, entries);
-
         readMoreEntries();
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 836ee74..a49b41e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -21,10 +21,11 @@ package org.apache.pulsar.broker.service.persistent;
 import io.netty.buffer.ByteBuf;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -39,6 +40,7 @@ import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,15 +73,21 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         long totalMessagesSent = 0;
         long totalBytesSent = 0;
         if (entries.size() > 0) {
-            final Map<byte[], List<Entry>> groupedEntries = entries
-                    .stream()
-                    .collect(Collectors.groupingBy(entry -> peekStickyKey(entry.getDataBuffer()), Collectors.toList()));
-            final Iterator<Map.Entry<byte[], List<Entry>>> iterator = groupedEntries.entrySet().iterator();
+            final Map<Integer, List<Entry>> groupedEntries = new HashMap<>();
+            for (Entry entry : entries) {
+                int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer()));
+                groupedEntries.putIfAbsent(key, new ArrayList<>());
+                groupedEntries.get(key).add(entry);
+            }
+            final Iterator<Map.Entry<Integer, List<Entry>>> iterator = groupedEntries.entrySet().iterator();
+            AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size());
             while (iterator.hasNext() && totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
-                final Map.Entry<byte[], List<Entry>> entriesWithSameKey = iterator.next();
+                final Map.Entry<Integer, List<Entry>> entriesWithSameKey = iterator.next();
                 //TODO: None key policy
-                final Consumer consumer = selector.select(entriesWithSameKey.getKey());
-
+                Consumer consumer = null;
+                if (selector instanceof HashRangeStickyKeyConsumerSelector) {
+                    consumer = ((HashRangeStickyKeyConsumerSelector)selector).select(entriesWithSameKey.getKey());
+                }
                 if (consumer == null) {
                     // Do nothing, cursor will be rewind at reconnection
                     log.info("[{}] rewind because no available consumer found for key {} from total {}", name,
@@ -91,7 +99,6 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
 
                 int messagesForC = Math.min(entriesWithSameKey.getValue().size(), consumer.getAvailablePermits());
                 if (messagesForC > 0) {
-
                     // remove positions first from replay list first : sendMessages recycles entries
                     List<Entry> subList = new ArrayList<>(entriesWithSameKey.getValue().subList(0, messagesForC));
                     if (readType == ReadType.Replay) {
@@ -103,7 +110,11 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
                     filterEntriesForConsumer(subList, batchSizes, sendMessageInfo);
 
                     consumer.sendMessages(subList, batchSizes, sendMessageInfo.getTotalMessages(),
-                            sendMessageInfo.getTotalBytes(), getRedeliveryTracker());
+                            sendMessageInfo.getTotalBytes(), getRedeliveryTracker()).addListener(future -> {
+                                if (future.isSuccess() && keyNumbers.decrementAndGet() == 0) {
+                                    readMoreEntries();
+                                }
+                    });
                     entriesWithSameKey.getValue().removeAll(subList);
 
                     totalAvailablePermits -= sendMessageInfo.getTotalMessages();
@@ -140,7 +151,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
                     log.debug("[{}] No consumers found with available permits, storing {} positions for later replay", name,
                             laterReplay);
                 }
-
+                readMoreEntries();
             }
         }
     }
@@ -155,12 +166,11 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
         metadataAndPayload.resetReaderIndex();
         String key = metadata.getPartitionKey();
-        metadata.recycle();
         if (StringUtils.isNotBlank(key) || metadata.hasOrderingKey()) {
             return metadata.hasOrderingKey() ? metadata.getOrderingKey().toByteArray() : key.getBytes();
-        } else {
-            return NONE_KEY.getBytes();
         }
+        metadata.recycle();
+        return NONE_KEY.getBytes();
     }
 
     private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index b2148a1..c5f42bb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -18,8 +18,11 @@
  */
 package org.apache.pulsar.client.api;
 
+import com.google.common.collect.Sets;
+import lombok.Cleanup;
 import org.apache.pulsar.broker.service.HashRangeStickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
+import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.util.Murmur3_32Hash;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -28,12 +31,21 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.util.UUID;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import static org.testng.Assert.assertTrue;
+
 public class KeySharedSubscriptionTest extends ProducerConsumerBase {
 
     private static final Logger log = LoggerFactory.getLogger(KeySharedSubscriptionTest.class);
+    private static final List<String> keys = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
+
 
     @BeforeMethod
     @Override
@@ -53,194 +65,163 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         this.conf.setSubscriptionKeySharedEnable(true);
         String topic = "persistent://public/default/key_shared";
 
-        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+        @Cleanup
+        Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
                 .topic(topic)
                 .subscriptionName("key_shared")
                 .subscriptionType(SubscriptionType.Key_Shared)
-                .ackTimeout(10, TimeUnit.SECONDS)
+                .ackTimeout(3, TimeUnit.SECONDS)
                 .subscribe();
 
-        Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+        @Cleanup
+        Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
                 .topic(topic)
                 .subscriptionName("key_shared")
                 .subscriptionType(SubscriptionType.Key_Shared)
-                .ackTimeout(10, TimeUnit.SECONDS)
+                .ackTimeout(3, TimeUnit.SECONDS)
                 .subscribe();
 
-        Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
+        @Cleanup
+        Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
                 .topic(topic)
                 .subscriptionName("key_shared")
                 .subscriptionType(SubscriptionType.Key_Shared)
-                .ackTimeout(10, TimeUnit.SECONDS)
+                .ackTimeout(3, TimeUnit.SECONDS)
                 .subscribe();
 
-        int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
-        int consumer2Slot = consumer1Slot >> 1;
-        int consumer3Slot = consumer2Slot >> 1;
-
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        @Cleanup
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
                 .topic(topic)
                 .enableBatching(false)
                 .create();
 
+        int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+        int consumer2Slot = consumer1Slot >> 1;
+        int consumer3Slot = consumer2Slot >> 1;
+
         int consumer1ExpectMessages = 0;
         int consumer2ExpectMessages = 0;
         int consumer3ExpectMessages = 0;
 
-        for (int i = 0; i < 100; i++) {
-            String key = UUID.randomUUID().toString();
-            int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes())
-                % HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
-            if (slot < consumer3Slot) {
-                consumer3ExpectMessages++;
-            } else if (slot < consumer2Slot) {
-                consumer2ExpectMessages++;
-            } else {
-                consumer1ExpectMessages++;
-            }
-            producer.newMessage()
+        for (int i = 0; i < 10; i++) {
+            for (String key : keys) {
+                int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes())
+                    % HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+                if (slot < consumer3Slot) {
+                    consumer3ExpectMessages++;
+                } else if (slot < consumer2Slot) {
+                    consumer2ExpectMessages++;
+                } else {
+                    consumer1ExpectMessages++;
+                }
+                producer.newMessage()
                     .key(key)
-                    .value(key.getBytes())
+                    .value(i)
                     .send();
+            }
         }
 
-        int consumer1Received = 0;
-        for (int i = 0; i < consumer1ExpectMessages; i++) {
-            consumer1.receive();
-            consumer1Received++;
-        }
-
-        int consumer2Received = 0;
-        for (int i = 0; i < consumer2ExpectMessages; i++) {
-            consumer2.receive();
-            consumer2Received++;
-        }
-
-        int consumer3Received = 0;
-        for (int i = 0; i < consumer3ExpectMessages; i++) {
-            consumer3.receive();
-            consumer3Received++;
-        }
-        Assert.assertEquals(consumer1ExpectMessages, consumer1Received);
-        Assert.assertEquals(consumer2ExpectMessages, consumer2Received);
-        Assert.assertEquals(consumer3ExpectMessages, consumer3Received);
-
-        // messages not acked, test redelivery
-
-        for (int i = 0; i < consumer1ExpectMessages; i++) {
-            Message message = consumer1.receive();
-            consumer1.acknowledge(message);
-            consumer1Received++;
-        }
-
-        for (int i = 0; i < consumer2ExpectMessages; i++) {
-            Message message = consumer2.receive();
-            consumer2.acknowledge(message);
-            consumer2Received++;
-        }
-
-        for (int i = 0; i < consumer3ExpectMessages; i++) {
-            Message message = consumer3.receive();
-            consumer3.acknowledge(message);
-            consumer3Received++;
-        }
+        List<KeyValue<Consumer<Integer>, Integer>> checkList = new ArrayList<>();
+        checkList.add(new KeyValue<>(consumer1, consumer1ExpectMessages));
+        checkList.add(new KeyValue<>(consumer2, consumer2ExpectMessages));
+        checkList.add(new KeyValue<>(consumer3, consumer3ExpectMessages));
 
-        Assert.assertEquals(consumer1ExpectMessages * 2, consumer1Received);
-        Assert.assertEquals(consumer2ExpectMessages * 2, consumer2Received);
-        Assert.assertEquals(consumer3ExpectMessages * 2, consumer3Received);
+        receiveAndCheck(checkList);
     }
 
     @Test
-    public void testConsumerCrashSendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {
+    public void testConsumerCrashSendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException, InterruptedException {
 
         this.conf.setSubscriptionKeySharedEnable(true);
         String topic = "persistent://public/default/key_shared_consumer_crash";
 
-        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+        @Cleanup
+        Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
                 .topic(topic)
                 .subscriptionName("key_shared")
                 .subscriptionType(SubscriptionType.Key_Shared)
-                .ackTimeout(10, TimeUnit.SECONDS)
+                .ackTimeout(3, TimeUnit.SECONDS)
                 .subscribe();
 
-        Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+        @Cleanup
+        Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
                 .topic(topic)
                 .subscriptionName("key_shared")
                 .subscriptionType(SubscriptionType.Key_Shared)
-                .ackTimeout(10, TimeUnit.SECONDS)
+                .ackTimeout(3, TimeUnit.SECONDS)
                 .subscribe();
 
-        Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
+        @Cleanup
+        Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
                 .topic(topic)
                 .subscriptionName("key_shared")
                 .subscriptionType(SubscriptionType.Key_Shared)
-                .ackTimeout(10, TimeUnit.SECONDS)
+                .ackTimeout(3, TimeUnit.SECONDS)
                 .subscribe();
 
+        @Cleanup
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+            .topic(topic)
+            .enableBatching(false)
+            .create();
+
         int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
         int consumer2Slot = consumer1Slot >> 1;
         int consumer3Slot = consumer2Slot >> 1;
 
-        Producer<byte[]> producer = pulsarClient.newProducer()
-                .topic(topic)
-                .enableBatching(false)
-                .create();
-
         int consumer1ExpectMessages = 0;
         int consumer2ExpectMessages = 0;
         int consumer3ExpectMessages = 0;
 
-        final int totalSend = 100;
-
-        for (int i = 0; i < totalSend; i++) {
-            String key = UUID.randomUUID().toString();
-            int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes())
+        for (int i = 0; i < 10; i++) {
+            for (String key : keys) {
+                int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes())
                     % HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
-            if (slot < consumer3Slot) {
-                consumer3ExpectMessages++;
-            } else if (slot < consumer2Slot) {
-                consumer2ExpectMessages++;
-            } else {
-                consumer1ExpectMessages++;
-            }
-            producer.newMessage()
+                if (slot < consumer3Slot) {
+                    consumer3ExpectMessages++;
+                } else if (slot < consumer2Slot) {
+                    consumer2ExpectMessages++;
+                } else {
+                    consumer1ExpectMessages++;
+                }
+                producer.newMessage()
                     .key(key)
-                    .value(key.getBytes())
+                    .value(i)
                     .send();
+            }
         }
 
-        int consumer1Received = 0;
-        for (int i = 0; i < consumer1ExpectMessages; i++) {
-            consumer1.receive();
-            consumer1Received++;
-        }
-
-        int consumer2Received = 0;
-        for (int i = 0; i < consumer2ExpectMessages; i++) {
-            consumer2.receive();
-            consumer2Received++;
-        }
+        List<KeyValue<Consumer<Integer>, Integer>> checkList = new ArrayList<>();
+        checkList.add(new KeyValue<>(consumer1, consumer1ExpectMessages));
+        checkList.add(new KeyValue<>(consumer2, consumer2ExpectMessages));
+        checkList.add(new KeyValue<>(consumer3, consumer3ExpectMessages));
 
-        int consumer3Received = 0;
-        for (int i = 0; i < consumer3ExpectMessages; i++) {
-            consumer3.receive();
-            consumer3Received++;
-        }
-        Assert.assertEquals(consumer1ExpectMessages, consumer1Received);
-        Assert.assertEquals(consumer2ExpectMessages, consumer2Received);
-        Assert.assertEquals(consumer3ExpectMessages, consumer3Received);
+        receiveAndCheck(checkList);
 
         consumer1.close();
         consumer2.close();
 
-        int receivedAfterConsumerCrash = 0;
-        for (int i = 0; i < totalSend; i++) {
-            Message message = consumer3.receive();
-            consumer3.acknowledge(message);
-            receivedAfterConsumerCrash++;
+        // avoid message replay
+        Message<Integer> message;
+        do {
+            message = consumer3.receive(1000, TimeUnit.MILLISECONDS);
+            if (message != null) {
+                consumer3.acknowledge(message);
+            }
+        } while (message != null);
+
+        for (int i = 0; i < 10; i++) {
+            for (String key : keys) {
+                producer.newMessage()
+                    .key(key)
+                    .value(i)
+                    .send();
+            }
         }
 
-        Assert.assertEquals(receivedAfterConsumerCrash, totalSend);
+        checkList = new ArrayList<>();
+        checkList.add(new KeyValue<>(consumer3, 100));
+        receiveAndCheck(checkList);
     }
 
 
@@ -249,97 +230,56 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         this.conf.setSubscriptionKeySharedEnable(true);
         String topic = "persistent://public/default/key_shared_none_key";
 
-        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+        @Cleanup
+        Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
                 .topic(topic)
                 .subscriptionName("key_shared")
                 .subscriptionType(SubscriptionType.Key_Shared)
+                .ackTimeout(3, TimeUnit.SECONDS)
                 .subscribe();
 
-        Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+        @Cleanup
+        Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
                 .topic(topic)
                 .subscriptionName("key_shared")
                 .subscriptionType(SubscriptionType.Key_Shared)
+                .ackTimeout(3, TimeUnit.SECONDS)
                 .subscribe();
 
-        Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
+        @Cleanup
+        Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
                 .topic(topic)
                 .subscriptionName("key_shared")
                 .subscriptionType(SubscriptionType.Key_Shared)
+                .ackTimeout(3, TimeUnit.SECONDS)
                 .subscribe();
 
         int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
         int consumer2Slot = consumer1Slot >> 1;
         int consumer3Slot = consumer2Slot >> 1;
 
-        Producer<byte[]> producer = pulsarClient.newProducer()
+        @Cleanup
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
                 .topic(topic)
                 .enableBatching(false)
                 .create();
 
         for (int i = 0; i < 100; i++) {
             producer.newMessage()
-                    .value(("Message - " + i).getBytes())
+                    .value(i)
                     .send();
         }
-
-        int expectMessages = 100;
-        int receiveMessages = 0;
         int slot = Murmur3_32Hash.getInstance().makeHash(PersistentStickyKeyDispatcherMultipleConsumers.NONE_KEY.getBytes())
             % HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+        List<KeyValue<Consumer<Integer>, Integer>> checkList = new ArrayList<>();
         if (slot < consumer3Slot) {
-            for (int i = 0; i < expectMessages; i++) {
-                Message message = consumer3.receive();
-                consumer3.acknowledge(message);
-                receiveMessages++;
-            }
+            checkList.add(new KeyValue<>(consumer3, 100));
         } else if (slot < consumer2Slot) {
-            for (int i = 0; i < expectMessages; i++) {
-                Message message = consumer2.receive();
-                consumer2.acknowledge(message);
-                receiveMessages++;
-            }
+            checkList.add(new KeyValue<>(consumer2, 100));
         } else {
-            for (int i = 0; i < expectMessages; i++) {
-                Message message = consumer1.receive();
-                consumer1.acknowledge(message);
-                receiveMessages++;
-            }
-        }
-        Assert.assertEquals(expectMessages, receiveMessages);
-
-        Producer<byte[]> batchingProducer = pulsarClient.newProducer()
-                .topic(topic)
-                .enableBatching(true)
-                .create();
-
-        for (int i = 0; i < 100; i++) {
-            batchingProducer.newMessage()
-                    .value(("Message - " + i).getBytes())
-                    .send();
+            checkList.add(new KeyValue<>(consumer1, 100));
         }
-
-        if (slot < consumer3Slot) {
-            for (int i = 0; i < expectMessages; i++) {
-                Message message = consumer3.receive();
-                consumer3.acknowledge(message);
-                receiveMessages++;
-            }
-        } else if (slot < consumer2Slot) {
-            for (int i = 0; i < expectMessages; i++) {
-                Message message = consumer2.receive();
-                consumer2.acknowledge(message);
-                receiveMessages++;
-            }
-        } else {
-            for (int i = 0; i < expectMessages; i++) {
-                Message message = consumer1.receive();
-                consumer1.acknowledge(message);
-                receiveMessages++;
-            }
-        }
-
-        Assert.assertEquals(expectMessages * 2, receiveMessages);
-
+        receiveAndCheck(checkList);
     }
 
     @Test
@@ -347,79 +287,69 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
         this.conf.setSubscriptionKeySharedEnable(true);
         String topic = "persistent://public/default/key_shared_ordering_key";
 
-        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
-                .topic(topic)
-                .subscriptionName("key_shared")
-                .subscriptionType(SubscriptionType.Key_Shared)
-                .ackTimeout(10, TimeUnit.SECONDS)
-                .subscribe();
+        @Cleanup
+        Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
+            .topic(topic)
+            .subscriptionName("key_shared")
+            .subscriptionType(SubscriptionType.Key_Shared)
+            .ackTimeout(3, TimeUnit.SECONDS)
+            .subscribe();
 
-        Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
-                .topic(topic)
-                .subscriptionName("key_shared")
-                .subscriptionType(SubscriptionType.Key_Shared)
-                .ackTimeout(10, TimeUnit.SECONDS)
-                .subscribe();
+        @Cleanup
+        Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
+            .topic(topic)
+            .subscriptionName("key_shared")
+            .subscriptionType(SubscriptionType.Key_Shared)
+            .ackTimeout(3, TimeUnit.SECONDS)
+            .subscribe();
 
-        Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
-                .topic(topic)
-                .subscriptionName("key_shared")
-                .subscriptionType(SubscriptionType.Key_Shared)
-                .ackTimeout(10, TimeUnit.SECONDS)
-                .subscribe();
+        @Cleanup
+        Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
+            .topic(topic)
+            .subscriptionName("key_shared")
+            .subscriptionType(SubscriptionType.Key_Shared)
+            .ackTimeout(3, TimeUnit.SECONDS)
+            .subscribe();
+
+        @Cleanup
+        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+            .topic(topic)
+            .enableBatching(false)
+            .create();
 
         int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
         int consumer2Slot = consumer1Slot >> 1;
         int consumer3Slot = consumer2Slot >> 1;
 
-        Producer<byte[]> producer = pulsarClient.newProducer()
-                .topic(topic)
-                .enableBatching(false)
-                .create();
-
         int consumer1ExpectMessages = 0;
         int consumer2ExpectMessages = 0;
         int consumer3ExpectMessages = 0;
 
-        for (int i = 0; i < 100; i++) {
-            String key = UUID.randomUUID().toString();
-            String orderingKey = UUID.randomUUID().toString();
-            int slot = Murmur3_32Hash.getInstance().makeHash(orderingKey.getBytes())
-                % HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
-            if (slot < consumer3Slot) {
-                consumer3ExpectMessages++;
-            } else if (slot < consumer2Slot) {
-                consumer2ExpectMessages++;
-            } else {
-                consumer1ExpectMessages++;
-            }
-            producer.newMessage()
-                    .key(key)
-                    .orderingKey(orderingKey.getBytes())
-                    .value(key.getBytes())
+        for (int i = 0; i < 10; i++) {
+            for (String key : keys) {
+                int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes())
+                    % HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+                if (slot < consumer3Slot) {
+                    consumer3ExpectMessages++;
+                } else if (slot < consumer2Slot) {
+                    consumer2ExpectMessages++;
+                } else {
+                    consumer1ExpectMessages++;
+                }
+                producer.newMessage()
+                    .key("any key")
+                    .orderingKey(key.getBytes())
+                    .value(i)
                     .send();
+            }
         }
 
-        int consumer1Received = 0;
-        for (int i = 0; i < consumer1ExpectMessages; i++) {
-            consumer1.receive();
-            consumer1Received++;
-        }
-
-        int consumer2Received = 0;
-        for (int i = 0; i < consumer2ExpectMessages; i++) {
-            consumer2.receive();
-            consumer2Received++;
-        }
+        List<KeyValue<Consumer<Integer>, Integer>> checkList = new ArrayList<>();
+        checkList.add(new KeyValue<>(consumer1, consumer1ExpectMessages));
+        checkList.add(new KeyValue<>(consumer2, consumer2ExpectMessages));
+        checkList.add(new KeyValue<>(consumer3, consumer3ExpectMessages));
 
-        int consumer3Received = 0;
-        for (int i = 0; i < consumer3ExpectMessages; i++) {
-            consumer3.receive();
-            consumer3Received++;
-        }
-        Assert.assertEquals(consumer1ExpectMessages, consumer1Received);
-        Assert.assertEquals(consumer2ExpectMessages, consumer2Received);
-        Assert.assertEquals(consumer3ExpectMessages, consumer3Received);
+        receiveAndCheck(checkList);
     }
 
     @Test(expectedExceptions = PulsarClientException.class)
@@ -433,4 +363,59 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
             .ackTimeout(10, TimeUnit.SECONDS)
             .subscribe();
     }
+
+    private void receiveAndCheck(List<KeyValue<Consumer<Integer>, Integer>> checkList) throws PulsarClientException {
+        Map<Consumer, Set<String>> consumerKeys = new HashMap<>();
+        for (KeyValue<Consumer<Integer>, Integer> check : checkList) {
+            if (check.getValue() % 2 != 0) {
+                throw new IllegalArgumentException();
+            }
+            int received = 0;
+            Map<String, Message<Integer>> lastMessageForKey = new HashMap<>();
+            for (Integer i = 0; i < check.getValue(); i++) {
+                Message<Integer> message = check.getKey().receive();
+                if (i % 2 == 0) {
+                    check.getKey().acknowledge(message);
+                }
+                String key = message.hasOrderingKey() ? new String(message.getOrderingKey()) : message.getKey();
+                log.info("[{}] Receive message key: {} value: {} messageId: {}",
+                    check.getKey().getConsumerName(), key, message.getValue(), message.getMessageId());
+                // check messages is order by key
+                if (lastMessageForKey.get(key) == null) {
+                    Assert.assertNotNull(message);
+                } else {
+                    Assert.assertTrue(message.getValue()
+                        .compareTo(lastMessageForKey.get(key).getValue()) > 0);
+                }
+                lastMessageForKey.put(key, message);
+                consumerKeys.putIfAbsent(check.getKey(), Sets.newHashSet());
+                consumerKeys.get(check.getKey()).add(key);
+                received++;
+            }
+            Assert.assertEquals(check.getValue().intValue(), received);
+            int redeliveryCount = check.getValue() / 2;
+            log.info("[{}] Consumer wait for {} messages redelivery ...", redeliveryCount);
+            // messages not acked, test redelivery
+            for (int i = 0; i < redeliveryCount; i++) {
+                Message<Integer> message = check.getKey().receive();
+                received++;
+                check.getKey().acknowledge(message);
+                String key = message.hasOrderingKey() ? new String(message.getOrderingKey()) : message.getKey();
+                log.info("[{}] Receive redeliver message key: {} value: {} messageId: {}",
+                        check.getKey().getConsumerName(), key, message.getValue(), message.getMessageId());
+            }
+            Message noMessages = null;
+            try {
+                noMessages = check.getKey().receive(100, TimeUnit.MILLISECONDS);
+            } catch (PulsarClientException ignore) {
+            }
+            Assert.assertNull(noMessages, "redeliver too many messages.");
+            Assert.assertEquals((check.getValue() + redeliveryCount), received);
+        }
+        Set<String> allKeys = Sets.newHashSet();
+        consumerKeys.forEach((k, v) -> v.forEach(key -> {
+            assertTrue(allKeys.add(key),
+                "Key "+ key +  "is distributed to multiple consumers." );
+        }));
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 3426e22..02d1858 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1253,7 +1253,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
         checkArgument(messageIds.stream().findFirst().get() instanceof MessageIdImpl);
 
-        if (conf.getSubscriptionType() != SubscriptionType.Shared) {
+        if (conf.getSubscriptionType() != SubscriptionType.Shared
+                && conf.getSubscriptionType() != SubscriptionType.Key_Shared) {
             // We cannot redeliver single messages if subscription type is not Shared
             redeliverUnacknowledgedMessages();
             return;