You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/08 02:10:30 UTC

[pulsar] 02/02: [improve][broker] Improve cursor.getNumberOfEntries if isUnackedRangesOpenCacheSetEnabled=true (#17465)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ea2aa4eb8042feef467a0e2de8f4b0e0638c8de3
Author: Penghui Li <pe...@apache.org>
AuthorDate: Wed Sep 7 15:27:32 2022 +0800

    [improve][broker] Improve cursor.getNumberOfEntries if isUnackedRangesOpenCacheSetEnabled=true (#17465)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 41 +++++++++++++---------
 .../bookkeeper/mledger/impl/RangeSetWrapper.java   |  5 +++
 .../ConcurrentOpenLongPairRangeSet.java            | 23 ++++++++++++
 .../common/util/collections/LongPairRangeSet.java  | 10 ++++++
 .../ConcurrentOpenLongPairRangeSetTest.java        | 20 +++++++++++
 5 files changed, 82 insertions(+), 17 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 8dee026f57c..25f1d8760b9 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1453,25 +1453,32 @@ public class ManagedCursorImpl implements ManagedCursor {
 
         lock.readLock().lock();
         try {
-            individualDeletedMessages.forEach((r) -> {
-                try {
-                    if (r.isConnected(range)) {
-                        Range<PositionImpl> commonEntries = r.intersection(range);
-                        long commonCount = ledger.getNumberOfEntries(commonEntries);
-                        if (log.isDebugEnabled()) {
-                            log.debug("[{}] [{}] Discounting {} entries for already deleted range {}", ledger.getName(),
-                                    name, commonCount, commonEntries);
+            if (config.isUnackedRangesOpenCacheSetEnabled()) {
+                int cardinality = individualDeletedMessages.cardinality(
+                        range.lowerEndpoint().ledgerId, range.lowerEndpoint().entryId,
+                        range.upperEndpoint().ledgerId, range.upperEndpoint().entryId);
+                deletedEntries.addAndGet(cardinality);
+            } else {
+                individualDeletedMessages.forEach((r) -> {
+                    try {
+                        if (r.isConnected(range)) {
+                            Range<PositionImpl> commonEntries = r.intersection(range);
+                            long commonCount = ledger.getNumberOfEntries(commonEntries);
+                            if (log.isDebugEnabled()) {
+                                log.debug("[{}] [{}] Discounting {} entries for already deleted range {}",
+                                        ledger.getName(), name, commonCount, commonEntries);
+                            }
+                            deletedEntries.addAndGet(commonCount);
+                        }
+                        return true;
+                    } finally {
+                        if (r.lowerEndpoint() instanceof PositionImplRecyclable) {
+                            ((PositionImplRecyclable) r.lowerEndpoint()).recycle();
+                            ((PositionImplRecyclable) r.upperEndpoint()).recycle();
                         }
-                        deletedEntries.addAndGet(commonCount);
-                    }
-                    return true;
-                } finally {
-                    if (r.lowerEndpoint() instanceof PositionImplRecyclable) {
-                        ((PositionImplRecyclable) r.lowerEndpoint()).recycle();
-                        ((PositionImplRecyclable) r.upperEndpoint()).recycle();
                     }
-                }
-            }, recyclePositionRangeConverter);
+                }, recyclePositionRangeConverter);
+            }
         } finally {
             lock.readLock().unlock();
         }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java
index b0314f4e775..02d7967f9fc 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java
@@ -133,6 +133,11 @@ public class RangeSetWrapper<T extends Comparable<T>> implements LongPairRangeSe
         return rangeSet.lastRange();
     }
 
+    @Override
+    public int cardinality(long lowerKey, long lowerValue, long upperKey, long upperValue) {
+        return rangeSet.cardinality(lowerKey, lowerValue, upperKey, upperValue);
+    }
+
     @VisibleForTesting
     void add(Range<LongPair> range) {
         if (!(rangeSet instanceof ConcurrentOpenLongPairRangeSet)) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
index 05d16c4b054..a71c5ceb8de 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
@@ -242,6 +242,29 @@ public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>> implements
         return Range.openClosed(consumer.apply(lastSet.getKey(), lower), consumer.apply(lastSet.getKey(), upper));
     }
 
+    @Override
+    public int cardinality(long lowerKey, long lowerValue, long upperKey, long upperValue) {
+        NavigableMap<Long, BitSet> subMap = rangeBitSetMap.subMap(lowerKey, true, upperKey, true);
+        MutableInt v = new MutableInt(0);
+        subMap.forEach((key, bitset) -> {
+            if (key == lowerKey || key == upperKey) {
+                BitSet temp = (BitSet) bitset.clone();
+                // Trim the bitset index which < lowerValue
+                if (key == lowerKey) {
+                    temp.clear(0, (int) Math.max(0, lowerValue));
+                }
+                // Trim the bitset index which > upperValue
+                if (key == upperKey) {
+                    temp.clear((int) Math.min(upperValue + 1, temp.length()), temp.length());
+                }
+                v.add(temp.cardinality());
+            } else {
+                v.add(bitset.cardinality());
+            }
+        });
+        return v.intValue();
+    }
+
     @Override
     public int size() {
         if (updatedAfterCachedForSize) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
index ba77ff4b839..d804900ed42 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
@@ -125,6 +125,11 @@ public interface LongPairRangeSet<T extends Comparable<T>> {
      */
     Range<T> lastRange();
 
+    /**
+     * Return the number bit sets to true from lower (inclusive) to upper (inclusive).
+     */
+    int cardinality(long lowerKey, long lowerValue, long upperKey, long upperValue);
+
     /**
      * Represents a function that accepts two long arguments and produces a result.
      *
@@ -296,6 +301,11 @@ public interface LongPairRangeSet<T extends Comparable<T>> {
             return list.get(list.size() - 1);
         }
 
+        @Override
+        public int cardinality(long lowerKey, long lowerValue, long upperKey, long upperValue) {
+            throw new UnsupportedOperationException();
+        }
+
         @Override
         public int size() {
             return set.asRanges().size();
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java
index 2c0b8d3552c..5d9af2e0227 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java
@@ -460,4 +460,24 @@ public class ConcurrentOpenLongPairRangeSetTest {
         gRangeConnected.add(lastRange);
         return gRangeConnected;
     }
+
+    @Test
+    public void testCardinality() {
+        ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
+        int v = set.cardinality(0, 0, Integer.MAX_VALUE, Integer.MAX_VALUE);
+        assertEquals(v, 0 );
+        set.addOpenClosed(1, 0, 1, 20);
+        set.addOpenClosed(1, 30, 1, 90);
+        set.addOpenClosed(2, 0, 3, 30);
+        v = set.cardinality(1, 0, 1, 100);
+        assertEquals(v, 80);
+        v = set.cardinality(1, 11, 1, 100);
+        assertEquals(v, 70);
+        v = set.cardinality(1, 0, 1, 90);
+        assertEquals(v, 80);
+        v = set.cardinality(1, 0, 1, 80);
+        assertEquals(v, 70);
+        v = set.cardinality(1, 0, 3, 30);
+        assertEquals(v, 80 + 31);
+    }
 }