You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/05/28 12:09:55 UTC
[pulsar] branch master updated: Fix bug of Key_Shared Ordering
(#4372)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7a21094 Fix bug of Key_Shared Ordering (#4372)
7a21094 is described below
commit 7a2109427c174207ec3f7f2060ce30bf7e0a2990
Author: lipenghui <pe...@apache.org>
AuthorDate: Tue May 28 20:09:49 2019 +0800
Fix bug of Key_Shared Ordering (#4372)
Fix bug of Key_Shared ordering and add check for order by key, key distribution, message redelivery in Key_Shared subscription mode.
---
.../HashRangeStickyKeyConsumerSelector.java | 6 +-
.../apache/pulsar/broker/service/ServerCnx.java | 2 +-
.../broker/service/StickyKeyConsumerSelector.java | 1 -
.../PersistentDispatcherMultipleConsumers.java | 1 -
...istentStickyKeyDispatcherMultipleConsumers.java | 38 +-
.../client/api/KeySharedSubscriptionTest.java | 455 ++++++++++-----------
.../apache/pulsar/client/impl/ConsumerImpl.java | 3 +-
7 files changed, 252 insertions(+), 254 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeStickyKeyConsumerSelector.java
index 940798f..57ff777 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeStickyKeyConsumerSelector.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeStickyKeyConsumerSelector.java
@@ -105,8 +105,12 @@ public class HashRangeStickyKeyConsumerSelector implements StickyKeyConsumerSele
@Override
public Consumer select(byte[] stickyKey) {
+ return select(Murmur3_32Hash.getInstance().makeHash(stickyKey));
+ }
+
+ public Consumer select(int hash) {
if (rangeMap.size() > 0) {
- int slot = Murmur3_32Hash.getInstance().makeHash(stickyKey) % rangeSize;
+ int slot = hash % rangeSize;
return rangeMap.ceilingEntry(slot).getValue();
} else {
return null;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 4488613..ddb619f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1104,7 +1104,7 @@ public class ServerCnx extends PulsarHandler {
if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
Consumer consumer = consumerFuture.getNow(null);
- if (redeliver.getMessageIdsCount() > 0 && consumer.subType() == SubType.Shared) {
+ if (redeliver.getMessageIdsCount() > 0 && (consumer.subType() == SubType.Shared || consumer.subType() == SubType.Key_Shared)) {
consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList());
} else {
consumer.redeliverUnacknowledgedMessages();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
index ce8ef86..b18ffa8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
@@ -41,5 +41,4 @@ public interface StickyKeyConsumerSelector {
* @return consumer
*/
Consumer select(byte[] stickyKey);
-
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 4b873d5..060b45d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -409,7 +409,6 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMu
}
sendMessagesToConsumers(readType, entries);
-
readMoreEntries();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 836ee74..a49b41e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -21,10 +21,11 @@ package org.apache.pulsar.broker.service.persistent;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -39,6 +40,7 @@ import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,15 +73,21 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
long totalMessagesSent = 0;
long totalBytesSent = 0;
if (entries.size() > 0) {
- final Map<byte[], List<Entry>> groupedEntries = entries
- .stream()
- .collect(Collectors.groupingBy(entry -> peekStickyKey(entry.getDataBuffer()), Collectors.toList()));
- final Iterator<Map.Entry<byte[], List<Entry>>> iterator = groupedEntries.entrySet().iterator();
+ final Map<Integer, List<Entry>> groupedEntries = new HashMap<>();
+ for (Entry entry : entries) {
+ int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer()));
+ groupedEntries.putIfAbsent(key, new ArrayList<>());
+ groupedEntries.get(key).add(entry);
+ }
+ final Iterator<Map.Entry<Integer, List<Entry>>> iterator = groupedEntries.entrySet().iterator();
+ AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size());
while (iterator.hasNext() && totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
- final Map.Entry<byte[], List<Entry>> entriesWithSameKey = iterator.next();
+ final Map.Entry<Integer, List<Entry>> entriesWithSameKey = iterator.next();
//TODO: None key policy
- final Consumer consumer = selector.select(entriesWithSameKey.getKey());
-
+ Consumer consumer = null;
+ if (selector instanceof HashRangeStickyKeyConsumerSelector) {
+ consumer = ((HashRangeStickyKeyConsumerSelector)selector).select(entriesWithSameKey.getKey());
+ }
if (consumer == null) {
// Do nothing, cursor will be rewind at reconnection
log.info("[{}] rewind because no available consumer found for key {} from total {}", name,
@@ -91,7 +99,6 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
int messagesForC = Math.min(entriesWithSameKey.getValue().size(), consumer.getAvailablePermits());
if (messagesForC > 0) {
-
// remove positions first from replay list first : sendMessages recycles entries
List<Entry> subList = new ArrayList<>(entriesWithSameKey.getValue().subList(0, messagesForC));
if (readType == ReadType.Replay) {
@@ -103,7 +110,11 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
filterEntriesForConsumer(subList, batchSizes, sendMessageInfo);
consumer.sendMessages(subList, batchSizes, sendMessageInfo.getTotalMessages(),
- sendMessageInfo.getTotalBytes(), getRedeliveryTracker());
+ sendMessageInfo.getTotalBytes(), getRedeliveryTracker()).addListener(future -> {
+ if (future.isSuccess() && keyNumbers.decrementAndGet() == 0) {
+ readMoreEntries();
+ }
+ });
entriesWithSameKey.getValue().removeAll(subList);
totalAvailablePermits -= sendMessageInfo.getTotalMessages();
@@ -140,7 +151,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
log.debug("[{}] No consumers found with available permits, storing {} positions for later replay", name,
laterReplay);
}
-
+ readMoreEntries();
}
}
}
@@ -155,12 +166,11 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
metadataAndPayload.resetReaderIndex();
String key = metadata.getPartitionKey();
- metadata.recycle();
if (StringUtils.isNotBlank(key) || metadata.hasOrderingKey()) {
return metadata.hasOrderingKey() ? metadata.getOrderingKey().toByteArray() : key.getBytes();
- } else {
- return NONE_KEY.getBytes();
}
+ metadata.recycle();
+ return NONE_KEY.getBytes();
}
private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index b2148a1..c5f42bb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -18,8 +18,11 @@
*/
package org.apache.pulsar.client.api;
+import com.google.common.collect.Sets;
+import lombok.Cleanup;
import org.apache.pulsar.broker.service.HashRangeStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
+import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,12 +31,21 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.util.UUID;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
+import static org.testng.Assert.assertTrue;
+
public class KeySharedSubscriptionTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(KeySharedSubscriptionTest.class);
+ private static final List<String> keys = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
+
@BeforeMethod
@Override
@@ -53,194 +65,163 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
this.conf.setSubscriptionKeySharedEnable(true);
String topic = "persistent://public/default/key_shared";
- Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+ @Cleanup
+ Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("key_shared")
.subscriptionType(SubscriptionType.Key_Shared)
- .ackTimeout(10, TimeUnit.SECONDS)
+ .ackTimeout(3, TimeUnit.SECONDS)
.subscribe();
- Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+ @Cleanup
+ Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("key_shared")
.subscriptionType(SubscriptionType.Key_Shared)
- .ackTimeout(10, TimeUnit.SECONDS)
+ .ackTimeout(3, TimeUnit.SECONDS)
.subscribe();
- Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
+ @Cleanup
+ Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("key_shared")
.subscriptionType(SubscriptionType.Key_Shared)
- .ackTimeout(10, TimeUnit.SECONDS)
+ .ackTimeout(3, TimeUnit.SECONDS)
.subscribe();
- int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
- int consumer2Slot = consumer1Slot >> 1;
- int consumer3Slot = consumer2Slot >> 1;
-
- Producer<byte[]> producer = pulsarClient.newProducer()
+ @Cleanup
+ Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
.topic(topic)
.enableBatching(false)
.create();
+ int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+ int consumer2Slot = consumer1Slot >> 1;
+ int consumer3Slot = consumer2Slot >> 1;
+
int consumer1ExpectMessages = 0;
int consumer2ExpectMessages = 0;
int consumer3ExpectMessages = 0;
- for (int i = 0; i < 100; i++) {
- String key = UUID.randomUUID().toString();
- int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes())
- % HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
- if (slot < consumer3Slot) {
- consumer3ExpectMessages++;
- } else if (slot < consumer2Slot) {
- consumer2ExpectMessages++;
- } else {
- consumer1ExpectMessages++;
- }
- producer.newMessage()
+ for (int i = 0; i < 10; i++) {
+ for (String key : keys) {
+ int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes())
+ % HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+ if (slot < consumer3Slot) {
+ consumer3ExpectMessages++;
+ } else if (slot < consumer2Slot) {
+ consumer2ExpectMessages++;
+ } else {
+ consumer1ExpectMessages++;
+ }
+ producer.newMessage()
.key(key)
- .value(key.getBytes())
+ .value(i)
.send();
+ }
}
- int consumer1Received = 0;
- for (int i = 0; i < consumer1ExpectMessages; i++) {
- consumer1.receive();
- consumer1Received++;
- }
-
- int consumer2Received = 0;
- for (int i = 0; i < consumer2ExpectMessages; i++) {
- consumer2.receive();
- consumer2Received++;
- }
-
- int consumer3Received = 0;
- for (int i = 0; i < consumer3ExpectMessages; i++) {
- consumer3.receive();
- consumer3Received++;
- }
- Assert.assertEquals(consumer1ExpectMessages, consumer1Received);
- Assert.assertEquals(consumer2ExpectMessages, consumer2Received);
- Assert.assertEquals(consumer3ExpectMessages, consumer3Received);
-
- // messages not acked, test redelivery
-
- for (int i = 0; i < consumer1ExpectMessages; i++) {
- Message message = consumer1.receive();
- consumer1.acknowledge(message);
- consumer1Received++;
- }
-
- for (int i = 0; i < consumer2ExpectMessages; i++) {
- Message message = consumer2.receive();
- consumer2.acknowledge(message);
- consumer2Received++;
- }
-
- for (int i = 0; i < consumer3ExpectMessages; i++) {
- Message message = consumer3.receive();
- consumer3.acknowledge(message);
- consumer3Received++;
- }
+ List<KeyValue<Consumer<Integer>, Integer>> checkList = new ArrayList<>();
+ checkList.add(new KeyValue<>(consumer1, consumer1ExpectMessages));
+ checkList.add(new KeyValue<>(consumer2, consumer2ExpectMessages));
+ checkList.add(new KeyValue<>(consumer3, consumer3ExpectMessages));
- Assert.assertEquals(consumer1ExpectMessages * 2, consumer1Received);
- Assert.assertEquals(consumer2ExpectMessages * 2, consumer2Received);
- Assert.assertEquals(consumer3ExpectMessages * 2, consumer3Received);
+ receiveAndCheck(checkList);
}
@Test
- public void testConsumerCrashSendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {
+ public void testConsumerCrashSendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException, InterruptedException {
this.conf.setSubscriptionKeySharedEnable(true);
String topic = "persistent://public/default/key_shared_consumer_crash";
- Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+ @Cleanup
+ Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("key_shared")
.subscriptionType(SubscriptionType.Key_Shared)
- .ackTimeout(10, TimeUnit.SECONDS)
+ .ackTimeout(3, TimeUnit.SECONDS)
.subscribe();
- Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+ @Cleanup
+ Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("key_shared")
.subscriptionType(SubscriptionType.Key_Shared)
- .ackTimeout(10, TimeUnit.SECONDS)
+ .ackTimeout(3, TimeUnit.SECONDS)
.subscribe();
- Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
+ @Cleanup
+ Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("key_shared")
.subscriptionType(SubscriptionType.Key_Shared)
- .ackTimeout(10, TimeUnit.SECONDS)
+ .ackTimeout(3, TimeUnit.SECONDS)
.subscribe();
+ @Cleanup
+ Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+ .topic(topic)
+ .enableBatching(false)
+ .create();
+
int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
int consumer2Slot = consumer1Slot >> 1;
int consumer3Slot = consumer2Slot >> 1;
- Producer<byte[]> producer = pulsarClient.newProducer()
- .topic(topic)
- .enableBatching(false)
- .create();
-
int consumer1ExpectMessages = 0;
int consumer2ExpectMessages = 0;
int consumer3ExpectMessages = 0;
- final int totalSend = 100;
-
- for (int i = 0; i < totalSend; i++) {
- String key = UUID.randomUUID().toString();
- int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes())
+ for (int i = 0; i < 10; i++) {
+ for (String key : keys) {
+ int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes())
% HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
- if (slot < consumer3Slot) {
- consumer3ExpectMessages++;
- } else if (slot < consumer2Slot) {
- consumer2ExpectMessages++;
- } else {
- consumer1ExpectMessages++;
- }
- producer.newMessage()
+ if (slot < consumer3Slot) {
+ consumer3ExpectMessages++;
+ } else if (slot < consumer2Slot) {
+ consumer2ExpectMessages++;
+ } else {
+ consumer1ExpectMessages++;
+ }
+ producer.newMessage()
.key(key)
- .value(key.getBytes())
+ .value(i)
.send();
+ }
}
- int consumer1Received = 0;
- for (int i = 0; i < consumer1ExpectMessages; i++) {
- consumer1.receive();
- consumer1Received++;
- }
-
- int consumer2Received = 0;
- for (int i = 0; i < consumer2ExpectMessages; i++) {
- consumer2.receive();
- consumer2Received++;
- }
+ List<KeyValue<Consumer<Integer>, Integer>> checkList = new ArrayList<>();
+ checkList.add(new KeyValue<>(consumer1, consumer1ExpectMessages));
+ checkList.add(new KeyValue<>(consumer2, consumer2ExpectMessages));
+ checkList.add(new KeyValue<>(consumer3, consumer3ExpectMessages));
- int consumer3Received = 0;
- for (int i = 0; i < consumer3ExpectMessages; i++) {
- consumer3.receive();
- consumer3Received++;
- }
- Assert.assertEquals(consumer1ExpectMessages, consumer1Received);
- Assert.assertEquals(consumer2ExpectMessages, consumer2Received);
- Assert.assertEquals(consumer3ExpectMessages, consumer3Received);
+ receiveAndCheck(checkList);
consumer1.close();
consumer2.close();
- int receivedAfterConsumerCrash = 0;
- for (int i = 0; i < totalSend; i++) {
- Message message = consumer3.receive();
- consumer3.acknowledge(message);
- receivedAfterConsumerCrash++;
+ // avoid message replay
+ Message<Integer> message;
+ do {
+ message = consumer3.receive(1000, TimeUnit.MILLISECONDS);
+ if (message != null) {
+ consumer3.acknowledge(message);
+ }
+ } while (message != null);
+
+ for (int i = 0; i < 10; i++) {
+ for (String key : keys) {
+ producer.newMessage()
+ .key(key)
+ .value(i)
+ .send();
+ }
}
- Assert.assertEquals(receivedAfterConsumerCrash, totalSend);
+ checkList = new ArrayList<>();
+ checkList.add(new KeyValue<>(consumer3, 100));
+ receiveAndCheck(checkList);
}
@@ -249,97 +230,56 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
this.conf.setSubscriptionKeySharedEnable(true);
String topic = "persistent://public/default/key_shared_none_key";
- Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
+ @Cleanup
+ Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("key_shared")
.subscriptionType(SubscriptionType.Key_Shared)
+ .ackTimeout(3, TimeUnit.SECONDS)
.subscribe();
- Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
+ @Cleanup
+ Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("key_shared")
.subscriptionType(SubscriptionType.Key_Shared)
+ .ackTimeout(3, TimeUnit.SECONDS)
.subscribe();
- Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
+ @Cleanup
+ Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("key_shared")
.subscriptionType(SubscriptionType.Key_Shared)
+ .ackTimeout(3, TimeUnit.SECONDS)
.subscribe();
int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
int consumer2Slot = consumer1Slot >> 1;
int consumer3Slot = consumer2Slot >> 1;
- Producer<byte[]> producer = pulsarClient.newProducer()
+ @Cleanup
+ Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
.topic(topic)
.enableBatching(false)
.create();
for (int i = 0; i < 100; i++) {
producer.newMessage()
- .value(("Message - " + i).getBytes())
+ .value(i)
.send();
}
-
- int expectMessages = 100;
- int receiveMessages = 0;
int slot = Murmur3_32Hash.getInstance().makeHash(PersistentStickyKeyDispatcherMultipleConsumers.NONE_KEY.getBytes())
% HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+ List<KeyValue<Consumer<Integer>, Integer>> checkList = new ArrayList<>();
if (slot < consumer3Slot) {
- for (int i = 0; i < expectMessages; i++) {
- Message message = consumer3.receive();
- consumer3.acknowledge(message);
- receiveMessages++;
- }
+ checkList.add(new KeyValue<>(consumer3, 100));
} else if (slot < consumer2Slot) {
- for (int i = 0; i < expectMessages; i++) {
- Message message = consumer2.receive();
- consumer2.acknowledge(message);
- receiveMessages++;
- }
+ checkList.add(new KeyValue<>(consumer2, 100));
} else {
- for (int i = 0; i < expectMessages; i++) {
- Message message = consumer1.receive();
- consumer1.acknowledge(message);
- receiveMessages++;
- }
- }
- Assert.assertEquals(expectMessages, receiveMessages);
-
- Producer<byte[]> batchingProducer = pulsarClient.newProducer()
- .topic(topic)
- .enableBatching(true)
- .create();
-
- for (int i = 0; i < 100; i++) {
- batchingProducer.newMessage()
- .value(("Message - " + i).getBytes())
- .send();
+ checkList.add(new KeyValue<>(consumer1, 100));
}
-
- if (slot < consumer3Slot) {
- for (int i = 0; i < expectMessages; i++) {
- Message message = consumer3.receive();
- consumer3.acknowledge(message);
- receiveMessages++;
- }
- } else if (slot < consumer2Slot) {
- for (int i = 0; i < expectMessages; i++) {
- Message message = consumer2.receive();
- consumer2.acknowledge(message);
- receiveMessages++;
- }
- } else {
- for (int i = 0; i < expectMessages; i++) {
- Message message = consumer1.receive();
- consumer1.acknowledge(message);
- receiveMessages++;
- }
- }
-
- Assert.assertEquals(expectMessages * 2, receiveMessages);
-
+ receiveAndCheck(checkList);
}
@Test
@@ -347,79 +287,69 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
this.conf.setSubscriptionKeySharedEnable(true);
String topic = "persistent://public/default/key_shared_ordering_key";
- Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
- .topic(topic)
- .subscriptionName("key_shared")
- .subscriptionType(SubscriptionType.Key_Shared)
- .ackTimeout(10, TimeUnit.SECONDS)
- .subscribe();
+ @Cleanup
+ Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName("key_shared")
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .ackTimeout(3, TimeUnit.SECONDS)
+ .subscribe();
- Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
- .topic(topic)
- .subscriptionName("key_shared")
- .subscriptionType(SubscriptionType.Key_Shared)
- .ackTimeout(10, TimeUnit.SECONDS)
- .subscribe();
+ @Cleanup
+ Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName("key_shared")
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .ackTimeout(3, TimeUnit.SECONDS)
+ .subscribe();
- Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
- .topic(topic)
- .subscriptionName("key_shared")
- .subscriptionType(SubscriptionType.Key_Shared)
- .ackTimeout(10, TimeUnit.SECONDS)
- .subscribe();
+ @Cleanup
+ Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName("key_shared")
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .ackTimeout(3, TimeUnit.SECONDS)
+ .subscribe();
+
+ @Cleanup
+ Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+ .topic(topic)
+ .enableBatching(false)
+ .create();
int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
int consumer2Slot = consumer1Slot >> 1;
int consumer3Slot = consumer2Slot >> 1;
- Producer<byte[]> producer = pulsarClient.newProducer()
- .topic(topic)
- .enableBatching(false)
- .create();
-
int consumer1ExpectMessages = 0;
int consumer2ExpectMessages = 0;
int consumer3ExpectMessages = 0;
- for (int i = 0; i < 100; i++) {
- String key = UUID.randomUUID().toString();
- String orderingKey = UUID.randomUUID().toString();
- int slot = Murmur3_32Hash.getInstance().makeHash(orderingKey.getBytes())
- % HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
- if (slot < consumer3Slot) {
- consumer3ExpectMessages++;
- } else if (slot < consumer2Slot) {
- consumer2ExpectMessages++;
- } else {
- consumer1ExpectMessages++;
- }
- producer.newMessage()
- .key(key)
- .orderingKey(orderingKey.getBytes())
- .value(key.getBytes())
+ for (int i = 0; i < 10; i++) {
+ for (String key : keys) {
+ int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes())
+ % HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+ if (slot < consumer3Slot) {
+ consumer3ExpectMessages++;
+ } else if (slot < consumer2Slot) {
+ consumer2ExpectMessages++;
+ } else {
+ consumer1ExpectMessages++;
+ }
+ producer.newMessage()
+ .key("any key")
+ .orderingKey(key.getBytes())
+ .value(i)
.send();
+ }
}
- int consumer1Received = 0;
- for (int i = 0; i < consumer1ExpectMessages; i++) {
- consumer1.receive();
- consumer1Received++;
- }
-
- int consumer2Received = 0;
- for (int i = 0; i < consumer2ExpectMessages; i++) {
- consumer2.receive();
- consumer2Received++;
- }
+ List<KeyValue<Consumer<Integer>, Integer>> checkList = new ArrayList<>();
+ checkList.add(new KeyValue<>(consumer1, consumer1ExpectMessages));
+ checkList.add(new KeyValue<>(consumer2, consumer2ExpectMessages));
+ checkList.add(new KeyValue<>(consumer3, consumer3ExpectMessages));
- int consumer3Received = 0;
- for (int i = 0; i < consumer3ExpectMessages; i++) {
- consumer3.receive();
- consumer3Received++;
- }
- Assert.assertEquals(consumer1ExpectMessages, consumer1Received);
- Assert.assertEquals(consumer2ExpectMessages, consumer2Received);
- Assert.assertEquals(consumer3ExpectMessages, consumer3Received);
+ receiveAndCheck(checkList);
}
@Test(expectedExceptions = PulsarClientException.class)
@@ -433,4 +363,59 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
.ackTimeout(10, TimeUnit.SECONDS)
.subscribe();
}
+
+ private void receiveAndCheck(List<KeyValue<Consumer<Integer>, Integer>> checkList) throws PulsarClientException {
+ Map<Consumer, Set<String>> consumerKeys = new HashMap<>();
+ for (KeyValue<Consumer<Integer>, Integer> check : checkList) {
+ if (check.getValue() % 2 != 0) {
+ throw new IllegalArgumentException();
+ }
+ int received = 0;
+ Map<String, Message<Integer>> lastMessageForKey = new HashMap<>();
+ for (Integer i = 0; i < check.getValue(); i++) {
+ Message<Integer> message = check.getKey().receive();
+ if (i % 2 == 0) {
+ check.getKey().acknowledge(message);
+ }
+ String key = message.hasOrderingKey() ? new String(message.getOrderingKey()) : message.getKey();
+ log.info("[{}] Receive message key: {} value: {} messageId: {}",
+ check.getKey().getConsumerName(), key, message.getValue(), message.getMessageId());
+ // check messages is order by key
+ if (lastMessageForKey.get(key) == null) {
+ Assert.assertNotNull(message);
+ } else {
+ Assert.assertTrue(message.getValue()
+ .compareTo(lastMessageForKey.get(key).getValue()) > 0);
+ }
+ lastMessageForKey.put(key, message);
+ consumerKeys.putIfAbsent(check.getKey(), Sets.newHashSet());
+ consumerKeys.get(check.getKey()).add(key);
+ received++;
+ }
+ Assert.assertEquals(check.getValue().intValue(), received);
+ int redeliveryCount = check.getValue() / 2;
+ log.info("[{}] Consumer wait for {} messages redelivery ...", redeliveryCount);
+ // messages not acked, test redelivery
+ for (int i = 0; i < redeliveryCount; i++) {
+ Message<Integer> message = check.getKey().receive();
+ received++;
+ check.getKey().acknowledge(message);
+ String key = message.hasOrderingKey() ? new String(message.getOrderingKey()) : message.getKey();
+ log.info("[{}] Receive redeliver message key: {} value: {} messageId: {}",
+ check.getKey().getConsumerName(), key, message.getValue(), message.getMessageId());
+ }
+ Message noMessages = null;
+ try {
+ noMessages = check.getKey().receive(100, TimeUnit.MILLISECONDS);
+ } catch (PulsarClientException ignore) {
+ }
+ Assert.assertNull(noMessages, "redeliver too many messages.");
+ Assert.assertEquals((check.getValue() + redeliveryCount), received);
+ }
+ Set<String> allKeys = Sets.newHashSet();
+ consumerKeys.forEach((k, v) -> v.forEach(key -> {
+ assertTrue(allKeys.add(key),
+ "Key "+ key + "is distributed to multiple consumers." );
+ }));
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 3426e22..02d1858 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1253,7 +1253,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
checkArgument(messageIds.stream().findFirst().get() instanceof MessageIdImpl);
- if (conf.getSubscriptionType() != SubscriptionType.Shared) {
+ if (conf.getSubscriptionType() != SubscriptionType.Shared
+ && conf.getSubscriptionType() != SubscriptionType.Key_Shared) {
// We cannot redeliver single messages if subscription type is not Shared
redeliverUnacknowledgedMessages();
return;