You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/08 02:10:30 UTC
[pulsar] 02/02: [improve][broker] Improve cursor.getNumberOfEntries if isUnackedRangesOpenCacheSetEnabled=true (#17465)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ea2aa4eb8042feef467a0e2de8f4b0e0638c8de3
Author: Penghui Li <pe...@apache.org>
AuthorDate: Wed Sep 7 15:27:32 2022 +0800
[improve][broker] Improve cursor.getNumberOfEntries if isUnackedRangesOpenCacheSetEnabled=true (#17465)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 41 +++++++++++++---------
.../bookkeeper/mledger/impl/RangeSetWrapper.java | 5 +++
.../ConcurrentOpenLongPairRangeSet.java | 23 ++++++++++++
.../common/util/collections/LongPairRangeSet.java | 10 ++++++
.../ConcurrentOpenLongPairRangeSetTest.java | 20 +++++++++++
5 files changed, 82 insertions(+), 17 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 8dee026f57c..25f1d8760b9 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1453,25 +1453,32 @@ public class ManagedCursorImpl implements ManagedCursor {
lock.readLock().lock();
try {
- individualDeletedMessages.forEach((r) -> {
- try {
- if (r.isConnected(range)) {
- Range<PositionImpl> commonEntries = r.intersection(range);
- long commonCount = ledger.getNumberOfEntries(commonEntries);
- if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] Discounting {} entries for already deleted range {}", ledger.getName(),
- name, commonCount, commonEntries);
+ if (config.isUnackedRangesOpenCacheSetEnabled()) {
+ int cardinality = individualDeletedMessages.cardinality(
+ range.lowerEndpoint().ledgerId, range.lowerEndpoint().entryId,
+ range.upperEndpoint().ledgerId, range.upperEndpoint().entryId);
+ deletedEntries.addAndGet(cardinality);
+ } else {
+ individualDeletedMessages.forEach((r) -> {
+ try {
+ if (r.isConnected(range)) {
+ Range<PositionImpl> commonEntries = r.intersection(range);
+ long commonCount = ledger.getNumberOfEntries(commonEntries);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] [{}] Discounting {} entries for already deleted range {}",
+ ledger.getName(), name, commonCount, commonEntries);
+ }
+ deletedEntries.addAndGet(commonCount);
+ }
+ return true;
+ } finally {
+ if (r.lowerEndpoint() instanceof PositionImplRecyclable) {
+ ((PositionImplRecyclable) r.lowerEndpoint()).recycle();
+ ((PositionImplRecyclable) r.upperEndpoint()).recycle();
}
- deletedEntries.addAndGet(commonCount);
- }
- return true;
- } finally {
- if (r.lowerEndpoint() instanceof PositionImplRecyclable) {
- ((PositionImplRecyclable) r.lowerEndpoint()).recycle();
- ((PositionImplRecyclable) r.upperEndpoint()).recycle();
}
- }
- }, recyclePositionRangeConverter);
+ }, recyclePositionRangeConverter);
+ }
} finally {
lock.readLock().unlock();
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java
index b0314f4e775..02d7967f9fc 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java
@@ -133,6 +133,11 @@ public class RangeSetWrapper<T extends Comparable<T>> implements LongPairRangeSe
return rangeSet.lastRange();
}
+ @Override
+ public int cardinality(long lowerKey, long lowerValue, long upperKey, long upperValue) {
+ return rangeSet.cardinality(lowerKey, lowerValue, upperKey, upperValue);
+ }
+
@VisibleForTesting
void add(Range<LongPair> range) {
if (!(rangeSet instanceof ConcurrentOpenLongPairRangeSet)) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
index 05d16c4b054..a71c5ceb8de 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
@@ -242,6 +242,29 @@ public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>> implements
return Range.openClosed(consumer.apply(lastSet.getKey(), lower), consumer.apply(lastSet.getKey(), upper));
}
+ @Override
+ public int cardinality(long lowerKey, long lowerValue, long upperKey, long upperValue) {
+ NavigableMap<Long, BitSet> subMap = rangeBitSetMap.subMap(lowerKey, true, upperKey, true);
+ MutableInt v = new MutableInt(0);
+ subMap.forEach((key, bitset) -> {
+ if (key == lowerKey || key == upperKey) {
+ BitSet temp = (BitSet) bitset.clone();
+ // Trim the bitset index which < lowerValue
+ if (key == lowerKey) {
+ temp.clear(0, (int) Math.max(0, lowerValue));
+ }
+ // Trim the bitset index which > upperValue
+ if (key == upperKey) {
+ temp.clear((int) Math.min(upperValue + 1, temp.length()), temp.length());
+ }
+ v.add(temp.cardinality());
+ } else {
+ v.add(bitset.cardinality());
+ }
+ });
+ return v.intValue();
+ }
+
@Override
public int size() {
if (updatedAfterCachedForSize) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
index ba77ff4b839..d804900ed42 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
@@ -125,6 +125,11 @@ public interface LongPairRangeSet<T extends Comparable<T>> {
*/
Range<T> lastRange();
+ /**
+ * Return the number bit sets to true from lower (inclusive) to upper (inclusive).
+ */
+ int cardinality(long lowerKey, long lowerValue, long upperKey, long upperValue);
+
/**
* Represents a function that accepts two long arguments and produces a result.
*
@@ -296,6 +301,11 @@ public interface LongPairRangeSet<T extends Comparable<T>> {
return list.get(list.size() - 1);
}
+ @Override
+ public int cardinality(long lowerKey, long lowerValue, long upperKey, long upperValue) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public int size() {
return set.asRanges().size();
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java
index 2c0b8d3552c..5d9af2e0227 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java
@@ -460,4 +460,24 @@ public class ConcurrentOpenLongPairRangeSetTest {
gRangeConnected.add(lastRange);
return gRangeConnected;
}
+
+ @Test
+ public void testCardinality() {
+ ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
+ int v = set.cardinality(0, 0, Integer.MAX_VALUE, Integer.MAX_VALUE);
+ assertEquals(v, 0 );
+ set.addOpenClosed(1, 0, 1, 20);
+ set.addOpenClosed(1, 30, 1, 90);
+ set.addOpenClosed(2, 0, 3, 30);
+ v = set.cardinality(1, 0, 1, 100);
+ assertEquals(v, 80);
+ v = set.cardinality(1, 11, 1, 100);
+ assertEquals(v, 70);
+ v = set.cardinality(1, 0, 1, 90);
+ assertEquals(v, 80);
+ v = set.cardinality(1, 0, 1, 80);
+ assertEquals(v, 70);
+ v = set.cardinality(1, 0, 3, 30);
+ assertEquals(v, 80 + 31);
+ }
}