You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/12/07 07:44:32 UTC

[pulsar] branch master updated: [improve][ml] Reduce object allocation when iterate on LongPairRangeSet (#18357)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 12b864ef143 [improve][ml] Reduce object allocation when iterate on LongPairRangeSet (#18357)
12b864ef143 is described below

commit 12b864ef14311fdba5d0b40a8a4f005b262e48c0
Author: lifepuzzlefun <wj...@163.com>
AuthorDate: Wed Dec 7 15:44:25 2022 +0800

    [improve][ml] Reduce object allocation when iterate on LongPairRangeSet (#18357)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 42 +++++++++++-----
 .../bookkeeper/mledger/impl/RangeSetWrapper.java   | 14 ++++--
 .../mledger/impl/RangeSetWrapperTest.java          | 37 ++++++++------
 .../ConcurrentOpenLongPairRangeSet.java            | 24 +++++++--
 .../common/util/collections/LongPairRangeSet.java  | 56 +++++++++++++++++++--
 .../ConcurrentOpenLongPairRangeSetTest.java        | 57 +++++++++++++++++++++-
 .../util/collections/DefaultRangeSetTest.java      |  4 +-
 7 files changed, 192 insertions(+), 42 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 5755f528c6a..e50e1245b8b 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -99,6 +99,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
 import org.apache.pulsar.common.util.collections.LongPairRangeSet;
 import org.apache.pulsar.common.util.collections.LongPairRangeSet.LongPairConsumer;
+import org.apache.pulsar.common.util.collections.LongPairRangeSet.RangeBoundConsumer;
 import org.apache.pulsar.metadata.api.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -181,6 +182,10 @@ public class ManagedCursorImpl implements ManagedCursor {
     private volatile ManagedCursorInfo managedCursorInfo;
 
     private static final LongPairConsumer<PositionImpl> positionRangeConverter = PositionImpl::new;
+
+    private static final RangeBoundConsumer<PositionImpl> positionRangeReverseConverter =
+            (position) -> new LongPairRangeSet.LongPair(position.ledgerId, position.entryId);
+
     private static final LongPairConsumer<PositionImplRecyclable> recyclePositionRangeConverter = (key, value) -> {
         PositionImplRecyclable position = PositionImplRecyclable.create();
         position.ledgerId = key;
@@ -294,7 +299,8 @@ public class ManagedCursorImpl implements ManagedCursor {
         this.config = config;
         this.ledger = ledger;
         this.name = cursorName;
-        this.individualDeletedMessages = new RangeSetWrapper<>(positionRangeConverter, this);
+        this.individualDeletedMessages = new RangeSetWrapper<>(positionRangeConverter,
+                positionRangeReverseConverter, this);
         if (config.isDeletionAtBatchIndexLevelEnabled()) {
             this.batchDeletedIndexes = new ConcurrentSkipListMap<>();
         } else {
@@ -2858,23 +2864,35 @@ public class ManagedCursorImpl implements ManagedCursor {
 
             MLDataFormats.NestedPositionInfo.Builder nestedPositionBuilder = MLDataFormats.NestedPositionInfo
                     .newBuilder();
-            MLDataFormats.MessageRange.Builder messageRangeBuilder = MLDataFormats.MessageRange.newBuilder();
+
+            MLDataFormats.MessageRange.Builder messageRangeBuilder = MLDataFormats.MessageRange
+                    .newBuilder();
+
             AtomicInteger acksSerializedSize = new AtomicInteger(0);
             List<MessageRange> rangeList = new ArrayList<>();
-            individualDeletedMessages.forEach((positionRange) -> {
-                PositionImpl p = positionRange.lowerEndpoint();
-                nestedPositionBuilder.setLedgerId(p.getLedgerId());
-                nestedPositionBuilder.setEntryId(p.getEntryId());
-                messageRangeBuilder.setLowerEndpoint(nestedPositionBuilder.build());
-                p = positionRange.upperEndpoint();
-                nestedPositionBuilder.setLedgerId(p.getLedgerId());
-                nestedPositionBuilder.setEntryId(p.getEntryId());
-                messageRangeBuilder.setUpperEndpoint(nestedPositionBuilder.build());
-                MessageRange messageRange = messageRangeBuilder.build();
+
+            individualDeletedMessages.forEachRawRange((lowerKey, lowerValue, upperKey, upperValue) -> {
+                MLDataFormats.NestedPositionInfo lowerPosition = nestedPositionBuilder
+                        .setLedgerId(lowerKey)
+                        .setEntryId(lowerValue)
+                        .build();
+
+                MLDataFormats.NestedPositionInfo upperPosition = nestedPositionBuilder
+                        .setLedgerId(upperKey)
+                        .setEntryId(upperValue)
+                        .build();
+
+                MessageRange messageRange = messageRangeBuilder
+                        .setLowerEndpoint(lowerPosition)
+                        .setUpperEndpoint(upperPosition)
+                        .build();
+
                 acksSerializedSize.addAndGet(messageRange.getSerializedSize());
                 rangeList.add(messageRange);
+
                 return rangeList.size() <= config.getMaxUnackedRangesToPersist();
             });
+
             this.individualDeletedMessagesSerializedSize = acksSerializedSize.get();
             individualDeletedMessages.resetDirtyKeys();
             return rangeList;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java
index 274c625e745..02e43504482 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java
@@ -45,15 +45,18 @@ public class RangeSetWrapper<T extends Comparable<T>> implements LongPairRangeSe
      * Record which Ledger is dirty.
      */
     private final DefaultRangeSet<Long> dirtyLedgers = new LongPairRangeSet.DefaultRangeSet<>(
-            (LongPairConsumer<Long>) (key, value) -> key);
+            (LongPairConsumer<Long>) (key, value) -> key,
+            (RangeBoundConsumer<Long>) key -> new LongPair(key, 0));
 
-    public RangeSetWrapper(LongPairConsumer<T> rangeConverter, ManagedCursorImpl managedCursor) {
+    public RangeSetWrapper(LongPairConsumer<T> rangeConverter,
+                           RangeBoundConsumer<T> rangeBoundConsumer,
+                           ManagedCursorImpl managedCursor) {
         requireNonNull(managedCursor);
         this.config = managedCursor.getConfig();
         this.rangeConverter = rangeConverter;
         this.rangeSet = config.isUnackedRangesOpenCacheSetEnabled()
                 ? new ConcurrentOpenLongPairRangeSet<>(4096, rangeConverter)
-                : new LongPairRangeSet.DefaultRangeSet<>(rangeConverter);
+                : new LongPairRangeSet.DefaultRangeSet<>(rangeConverter, rangeBoundConsumer);
         this.enableMultiEntry = config.isPersistentUnackedRangesWithMultipleEntriesEnabled();
     }
 
@@ -118,6 +121,11 @@ public class RangeSetWrapper<T extends Comparable<T>> implements LongPairRangeSe
         rangeSet.forEach(action, consumer);
     }
 
+    @Override
+    public void forEachRawRange(RawRangeProcessor action) {
+        rangeSet.forEachRawRange(action);
+    }
+
     @Override
     public int size() {
         return rangeSet.size();
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapperTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapperTest.java
index c374b034054..89fbc26d41a 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapperTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapperTest.java
@@ -31,6 +31,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.pulsar.common.util.collections.LongPairRangeSet.RangeBoundConsumer;
 import org.apache.pulsar.common.util.collections.LongPairRangeSet.LongPair;
 import org.apache.pulsar.common.util.collections.LongPairRangeSet.LongPairConsumer;
 import org.testng.annotations.AfterMethod;
@@ -40,6 +41,8 @@ import org.testng.annotations.Test;
 public class RangeSetWrapperTest {
 
     static final LongPairConsumer<LongPair> consumer = (key, value) -> new LongPair(key, value);
+    static final RangeBoundConsumer<LongPair> reverseConvert = (pair) -> pair;
+    
     ManagedLedgerImpl managedLedger;
     RangeSetWrapper<LongPair> set;
     ManagedLedgerConfig managedLedgerConfig;
@@ -67,7 +70,9 @@ public class RangeSetWrapperTest {
 
     @Test
     public void testDirtyLedger() {
-        RangeSetWrapper<LongPair> rangeSetWrapper = new RangeSetWrapper<>(consumer, managedCursor);
+        RangeSetWrapper<LongPair> rangeSetWrapper = new RangeSetWrapper<>(consumer,
+                reverseConvert,
+                managedCursor);
         // Test add range
         rangeSetWrapper.addOpenClosed(10, 0, 20, 0);
         assertEquals(rangeSetWrapper.size(), 1);
@@ -93,7 +98,7 @@ public class RangeSetWrapperTest {
     }
 
     private void doTestAddForSameKey() {
-        set = new RangeSetWrapper(consumer, managedCursor);
+        set = new RangeSetWrapper(consumer, reverseConvert, managedCursor);
         // add 0 to 5
         set.addOpenClosed(0, 0, 0, 5);
         // add 8,9,10
@@ -113,7 +118,7 @@ public class RangeSetWrapperTest {
 
     @Test
     public void testAddForDifferentKey() {
-        set = new RangeSetWrapper<>(consumer, managedCursor);
+        set = new RangeSetWrapper<>(consumer, reverseConvert, managedCursor);
         // [98,100],[(1,5),(1,5)],[(1,10,1,15)],[(1,20),(1,20)],[(2,0),(2,10)]
         set.addOpenClosed(0, 98, 0, 99);
         set.addOpenClosed(0, 100, 1, 5);
@@ -131,7 +136,7 @@ public class RangeSetWrapperTest {
     @Test
     public void testAddForDifferentKey2() {
         managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(false);
-        set = new RangeSetWrapper<>(consumer, managedCursor);
+        set = new RangeSetWrapper<>(consumer, reverseConvert, managedCursor);
         // [98,100],[(1,5),(1,5)],[(1,10,1,15)],[(1,20),(1,20)],[(2,0),(2,10)]
         set.addOpenClosed(0, 98, 0, 99);
         set.addOpenClosed(0, 100, 1, 5);
@@ -148,7 +153,7 @@ public class RangeSetWrapperTest {
 
     @Test
     public void testAddCompareCompareWithGuava() {
-        set = new RangeSetWrapper<>(consumer, managedCursor);
+        set = new RangeSetWrapper<>(consumer, reverseConvert, managedCursor);
         com.google.common.collect.RangeSet<LongPair> gSet = TreeRangeSet.create();
 
         // add 10K values for key 0
@@ -187,7 +192,7 @@ public class RangeSetWrapperTest {
 
     @Test
     public void testDeleteCompareWithGuava() throws Exception {
-        RangeSetWrapper<LongPair> set = new RangeSetWrapper<>(consumer, managedCursor);
+        RangeSetWrapper<LongPair> set = new RangeSetWrapper<>(consumer, reverseConvert, managedCursor);
         com.google.common.collect.RangeSet<LongPair> gSet = TreeRangeSet.create();
 
         // add 10K values for key 0
@@ -241,7 +246,7 @@ public class RangeSetWrapperTest {
 
     @Test
     public void testSpanWithGuava() {
-        set = new RangeSetWrapper<>(consumer, managedCursor);
+        set = new RangeSetWrapper<>(consumer, reverseConvert, managedCursor);
         com.google.common.collect.RangeSet<LongPair> gSet = TreeRangeSet.create();
         set.addOpenClosed(0, 97, 0, 99);
         gSet.add(Range.openClosed(new LongPair(0, 97), new LongPair(0, 99)));
@@ -266,7 +271,7 @@ public class RangeSetWrapperTest {
 
     @Test
     public void testFirstRange() {
-        set = new RangeSetWrapper<>(consumer, managedCursor);
+        set = new RangeSetWrapper<>(consumer, reverseConvert, managedCursor);
         assertNull(set.firstRange());
         set.addOpenClosed(0, 97, 0, 99);
         assertEquals(set.firstRange(), Range.openClosed(new LongPair(0, 97), new LongPair(0, 99)));
@@ -281,7 +286,7 @@ public class RangeSetWrapperTest {
 
     @Test
     public void testLastRange() {
-        set = new RangeSetWrapper<>(consumer, managedCursor);
+        set = new RangeSetWrapper<>(consumer, reverseConvert, managedCursor);
         assertNull(set.lastRange());
         Range<LongPair> range = Range.openClosed(new LongPair(0, 97), new LongPair(0, 99));
         set.addOpenClosed(0, 97, 0, 99);
@@ -302,7 +307,7 @@ public class RangeSetWrapperTest {
 
     @Test
     public void testToString() {
-        set = new RangeSetWrapper<>(consumer, managedCursor);
+        set = new RangeSetWrapper<>(consumer, reverseConvert, managedCursor);
         set.addOpenClosed(0, 97, 0, 99);
         assertEquals(set.toString(), "[(0:97..0:99]]");
         set.addOpenClosed(0, 98, 0, 105);
@@ -313,7 +318,7 @@ public class RangeSetWrapperTest {
 
     @Test
     public void testDeleteForDifferentKey() {
-        set = new RangeSetWrapper<>(consumer, managedCursor);
+        set = new RangeSetWrapper<>(consumer, reverseConvert, managedCursor);
         set.addOpenClosed(0, 97, 0, 99);
         set.addOpenClosed(0, 99, 1, 5);
         set.addOpenClosed(1, 9, 1, 15);
@@ -344,7 +349,7 @@ public class RangeSetWrapperTest {
 
     @Test
     public void testDeleteWithAtMost() {
-        set = new RangeSetWrapper<>(consumer, managedCursor);
+        set = new RangeSetWrapper<>(consumer, reverseConvert, managedCursor);
         set.addOpenClosed(0, 98, 0, 99);
         set.addOpenClosed(0, 100, 1, 5);
         set.addOpenClosed(1, 10, 1, 15);
@@ -370,7 +375,7 @@ public class RangeSetWrapperTest {
 
     @Test
     public void testDeleteWithAtMost2() {
-        set = new RangeSetWrapper<>(consumer, managedCursor);
+        set = new RangeSetWrapper<>(consumer, reverseConvert, managedCursor);
         set.addOpenClosed(0, 98, 0, 99);
         set.addOpenClosed(0, 100, 1, 5);
         set.addOpenClosed(1, 10, 1, 15);
@@ -390,7 +395,7 @@ public class RangeSetWrapperTest {
         assertEquals(ranges.get(count), (Range.openClosed(new LongPair(2, 25), new LongPair(2, 28))));
 
         managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(false);
-        set = new RangeSetWrapper<>(consumer, managedCursor);
+        set = new RangeSetWrapper<>(consumer, reverseConvert, managedCursor);
         set.addOpenClosed(0, 98, 0, 99);
         set.addOpenClosed(0, 100, 1, 5);
         set.addOpenClosed(1, 10, 1, 15);
@@ -411,7 +416,7 @@ public class RangeSetWrapperTest {
 
     @Test
     public void testDeleteWithLeastMost() {
-        set = new RangeSetWrapper<>(consumer, managedCursor);
+        set = new RangeSetWrapper<>(consumer, reverseConvert, managedCursor);
         set.addOpenClosed(0, 98, 0, 99);
         set.addOpenClosed(0, 100, 1, 5);
         set.addOpenClosed(1, 10, 1, 15);
@@ -439,7 +444,7 @@ public class RangeSetWrapperTest {
 
     @Test
     public void testRangeContaining() {
-        set = new RangeSetWrapper<>(consumer, managedCursor);
+        set = new RangeSetWrapper<>(consumer, reverseConvert, managedCursor);
         set.add(Range.closed(new LongPair(0, 98), new LongPair(0, 99)));
         set.add(Range.closed(new LongPair(0, 100), new LongPair(1, 5)));
         com.google.common.collect.RangeSet<LongPair> gSet = TreeRangeSet.create();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
index 6faa61d3b37..72215d7296c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
@@ -195,7 +195,18 @@ public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>> implements
     }
 
     @Override
-    public void forEach(RangeProcessor<T> action, LongPairConsumer<? extends T> consumer) {
+    public void forEach(RangeProcessor<T> action, LongPairConsumer<? extends T> consumerParam) {
+        forEachRawRange((lowerKey, lowerValue, upperKey, upperValue) -> {
+            Range<T> range = Range.openClosed(
+                    consumerParam.apply(lowerKey, lowerValue),
+                    consumerParam.apply(upperKey, upperValue)
+            );
+            return action.process(range);
+        });
+    }
+
+    @Override
+    public void forEachRawRange(RawRangeProcessor processor) {
         AtomicBoolean completed = new AtomicBoolean(false);
         rangeBitSetMap.forEach((key, set) -> {
             if (completed.get()) {
@@ -209,9 +220,8 @@ public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>> implements
             int currentClosedMark = first;
             while (currentClosedMark != -1 && currentClosedMark <= last) {
                 int nextOpenMark = set.nextClearBit(currentClosedMark);
-                Range<T> range = Range.openClosed(consumer.apply(key, currentClosedMark - 1),
-                        consumer.apply(key, nextOpenMark - 1));
-                if (!action.process(range)) {
+                if (!processor.processRawRange(key, currentClosedMark - 1,
+                        key, nextOpenMark - 1)) {
                     completed.set(true);
                     break;
                 }
@@ -220,6 +230,7 @@ public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>> implements
         });
     }
 
+
     @Override
     public Range<T> firstRange() {
         if (rangeBitSetMap.isEmpty()) {
@@ -269,10 +280,13 @@ public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>> implements
     public int size() {
         if (updatedAfterCachedForSize) {
             MutableInt size = new MutableInt(0);
-            forEach((range) -> {
+
+            // ignore result because we just want to count
+            forEachRawRange((lowerKey, lowerValue, upperKey, upperValue) -> {
                 size.increment();
                 return true;
             });
+
             cachedSize = size.intValue();
             updatedAfterCachedForSize = false;
         }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
index 70a589cf551..8aad5587dfd 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
@@ -104,6 +104,17 @@ public interface LongPairRangeSet<T extends Comparable<T>> {
      */
     void forEach(RangeProcessor<T> action, LongPairConsumer<? extends T> consumer);
 
+    /**
+     * Performs the given action for each entry in this map until all entries have been processed
+     * or action returns "false". Unless otherwise specified by the implementing class,
+     * actions are performed in the order of entry set iteration (if an iteration order is specified.)
+     *
+     * This method is optimized on reducing intermediate object creation.
+     * {@param action} to do iteration jobs.
+     *
+     */
+    void forEachRawRange(RawRangeProcessor action);
+
     /**
      * Returns total number of ranges into the set.
      *
@@ -135,15 +146,24 @@ public interface LongPairRangeSet<T extends Comparable<T>> {
      *
      * @param <T> the type of the result.
      */
-    public interface LongPairConsumer<T> {
+    interface LongPairConsumer<T> {
         T apply(long key, long value);
     }
 
+    /**
+     * Represents a function that accepts result and produces a LongPair.
+     * Reverse ops of `LongPairConsumer<T>`
+     * @param <T> the type of the result.
+     */
+    interface RangeBoundConsumer<T> {
+        LongPair apply(T bound);
+    }
+
     /**
      * The interface exposing a method for processing of ranges.
      * @param <T> - The incoming type of data in the range object.
      */
-    public interface RangeProcessor<T extends Comparable<T>> {
+    interface RangeProcessor<T extends Comparable<T>> {
         /**
          *
          * @param range
@@ -152,6 +172,19 @@ public interface LongPairRangeSet<T extends Comparable<T>> {
         boolean process(Range<T> range);
     }
 
+    /**
+     * The interface exposing a method for processing raw form of ranges.
+     * This method will omit the process to convert (long, long) to `T`
+     * create less object during the iteration.
+     * the parameter is the same as {@linkplain RangeProcessor} which
+     * means (lowerKey,lowerValue) in open bound
+     * (upperKey, upperValue) in close bound in Range
+     */
+    interface RawRangeProcessor {
+        boolean processRawRange(long lowerKey, long lowerValue,
+                                long upperKey, long upperValue);
+    }
+
     /**
      * This class is a simple key-value data structure.
      */
@@ -208,9 +241,11 @@ public interface LongPairRangeSet<T extends Comparable<T>> {
         RangeSet<T> set = TreeRangeSet.create();
 
         private final LongPairConsumer<T> consumer;
+        private final RangeBoundConsumer<T> rangeEndPointConsumer;
 
-        public DefaultRangeSet(LongPairConsumer<T> consumer) {
+        public DefaultRangeSet(LongPairConsumer<T> consumer, RangeBoundConsumer<T> reverseConsumer) {
             this.consumer = consumer;
+            this.rangeEndPointConsumer = reverseConsumer;
         }
 
         @Override
@@ -270,7 +305,7 @@ public interface LongPairRangeSet<T extends Comparable<T>> {
         }
 
         @Override
-        public void forEach(RangeProcessor<T> action, LongPairConsumer<? extends T> consumer) {
+        public void forEach(RangeProcessor<T> action, LongPairConsumer<? extends T> outerConsumer) {
             for (Range<T> range : asRanges()) {
                 if (!action.process(range)) {
                     break;
@@ -278,6 +313,19 @@ public interface LongPairRangeSet<T extends Comparable<T>> {
             }
         }
 
+        @Override
+        public void forEachRawRange(RawRangeProcessor action) {
+            for (Range<T> range : asRanges()) {
+                LongPair lowerEndpoint = this.rangeEndPointConsumer.apply(range.lowerEndpoint());
+                LongPair upperEndpoint = this.rangeEndPointConsumer.apply(range.upperEndpoint());
+                if (!action.processRawRange(lowerEndpoint.key, lowerEndpoint.value,
+                        upperEndpoint.key, upperEndpoint.value)) {
+                    break;
+                }
+            }
+        }
+
+
         @Override
         public boolean contains(long key, long value) {
             return this.contains(consumer.apply(key, value));
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java
index 3037a9deba3..40bb3379357 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java
@@ -27,7 +27,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.pulsar.common.util.collections.LongPairRangeSet.LongPair;
+import org.apache.pulsar.common.util.collections.LongPairRangeSet.RangeBoundConsumer;
 import org.apache.pulsar.common.util.collections.LongPairRangeSet.LongPairConsumer;
 import org.testng.annotations.Test;
 
@@ -37,7 +39,8 @@ import com.google.common.collect.TreeRangeSet;
 
 public class ConcurrentOpenLongPairRangeSetTest {
 
-    static final LongPairConsumer<LongPair> consumer = (key, value) -> new LongPair(key, value);
+    static final LongPairConsumer<LongPair> consumer = LongPair::new;
+    static final RangeBoundConsumer<LongPair> reverseConsumer = pair -> pair;
 
     @Test
     public void testIsEmpty() {
@@ -480,4 +483,56 @@ public class ConcurrentOpenLongPairRangeSetTest {
         v = set.cardinality(1, 0, 3, 30);
         assertEquals(v, 80 + 31);
     }
+
+    @Test
+    public void testForEachResultTheSameAsForEachWithRangeBoundMapper() {
+        ConcurrentOpenLongPairRangeSet<LongPair> set =
+                new ConcurrentOpenLongPairRangeSet<>(consumer);
+
+        LongPairRangeSet.DefaultRangeSet<LongPair> defaultRangeSet =
+                new LongPairRangeSet.DefaultRangeSet<>(consumer, reverseConsumer);
+
+        set.addOpenClosed(1, 10, 1, 15);
+        set.addOpenClosed(2, 25, 2, 28);
+        set.addOpenClosed(3, 12, 3, 20);
+        set.addOpenClosed(4, 12, 4, 20);
+
+        defaultRangeSet.addOpenClosed(1, 10, 1, 15);
+        defaultRangeSet.addOpenClosed(2, 25, 2, 28);
+        defaultRangeSet.addOpenClosed(3, 12, 3, 20);
+        defaultRangeSet.addOpenClosed(4, 12, 4, 20);
+
+
+        MutableInt size = new MutableInt(0);
+
+        List<LongPair> forEachIterResult = new ArrayList<>();
+        set.forEach((range) -> {
+            forEachIterResult.add(range.lowerEndpoint());
+            forEachIterResult.add(range.upperEndpoint());
+
+            size.increment();
+            return true;
+        });
+
+        List<LongPair> defaultRangeSetResult = new ArrayList<>();
+        List<LongPair> forEachRawRangeResult = new ArrayList<>();
+
+        defaultRangeSet.forEachRawRange((lowerKey, lowerValue, upperKey, upperValue) -> {
+            defaultRangeSetResult.add(new LongPair(lowerKey, lowerValue));
+            defaultRangeSetResult.add(new LongPair(upperKey, upperValue));
+            return true;
+        });
+        
+        set.forEachRawRange((lowerKey, lowerValue, upperKey, upperValue) -> {
+            forEachRawRangeResult.add(new LongPair(lowerKey, lowerValue));
+            forEachRawRangeResult.add(new LongPair(upperKey, upperValue));
+            return true;
+        });
+
+        assertEquals(forEachIterResult, forEachRawRangeResult);
+        assertEquals(forEachIterResult, defaultRangeSetResult);
+
+        assertEquals(size.intValue(), set.size());
+        assertEquals(size.intValue(), defaultRangeSet.size());
+    }
 }
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/DefaultRangeSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/DefaultRangeSetTest.java
index 57303013a11..f6103061a42 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/DefaultRangeSetTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/DefaultRangeSetTest.java
@@ -27,11 +27,13 @@ import static org.testng.AssertJUnit.fail;
 public class DefaultRangeSetTest {
     static final LongPairRangeSet.LongPairConsumer<LongPairRangeSet.LongPair> consumer =
             LongPairRangeSet.LongPair::new;
+    static final LongPairRangeSet.RangeBoundConsumer<LongPairRangeSet.LongPair> reverseConsumer =
+            pair -> pair;
 
     @Test
     public void testBehavior() {
         LongPairRangeSet.DefaultRangeSet<LongPairRangeSet.LongPair> set =
-                new LongPairRangeSet.DefaultRangeSet<>(consumer);
+                new LongPairRangeSet.DefaultRangeSet<>(consumer, reverseConsumer);
         ConcurrentOpenLongPairRangeSet<LongPairRangeSet.LongPair> rangeSet =
                 new ConcurrentOpenLongPairRangeSet<>(consumer);