You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/05/23 04:02:28 UTC
[pulsar] branch master updated: [broker] fix bug of can't dispatch
messages to other consumers while a consume crash. (#4335)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e50c829 [broker] fix bug of can't dispatch messages to other consumers while a consume crash. (#4335)
e50c829 is described below
commit e50c829f7cfc8f631066c42292da649553cc10ce
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu May 23 12:02:22 2019 +0800
[broker] fix bug of can't dispatch messages to other consumers while a consume crash. (#4335)
Describe the bug
Use key_shared subscription, while a consumer crash, message cannot be dispatched correctly.
To Reproduce
start consumer1
start consumer2
start consumer3
send message
close consumer1
close consumer2
Expected behavior
Messages can also be received in full.
Actual behavior
Unable to receive part of messages that the range originally responsible for consumer2.
---
.../HashRangeStickyKeyConsumerSelector.java | 7 +-
.../client/api/KeySharedSubscriptionTest.java | 94 ++++++++++++++++++++++
2 files changed, 98 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeStickyKeyConsumerSelector.java
index c0dfa79..18b3427 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeStickyKeyConsumerSelector.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeStickyKeyConsumerSelector.java
@@ -92,9 +92,10 @@ public class HashRangeStickyKeyConsumerSelector implements StickyKeyConsumerSele
Integer removeRange = consumerRange.get(consumer);
if (removeRange != null) {
if (removeRange == rangeSize && rangeMap.size() > 1) {
- Consumer lowerConsumer = rangeMap.lowerEntry(removeRange).getValue();
- rangeMap.put(removeRange, lowerConsumer);
- consumerRange.put(lowerConsumer, removeRange);
+ Map.Entry<Integer, Consumer> lowerEntry = rangeMap.lowerEntry(removeRange);
+ rangeMap.put(removeRange, lowerEntry.getValue());
+ rangeMap.remove(lowerEntry.getKey());
+ consumerRange.put(lowerEntry.getValue(), removeRange);
} else {
rangeMap.remove(removeRange);
consumerRange.remove(consumer);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 14c9693..b2148a1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -151,6 +151,100 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
}
@Test
+ public void testConsumerCrashSendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {
+
+ this.conf.setSubscriptionKeySharedEnable(true);
+ String topic = "persistent://public/default/key_shared_consumer_crash";
+
+ Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName("key_shared")
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .ackTimeout(10, TimeUnit.SECONDS)
+ .subscribe();
+
+ Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName("key_shared")
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .ackTimeout(10, TimeUnit.SECONDS)
+ .subscribe();
+
+ Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName("key_shared")
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .ackTimeout(10, TimeUnit.SECONDS)
+ .subscribe();
+
+ int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+ int consumer2Slot = consumer1Slot >> 1;
+ int consumer3Slot = consumer2Slot >> 1;
+
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .enableBatching(false)
+ .create();
+
+ int consumer1ExpectMessages = 0;
+ int consumer2ExpectMessages = 0;
+ int consumer3ExpectMessages = 0;
+
+ final int totalSend = 100;
+
+ for (int i = 0; i < totalSend; i++) {
+ String key = UUID.randomUUID().toString();
+ int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes())
+ % HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+ if (slot < consumer3Slot) {
+ consumer3ExpectMessages++;
+ } else if (slot < consumer2Slot) {
+ consumer2ExpectMessages++;
+ } else {
+ consumer1ExpectMessages++;
+ }
+ producer.newMessage()
+ .key(key)
+ .value(key.getBytes())
+ .send();
+ }
+
+ int consumer1Received = 0;
+ for (int i = 0; i < consumer1ExpectMessages; i++) {
+ consumer1.receive();
+ consumer1Received++;
+ }
+
+ int consumer2Received = 0;
+ for (int i = 0; i < consumer2ExpectMessages; i++) {
+ consumer2.receive();
+ consumer2Received++;
+ }
+
+ int consumer3Received = 0;
+ for (int i = 0; i < consumer3ExpectMessages; i++) {
+ consumer3.receive();
+ consumer3Received++;
+ }
+ Assert.assertEquals(consumer1ExpectMessages, consumer1Received);
+ Assert.assertEquals(consumer2ExpectMessages, consumer2Received);
+ Assert.assertEquals(consumer3ExpectMessages, consumer3Received);
+
+ consumer1.close();
+ consumer2.close();
+
+ int receivedAfterConsumerCrash = 0;
+ for (int i = 0; i < totalSend; i++) {
+ Message message = consumer3.receive();
+ consumer3.acknowledge(message);
+ receivedAfterConsumerCrash++;
+ }
+
+ Assert.assertEquals(receivedAfterConsumerCrash, totalSend);
+ }
+
+
+ @Test
public void testNonKeySendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {
this.conf.setSubscriptionKeySharedEnable(true);
String topic = "persistent://public/default/key_shared_none_key";