You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/12/07 07:44:32 UTC
[pulsar] branch master updated: [improve][ml] Reduce object allocation when iterate on LongPairRangeSet (#18357)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 12b864ef143 [improve][ml] Reduce object allocation when iterate on LongPairRangeSet (#18357)
12b864ef143 is described below
commit 12b864ef14311fdba5d0b40a8a4f005b262e48c0
Author: lifepuzzlefun <wj...@163.com>
AuthorDate: Wed Dec 7 15:44:25 2022 +0800
[improve][ml] Reduce object allocation when iterate on LongPairRangeSet (#18357)
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 42 +++++++++++-----
.../bookkeeper/mledger/impl/RangeSetWrapper.java | 14 ++++--
.../mledger/impl/RangeSetWrapperTest.java | 37 ++++++++------
.../ConcurrentOpenLongPairRangeSet.java | 24 +++++++--
.../common/util/collections/LongPairRangeSet.java | 56 +++++++++++++++++++--
.../ConcurrentOpenLongPairRangeSetTest.java | 57 +++++++++++++++++++++-
.../util/collections/DefaultRangeSetTest.java | 4 +-
7 files changed, 192 insertions(+), 42 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 5755f528c6a..e50e1245b8b 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -99,6 +99,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.apache.pulsar.common.util.collections.LongPairRangeSet.LongPairConsumer;
+import org.apache.pulsar.common.util.collections.LongPairRangeSet.RangeBoundConsumer;
import org.apache.pulsar.metadata.api.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -181,6 +182,10 @@ public class ManagedCursorImpl implements ManagedCursor {
private volatile ManagedCursorInfo managedCursorInfo;
private static final LongPairConsumer<PositionImpl> positionRangeConverter = PositionImpl::new;
+
+ private static final RangeBoundConsumer<PositionImpl> positionRangeReverseConverter =
+ (position) -> new LongPairRangeSet.LongPair(position.ledgerId, position.entryId);
+
private static final LongPairConsumer<PositionImplRecyclable> recyclePositionRangeConverter = (key, value) -> {
PositionImplRecyclable position = PositionImplRecyclable.create();
position.ledgerId = key;
@@ -294,7 +299,8 @@ public class ManagedCursorImpl implements ManagedCursor {
this.config = config;
this.ledger = ledger;
this.name = cursorName;
- this.individualDeletedMessages = new RangeSetWrapper<>(positionRangeConverter, this);
+ this.individualDeletedMessages = new RangeSetWrapper<>(positionRangeConverter,
+ positionRangeReverseConverter, this);
if (config.isDeletionAtBatchIndexLevelEnabled()) {
this.batchDeletedIndexes = new ConcurrentSkipListMap<>();
} else {
@@ -2858,23 +2864,35 @@ public class ManagedCursorImpl implements ManagedCursor {
MLDataFormats.NestedPositionInfo.Builder nestedPositionBuilder = MLDataFormats.NestedPositionInfo
.newBuilder();
- MLDataFormats.MessageRange.Builder messageRangeBuilder = MLDataFormats.MessageRange.newBuilder();
+
+ MLDataFormats.MessageRange.Builder messageRangeBuilder = MLDataFormats.MessageRange
+ .newBuilder();
+
AtomicInteger acksSerializedSize = new AtomicInteger(0);
List<MessageRange> rangeList = new ArrayList<>();
- individualDeletedMessages.forEach((positionRange) -> {
- PositionImpl p = positionRange.lowerEndpoint();
- nestedPositionBuilder.setLedgerId(p.getLedgerId());
- nestedPositionBuilder.setEntryId(p.getEntryId());
- messageRangeBuilder.setLowerEndpoint(nestedPositionBuilder.build());
- p = positionRange.upperEndpoint();
- nestedPositionBuilder.setLedgerId(p.getLedgerId());
- nestedPositionBuilder.setEntryId(p.getEntryId());
- messageRangeBuilder.setUpperEndpoint(nestedPositionBuilder.build());
- MessageRange messageRange = messageRangeBuilder.build();
+
+ individualDeletedMessages.forEachRawRange((lowerKey, lowerValue, upperKey, upperValue) -> {
+ MLDataFormats.NestedPositionInfo lowerPosition = nestedPositionBuilder
+ .setLedgerId(lowerKey)
+ .setEntryId(lowerValue)
+ .build();
+
+ MLDataFormats.NestedPositionInfo upperPosition = nestedPositionBuilder
+ .setLedgerId(upperKey)
+ .setEntryId(upperValue)
+ .build();
+
+ MessageRange messageRange = messageRangeBuilder
+ .setLowerEndpoint(lowerPosition)
+ .setUpperEndpoint(upperPosition)
+ .build();
+
acksSerializedSize.addAndGet(messageRange.getSerializedSize());
rangeList.add(messageRange);
+
return rangeList.size() <= config.getMaxUnackedRangesToPersist();
});
+
this.individualDeletedMessagesSerializedSize = acksSerializedSize.get();
individualDeletedMessages.resetDirtyKeys();
return rangeList;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java
index 274c625e745..02e43504482 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java
@@ -45,15 +45,18 @@ public class RangeSetWrapper<T extends Comparable<T>> implements LongPairRangeSe
* Record which Ledger is dirty.
*/
private final DefaultRangeSet<Long> dirtyLedgers = new LongPairRangeSet.DefaultRangeSet<>(
- (LongPairConsumer<Long>) (key, value) -> key);
+ (LongPairConsumer<Long>) (key, value) -> key,
+ (RangeBoundConsumer<Long>) key -> new LongPair(key, 0));
- public RangeSetWrapper(LongPairConsumer<T> rangeConverter, ManagedCursorImpl managedCursor) {
+ public RangeSetWrapper(LongPairConsumer<T> rangeConverter,
+ RangeBoundConsumer<T> rangeBoundConsumer,
+ ManagedCursorImpl managedCursor) {
requireNonNull(managedCursor);
this.config = managedCursor.getConfig();
this.rangeConverter = rangeConverter;
this.rangeSet = config.isUnackedRangesOpenCacheSetEnabled()
? new ConcurrentOpenLongPairRangeSet<>(4096, rangeConverter)
- : new LongPairRangeSet.DefaultRangeSet<>(rangeConverter);
+ : new LongPairRangeSet.DefaultRangeSet<>(rangeConverter, rangeBoundConsumer);
this.enableMultiEntry = config.isPersistentUnackedRangesWithMultipleEntriesEnabled();
}
@@ -118,6 +121,11 @@ public class RangeSetWrapper<T extends Comparable<T>> implements LongPairRangeSe
rangeSet.forEach(action, consumer);
}
+ @Override
+ public void forEachRawRange(RawRangeProcessor action) {
+ rangeSet.forEachRawRange(action);
+ }
+
@Override
public int size() {
return rangeSet.size();
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapperTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapperTest.java
index c374b034054..89fbc26d41a 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapperTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapperTest.java
@@ -31,6 +31,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.pulsar.common.util.collections.LongPairRangeSet.RangeBoundConsumer;
import org.apache.pulsar.common.util.collections.LongPairRangeSet.LongPair;
import org.apache.pulsar.common.util.collections.LongPairRangeSet.LongPairConsumer;
import org.testng.annotations.AfterMethod;
@@ -40,6 +41,8 @@ import org.testng.annotations.Test;
public class RangeSetWrapperTest {
static final LongPairConsumer<LongPair> consumer = (key, value) -> new LongPair(key, value);
+ static final RangeBoundConsumer<LongPair> reverseConvert = (pair) -> pair;
+
ManagedLedgerImpl managedLedger;
RangeSetWrapper<LongPair> set;
ManagedLedgerConfig managedLedgerConfig;
@@ -67,7 +70,9 @@ public class RangeSetWrapperTest {
@Test
public void testDirtyLedger() {
- RangeSetWrapper<LongPair> rangeSetWrapper = new RangeSetWrapper<>(consumer, managedCursor);
+ RangeSetWrapper<LongPair> rangeSetWrapper = new RangeSetWrapper<>(consumer,
+ reverseConvert,
+ managedCursor);
// Test add range
rangeSetWrapper.addOpenClosed(10, 0, 20, 0);
assertEquals(rangeSetWrapper.size(), 1);
@@ -93,7 +98,7 @@ public class RangeSetWrapperTest {
}
private void doTestAddForSameKey() {
- set = new RangeSetWrapper(consumer, managedCursor);
+ set = new RangeSetWrapper(consumer, reverseConvert, managedCursor);
// add 0 to 5
set.addOpenClosed(0, 0, 0, 5);
// add 8,9,10
@@ -113,7 +118,7 @@ public class RangeSetWrapperTest {
@Test
public void testAddForDifferentKey() {
- set = new RangeSetWrapper<>(consumer, managedCursor);
+ set = new RangeSetWrapper<>(consumer, reverseConvert, managedCursor);
// [98,100],[(1,5),(1,5)],[(1,10,1,15)],[(1,20),(1,20)],[(2,0),(2,10)]
set.addOpenClosed(0, 98, 0, 99);
set.addOpenClosed(0, 100, 1, 5);
@@ -131,7 +136,7 @@ public class RangeSetWrapperTest {
@Test
public void testAddForDifferentKey2() {
managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(false);
- set = new RangeSetWrapper<>(consumer, managedCursor);
+ set = new RangeSetWrapper<>(consumer, reverseConvert, managedCursor);
// [98,100],[(1,5),(1,5)],[(1,10,1,15)],[(1,20),(1,20)],[(2,0),(2,10)]
set.addOpenClosed(0, 98, 0, 99);
set.addOpenClosed(0, 100, 1, 5);
@@ -148,7 +153,7 @@ public class RangeSetWrapperTest {
@Test
public void testAddCompareCompareWithGuava() {
- set = new RangeSetWrapper<>(consumer, managedCursor);
+ set = new RangeSetWrapper<>(consumer, reverseConvert, managedCursor);
com.google.common.collect.RangeSet<LongPair> gSet = TreeRangeSet.create();
// add 10K values for key 0
@@ -187,7 +192,7 @@ public class RangeSetWrapperTest {
@Test
public void testDeleteCompareWithGuava() throws Exception {
- RangeSetWrapper<LongPair> set = new RangeSetWrapper<>(consumer, managedCursor);
+ RangeSetWrapper<LongPair> set = new RangeSetWrapper<>(consumer, reverseConvert, managedCursor);
com.google.common.collect.RangeSet<LongPair> gSet = TreeRangeSet.create();
// add 10K values for key 0
@@ -241,7 +246,7 @@ public class RangeSetWrapperTest {
@Test
public void testSpanWithGuava() {
- set = new RangeSetWrapper<>(consumer, managedCursor);
+ set = new RangeSetWrapper<>(consumer, reverseConvert, managedCursor);
com.google.common.collect.RangeSet<LongPair> gSet = TreeRangeSet.create();
set.addOpenClosed(0, 97, 0, 99);
gSet.add(Range.openClosed(new LongPair(0, 97), new LongPair(0, 99)));
@@ -266,7 +271,7 @@ public class RangeSetWrapperTest {
@Test
public void testFirstRange() {
- set = new RangeSetWrapper<>(consumer, managedCursor);
+ set = new RangeSetWrapper<>(consumer, reverseConvert, managedCursor);
assertNull(set.firstRange());
set.addOpenClosed(0, 97, 0, 99);
assertEquals(set.firstRange(), Range.openClosed(new LongPair(0, 97), new LongPair(0, 99)));
@@ -281,7 +286,7 @@ public class RangeSetWrapperTest {
@Test
public void testLastRange() {
- set = new RangeSetWrapper<>(consumer, managedCursor);
+ set = new RangeSetWrapper<>(consumer, reverseConvert, managedCursor);
assertNull(set.lastRange());
Range<LongPair> range = Range.openClosed(new LongPair(0, 97), new LongPair(0, 99));
set.addOpenClosed(0, 97, 0, 99);
@@ -302,7 +307,7 @@ public class RangeSetWrapperTest {
@Test
public void testToString() {
- set = new RangeSetWrapper<>(consumer, managedCursor);
+ set = new RangeSetWrapper<>(consumer, reverseConvert, managedCursor);
set.addOpenClosed(0, 97, 0, 99);
assertEquals(set.toString(), "[(0:97..0:99]]");
set.addOpenClosed(0, 98, 0, 105);
@@ -313,7 +318,7 @@ public class RangeSetWrapperTest {
@Test
public void testDeleteForDifferentKey() {
- set = new RangeSetWrapper<>(consumer, managedCursor);
+ set = new RangeSetWrapper<>(consumer, reverseConvert, managedCursor);
set.addOpenClosed(0, 97, 0, 99);
set.addOpenClosed(0, 99, 1, 5);
set.addOpenClosed(1, 9, 1, 15);
@@ -344,7 +349,7 @@ public class RangeSetWrapperTest {
@Test
public void testDeleteWithAtMost() {
- set = new RangeSetWrapper<>(consumer, managedCursor);
+ set = new RangeSetWrapper<>(consumer, reverseConvert, managedCursor);
set.addOpenClosed(0, 98, 0, 99);
set.addOpenClosed(0, 100, 1, 5);
set.addOpenClosed(1, 10, 1, 15);
@@ -370,7 +375,7 @@ public class RangeSetWrapperTest {
@Test
public void testDeleteWithAtMost2() {
- set = new RangeSetWrapper<>(consumer, managedCursor);
+ set = new RangeSetWrapper<>(consumer, reverseConvert, managedCursor);
set.addOpenClosed(0, 98, 0, 99);
set.addOpenClosed(0, 100, 1, 5);
set.addOpenClosed(1, 10, 1, 15);
@@ -390,7 +395,7 @@ public class RangeSetWrapperTest {
assertEquals(ranges.get(count), (Range.openClosed(new LongPair(2, 25), new LongPair(2, 28))));
managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(false);
- set = new RangeSetWrapper<>(consumer, managedCursor);
+ set = new RangeSetWrapper<>(consumer, reverseConvert, managedCursor);
set.addOpenClosed(0, 98, 0, 99);
set.addOpenClosed(0, 100, 1, 5);
set.addOpenClosed(1, 10, 1, 15);
@@ -411,7 +416,7 @@ public class RangeSetWrapperTest {
@Test
public void testDeleteWithLeastMost() {
- set = new RangeSetWrapper<>(consumer, managedCursor);
+ set = new RangeSetWrapper<>(consumer, reverseConvert, managedCursor);
set.addOpenClosed(0, 98, 0, 99);
set.addOpenClosed(0, 100, 1, 5);
set.addOpenClosed(1, 10, 1, 15);
@@ -439,7 +444,7 @@ public class RangeSetWrapperTest {
@Test
public void testRangeContaining() {
- set = new RangeSetWrapper<>(consumer, managedCursor);
+ set = new RangeSetWrapper<>(consumer, reverseConvert, managedCursor);
set.add(Range.closed(new LongPair(0, 98), new LongPair(0, 99)));
set.add(Range.closed(new LongPair(0, 100), new LongPair(1, 5)));
com.google.common.collect.RangeSet<LongPair> gSet = TreeRangeSet.create();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
index 6faa61d3b37..72215d7296c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
@@ -195,7 +195,18 @@ public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>> implements
}
@Override
- public void forEach(RangeProcessor<T> action, LongPairConsumer<? extends T> consumer) {
+ public void forEach(RangeProcessor<T> action, LongPairConsumer<? extends T> consumerParam) {
+ forEachRawRange((lowerKey, lowerValue, upperKey, upperValue) -> {
+ Range<T> range = Range.openClosed(
+ consumerParam.apply(lowerKey, lowerValue),
+ consumerParam.apply(upperKey, upperValue)
+ );
+ return action.process(range);
+ });
+ }
+
+ @Override
+ public void forEachRawRange(RawRangeProcessor processor) {
AtomicBoolean completed = new AtomicBoolean(false);
rangeBitSetMap.forEach((key, set) -> {
if (completed.get()) {
@@ -209,9 +220,8 @@ public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>> implements
int currentClosedMark = first;
while (currentClosedMark != -1 && currentClosedMark <= last) {
int nextOpenMark = set.nextClearBit(currentClosedMark);
- Range<T> range = Range.openClosed(consumer.apply(key, currentClosedMark - 1),
- consumer.apply(key, nextOpenMark - 1));
- if (!action.process(range)) {
+ if (!processor.processRawRange(key, currentClosedMark - 1,
+ key, nextOpenMark - 1)) {
completed.set(true);
break;
}
@@ -220,6 +230,7 @@ public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>> implements
});
}
+
@Override
public Range<T> firstRange() {
if (rangeBitSetMap.isEmpty()) {
@@ -269,10 +280,13 @@ public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>> implements
public int size() {
if (updatedAfterCachedForSize) {
MutableInt size = new MutableInt(0);
- forEach((range) -> {
+
+ // ignore result because we just want to count
+ forEachRawRange((lowerKey, lowerValue, upperKey, upperValue) -> {
size.increment();
return true;
});
+
cachedSize = size.intValue();
updatedAfterCachedForSize = false;
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
index 70a589cf551..8aad5587dfd 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
@@ -104,6 +104,17 @@ public interface LongPairRangeSet<T extends Comparable<T>> {
*/
void forEach(RangeProcessor<T> action, LongPairConsumer<? extends T> consumer);
+ /**
+ * Performs the given action for each entry in this map until all entries have been processed
+ * or action returns "false". Unless otherwise specified by the implementing class,
+ * actions are performed in the order of entry set iteration (if an iteration order is specified.)
+ *
+ * This method is optimized on reducing intermediate object creation.
+ * {@param action} to do iteration jobs.
+ *
+ */
+ void forEachRawRange(RawRangeProcessor action);
+
/**
* Returns total number of ranges into the set.
*
@@ -135,15 +146,24 @@ public interface LongPairRangeSet<T extends Comparable<T>> {
*
* @param <T> the type of the result.
*/
- public interface LongPairConsumer<T> {
+ interface LongPairConsumer<T> {
T apply(long key, long value);
}
+ /**
+ * Represents a function that accepts result and produces a LongPair.
+ * Reverse ops of `LongPairConsumer<T>`
+ * @param <T> the type of the result.
+ */
+ interface RangeBoundConsumer<T> {
+ LongPair apply(T bound);
+ }
+
/**
* The interface exposing a method for processing of ranges.
* @param <T> - The incoming type of data in the range object.
*/
- public interface RangeProcessor<T extends Comparable<T>> {
+ interface RangeProcessor<T extends Comparable<T>> {
/**
*
* @param range
@@ -152,6 +172,19 @@ public interface LongPairRangeSet<T extends Comparable<T>> {
boolean process(Range<T> range);
}
+ /**
+ * The interface exposing a method for processing raw form of ranges.
+ * This method will omit the process to convert (long, long) to `T`
+ * create less object during the iteration.
+ * the parameter is the same as {@linkplain RangeProcessor} which
+ * means (lowerKey,lowerValue) in open bound
+ * (upperKey, upperValue) in close bound in Range
+ */
+ interface RawRangeProcessor {
+ boolean processRawRange(long lowerKey, long lowerValue,
+ long upperKey, long upperValue);
+ }
+
/**
* This class is a simple key-value data structure.
*/
@@ -208,9 +241,11 @@ public interface LongPairRangeSet<T extends Comparable<T>> {
RangeSet<T> set = TreeRangeSet.create();
private final LongPairConsumer<T> consumer;
+ private final RangeBoundConsumer<T> rangeEndPointConsumer;
- public DefaultRangeSet(LongPairConsumer<T> consumer) {
+ public DefaultRangeSet(LongPairConsumer<T> consumer, RangeBoundConsumer<T> reverseConsumer) {
this.consumer = consumer;
+ this.rangeEndPointConsumer = reverseConsumer;
}
@Override
@@ -270,7 +305,7 @@ public interface LongPairRangeSet<T extends Comparable<T>> {
}
@Override
- public void forEach(RangeProcessor<T> action, LongPairConsumer<? extends T> consumer) {
+ public void forEach(RangeProcessor<T> action, LongPairConsumer<? extends T> outerConsumer) {
for (Range<T> range : asRanges()) {
if (!action.process(range)) {
break;
@@ -278,6 +313,19 @@ public interface LongPairRangeSet<T extends Comparable<T>> {
}
}
+ @Override
+ public void forEachRawRange(RawRangeProcessor action) {
+ for (Range<T> range : asRanges()) {
+ LongPair lowerEndpoint = this.rangeEndPointConsumer.apply(range.lowerEndpoint());
+ LongPair upperEndpoint = this.rangeEndPointConsumer.apply(range.upperEndpoint());
+ if (!action.processRawRange(lowerEndpoint.key, lowerEndpoint.value,
+ upperEndpoint.key, upperEndpoint.value)) {
+ break;
+ }
+ }
+ }
+
+
@Override
public boolean contains(long key, long value) {
return this.contains(consumer.apply(key, value));
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java
index 3037a9deba3..40bb3379357 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java
@@ -27,7 +27,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Set;
+import org.apache.commons.lang.mutable.MutableInt;
import org.apache.pulsar.common.util.collections.LongPairRangeSet.LongPair;
+import org.apache.pulsar.common.util.collections.LongPairRangeSet.RangeBoundConsumer;
import org.apache.pulsar.common.util.collections.LongPairRangeSet.LongPairConsumer;
import org.testng.annotations.Test;
@@ -37,7 +39,8 @@ import com.google.common.collect.TreeRangeSet;
public class ConcurrentOpenLongPairRangeSetTest {
- static final LongPairConsumer<LongPair> consumer = (key, value) -> new LongPair(key, value);
+ static final LongPairConsumer<LongPair> consumer = LongPair::new;
+ static final RangeBoundConsumer<LongPair> reverseConsumer = pair -> pair;
@Test
public void testIsEmpty() {
@@ -480,4 +483,56 @@ public class ConcurrentOpenLongPairRangeSetTest {
v = set.cardinality(1, 0, 3, 30);
assertEquals(v, 80 + 31);
}
+
+ @Test
+ public void testForEachResultTheSameAsForEachWithRangeBoundMapper() {
+ ConcurrentOpenLongPairRangeSet<LongPair> set =
+ new ConcurrentOpenLongPairRangeSet<>(consumer);
+
+ LongPairRangeSet.DefaultRangeSet<LongPair> defaultRangeSet =
+ new LongPairRangeSet.DefaultRangeSet<>(consumer, reverseConsumer);
+
+ set.addOpenClosed(1, 10, 1, 15);
+ set.addOpenClosed(2, 25, 2, 28);
+ set.addOpenClosed(3, 12, 3, 20);
+ set.addOpenClosed(4, 12, 4, 20);
+
+ defaultRangeSet.addOpenClosed(1, 10, 1, 15);
+ defaultRangeSet.addOpenClosed(2, 25, 2, 28);
+ defaultRangeSet.addOpenClosed(3, 12, 3, 20);
+ defaultRangeSet.addOpenClosed(4, 12, 4, 20);
+
+
+ MutableInt size = new MutableInt(0);
+
+ List<LongPair> forEachIterResult = new ArrayList<>();
+ set.forEach((range) -> {
+ forEachIterResult.add(range.lowerEndpoint());
+ forEachIterResult.add(range.upperEndpoint());
+
+ size.increment();
+ return true;
+ });
+
+ List<LongPair> defaultRangeSetResult = new ArrayList<>();
+ List<LongPair> forEachRawRangeResult = new ArrayList<>();
+
+ defaultRangeSet.forEachRawRange((lowerKey, lowerValue, upperKey, upperValue) -> {
+ defaultRangeSetResult.add(new LongPair(lowerKey, lowerValue));
+ defaultRangeSetResult.add(new LongPair(upperKey, upperValue));
+ return true;
+ });
+
+ set.forEachRawRange((lowerKey, lowerValue, upperKey, upperValue) -> {
+ forEachRawRangeResult.add(new LongPair(lowerKey, lowerValue));
+ forEachRawRangeResult.add(new LongPair(upperKey, upperValue));
+ return true;
+ });
+
+ assertEquals(forEachIterResult, forEachRawRangeResult);
+ assertEquals(forEachIterResult, defaultRangeSetResult);
+
+ assertEquals(size.intValue(), set.size());
+ assertEquals(size.intValue(), defaultRangeSet.size());
+ }
}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/DefaultRangeSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/DefaultRangeSetTest.java
index 57303013a11..f6103061a42 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/DefaultRangeSetTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/DefaultRangeSetTest.java
@@ -27,11 +27,13 @@ import static org.testng.AssertJUnit.fail;
public class DefaultRangeSetTest {
static final LongPairRangeSet.LongPairConsumer<LongPairRangeSet.LongPair> consumer =
LongPairRangeSet.LongPair::new;
+ static final LongPairRangeSet.RangeBoundConsumer<LongPairRangeSet.LongPair> reverseConsumer =
+ pair -> pair;
@Test
public void testBehavior() {
LongPairRangeSet.DefaultRangeSet<LongPairRangeSet.LongPair> set =
- new LongPairRangeSet.DefaultRangeSet<>(consumer);
+ new LongPairRangeSet.DefaultRangeSet<>(consumer, reverseConsumer);
ConcurrentOpenLongPairRangeSet<LongPairRangeSet.LongPair> rangeSet =
new ConcurrentOpenLongPairRangeSet<>(consumer);