You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/06/06 09:05:41 UTC
[pulsar] branch master updated: Key_Shared subscription on
non-persistent topic (#4462)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6309149 Key_Shared subscription on non-persistent topic (#4462)
6309149 is described below
commit 630914901477357af9a47d687a0b43a7a7b7fa37
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Jun 6 17:05:37 2019 +0800
Key_Shared subscription on non-persistent topic (#4462)
* Support Key_Shared subscription on non-persistent topic.
* fix review comments.
---
.../AbstractDispatcherMultipleConsumers.java | 26 ++
.../NonPersistentDispatcherMultipleConsumers.java | 4 +-
...istentStickyKeyDispatcherMultipleConsumers.java | 102 +++++++
.../nonpersistent/NonPersistentSubscription.java | 5 +
...istentStickyKeyDispatcherMultipleConsumers.java | 22 --
.../NonPersistentKeySharedSubscriptionTest.java | 335 +++++++++++++++++++++
6 files changed, 470 insertions(+), 24 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
index d663752..c36c8ab 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java
@@ -20,11 +20,18 @@ package org.apache.pulsar.broker.service;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
+import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.utils.CopyOnWriteArrayList;
import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.ObjectSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
*/
@@ -205,4 +212,23 @@ public abstract class AbstractDispatcherMultipleConsumers extends AbstractBaseDi
return -1;
}
+ public static final String NONE_KEY = "NONE_KEY";
+ protected byte[] peekStickyKey(ByteBuf metadataAndPayload) {
+ metadataAndPayload.markReaderIndex();
+ PulsarApi.MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
+ metadataAndPayload.resetReaderIndex();
+ String key = metadata.getPartitionKey();
+ if (log.isDebugEnabled()) {
+ log.debug("Parse message metadata, partition key is {}, ordering key is {}", key, metadata.getOrderingKey());
+ }
+ if (StringUtils.isNotBlank(key) || metadata.hasOrderingKey()) {
+ return metadata.hasOrderingKey() ? metadata.getOrderingKey().toByteArray() : key.getBytes();
+ }
+ metadata.recycle();
+ return NONE_KEY.getBytes();
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);
+
+
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 1bb4042..78513c5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -46,11 +46,11 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher
implements NonPersistentDispatcher {
private final NonPersistentTopic topic;
- private final Subscription subscription;
+ protected final Subscription subscription;
private CompletableFuture<Void> closeFuture = null;
private final String name;
- private final Rate msgDrop;
+ protected final Rate msgDrop;
protected static final AtomicIntegerFieldUpdater<NonPersistentDispatcherMultipleConsumers> TOTAL_AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(NonPersistentDispatcherMultipleConsumers.class, "totalAvailablePermits");
@SuppressWarnings("unused")
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
new file mode 100644
index 0000000..3f0d903
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.nonpersistent;
+
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.EntryBatchSizes;
+import org.apache.pulsar.broker.service.HashRangeStickyKeyConsumerSelector;
+import org.apache.pulsar.broker.service.SendMessageInfo;
+import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersistentDispatcherMultipleConsumers {
+
+ private final StickyKeyConsumerSelector selector;
+
+ public NonPersistentStickyKeyDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription) {
+ super(topic, subscription);
+ //TODO: Consumer selector Pluggable
+ selector = new HashRangeStickyKeyConsumerSelector();
+ }
+
+ @Override
+ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
+ super.addConsumer(consumer);
+ selector.addConsumer(consumer);
+ }
+
+ @Override
+ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
+ super.removeConsumer(consumer);
+ selector.removeConsumer(consumer);
+ }
+
+ @Override
+ public SubType getType() {
+ return SubType.Key_Shared;
+ }
+
+ @Override
+ public void sendMessages(List<Entry> entries) {
+ if (entries.size() > 0) {
+ final Map<Integer, List<Entry>> groupedEntries = new HashMap<>();
+ for (Entry entry : entries) {
+ int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer()));
+ groupedEntries.putIfAbsent(key, new ArrayList<>());
+ groupedEntries.get(key).add(entry);
+ }
+ final Iterator<Map.Entry<Integer, List<Entry>>> iterator = groupedEntries.entrySet().iterator();
+ while (iterator.hasNext()) {
+ final Map.Entry<Integer, List<Entry>> entriesWithSameKey = iterator.next();
+ //TODO: None key policy
+ Consumer consumer = null;
+ if (selector instanceof HashRangeStickyKeyConsumerSelector) {
+ consumer = ((HashRangeStickyKeyConsumerSelector)selector).select(entriesWithSameKey.getKey());
+ }
+ if (consumer != null) {
+ SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
+ EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesWithSameKey.getValue().size());
+ filterEntriesForConsumer(entriesWithSameKey.getValue(), batchSizes, sendMessageInfo);
+ consumer.sendMessages(entriesWithSameKey.getValue(), batchSizes, sendMessageInfo.getTotalMessages(),
+ sendMessageInfo.getTotalBytes(), getRedeliveryTracker());
+ TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages());
+ } else {
+ entries.forEach(entry -> {
+ int totalMsgs = Commands.getNumberOfMessagesInBatch(entry.getDataBuffer(), subscription.toString(), -1);
+ if (totalMsgs > 0) {
+ msgDrop.recordEvent(totalMsgs);
+ }
+ entry.release();
+ });
+ }
+ }
+ }
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 5c060ba..beba94b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -111,6 +111,11 @@ public class NonPersistentSubscription implements Subscription {
topic, this);
}
break;
+ case Key_Shared:
+ if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared) {
+ dispatcher = new NonPersistentStickyKeyDispatcherMultipleConsumers(topic, this);
+ }
+ break;
default:
throw new ServerMetadataException("Unsupported subscription type");
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 0cbe354..26d599e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -18,8 +18,6 @@
*/
package org.apache.pulsar.broker.service.persistent;
-import io.netty.buffer.ByteBuf;
-
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -31,7 +29,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
-import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchSizes;
@@ -39,17 +36,13 @@ import org.apache.pulsar.broker.service.HashRangeStickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Subscription;
-import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
-import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDispatcherMultipleConsumers {
- public static final String NONE_KEY = "NONE_KEY";
-
private final StickyKeyConsumerSelector selector;
PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Subscription subscription) {
@@ -171,21 +164,6 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
return cursor.asyncReplayEntries(positions, this, ReadType.Replay, true);
}
- private byte[] peekStickyKey(ByteBuf metadataAndPayload) {
- metadataAndPayload.markReaderIndex();
- MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
- metadataAndPayload.resetReaderIndex();
- String key = metadata.getPartitionKey();
- if (log.isDebugEnabled()) {
- log.debug("Parse message metadata, partition key is {}, ordering key is {}", key, metadata.getOrderingKey());
- }
- if (StringUtils.isNotBlank(key) || metadata.hasOrderingKey()) {
- return metadata.hasOrderingKey() ? metadata.getOrderingKey().toByteArray() : key.getBytes();
- }
- metadata.recycle();
- return NONE_KEY.getBytes();
- }
-
private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentKeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentKeySharedSubscriptionTest.java
new file mode 100644
index 0000000..f03460d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentKeySharedSubscriptionTest.java
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import com.google.common.collect.Sets;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.service.HashRangeStickyKeyConsumerSelector;
+import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.testng.Assert.assertTrue;
+
+public class NonPersistentKeySharedSubscriptionTest extends ProducerConsumerBase {
+
+ private static final Logger log = LoggerFactory.getLogger(NonPersistentKeySharedSubscriptionTest.class);
+ private static final List<String> keys = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");
+
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.producerBaseSetup();
+ }
+
+ @AfterMethod
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testSendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {
+ this.conf.setSubscriptionKeySharedEnable(true);
+ String topic = "non-persistent://public/default/key_shared";
+
+ @Cleanup
+ Consumer<Integer> consumer1 = createConsumer(topic);
+
+ @Cleanup
+ Consumer<Integer> consumer2 = createConsumer(topic);
+
+ @Cleanup
+ Consumer<Integer> consumer3 = createConsumer(topic);
+
+ @Cleanup
+ Producer<Integer> producer = createProducer(topic);
+
+ int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+ int consumer2Slot = consumer1Slot >> 1;
+ int consumer3Slot = consumer2Slot >> 1;
+
+ int consumer1ExpectMessages = 0;
+ int consumer2ExpectMessages = 0;
+ int consumer3ExpectMessages = 0;
+
+ for (int i = 0; i < 10; i++) {
+ for (String key : keys) {
+ int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes())
+ % HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+ if (slot < consumer3Slot) {
+ consumer3ExpectMessages++;
+ } else if (slot < consumer2Slot) {
+ consumer2ExpectMessages++;
+ } else {
+ consumer1ExpectMessages++;
+ }
+ producer.newMessage()
+ .key(key)
+ .value(i)
+ .send();
+ }
+ }
+
+ List<KeyValue<Consumer<Integer>, Integer>> checkList = new ArrayList<>();
+ checkList.add(new KeyValue<>(consumer1, consumer1ExpectMessages));
+ checkList.add(new KeyValue<>(consumer2, consumer2ExpectMessages));
+ checkList.add(new KeyValue<>(consumer3, consumer3ExpectMessages));
+
+ receiveAndCheck(checkList);
+ }
+
+ @Test
+ public void testConsumerCrashSendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException, InterruptedException {
+
+ this.conf.setSubscriptionKeySharedEnable(true);
+ String topic = "non-persistent://public/default/key_shared_consumer_crash";
+
+ @Cleanup
+ Consumer<Integer> consumer1 = createConsumer(topic);
+
+ @Cleanup
+ Consumer<Integer> consumer2 = createConsumer(topic);
+
+ @Cleanup
+ Consumer<Integer> consumer3 = createConsumer(topic);
+
+ @Cleanup
+ Producer<Integer> producer = createProducer(topic);
+
+ int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+ int consumer2Slot = consumer1Slot >> 1;
+ int consumer3Slot = consumer2Slot >> 1;
+
+ int consumer1ExpectMessages = 0;
+ int consumer2ExpectMessages = 0;
+ int consumer3ExpectMessages = 0;
+
+ for (int i = 0; i < 10; i++) {
+ for (String key : keys) {
+ int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes())
+ % HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+ if (slot < consumer3Slot) {
+ consumer3ExpectMessages++;
+ } else if (slot < consumer2Slot) {
+ consumer2ExpectMessages++;
+ } else {
+ consumer1ExpectMessages++;
+ }
+ producer.newMessage()
+ .key(key)
+ .value(i)
+ .send();
+ }
+ }
+
+ List<KeyValue<Consumer<Integer>, Integer>> checkList = new ArrayList<>();
+ checkList.add(new KeyValue<>(consumer1, consumer1ExpectMessages));
+ checkList.add(new KeyValue<>(consumer2, consumer2ExpectMessages));
+ checkList.add(new KeyValue<>(consumer3, consumer3ExpectMessages));
+
+ receiveAndCheck(checkList);
+
+ // wait for consumer grouping acking send.
+ Thread.sleep(1000);
+
+ consumer1.close();
+ consumer2.close();
+
+ for (int i = 0; i < 10; i++) {
+ for (String key : keys) {
+ producer.newMessage()
+ .key(key)
+ .value(i)
+ .send();
+ }
+ }
+
+ checkList = new ArrayList<>();
+ checkList.add(new KeyValue<>(consumer3, 100));
+ receiveAndCheck(checkList);
+ }
+
+ @Test
+ public void testNonKeySendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {
+ this.conf.setSubscriptionKeySharedEnable(true);
+ String topic = "non-persistent://public/default/key_shared_none_key";
+
+ @Cleanup
+ Consumer<Integer> consumer1 = createConsumer(topic);
+
+ @Cleanup
+ Consumer<Integer> consumer2 = createConsumer(topic);
+
+ @Cleanup
+ Consumer<Integer> consumer3 = createConsumer(topic);
+
+ @Cleanup
+ Producer<Integer> producer = createProducer(topic);
+
+ int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+ int consumer2Slot = consumer1Slot >> 1;
+ int consumer3Slot = consumer2Slot >> 1;
+
+ for (int i = 0; i < 100; i++) {
+ producer.newMessage()
+ .value(i)
+ .send();
+ }
+ int slot = Murmur3_32Hash.getInstance().makeHash(PersistentStickyKeyDispatcherMultipleConsumers.NONE_KEY.getBytes())
+ % HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+ List<KeyValue<Consumer<Integer>, Integer>> checkList = new ArrayList<>();
+ if (slot < consumer3Slot) {
+ checkList.add(new KeyValue<>(consumer3, 100));
+ } else if (slot < consumer2Slot) {
+ checkList.add(new KeyValue<>(consumer2, 100));
+ } else {
+ checkList.add(new KeyValue<>(consumer1, 100));
+ }
+ receiveAndCheck(checkList);
+ }
+
+ @Test
+ public void testOrderingKeyWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {
+ this.conf.setSubscriptionKeySharedEnable(true);
+ String topic = "non-persistent://public/default/key_shared_ordering_key";
+
+ @Cleanup
+ Consumer<Integer> consumer1 = createConsumer(topic);
+
+ @Cleanup
+ Consumer<Integer> consumer2 = createConsumer(topic);
+
+ @Cleanup
+ Consumer<Integer> consumer3 = createConsumer(topic);
+
+ @Cleanup
+ Producer<Integer> producer = createProducer(topic);
+
+ int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+ int consumer2Slot = consumer1Slot >> 1;
+ int consumer3Slot = consumer2Slot >> 1;
+
+ int consumer1ExpectMessages = 0;
+ int consumer2ExpectMessages = 0;
+ int consumer3ExpectMessages = 0;
+
+ for (int i = 0; i < 10; i++) {
+ for (String key : keys) {
+ int slot = Murmur3_32Hash.getInstance().makeHash(key.getBytes())
+ % HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
+ if (slot < consumer3Slot) {
+ consumer3ExpectMessages++;
+ } else if (slot < consumer2Slot) {
+ consumer2ExpectMessages++;
+ } else {
+ consumer1ExpectMessages++;
+ }
+ producer.newMessage()
+ .key("any key")
+ .orderingKey(key.getBytes())
+ .value(i)
+ .send();
+ }
+ }
+
+ List<KeyValue<Consumer<Integer>, Integer>> checkList = new ArrayList<>();
+ checkList.add(new KeyValue<>(consumer1, consumer1ExpectMessages));
+ checkList.add(new KeyValue<>(consumer2, consumer2ExpectMessages));
+ checkList.add(new KeyValue<>(consumer3, consumer3ExpectMessages));
+
+ receiveAndCheck(checkList);
+ }
+
+ @Test(expectedExceptions = PulsarClientException.class)
+ public void testDisableKeySharedSubscription() throws PulsarClientException {
+ this.conf.setSubscriptionKeySharedEnable(false);
+ String topic = "persistent://public/default/key_shared_disabled";
+ pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName("key_shared")
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .ackTimeout(10, TimeUnit.SECONDS)
+ .subscribe();
+ }
+
+ private Producer<Integer> createProducer(String topic) throws PulsarClientException {
+ return pulsarClient.newProducer(Schema.INT32)
+ .topic(topic)
+ .enableBatching(false)
+ .create();
+ }
+
+ private Consumer<Integer> createConsumer(String topic) throws PulsarClientException {
+ return pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName("key_shared")
+ .subscriptionType(SubscriptionType.Key_Shared)
+ .ackTimeout(3, TimeUnit.SECONDS)
+ .subscribe();
+ }
+
+ private void receiveAndCheck(List<KeyValue<Consumer<Integer>, Integer>> checkList) throws PulsarClientException {
+ Map<Consumer, Set<String>> consumerKeys = new HashMap<>();
+ for (KeyValue<Consumer<Integer>, Integer> check : checkList) {
+ int received = 0;
+ Map<String, Message<Integer>> lastMessageForKey = new HashMap<>();
+ for (Integer i = 0; i < check.getValue(); i++) {
+ Message<Integer> message = check.getKey().receive();
+ check.getKey().acknowledge(message);
+ String key = message.hasOrderingKey() ? new String(message.getOrderingKey()) : message.getKey();
+ log.info("[{}] Receive message key: {} value: {} messageId: {}",
+ check.getKey().getConsumerName(), key, message.getValue(), message.getMessageId());
+ // check messages is order by key
+ if (lastMessageForKey.get(key) == null) {
+ Assert.assertNotNull(message);
+ } else {
+ Assert.assertTrue(message.getValue()
+ .compareTo(lastMessageForKey.get(key).getValue()) > 0);
+ }
+ lastMessageForKey.put(key, message);
+ consumerKeys.putIfAbsent(check.getKey(), Sets.newHashSet());
+ consumerKeys.get(check.getKey()).add(key);
+ received++;
+ }
+ Assert.assertEquals(check.getValue().intValue(), received);
+ }
+ Set<String> allKeys = Sets.newHashSet();
+ consumerKeys.forEach((k, v) -> v.forEach(key -> {
+ assertTrue(allKeys.add(key),
+ "Key "+ key + "is distributed to multiple consumers." );
+ }));
+ }
+}