You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/04/05 16:07:07 UTC
[pulsar] branch master updated: Improve Key_Shared subscription
message dispatching performance. (#6647)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c555eb7 Improve Key_Shared subscription message dispatching performance. (#6647)
c555eb7 is described below
commit c555eb7c915dcceec4df5273eacf5b38b2261123
Author: lipenghui <pe...@apache.org>
AuthorDate: Mon Apr 6 00:06:54 2020 +0800
Improve Key_Shared subscription message dispatching performance. (#6647)
* Improve Key_Shared subscription message dispatching performance.
* Fix unit tests.
* Remove system.out.println
---
...ashRangeAutoSplitStickyKeyConsumerSelector.java | 14 ++++++++++++
...ashRangeExclusiveStickyKeyConsumerSelector.java | 22 ++++++++++++++++++
.../broker/service/StickyKeyConsumerSelector.java | 9 ++++++++
...istentStickyKeyDispatcherMultipleConsumers.java | 4 ++--
...istentStickyKeyDispatcherMultipleConsumers.java | 4 ++--
.../pulsar/broker/service/BatchMessageTest.java | 26 ++++++++++++----------
6 files changed, 63 insertions(+), 16 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
index 5c3c5b5..a6e93c2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
@@ -115,6 +115,15 @@ public class HashRangeAutoSplitStickyKeyConsumerSelector implements StickyKeyCon
}
}
+ @Override
+ public Consumer selectByIndex(int index) {
+ if (rangeMap.size() > 0) {
+ return rangeMap.ceilingEntry(index).getValue();
+ } else {
+ return null;
+ }
+ }
+
private int findBiggestRange() {
int slots = 0;
int busiestRange = rangeSize;
@@ -150,6 +159,11 @@ public class HashRangeAutoSplitStickyKeyConsumerSelector implements StickyKeyCon
return (num & num - 1) == 0;
}
+ @Override
+ public int getRangeSize() {
+ return rangeSize;
+ }
+
Map<Consumer, Integer> getConsumerRange() {
return Collections.unmodifiableMap(consumerRange);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
index 21e94ba..8fb99ed 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
@@ -80,6 +80,28 @@ public class HashRangeExclusiveStickyKeyConsumerSelector implements StickyKeyCon
}
}
+ @Override
+ public Consumer selectByIndex(int index) {
+ if (rangeMap.size() > 0) {
+ Map.Entry<Integer, Consumer> ceilingEntry = rangeMap.ceilingEntry(index);
+ Map.Entry<Integer, Consumer> floorEntry = rangeMap.floorEntry(index);
+ Consumer ceilingConsumer = ceilingEntry != null ? ceilingEntry.getValue() : null;
+ Consumer floorConsumer = floorEntry != null ? floorEntry.getValue() : null;
+ if (floorConsumer != null && floorConsumer.equals(ceilingConsumer)) {
+ return ceilingConsumer;
+ } else {
+ return null;
+ }
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public int getRangeSize() {
+ return rangeSize;
+ }
+
private void validateKeySharedMeta(Consumer consumer) throws BrokerServiceException.ConsumerAssignException {
if (consumer.getKeySharedMeta() == null) {
throw new BrokerServiceException.ConsumerAssignException("Must specify key shared meta for consumer.");
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
index 88852b5..545e42d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
@@ -50,4 +50,13 @@ public interface StickyKeyConsumerSelector {
* @return
*/
Consumer select(int keyHash);
+
+ /**
+ * Select a consumer by key hash range index.
+ * @param index index of the key hash range
+ * @return
+ */
+ Consumer selectByIndex(int index);
+
+ int getRangeSize();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index c5183cd..3fdad35 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -67,7 +67,7 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
if (entries.size() > 0) {
final Map<Integer, List<Entry>> groupedEntries = new HashMap<>();
for (Entry entry : entries) {
- int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer()));
+ int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer())) % selector.getRangeSize();
groupedEntries.putIfAbsent(key, new ArrayList<>());
groupedEntries.get(key).add(entry);
}
@@ -75,7 +75,7 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
while (iterator.hasNext()) {
final Map.Entry<Integer, List<Entry>> entriesWithSameKey = iterator.next();
//TODO: None key policy
- Consumer consumer = selector.select(entriesWithSameKey.getKey());
+ Consumer consumer = selector.selectByIndex(entriesWithSameKey.getKey());
if (consumer != null) {
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesWithSameKey.getValue().size());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index abe0cf7..248f45a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -73,7 +73,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
}
final Map<Integer, List<Entry>> groupedEntries = new HashMap<>();
for (Entry entry : entries) {
- int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer()));
+ int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer())) % selector.getRangeSize();
groupedEntries.putIfAbsent(key, new ArrayList<>());
groupedEntries.get(key).add(entry);
}
@@ -82,7 +82,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
while (iterator.hasNext() && totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
final Map.Entry<Integer, List<Entry>> entriesWithSameKey = iterator.next();
//TODO: None key policy
- Consumer consumer = selector.select(entriesWithSameKey.getKey());
+ Consumer consumer = selector.selectByIndex(entriesWithSameKey.getKey());
if (consumer == null) {
// Do nothing, cursor will be rewind at reconnection
log.info("[{}] rewind because no available consumer found for key {} from total {}", name,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
index 3d5f956..1ddbd41 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
@@ -753,16 +753,17 @@ public class BatchMessageTest extends BrokerTestBase {
}
FutureUtil.waitForAll(sendFutureList).get();
+ String receivedKey = "";
+ int receivedMessageIndex = 0;
for (int i = 0; i < 30; i++) {
Message<byte[]> received = consumer.receive();
- if (i < 10) {
- assertEquals(received.getKey(), "key-1");
- } else if (i < 20) {
- assertEquals(received.getKey(), "key-2");
- } else {
- assertEquals(received.getKey(), "key-3");
+ if (!received.getKey().equals(receivedKey)) {
+ receivedKey = received.getKey();
+ receivedMessageIndex = 0;
}
+ assertEquals(new String(received.getValue()), "my-message-" + receivedMessageIndex % 10);
consumer.acknowledge(received);
+ receivedMessageIndex++;
}
for (int i = 0; i < 10; i++) {
@@ -777,16 +778,17 @@ public class BatchMessageTest extends BrokerTestBase {
}
FutureUtil.waitForAll(sendFutureList).get();
+ receivedKey = "";
+ receivedMessageIndex = 0;
for (int i = 0; i < 30; i++) {
Message<byte[]> received = consumer.receive();
- if (i < 10) {
- assertEquals(new String(received.getOrderingKey()), "key-1");
- } else if (i < 20) {
- assertEquals(new String(received.getOrderingKey()), "key-2");
- } else {
- assertEquals(new String(received.getOrderingKey()), "key-3");
+ if (!new String(received.getOrderingKey()).equals(receivedKey)) {
+ receivedKey = new String(received.getOrderingKey());
+ receivedMessageIndex = 0;
}
+ assertEquals(new String(received.getValue()), "my-message-" + receivedMessageIndex % 10);
consumer.acknowledge(received);
+ receivedMessageIndex++;
}
consumer.close();