You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/04/05 16:07:07 UTC

[pulsar] branch master updated: Improve Key_Shared subscription message dispatching performance. (#6647)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c555eb7  Improve Key_Shared subscription message dispatching performance. (#6647)
c555eb7 is described below

commit c555eb7c915dcceec4df5273eacf5b38b2261123
Author: lipenghui <pe...@apache.org>
AuthorDate: Mon Apr 6 00:06:54 2020 +0800

    Improve Key_Shared subscription message dispatching performance. (#6647)
    
    * Improve Key_Shared subscription message dispatching performance.
    
    * Fix unit tests.
    
    * Remove system.out.println
---
 ...ashRangeAutoSplitStickyKeyConsumerSelector.java | 14 ++++++++++++
 ...ashRangeExclusiveStickyKeyConsumerSelector.java | 22 ++++++++++++++++++
 .../broker/service/StickyKeyConsumerSelector.java  |  9 ++++++++
 ...istentStickyKeyDispatcherMultipleConsumers.java |  4 ++--
 ...istentStickyKeyDispatcherMultipleConsumers.java |  4 ++--
 .../pulsar/broker/service/BatchMessageTest.java    | 26 ++++++++++++----------
 6 files changed, 63 insertions(+), 16 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
index 5c3c5b5..a6e93c2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
@@ -115,6 +115,15 @@ public class HashRangeAutoSplitStickyKeyConsumerSelector implements StickyKeyCon
         }
     }
 
+    @Override
+    public Consumer selectByIndex(int index) {
+        if (rangeMap.size() > 0) {
+            return rangeMap.ceilingEntry(index).getValue();
+        } else {
+            return null;
+        }
+    }
+
     private int findBiggestRange() {
         int slots = 0;
         int busiestRange = rangeSize;
@@ -150,6 +159,11 @@ public class HashRangeAutoSplitStickyKeyConsumerSelector implements StickyKeyCon
         return (num & num - 1) == 0;
     }
 
+    @Override
+    public int getRangeSize() {
+        return rangeSize;
+    }
+
     Map<Consumer, Integer> getConsumerRange() {
         return Collections.unmodifiableMap(consumerRange);
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
index 21e94ba..8fb99ed 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
@@ -80,6 +80,28 @@ public class HashRangeExclusiveStickyKeyConsumerSelector implements StickyKeyCon
         }
     }
 
+    @Override
+    public Consumer selectByIndex(int index) {
+        if (rangeMap.size() > 0) {
+            Map.Entry<Integer, Consumer> ceilingEntry = rangeMap.ceilingEntry(index);
+            Map.Entry<Integer, Consumer> floorEntry = rangeMap.floorEntry(index);
+            Consumer ceilingConsumer = ceilingEntry != null ? ceilingEntry.getValue() : null;
+            Consumer floorConsumer = floorEntry != null ? floorEntry.getValue() : null;
+            if (floorConsumer != null && floorConsumer.equals(ceilingConsumer)) {
+                return ceilingConsumer;
+            } else {
+                return null;
+            }
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public int getRangeSize() {
+        return rangeSize;
+    }
+
     private void validateKeySharedMeta(Consumer consumer) throws BrokerServiceException.ConsumerAssignException {
         if (consumer.getKeySharedMeta() == null) {
             throw new BrokerServiceException.ConsumerAssignException("Must specify key shared meta for consumer.");
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
index 88852b5..545e42d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
@@ -50,4 +50,13 @@ public interface StickyKeyConsumerSelector {
      * @return
      */
     Consumer select(int keyHash);
+
+    /**
+     * Select a consumer by key hash range index.
+     * @param index index of the key hash range
+     * @return
+     */
+    Consumer selectByIndex(int index);
+
+    int getRangeSize();
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index c5183cd..3fdad35 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -67,7 +67,7 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
         if (entries.size() > 0) {
             final Map<Integer, List<Entry>> groupedEntries = new HashMap<>();
             for (Entry entry : entries) {
-                int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer()));
+                int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer())) % selector.getRangeSize();
                 groupedEntries.putIfAbsent(key, new ArrayList<>());
                 groupedEntries.get(key).add(entry);
             }
@@ -75,7 +75,7 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
             while (iterator.hasNext()) {
                 final Map.Entry<Integer, List<Entry>> entriesWithSameKey = iterator.next();
                 //TODO: None key policy
-                Consumer consumer = selector.select(entriesWithSameKey.getKey());
+                Consumer consumer = selector.selectByIndex(entriesWithSameKey.getKey());
                 if (consumer != null) {
                     SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
                     EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesWithSameKey.getValue().size());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index abe0cf7..248f45a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -73,7 +73,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         }
         final Map<Integer, List<Entry>> groupedEntries = new HashMap<>();
         for (Entry entry : entries) {
-            int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer()));
+            int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer())) % selector.getRangeSize();
             groupedEntries.putIfAbsent(key, new ArrayList<>());
             groupedEntries.get(key).add(entry);
         }
@@ -82,7 +82,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         while (iterator.hasNext() && totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
             final Map.Entry<Integer, List<Entry>> entriesWithSameKey = iterator.next();
             //TODO: None key policy
-            Consumer consumer = selector.select(entriesWithSameKey.getKey());
+            Consumer consumer = selector.selectByIndex(entriesWithSameKey.getKey());
             if (consumer == null) {
                 // Do nothing, cursor will be rewind at reconnection
                 log.info("[{}] rewind because no available consumer found for key {} from total {}", name,
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
index 3d5f956..1ddbd41 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java
@@ -753,16 +753,17 @@ public class BatchMessageTest extends BrokerTestBase {
         }
         FutureUtil.waitForAll(sendFutureList).get();
 
+        String receivedKey = "";
+        int receivedMessageIndex = 0;
         for (int i = 0; i < 30; i++) {
             Message<byte[]> received = consumer.receive();
-            if (i < 10) {
-                assertEquals(received.getKey(), "key-1");
-            } else if (i < 20) {
-                assertEquals(received.getKey(), "key-2");
-            } else {
-                assertEquals(received.getKey(), "key-3");
+            if (!received.getKey().equals(receivedKey)) {
+                receivedKey = received.getKey();
+                receivedMessageIndex = 0;
             }
+            assertEquals(new String(received.getValue()), "my-message-" + receivedMessageIndex % 10);
             consumer.acknowledge(received);
+            receivedMessageIndex++;
         }
 
         for (int i = 0; i < 10; i++) {
@@ -777,16 +778,17 @@ public class BatchMessageTest extends BrokerTestBase {
         }
         FutureUtil.waitForAll(sendFutureList).get();
 
+        receivedKey = "";
+        receivedMessageIndex = 0;
         for (int i = 0; i < 30; i++) {
             Message<byte[]> received = consumer.receive();
-            if (i < 10) {
-                assertEquals(new String(received.getOrderingKey()), "key-1");
-            } else if (i < 20) {
-                assertEquals(new String(received.getOrderingKey()), "key-2");
-            } else {
-                assertEquals(new String(received.getOrderingKey()), "key-3");
+            if (!new String(received.getOrderingKey()).equals(receivedKey)) {
+                receivedKey = new String(received.getOrderingKey());
+                receivedMessageIndex = 0;
             }
+            assertEquals(new String(received.getValue()), "my-message-" + receivedMessageIndex % 10);
             consumer.acknowledge(received);
+            receivedMessageIndex++;
         }
 
         consumer.close();