You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/05/23 04:02:28 UTC

[pulsar] branch master updated: [broker] fix bug of can't dispatch messages to other consumers while a consume crash. (#4335)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e50c829  [broker] fix bug of can't dispatch messages to other consumers while a consume crash. (#4335)
e50c829 is described below

commit e50c829f7cfc8f631066c42292da649553cc10ce
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu May 23 12:02:22 2019 +0800

    [broker] fix bug of can't dispatch messages to other consumers while a consume crash. (#4335)
    
    Describe the bug
    Use key_shared subscription, while a consumer crash, message cannot be dispatched correctly.
    
    To Reproduce
    
    start consumer1
    start consumer2
    start consumer3
    send message
    close consumer1
    close consumer2
    Expected behavior
    Messages can also be received in full.
    
    Actual behavior
    Unable to receive part of messages that the range originally responsible for consumer2.
---
 .../HashRangeStickyKeyConsumerSelector.java        |  7 +-
 .../client/api/KeySharedSubscriptionTest.java      | 94 ++++++++++++++++++++++
 2 files changed, 98 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeStickyKeyConsumerSelector.java
index c0dfa79..18b3427 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeStickyKeyConsumerSelector.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeStickyKeyConsumerSelector.java
@@ -92,9 +92,10 @@ public class HashRangeStickyKeyConsumerSelector implements StickyKeyConsumerSele
         Integer removeRange = consumerRange.get(consumer);
         if (removeRange != null) {
             if (removeRange == rangeSize && rangeMap.size() > 1) {
-                Consumer lowerConsumer = rangeMap.lowerEntry(removeRange).getValue();
-                rangeMap.put(removeRange, lowerConsumer);
-                consumerRange.put(lowerConsumer, removeRange);
+                Map.Entry<Integer, Consumer> lowerEntry = rangeMap.lowerEntry(removeRange);
+                rangeMap.put(removeRange, lowerEntry.getValue());
+                rangeMap.remove(lowerEntry.getKey());
+                consumerRange.put(lowerEntry.getValue(), removeRange);
             } else {
                 rangeMap.remove(removeRange);
                 consumerRange.remove(consumer);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 14c9693..b2148a1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -151,6 +151,100 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
     }
 
     @Test
+    public void testConsumerCrashSendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {
+
+        this.conf.setSubscriptionKeySharedEnable(true);
+        String topic = "persistent://public/default/key_shared_consumer_crash";
+
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("key_shared")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .ackTimeout(10, TimeUnit.SECONDS)
+                .subscribe();
+
+        Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("key_shared")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .ackTimeout(10, TimeUnit.SECONDS)
+                .subscribe();
+
+        Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionName("key_shared")
+                .subscriptionType(SubscriptionType.Key_Shared)
+                .ackTimeout(10, TimeUnit.SECONDS)
+                .subscribe();
+
+        int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+        int consumer2Slot = consumer1Slot >> 1;
+        int consumer3Slot = consumer2Slot >> 1;
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+
+        int consumer1ExpectMessages = 0;
+        int consumer2ExpectMessages = 0;
+        int consumer3ExpectMessages = 0;
+
+        final int totalSend = 100;
+
+        for (int i = 0; i < totalSend; i++) {
+            String key = UUID.randomUUID().toString();
+            int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes())
+                    % HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+            if (slot < consumer3Slot) {
+                consumer3ExpectMessages++;
+            } else if (slot < consumer2Slot) {
+                consumer2ExpectMessages++;
+            } else {
+                consumer1ExpectMessages++;
+            }
+            producer.newMessage()
+                    .key(key)
+                    .value(key.getBytes())
+                    .send();
+        }
+
+        int consumer1Received = 0;
+        for (int i = 0; i < consumer1ExpectMessages; i++) {
+            consumer1.receive();
+            consumer1Received++;
+        }
+
+        int consumer2Received = 0;
+        for (int i = 0; i < consumer2ExpectMessages; i++) {
+            consumer2.receive();
+            consumer2Received++;
+        }
+
+        int consumer3Received = 0;
+        for (int i = 0; i < consumer3ExpectMessages; i++) {
+            consumer3.receive();
+            consumer3Received++;
+        }
+        Assert.assertEquals(consumer1ExpectMessages, consumer1Received);
+        Assert.assertEquals(consumer2ExpectMessages, consumer2Received);
+        Assert.assertEquals(consumer3ExpectMessages, consumer3Received);
+
+        consumer1.close();
+        consumer2.close();
+
+        int receivedAfterConsumerCrash = 0;
+        for (int i = 0; i < totalSend; i++) {
+            Message message = consumer3.receive();
+            consumer3.acknowledge(message);
+            receivedAfterConsumerCrash++;
+        }
+
+        Assert.assertEquals(receivedAfterConsumerCrash, totalSend);
+    }
+
+
+    @Test
     public void testNonKeySendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {
         this.conf.setSubscriptionKeySharedEnable(true);
         String topic = "persistent://public/default/key_shared_none_key";