You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/22 12:56:03 UTC

[GitHub] [pulsar] codelipenghui opened a new pull request, #17804: [improve][broker] Make MessageRedeliveryController work more efficiently

codelipenghui opened a new pull request, #17804:
URL: https://github.com/apache/pulsar/pull/17804

   ### Motivation
   
   The implementation of MessageRedeliveryController is inefficient
   
   - Duplicated Ledger IDs https://github.com/apache/pulsar/blob/master/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSet.java#L50
   - high sorting overhead even if you just want to get a few redeliveries https://github.com/apache/pulsar/compare/master...codelipenghui:incubator-pulsar:penghui/improve-relay-queue#diff-4c2d35368b21e5c33d65886196e8673c04a69f5c5cb888cdd04c29f8823502fdL112-L116
   
   <img width="1842" alt="image" src="https://user-images.githubusercontent.com/12592133/191750398-61caba7e-6056-48f5-9932-3ed59ecf9ee9.png">
   
   ```
   "BookKeeperClientWorker-OrderedExecutor-1-0" #40 prio=5 os_prio=0 cpu=27401742.31ms elapsed=788038.50s tid=0x00007f4cf95bc800 nid=0x56 runnable  [0x00007f4cd49a0000]
      java.lang.Thread.State: RUNNABLE
   	at java.util.TreeMap.put(java.base@11.0.15/TreeMap.java:566)
   	at java.util.TreeSet.add(java.base@11.0.15/TreeSet.java:255)
   	at org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet.lambda$items$4(ConcurrentSortedLongPairSet.java:152)
   	at org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet$$Lambda$1150/0x00000008408d9040.accept(Unknown Source)
   	at org.apache.pulsar.common.util.collections.ConcurrentLongPairSet$Section.forEach(ConcurrentLongPairSet.java:563)
   	at org.apache.pulsar.common.util.collections.ConcurrentLongPairSet.forEach(ConcurrentLongPairSet.java:242)
   	at org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet.items(ConcurrentSortedLongPairSet.java:151)
   	at org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet.items(ConcurrentSortedLongPairSet.java:143)
   	at org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet.items(ConcurrentSortedLongPairSet.java:128)
   	at org.apache.pulsar.broker.service.persistent.MessageRedeliveryController.getMessagesToReplayNow(MessageRedeliveryController.java:112)
   	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.getMessagesToReplayNow(PersistentDispatcherMultipleConsumers.java:850)
   	- eliminated <0x0000000757da7548> (a org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers)
   	at org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers.getMessagesToReplayNow(PersistentStickyKeyDispatcherMultipleConsumers.java:432)
   	- locked <0x0000000757da7548> (a org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers)
   	at org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.java:175)
   	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readEntriesComplete(PersistentDispatcherMultipleConsumers.java:483)
   	- locked <0x0000000757da7548> (a org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers)
   	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$11.readEntryComplete(ManagedCursorImpl.java:1326)
   	- locked <0x00000007129d8210> (a org.apache.bookkeeper.mledger.impl.ManagedCursorImpl$11)
   	at org.apache.bookkeeper.mledger.impl.EntryCacheImpl.lambda$asyncReadEntry0$0(EntryCacheImpl.java:222)
   	at org.apache.bookkeeper.mledger.impl.EntryCacheImpl$$Lambda$1158/0x00000008408db440.accept(Unknown Source)
   	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(java.base@11.0.15/CompletableFuture.java:714)
   	at java.util.concurrent.CompletableFuture$Completion.run(java.base@11.0.15/CompletableFuture.java:478)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.15/ThreadPoolExecutor.java:1128)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.15/ThreadPoolExecutor.java:628)
   	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
   	at java.lang.Thread.run(java.base@11.0.15/Thread.java:829)
   ```
   
   A related fix #15354, but the fix can't fix the case that the `MessageRedeliveryController` have many redelivery messages
   This PR can make the `MessageRedeliveryController` work more efficiently by introducing a new Ordered Map and Bitmap based LongPair Ordered Set
   
   ### Modifications
   
   - Added [RoaringBitmap](https://roaringbitmap.org/) dependency which is a widely used compressed bitset. The dependency is introduced to broker, not the common module because the common module will also expose the Java Client
   - Added BitmapSortedLongPairSet for MessageRedeliveryController
   - Update the MessageRedeliveryController to use BitmapSortedLongPairSet for messagesToRedeliver
   
   ### Verifying this change
   
   New test added BitmapSortedLongPairSetTest
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc-required` 
   (Your PR needs to update docs, and you will update later)
   
   - [x] `doc-not-needed` 
   (Please explain why)
   
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)
   
   ### Matching PR in forked repository
   
   PR in forked repository: https://github.com/codelipenghui/incubator-pulsar/pull/14
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17804: [improve][broker] Make MessageRedeliveryController work more efficiently

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17804:
URL: https://github.com/apache/pulsar/pull/17804#discussion_r978254074


##########
pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java:
##########
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.pulsar.common.util.collections.LongPairSet;
+import org.roaringbitmap.RoaringBitmap;
+
+public class ConcurrentBitmapSortedLongPairSet {
+
+    private final NavigableMap<Long, RoaringBitmap> map = new TreeMap<>();
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public void add(long item1, long item2) {
+        lock.writeLock().lock();
+        try {
+            RoaringBitmap bitSet = map.computeIfAbsent(item1, k -> new RoaringBitmap());
+            bitSet.add(item2, item2 + 1);

Review Comment:
   The `RoaringbitMap` can't support the value of more than`0xffffffff`, do we need to replace it with `Roaring64bitmap`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17804: [improve][broker] Make MessageRedeliveryController work more efficiently

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17804:
URL: https://github.com/apache/pulsar/pull/17804#discussion_r978196266


##########
pulsar-broker/src/main/java/org/apache/pulsar/utils/BitmapSortedLongPairSet.java:
##########
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.pulsar.common.util.collections.LongPairSet;
+import org.roaringbitmap.RoaringBitmap;
+
+public class BitmapSortedLongPairSet {
+
+    private final NavigableMap<Long, RoaringBitmap> map = new TreeMap<>();
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public void add(long item1, long item2) {
+        lock.writeLock().lock();
+        try {
+            RoaringBitmap bitSet = map.computeIfAbsent(item1, k -> new RoaringBitmap());
+            bitSet.add(item2, item2 + 1);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    public void remove(long item1, long item2) {
+        lock.writeLock().lock();
+        try {
+            RoaringBitmap bitSet = map.get(item1);
+            if (bitSet != null) {
+                bitSet.remove(item2, item2 + 1);
+                if (bitSet.isEmpty()) {
+                    map.remove(item1, bitSet);
+                }
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    public boolean contains(long item1, long item2) {
+        lock.readLock().lock();
+        try {
+            RoaringBitmap bitSet = map.get(item1);
+            return bitSet != null && bitSet.contains(item2, item2 + 1);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    public synchronized void removeUpTo(long item1, long item2) {

Review Comment:
   Question: why do we need `synchronized` here? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17804: [improve][broker] Make MessageRedeliveryController work more efficiently

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17804:
URL: https://github.com/apache/pulsar/pull/17804#discussion_r978559582


##########
pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java:
##########
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.pulsar.common.util.collections.LongPairSet;
+import org.roaringbitmap.RoaringBitmap;
+
+public class ConcurrentBitmapSortedLongPairSet {
+
+    private final NavigableMap<Long, RoaringBitmap> map = new TreeMap<>();
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public void add(long item1, long item2) {
+        lock.writeLock().lock();
+        try {
+            RoaringBitmap bitSet = map.computeIfAbsent(item1, k -> new RoaringBitmap());
+            bitSet.add(item2, item2 + 1);

Review Comment:
   Ok, I see.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #17804: [improve][broker] Make MessageRedeliveryController work more efficiently

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #17804:
URL: https://github.com/apache/pulsar/pull/17804#discussion_r978202553


##########
pulsar-broker/src/main/java/org/apache/pulsar/utils/BitmapSortedLongPairSet.java:
##########
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.pulsar.common.util.collections.LongPairSet;
+import org.roaringbitmap.RoaringBitmap;
+
+public class BitmapSortedLongPairSet {

Review Comment:
   Maybe we can indicate this class is thread-safe by adding a comment or name like `Concurrent***`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] 315157973 commented on a diff in pull request #17804: [improve][broker] Make MessageRedeliveryController work more efficiently

Posted by GitBox <gi...@apache.org>.
315157973 commented on code in PR #17804:
URL: https://github.com/apache/pulsar/pull/17804#discussion_r978208045


##########
pulsar-broker/src/main/java/org/apache/pulsar/utils/BitmapSortedLongPairSet.java:
##########
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.pulsar.common.util.collections.LongPairSet;
+import org.roaringbitmap.RoaringBitmap;
+
+public class BitmapSortedLongPairSet {
+
+    private final NavigableMap<Long, RoaringBitmap> map = new TreeMap<>();
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public void add(long item1, long item2) {
+        lock.writeLock().lock();
+        try {
+            RoaringBitmap bitSet = map.computeIfAbsent(item1, k -> new RoaringBitmap());
+            bitSet.add(item2, item2 + 1);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    public void remove(long item1, long item2) {
+        lock.writeLock().lock();
+        try {
+            RoaringBitmap bitSet = map.get(item1);
+            if (bitSet != null) {
+                bitSet.remove(item2, item2 + 1);
+                if (bitSet.isEmpty()) {
+                    map.remove(item1, bitSet);
+                }
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    public boolean contains(long item1, long item2) {
+        lock.readLock().lock();
+        try {
+            RoaringBitmap bitSet = map.get(item1);
+            return bitSet != null && bitSet.contains(item2, item2 + 1);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    public synchronized void removeUpTo(long item1, long item2) {
+        lock.writeLock().lock();
+        try {
+            Map.Entry<Long, RoaringBitmap> firstEntry = map.firstEntry();
+            while (firstEntry != null && firstEntry.getKey() <= item1) {
+                if (firstEntry.getKey() < item1) {
+                    map.remove(firstEntry.getKey(), firstEntry.getValue());

Review Comment:
   Is it possible to avoid loop traversal by using `subMap`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] 315157973 commented on a diff in pull request #17804: [improve][broker] Make MessageRedeliveryController work more efficiently

Posted by GitBox <gi...@apache.org>.
315157973 commented on code in PR #17804:
URL: https://github.com/apache/pulsar/pull/17804#discussion_r977847331


##########
pulsar-broker/src/main/java/org/apache/pulsar/utils/BitmapSortedLongPairSet.java:
##########
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.pulsar.common.util.collections.LongPairSet;
+import org.roaringbitmap.RoaringBitmap;
+
+public class BitmapSortedLongPairSet {
+
+    private final NavigableMap<Long, RoaringBitmap> map = new TreeMap<>();
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public void add(long item1, long item2) {
+        lock.writeLock().lock();
+        try {
+            RoaringBitmap bitSet = map.computeIfAbsent(item1, k -> new RoaringBitmap());
+            bitSet.add(item2, item2 + 1);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    public void remove(long item1, long item2) {
+        lock.writeLock().lock();
+        try {
+            RoaringBitmap bitSet = map.get(item1);
+            if (bitSet != null) {
+                bitSet.remove(item2, item2 + 1);
+                if (bitSet.isEmpty()) {
+                    map.remove(item1, bitSet);
+                }
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    public boolean contains(long item1, long item2) {
+        lock.readLock().lock();
+        try {
+            RoaringBitmap bitSet = map.get(item1);
+            return bitSet != null && bitSet.contains(item2, item2 + 1);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    public synchronized void removeUpTo(long item1, long item2) {
+        lock.writeLock().lock();
+        try {
+            Map.Entry<Long, RoaringBitmap> firstEntry = map.firstEntry();
+            while (firstEntry != null && firstEntry.getKey() <= item1) {
+                if (firstEntry.getKey() < item1) {
+                    map.remove(firstEntry.getKey(), firstEntry.getValue());
+                } else {
+                    RoaringBitmap bitSet = firstEntry.getValue();
+                    if (bitSet != null) {
+                        bitSet.remove(0, item2);
+                        if (bitSet.isEmpty()) {
+                            map.remove(firstEntry.getKey(), bitSet);
+                        }
+                    }
+                    break;
+                }
+                firstEntry = map.firstEntry();
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+
+    public <T> Set<T> items(int numberOfItems, LongPairSet.LongPairFunction<T> longPairConverter) {
+        NavigableSet<T> items = new TreeSet<>();
+        lock.readLock().lock();
+        try {
+            for (Map.Entry<Long, RoaringBitmap> entry : map.entrySet()) {
+                Iterator<Integer> iterator = entry.getValue().stream().iterator();
+                while (iterator.hasNext() && items.size() < numberOfItems) {
+                    items.add(longPairConverter.apply(entry.getKey(), iterator.next()));
+                }
+                if (items.size() == numberOfItems) {
+                    break;
+                }
+            }

Review Comment:
   `map.subMap()`  Can it be changed to this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui merged pull request #17804: [improve][broker] Make MessageRedeliveryController work more efficiently

Posted by GitBox <gi...@apache.org>.
codelipenghui merged PR #17804:
URL: https://github.com/apache/pulsar/pull/17804


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17804: [improve][broker] Make MessageRedeliveryController work more efficiently

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17804:
URL: https://github.com/apache/pulsar/pull/17804#discussion_r978266626


##########
pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java:
##########
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.pulsar.common.util.collections.LongPairSet;
+import org.roaringbitmap.RoaringBitmap;
+
+public class ConcurrentBitmapSortedLongPairSet {
+
+    private final NavigableMap<Long, RoaringBitmap> map = new TreeMap<>();
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public void add(long item1, long item2) {
+        lock.writeLock().lock();
+        try {
+            RoaringBitmap bitSet = map.computeIfAbsent(item1, k -> new RoaringBitmap());
+            bitSet.add(item2, item2 + 1);

Review Comment:
   @coderzc This doesn't actually happen in practice. And the individual acks of the ManagedCursor also can't support 64bit entry ID. Maybe we can consider supporting 64bit entry ID in the future if we really need it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17804: [improve][broker] Make MessageRedeliveryController work more efficiently

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17804:
URL: https://github.com/apache/pulsar/pull/17804#discussion_r978242302


##########
pulsar-broker/src/main/java/org/apache/pulsar/utils/BitmapSortedLongPairSet.java:
##########
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.pulsar.common.util.collections.LongPairSet;
+import org.roaringbitmap.RoaringBitmap;
+
+public class BitmapSortedLongPairSet {
+
+    private final NavigableMap<Long, RoaringBitmap> map = new TreeMap<>();
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public void add(long item1, long item2) {
+        lock.writeLock().lock();
+        try {
+            RoaringBitmap bitSet = map.computeIfAbsent(item1, k -> new RoaringBitmap());
+            bitSet.add(item2, item2 + 1);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    public void remove(long item1, long item2) {
+        lock.writeLock().lock();
+        try {
+            RoaringBitmap bitSet = map.get(item1);
+            if (bitSet != null) {
+                bitSet.remove(item2, item2 + 1);
+                if (bitSet.isEmpty()) {
+                    map.remove(item1, bitSet);
+                }
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    public boolean contains(long item1, long item2) {
+        lock.readLock().lock();
+        try {
+            RoaringBitmap bitSet = map.get(item1);
+            return bitSet != null && bitSet.contains(item2, item2 + 1);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    public synchronized void removeUpTo(long item1, long item2) {
+        lock.writeLock().lock();
+        try {
+            Map.Entry<Long, RoaringBitmap> firstEntry = map.firstEntry();
+            while (firstEntry != null && firstEntry.getKey() <= item1) {
+                if (firstEntry.getKey() < item1) {
+                    map.remove(firstEntry.getKey(), firstEntry.getValue());

Review Comment:
   Sub map will have the low and upper boundaries. If we change to sub map here e.g. from 3 to 10, you will not able to add 2 or 11 again.



##########
pulsar-broker/src/main/java/org/apache/pulsar/utils/BitmapSortedLongPairSet.java:
##########
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.pulsar.common.util.collections.LongPairSet;
+import org.roaringbitmap.RoaringBitmap;
+
+public class BitmapSortedLongPairSet {
+
+    private final NavigableMap<Long, RoaringBitmap> map = new TreeMap<>();
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public void add(long item1, long item2) {
+        lock.writeLock().lock();
+        try {
+            RoaringBitmap bitSet = map.computeIfAbsent(item1, k -> new RoaringBitmap());
+            bitSet.add(item2, item2 + 1);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    public void remove(long item1, long item2) {
+        lock.writeLock().lock();
+        try {
+            RoaringBitmap bitSet = map.get(item1);
+            if (bitSet != null) {
+                bitSet.remove(item2, item2 + 1);
+                if (bitSet.isEmpty()) {
+                    map.remove(item1, bitSet);
+                }
+            }
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    public boolean contains(long item1, long item2) {
+        lock.readLock().lock();
+        try {
+            RoaringBitmap bitSet = map.get(item1);
+            return bitSet != null && bitSet.contains(item2, item2 + 1);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    public synchronized void removeUpTo(long item1, long item2) {

Review Comment:
   😁, I copied the method signature from another class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #17804: [improve][broker] Make MessageRedeliveryController work more efficiently

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #17804:
URL: https://github.com/apache/pulsar/pull/17804#discussion_r978254074


##########
pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java:
##########
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.pulsar.common.util.collections.LongPairSet;
+import org.roaringbitmap.RoaringBitmap;
+
+public class ConcurrentBitmapSortedLongPairSet {
+
+    private final NavigableMap<Long, RoaringBitmap> map = new TreeMap<>();
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public void add(long item1, long item2) {
+        lock.writeLock().lock();
+        try {
+            RoaringBitmap bitSet = map.computeIfAbsent(item1, k -> new RoaringBitmap());
+            bitSet.add(item2, item2 + 1);

Review Comment:
   The `RoaringbitMap` can't support the number of more than`0xffffffff`, do we need to replace it with `Roaring64bitmap`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org