You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/23 14:57:28 UTC
[pulsar] branch branch-2.10 updated: [improve][broker] Make MessageRedeliveryController work more efficiently (#17804)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 19eb842177f [improve][broker] Make MessageRedeliveryController work more efficiently (#17804)
19eb842177f is described below
commit 19eb842177f9e770ab59c4556188e8fa7468eed5
Author: Penghui Li <pe...@apache.org>
AuthorDate: Fri Sep 23 22:45:08 2022 +0800
[improve][broker] Make MessageRedeliveryController work more efficiently (#17804)
(cherry picked from commit c60f895cae6aa3a51a69603c6215c92e358a0e47)
---
distribution/server/src/assemble/LICENSE.bin.txt | 2 +
pom.xml | 8 +
pulsar-broker/pom.xml | 5 +
.../persistent/MessageRedeliveryController.java | 41 ++--
.../utils/ConcurrentBitmapSortedLongPairSet.java | 143 ++++++++++++++
.../MessageRedeliveryControllerTest.java | 22 +--
.../ConcurrentBitmapSortedLongPairSetTest.java | 211 +++++++++++++++++++++
7 files changed, 391 insertions(+), 41 deletions(-)
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index f5209320dd7..4783c17c83b 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -538,6 +538,8 @@ The Apache Software License, Version 2.0
- io.etcd-jetcd-core-0.5.11.jar
* IPAddress
- com.github.seancfoley-ipaddress-5.3.3.jar
+ * RoaringBitmap
+ - org.roaringbitmap-RoaringBitmap-0.9.15.jar
BSD 3-clause "New" or "Revised" License
* Google auth library
diff --git a/pom.xml b/pom.xml
index edf1c604a43..41d153c78b2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -263,6 +263,7 @@ flexible messaging model and an intuitive client API.</description>
<j2objc-annotations.version>1.3</j2objc-annotations.version>
<lightproto-maven-plugin.version>0.4</lightproto-maven-plugin.version>
<dependency-check-maven.version>7.1.0</dependency-check-maven.version>
+ <roaringbitmap.version>0.9.15</roaringbitmap.version>
<!-- Used to configure rename.netty.native. Libs -->
<rename.netty.native.libs>rename-netty-native-libs.sh</rename.netty.native.libs>
@@ -1276,6 +1277,13 @@ flexible messaging model and an intuitive client API.</description>
<artifactId>reload4j</artifactId>
<version>${reload4j.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.roaringbitmap</groupId>
+ <artifactId>RoaringBitmap</artifactId>
+ <version>${roaringbitmap.version}</version>
+ </dependency>
+
</dependencies>
</dependencyManagement>
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index cd331ab4b56..6c41a545d80 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -347,6 +347,11 @@
<artifactId>hppc</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.roaringbitmap</groupId>
+ <artifactId>RoaringBitmap</artifactId>
+ </dependency>
+
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-functions-api-examples</artifactId>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
index 46fa1b2b050..7aaba9a4e10 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
@@ -22,46 +22,43 @@ import com.google.common.collect.ComparisonChain;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
-import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap.LongPair;
-import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
-import org.apache.pulsar.common.util.collections.LongPairSet;
+import org.apache.pulsar.utils.ConcurrentBitmapSortedLongPairSet;
public class MessageRedeliveryController {
- private final LongPairSet messagesToRedeliver;
+ private final ConcurrentBitmapSortedLongPairSet messagesToRedeliver;
private final ConcurrentLongLongPairHashMap hashesToBeBlocked;
public MessageRedeliveryController(boolean allowOutOfOrderDelivery) {
- this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2, true);
+ this.messagesToRedeliver = new ConcurrentBitmapSortedLongPairSet();
this.hashesToBeBlocked = allowOutOfOrderDelivery
? null
: ConcurrentLongLongPairHashMap
.newBuilder().concurrencyLevel(2).expectedItems(128).autoShrink(true).build();
}
- public boolean add(long ledgerId, long entryId) {
- return messagesToRedeliver.add(ledgerId, entryId);
+ public void add(long ledgerId, long entryId) {
+ messagesToRedeliver.add(ledgerId, entryId);
}
- public boolean add(long ledgerId, long entryId, long stickyKeyHash) {
+ public void add(long ledgerId, long entryId, long stickyKeyHash) {
if (hashesToBeBlocked != null) {
hashesToBeBlocked.put(ledgerId, entryId, stickyKeyHash, 0);
}
- return messagesToRedeliver.add(ledgerId, entryId);
+ messagesToRedeliver.add(ledgerId, entryId);
}
- public boolean remove(long ledgerId, long entryId) {
+ public void remove(long ledgerId, long entryId) {
if (hashesToBeBlocked != null) {
hashesToBeBlocked.remove(ledgerId, entryId);
}
- return messagesToRedeliver.remove(ledgerId, entryId);
+ messagesToRedeliver.remove(ledgerId, entryId);
}
- public int removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) {
+ public void removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) {
if (hashesToBeBlocked != null) {
List<LongPair> keysToRemove = new ArrayList<>();
hashesToBeBlocked.forEach((ledgerId, entryId, stickyKeyHash, none) -> {
@@ -73,10 +70,7 @@ public class MessageRedeliveryController {
keysToRemove.forEach(longPair -> hashesToBeBlocked.remove(longPair.first, longPair.second));
keysToRemove.clear();
}
- return messagesToRedeliver.removeIf((ledgerId, entryId) -> {
- return ComparisonChain.start().compare(ledgerId, markDeleteLedgerId).compare(entryId, markDeleteEntryId)
- .result() <= 0;
- });
+ messagesToRedeliver.removeUpTo(markDeleteLedgerId, markDeleteEntryId + 1);
}
public boolean isEmpty() {
@@ -107,17 +101,6 @@ public class MessageRedeliveryController {
}
public Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
- if (hashesToBeBlocked != null) {
- // allowOutOfOrderDelivery is false
- return messagesToRedeliver.items().stream()
- .sorted((l1, l2) -> ComparisonChain.start().compare(l1.first, l2.first)
- .compare(l1.second, l2.second).result())
- .limit(maxMessagesToRead).map(longPair -> new PositionImpl(longPair.first, longPair.second))
- .collect(Collectors.toCollection(TreeSet::new));
- } else {
- // allowOutOfOrderDelivery is true
- return messagesToRedeliver.items(maxMessagesToRead,
- (ledgerId, entryId) -> new PositionImpl(ledgerId, entryId));
- }
+ return messagesToRedeliver.items(maxMessagesToRead, PositionImpl::new);
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java b/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java
new file mode 100644
index 00000000000..c9f1c65daca
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.pulsar.common.util.collections.LongPairSet;
+import org.roaringbitmap.RoaringBitmap;
+
+public class ConcurrentBitmapSortedLongPairSet {
+
+ private final NavigableMap<Long, RoaringBitmap> map = new TreeMap<>();
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ public void add(long item1, long item2) {
+ lock.writeLock().lock();
+ try {
+ RoaringBitmap bitSet = map.computeIfAbsent(item1, k -> new RoaringBitmap());
+ bitSet.add(item2, item2 + 1);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public void remove(long item1, long item2) {
+ lock.writeLock().lock();
+ try {
+ RoaringBitmap bitSet = map.get(item1);
+ if (bitSet != null) {
+ bitSet.remove(item2, item2 + 1);
+ if (bitSet.isEmpty()) {
+ map.remove(item1, bitSet);
+ }
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public boolean contains(long item1, long item2) {
+ lock.readLock().lock();
+ try {
+ RoaringBitmap bitSet = map.get(item1);
+ return bitSet != null && bitSet.contains(item2, item2 + 1);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public void removeUpTo(long item1, long item2) {
+ lock.writeLock().lock();
+ try {
+ Map.Entry<Long, RoaringBitmap> firstEntry = map.firstEntry();
+ while (firstEntry != null && firstEntry.getKey() <= item1) {
+ if (firstEntry.getKey() < item1) {
+ map.remove(firstEntry.getKey(), firstEntry.getValue());
+ } else {
+ RoaringBitmap bitSet = firstEntry.getValue();
+ if (bitSet != null) {
+ bitSet.remove(0, item2);
+ if (bitSet.isEmpty()) {
+ map.remove(firstEntry.getKey(), bitSet);
+ }
+ }
+ break;
+ }
+ firstEntry = map.firstEntry();
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+
+ public <T> Set<T> items(int numberOfItems, LongPairSet.LongPairFunction<T> longPairConverter) {
+ NavigableSet<T> items = new TreeSet<>();
+ lock.readLock().lock();
+ try {
+ for (Map.Entry<Long, RoaringBitmap> entry : map.entrySet()) {
+ Iterator<Integer> iterator = entry.getValue().stream().iterator();
+ while (iterator.hasNext() && items.size() < numberOfItems) {
+ items.add(longPairConverter.apply(entry.getKey(), iterator.next()));
+ }
+ if (items.size() == numberOfItems) {
+ break;
+ }
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+ return items;
+ }
+
+ public boolean isEmpty() {
+ lock.readLock().lock();
+ try {
+ return map.isEmpty() || map.values().stream().allMatch(RoaringBitmap::isEmpty);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public void clear() {
+ lock.writeLock().lock();
+ try {
+ map.clear();
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public int size() {
+ lock.readLock().lock();
+ try {
+ return map.isEmpty() ? 0 : map.values().stream().mapToInt(RoaringBitmap::getCardinality).sum();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java
index 478677a25e4..3cd6fc23de7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java
@@ -30,8 +30,8 @@ import java.lang.reflect.Field;
import java.util.Set;
import java.util.TreeSet;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.utils.ConcurrentBitmapSortedLongPairSet;
import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
-import org.apache.pulsar.common.util.collections.LongPairSet;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -48,7 +48,8 @@ public class MessageRedeliveryControllerTest {
Field messagesToRedeliverField = MessageRedeliveryController.class.getDeclaredField("messagesToRedeliver");
messagesToRedeliverField.setAccessible(true);
- LongPairSet messagesToRedeliver = (LongPairSet) messagesToRedeliverField.get(controller);
+ ConcurrentBitmapSortedLongPairSet messagesToRedeliver =
+ (ConcurrentBitmapSortedLongPairSet) messagesToRedeliverField.get(controller);
Field hashesToBeBlockedField = MessageRedeliveryController.class.getDeclaredField("hashesToBeBlocked");
hashesToBeBlockedField.setAccessible(true);
@@ -67,9 +68,8 @@ public class MessageRedeliveryControllerTest {
assertEquals(hashesToBeBlocked.size(), 0);
}
- assertTrue(controller.add(1, 1));
- assertTrue(controller.add(1, 2));
- assertFalse(controller.add(1, 1));
+ controller.add(1, 1);
+ controller.add(1, 2);
assertFalse(controller.isEmpty());
assertEquals(messagesToRedeliver.size(), 2);
@@ -81,9 +81,8 @@ public class MessageRedeliveryControllerTest {
assertFalse(hashesToBeBlocked.containsKey(1, 2));
}
- assertTrue(controller.remove(1, 1));
- assertTrue(controller.remove(1, 2));
- assertFalse(controller.remove(1, 1));
+ controller.remove(1, 1);
+ controller.remove(1, 2);
assertTrue(controller.isEmpty());
assertEquals(messagesToRedeliver.size(), 0);
@@ -93,10 +92,9 @@ public class MessageRedeliveryControllerTest {
assertEquals(hashesToBeBlocked.size(), 0);
}
- assertTrue(controller.add(2, 1, 100));
- assertTrue(controller.add(2, 2, 101));
- assertTrue(controller.add(2, 3, 101));
- assertFalse(controller.add(2, 1, 100));
+ controller.add(2, 1, 100);
+ controller.add(2, 2, 101);
+ controller.add(2, 3, 101);
assertFalse(controller.isEmpty());
assertEquals(messagesToRedeliver.size(), 3);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSetTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSetTest.java
new file mode 100644
index 00000000000..3c53fc159d0
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSetTest.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import lombok.Cleanup;
+import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet;
+import org.testng.annotations.Test;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+@Test(groups = "utils")
+public class ConcurrentBitmapSortedLongPairSetTest {
+
+ @Test
+ public void testAdd() {
+ ConcurrentBitmapSortedLongPairSet set = new ConcurrentBitmapSortedLongPairSet();
+ int items = 10;
+ for (int i = 0; i < items; i++) {
+ set.add(1, i);
+ }
+ assertEquals(set.size(), items);
+
+ for (int i = 0; i < items; i++) {
+ set.add(2, i);
+ }
+ assertEquals(set.size(), items * 2);
+
+ for (int i = 0; i < items; i++) {
+ set.add(2, i);
+ }
+ assertEquals(set.size(), items * 2);
+ }
+
+ @Test
+ public void testRemove() {
+ ConcurrentBitmapSortedLongPairSet set = new ConcurrentBitmapSortedLongPairSet();
+ int items = 10;
+ for (int i = 0; i < items; i++) {
+ set.add(1, i);
+ }
+
+ for (int i = 0; i < items / 2; i++) {
+ set.remove(1, i);
+ }
+ assertEquals(set.size(), items / 2);
+
+ for (int i = 0; i < items / 2; i++) {
+ set.remove(2, i);
+ }
+ assertEquals(set.size(), items / 2);
+
+ for (int i = 0; i < items / 2; i++) {
+ set.remove(1, i + 10000);
+ }
+ assertEquals(set.size(), items / 2);
+
+ for (int i = 0; i < items / 2; i++) {
+ set.remove(1, i + items / 2);
+ }
+ assertEquals(set.size(), 0);
+ assertTrue(set.isEmpty());
+ }
+
+ @Test
+ public void testContains() {
+ ConcurrentBitmapSortedLongPairSet set = new ConcurrentBitmapSortedLongPairSet();
+ assertFalse(set.contains(1, 1));
+
+ int items = 10;
+ for (int i = 0; i < items; i++) {
+ set.add(1, i);
+ }
+
+ for (int i = 0; i < items; i++) {
+ assertTrue(set.contains(1, i));
+ }
+
+ assertFalse(set.contains(1, 10000));
+ }
+
+ @Test
+ public void testRemoveUpTo() {
+ ConcurrentBitmapSortedLongPairSet set = new ConcurrentBitmapSortedLongPairSet();
+ set.removeUpTo(0, 1000);
+ set.removeUpTo(10, 10000);
+ assertTrue(set.isEmpty());
+
+ set.add(1, 0);
+
+ int items = 10;
+ for (int i = 0; i < items; i++) {
+ set.add(1, i);
+ }
+
+ set.removeUpTo(1, 5);
+ assertFalse(set.isEmpty());
+ assertEquals(set.size(), 5);
+
+ for (int i = 5; i < items; i++) {
+ assertTrue(set.contains(1, i));
+ }
+
+ set.removeUpTo(2, 0);
+ assertTrue(set.isEmpty());
+ }
+
+ @Test
+ public void testItems() {
+ ConcurrentBitmapSortedLongPairSet set = new ConcurrentBitmapSortedLongPairSet();
+ Set<ConcurrentLongPairSet.LongPair> items = set.items(10, ConcurrentLongPairSet.LongPair::new);
+ assertEquals(items.size(), 0);
+ for (int i = 0; i < 100; i++) {
+ set.add(1, i);
+ set.add(2, i);
+ set.add(5, i);
+ }
+ for (int i = 0; i < 100; i++) {
+ set.add(1, i + 1000);
+ set.add(2, i + 1000);
+ set.add(5, i + 1000);
+ }
+
+ for (int i = 0; i < 100; i++) {
+ set.add(1, i + 500);
+ set.add(2, i + 500);
+ set.add(5, i + 500);
+ }
+ assertEquals(set.size(), 900);
+ assertFalse(set.isEmpty());
+ items = set.items(10, ConcurrentLongPairSet.LongPair::new);
+ assertEquals(items.size(), 10);
+ ConcurrentLongPairSet.LongPair last = null;
+ for (ConcurrentLongPairSet.LongPair item : items) {
+ if (last != null) {
+ assertTrue(item.compareTo(last) > 0);
+ }
+ last = item;
+ }
+
+ items = set.items(900, ConcurrentLongPairSet.LongPair::new);
+ assertEquals(items.size(), 900);
+ last = null;
+ for (ConcurrentLongPairSet.LongPair item : items) {
+ if (last != null) {
+ assertTrue(item.compareTo(last) > 0);
+ }
+ last = item;
+ }
+
+ items = set.items(1000, ConcurrentLongPairSet.LongPair::new);
+ assertEquals(items.size(), 900);
+ }
+
+ @Test
+ public void concurrentInsertions() throws Throwable {
+ ConcurrentBitmapSortedLongPairSet set = new ConcurrentBitmapSortedLongPairSet();
+
+ @Cleanup("shutdownNow")
+ ExecutorService executor = Executors.newCachedThreadPool();
+
+ final int nThreads = 8;
+ final int N = 1000;
+
+ List<Future<?>> futures = new ArrayList<>();
+ for (int i = 0; i < nThreads; i++) {
+ final int threadIdx = i;
+
+ futures.add(executor.submit(() -> {
+ Random random = new Random();
+
+ for (int j = 0; j < N; j++) {
+ int key = random.nextInt();
+ // Ensure keys are unique
+ key -= key % (threadIdx + 1);
+ key = Math.abs(key);
+ set.add(key, key);
+ }
+ }));
+ }
+
+ for (Future<?> future : futures) {
+ future.get();
+ }
+
+ assertEquals(set.size(), N * nThreads);
+ }
+}