You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/09 00:59:45 UTC
[pulsar] branch branch-2.10 updated: [improve][broker] Improve cursor.getNumberOfEntries if isUnackedRangesOpenCacheSetEnabled=true (#17465)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new a88ac659a69 [improve][broker] Improve cursor.getNumberOfEntries if isUnackedRangesOpenCacheSetEnabled=true (#17465)
a88ac659a69 is described below
commit a88ac659a69763770dc3c3da272c03d6bfa8a897
Author: Penghui Li <pe...@apache.org>
AuthorDate: Wed Sep 7 15:27:32 2022 +0800
[improve][broker] Improve cursor.getNumberOfEntries if isUnackedRangesOpenCacheSetEnabled=true (#17465)
(cherry picked from commit 09edcceab419ef28a9311ac480c0335c8f9dd87e)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 41 +++++++++++++---------
.../ConcurrentOpenLongPairRangeSet.java | 24 +++++++++++++
.../common/util/collections/LongPairRangeSet.java | 10 ++++++
.../ConcurrentOpenLongPairRangeSetTest.java | 20 +++++++++++
4 files changed, 78 insertions(+), 17 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index feeaffd1dd8..8db4d12f2fb 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -1431,25 +1431,32 @@ public class ManagedCursorImpl implements ManagedCursor {
lock.readLock().lock();
try {
- individualDeletedMessages.forEach((r) -> {
- try {
- if (r.isConnected(range)) {
- Range<PositionImpl> commonEntries = r.intersection(range);
- long commonCount = ledger.getNumberOfEntries(commonEntries);
- if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] Discounting {} entries for already deleted range {}", ledger.getName(),
- name, commonCount, commonEntries);
+ if (config.isUnackedRangesOpenCacheSetEnabled()) {
+ int cardinality = individualDeletedMessages.cardinality(
+ range.lowerEndpoint().ledgerId, range.lowerEndpoint().entryId,
+ range.upperEndpoint().ledgerId, range.upperEndpoint().entryId);
+ deletedEntries.addAndGet(cardinality);
+ } else {
+ individualDeletedMessages.forEach((r) -> {
+ try {
+ if (r.isConnected(range)) {
+ Range<PositionImpl> commonEntries = r.intersection(range);
+ long commonCount = ledger.getNumberOfEntries(commonEntries);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] [{}] Discounting {} entries for already deleted range {}",
+ ledger.getName(), name, commonCount, commonEntries);
+ }
+ deletedEntries.addAndGet(commonCount);
+ }
+ return true;
+ } finally {
+ if (r.lowerEndpoint() instanceof PositionImplRecyclable) {
+ ((PositionImplRecyclable) r.lowerEndpoint()).recycle();
+ ((PositionImplRecyclable) r.upperEndpoint()).recycle();
}
- deletedEntries.addAndGet(commonCount);
- }
- return true;
- } finally {
- if (r.lowerEndpoint() instanceof PositionImplRecyclable) {
- ((PositionImplRecyclable) r.lowerEndpoint()).recycle();
- ((PositionImplRecyclable) r.upperEndpoint()).recycle();
}
- }
- }, recyclePositionRangeConverter);
+ }, recyclePositionRangeConverter);
+ }
} finally {
lock.readLock().unlock();
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
index 6e647966938..9bdfc93bd11 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
@@ -29,6 +29,7 @@ import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.mutable.MutableInt;
/**
* A Concurrent set comprising zero or more ranges of type {@link LongPair}. This can be alternative of
@@ -242,6 +243,29 @@ public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>> implements
return Range.openClosed(consumer.apply(lastSet.getKey(), lower), consumer.apply(lastSet.getKey(), upper));
}
+ @Override
+ public int cardinality(long lowerKey, long lowerValue, long upperKey, long upperValue) {
+ NavigableMap<Long, BitSet> subMap = rangeBitSetMap.subMap(lowerKey, true, upperKey, true);
+ MutableInt v = new MutableInt(0);
+ subMap.forEach((key, bitset) -> {
+ if (key == lowerKey || key == upperKey) {
+ BitSet temp = (BitSet) bitset.clone();
+ // Trim the bitset index which < lowerValue
+ if (key == lowerKey) {
+ temp.clear(0, (int) Math.max(0, lowerValue));
+ }
+ // Trim the bitset index which > upperValue
+ if (key == upperKey) {
+ temp.clear((int) Math.min(upperValue + 1, temp.length()), temp.length());
+ }
+ v.add(temp.cardinality());
+ } else {
+ v.add(bitset.cardinality());
+ }
+ });
+ return v.intValue();
+ }
+
@Override
public int size() {
if (updatedAfterCachedForSize) {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
index a6da065c673..739bd6e7df6 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
@@ -126,6 +126,11 @@ public interface LongPairRangeSet<T extends Comparable<T>> {
*/
Range<T> lastRange();
+ /**
+ * Return the number bit sets to true from lower (inclusive) to upper (inclusive).
+ */
+ int cardinality(long lowerKey, long lowerValue, long upperKey, long upperValue);
+
/**
* Represents a function that accepts two long arguments and produces a result.
*
@@ -289,6 +294,11 @@ public interface LongPairRangeSet<T extends Comparable<T>> {
return list.get(list.size() - 1);
}
+ @Override
+ public int cardinality(long lowerKey, long lowerValue, long upperKey, long upperValue) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public int size() {
return set.asRanges().size();
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java
index 2c0b8d3552c..5d9af2e0227 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java
@@ -460,4 +460,24 @@ public class ConcurrentOpenLongPairRangeSetTest {
gRangeConnected.add(lastRange);
return gRangeConnected;
}
+
+ @Test
+ public void testCardinality() {
+ ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
+ int v = set.cardinality(0, 0, Integer.MAX_VALUE, Integer.MAX_VALUE);
+ assertEquals(v, 0 );
+ set.addOpenClosed(1, 0, 1, 20);
+ set.addOpenClosed(1, 30, 1, 90);
+ set.addOpenClosed(2, 0, 3, 30);
+ v = set.cardinality(1, 0, 1, 100);
+ assertEquals(v, 80);
+ v = set.cardinality(1, 11, 1, 100);
+ assertEquals(v, 70);
+ v = set.cardinality(1, 0, 1, 90);
+ assertEquals(v, 80);
+ v = set.cardinality(1, 0, 1, 80);
+ assertEquals(v, 70);
+ v = set.cardinality(1, 0, 3, 30);
+ assertEquals(v, 80 + 31);
+ }
}