You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2019/05/18 06:56:26 UTC

[pulsar] branch master updated: [pulsar-common] add open Concurrent LongPair RangeSet (#3818)

This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ce685dc  [pulsar-common] add open Concurrent LongPair RangeSet (#3818)
ce685dc is described below

commit ce685dc3b359674941f02f81185651645012e80a
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Fri May 17 23:56:22 2019 -0700

    [pulsar-common] add open Concurrent LongPair RangeSet (#3818)
    
    * [pulsar-common] add open Concurrent LongPair RangeSet
    
    * add open-range set methods
    
    * add forEach
    
    * add forEach with consumer
    
    * Fix stamp-lock usage
    
    * Move ConcurrentBitSet to separate class
---
 .../common/util/collections/ConcurrentBitSet.java  | 160 +++++++++
 .../ConcurrentOpenLongPairRangeSet.java            | 370 ++++++++++++++++++++
 .../common/util/collections/LongPairRangeSet.java  | 254 ++++++++++++++
 .../ConcurrentOpenLongPairRangeSetTest.java        | 384 +++++++++++++++++++++
 4 files changed, 1168 insertions(+)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
new file mode 100644
index 0000000..739ad1e
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import java.util.BitSet;
+import java.util.concurrent.locks.StampedLock;
+
+public class ConcurrentBitSet extends BitSet {
+
+    private static final long serialVersionUID = 1L;
+    private final StampedLock rwLock = new StampedLock();
+
+    /**
+     * Creates a bit set whose initial size is large enough to explicitly represent bits with indices in the range
+     * {@code 0} through {@code nbits-1}. All bits are initially {@code false}.
+     *
+     * @param nbits
+     *            the initial size of the bit set
+     * @throws NegativeArraySizeException
+     *             if the specified initial size is negative
+     */
+    public ConcurrentBitSet(int nbits) {
+        super(nbits);
+    }
+
+    @Override
+    public boolean get(int bitIndex) {
+        long stamp = rwLock.tryOptimisticRead();
+        boolean isSet = super.get(bitIndex);
+        if (!rwLock.validate(stamp)) {
+            stamp = rwLock.readLock();
+            try {
+                isSet = super.get(bitIndex);
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return isSet;
+    }
+
+    @Override
+    public void set(int bitIndex) {
+        long stamp = rwLock.tryOptimisticRead();
+        super.set(bitIndex);
+        if (!rwLock.validate(stamp)) {
+            stamp = rwLock.readLock();
+            try {
+                super.set(bitIndex);
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+    }
+
+    @Override
+    public void set(int fromIndex, int toIndex) {
+        long stamp = rwLock.tryOptimisticRead();
+        super.set(fromIndex, toIndex);
+        if (!rwLock.validate(stamp)) {
+            stamp = rwLock.readLock();
+            try {
+                super.set(fromIndex, toIndex);
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+    }
+
+    @Override
+    public int nextSetBit(int fromIndex) {
+        long stamp = rwLock.tryOptimisticRead();
+        int bit = super.nextSetBit(fromIndex);
+        if (!rwLock.validate(stamp)) {
+            stamp = rwLock.readLock();
+            try {
+                bit = super.nextSetBit(fromIndex);
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return bit;
+    }
+
+    @Override
+    public int nextClearBit(int fromIndex) {
+        long stamp = rwLock.tryOptimisticRead();
+        int bit = super.nextClearBit(fromIndex);
+        if (!rwLock.validate(stamp)) {
+            stamp = rwLock.readLock();
+            try {
+                bit = super.nextClearBit(fromIndex);
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return bit;
+    }
+
+    @Override
+    public int previousSetBit(int fromIndex) {
+        long stamp = rwLock.tryOptimisticRead();
+        int bit = super.previousSetBit(fromIndex);
+        if (!rwLock.validate(stamp)) {
+            stamp = rwLock.readLock();
+            try {
+                bit = super.previousSetBit(fromIndex);
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return bit;
+    }
+
+    @Override
+    public int previousClearBit(int fromIndex) {
+        long stamp = rwLock.tryOptimisticRead();
+        int bit = super.previousClearBit(fromIndex);
+        if (!rwLock.validate(stamp)) {
+            stamp = rwLock.readLock();
+            try {
+                bit = super.previousClearBit(fromIndex);
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return bit;
+    }
+
+    @Override
+    public boolean isEmpty() {
+        long stamp = rwLock.tryOptimisticRead();
+        boolean isEmpty = super.isEmpty();
+        if (!rwLock.validate(stamp)) {
+            // Fallback to read lock
+            stamp = rwLock.readLock();
+            try {
+                isEmpty = super.isEmpty();
+            } finally {
+                rwLock.unlockRead(stamp);
+            }
+        }
+        return isEmpty;
+    }
+}
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
new file mode 100644
index 0000000..a88b8bf
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.BoundType;
+import com.google.common.collect.Range;
+
+/**
+ * A Concurrent set comprising zero or more ranges of type {@link LongPair}. This can be alternative of
+ * {@link com.google.common.collect.RangeSet} and can be used if {@code range} type is {@link LongPair} </br>
+ * 
+ * <pre>
+ *  
+ * Usage:
+ * a. This can be used if one doesn't want to create object for every new inserted {@code range}
+ * b. It creates {@link BitSet} for every unique first-key of the range. 
+ * So, this rangeSet is not suitable for large number of unique keys.
+ * </pre>
+ * 
+ *
+ */
+public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>> implements LongPairRangeSet<T> {
+
+    protected final NavigableMap<Long, BitSet> rangeBitSetMap = new ConcurrentSkipListMap<>();
+    private boolean threadSafe = true;
+    private final int bitSetSize;
+    private final LongPairConsumer<T> consumer;
+
+    // caching place-holder for cpu-optimization to avoid calculating ranges again
+    private volatile int cachedSize = 0;
+    private volatile String cachedToString = "[]";
+    private volatile boolean updatedAfterCached = true;
+
+    public ConcurrentOpenLongPairRangeSet(LongPairConsumer<T> consumer) {
+        this(1024, true, consumer);
+    }
+
+    public ConcurrentOpenLongPairRangeSet(int size, LongPairConsumer<T> consumer) {
+        this(size, true, consumer);
+    }
+
+    public ConcurrentOpenLongPairRangeSet(int size, boolean threadSafe, LongPairConsumer<T> consumer) {
+        this.threadSafe = threadSafe;
+        this.bitSetSize = size;
+        this.consumer = consumer;
+    }
+
+    /**
+     * Adds the specified range to this {@code RangeSet} (optional operation). That is, for equal range sets a and b,
+     * the result of {@code a.add(range)} is that {@code a} will be the minimal range set for which both
+     * {@code a.enclosesAll(b)} and {@code a.encloses(range)}.
+     * <p>
+     * Note that {@code range} will merge given {@code range} with any ranges in the range set that are
+     * {@linkplain Range#isConnected(Range) connected} with it. Moreover, if {@code range} is empty, this is a no-op.
+     */
+    @Override
+    public void addOpenClosed(long lowerKey, long lowerValueOpen, long upperKey, long upperValue) {
+        long lowerValue = lowerValueOpen + 1;
+        if (lowerKey != upperKey) {
+            // (1) set lower to last in lowerRange.getKey()
+            if (isValid(lowerKey, lowerValue)) {
+                BitSet rangeBitSet = rangeBitSetMap.get(lowerKey);
+                // if lower and upper has different key/ledger then set ranges for lower-key only if
+                // a. bitSet already exist and given value is not the last value in the bitset.
+                // it will prevent setting up values which are not actually expected to set
+                // eg: (2:10..4:10] in this case , don't set any value for 2:10 and set [4:0..4:10]
+                if (rangeBitSet != null && (rangeBitSet.previousSetBit(rangeBitSet.size()) > lowerValueOpen)) {
+                    int lastValue = rangeBitSet.previousSetBit(rangeBitSet.size());
+                    rangeBitSet.set((int) lowerValue, (int) Math.max(lastValue, lowerValue) + 1);
+                }
+            }
+            // (2) set 0th-index to upper-index in upperRange.getKey()
+            if (isValid(upperKey, upperValue)) {
+                BitSet rangeBitSet = rangeBitSetMap.computeIfAbsent(upperKey, (key) -> createNewBitSet());
+                if (rangeBitSet != null) {
+                    rangeBitSet.set(0, (int) upperValue + 1);
+                }
+            }
+            // No-op if values are not valid eg: if lower == LongPair.earliest or upper == LongPair.latest then nothing
+            // to set
+        } else {
+            long key = lowerKey;
+            BitSet rangeBitSet = rangeBitSetMap.computeIfAbsent(key, (k) -> createNewBitSet());
+            rangeBitSet.set((int) lowerValue, (int) upperValue + 1);
+        }
+        updatedAfterCached = true;
+    }
+
+    private boolean isValid(long key, long value) {
+        return key != LongPair.earliest.getKey() && value != LongPair.earliest.getValue()
+                && key != LongPair.latest.getKey() && value != LongPair.latest.getValue();
+    }
+
+    @Override
+    public boolean contains(long key, long value) {
+
+        BitSet rangeBitSet = rangeBitSetMap.get(key);
+        if (rangeBitSet != null) {
+            return rangeBitSet.get(getSafeEntry(value));
+        }
+        return false;
+    }
+
+    @Override
+    public Range<T> rangeContaining(long key, long value) {
+        BitSet rangeBitSet = rangeBitSetMap.get(key);
+        if (rangeBitSet != null) {
+            if (!rangeBitSet.get(getSafeEntry(value))) {
+                // if position is not part of any range then return null
+                return null;
+            }
+            int lowerValue = rangeBitSet.previousClearBit(getSafeEntry(value)) + 1;
+            final T lower = consumer.apply(key, lowerValue);
+            final T upper = consumer.apply(key,
+                    Math.max(rangeBitSet.nextClearBit(getSafeEntry(value)) - 1, lowerValue));
+            return Range.closed(lower, upper);
+        }
+        return null;
+    }
+
+    @Override
+    public void removeAtMost(long key, long value) {
+        this.remove(Range.atMost(new LongPair(key, value)));
+    }
+
+    @Override
+    public boolean isEmpty() {
+        if (rangeBitSetMap.isEmpty()) {
+            return true;
+        }
+        AtomicBoolean isEmpty = new AtomicBoolean(false);
+        rangeBitSetMap.forEach((key, val) -> {
+            if (!isEmpty.get()) {
+                return;
+            }
+            isEmpty.set(val.isEmpty());
+        });
+        return isEmpty.get();
+    }
+
+    @Override
+    public void clear() {
+        rangeBitSetMap.clear();
+        updatedAfterCached = true;
+    }
+
+    @Override
+    public Range<T> span() {
+        Entry<Long, BitSet> firstSet = rangeBitSetMap.firstEntry();
+        Entry<Long, BitSet> lastSet = rangeBitSetMap.lastEntry();
+        int first = firstSet.getValue().nextSetBit(0);
+        int last = lastSet.getValue().previousSetBit(lastSet.getValue().size());
+        return Range.openClosed(consumer.apply(firstSet.getKey(), first - 1), consumer.apply(lastSet.getKey(), last));
+    }
+
+    @Override
+    public List<Range<T>> asRanges() {
+        List<Range<T>> ranges = new ArrayList<>();
+        forEach((range) -> {
+            ranges.add(range);
+            return true;
+        });
+        return ranges;
+    }
+
+    @Override
+    public void forEach(RangeProcessor<T> action) {
+        forEach(action, consumer);
+    }
+
+    @Override
+    public void forEach(RangeProcessor<T> action, LongPairConsumer<? extends T> consumer) {
+        AtomicBoolean completed = new AtomicBoolean(false);
+        rangeBitSetMap.forEach((key, set) -> {
+            if (completed.get()) {
+                return;
+            }
+            if (set.isEmpty()) {
+                return;
+            }
+            int first = set.nextSetBit(0);
+            int last = set.previousSetBit(set.size());
+            int currentClosedMark = first;
+            while (currentClosedMark != -1 && currentClosedMark <= last) {
+                int nextOpenMark = set.nextClearBit(currentClosedMark);
+                Range<T> range = Range.openClosed(consumer.apply(key, currentClosedMark - 1),
+                        consumer.apply(key, nextOpenMark - 1));
+                if (!action.process(range)) {
+                    completed.set(true);
+                    break;
+                }
+                currentClosedMark = set.nextSetBit(nextOpenMark);
+            }
+        });
+    }
+
+    @Override
+    public Range<T> firstRange() {
+        Entry<Long, BitSet> firstSet = rangeBitSetMap.firstEntry();
+        int lower = firstSet.getValue().nextSetBit(0);
+        int upper = Math.max(lower, firstSet.getValue().nextClearBit(lower) - 1);
+        return Range.openClosed(consumer.apply(firstSet.getKey(), lower - 1), consumer.apply(firstSet.getKey(), upper));
+    }
+
+    @Override
+    public int size() {
+        if (updatedAfterCached) {
+            AtomicInteger size = new AtomicInteger(0);
+            forEach((range) -> {
+                size.getAndIncrement();
+                return true;
+            });
+            cachedSize = size.get();
+            updatedAfterCached = false;
+        }
+        return cachedSize;
+    }
+
+    @Override
+    public String toString() {
+        if (updatedAfterCached) {
+            StringBuilder toString = new StringBuilder();
+            AtomicBoolean first = new AtomicBoolean(true);
+            if (toString != null) {
+                toString.append("[");
+            }
+            forEach((range) -> {
+                if (!first.get()) {
+                    toString.append(",");
+                }
+                toString.append(range);
+                first.set(false);
+                return true;
+            });
+            toString.append("]");
+            cachedToString = toString.toString();
+            updatedAfterCached = false;
+        }
+        return cachedToString;
+    }
+
+    /**
+     * Adds the specified range to this {@code RangeSet} (optional operation). That is, for equal range sets a and b,
+     * the result of {@code a.add(range)} is that {@code a} will be the minimal range set for which both
+     * {@code a.enclosesAll(b)} and {@code a.encloses(range)}.
+     *
+     * <p>
+     * Note that {@code range} will merge given {@code range} with any ranges in the range set that are
+     * {@linkplain Range#isConnected(Range) connected} with it. Moreover, if {@code range} is empty/invalid, this is a
+     * no-op.
+     */
+    public void add(Range<LongPair> range) {
+        LongPair lowerEndpoint = range.hasLowerBound() ? range.lowerEndpoint() : LongPair.earliest;
+        LongPair upperEndpoint = range.hasUpperBound() ? range.upperEndpoint() : LongPair.latest;
+
+        long lowerValueOpen = (range.hasLowerBound() && range.lowerBoundType().equals(BoundType.CLOSED))
+                ? getSafeEntry(lowerEndpoint) - 1
+                : getSafeEntry(lowerEndpoint);
+        long upperValueClosed = (range.hasUpperBound() && range.upperBoundType().equals(BoundType.CLOSED))
+                ? getSafeEntry(upperEndpoint)
+                : getSafeEntry(upperEndpoint) + 1;
+
+        // #addOpenClosed doesn't create bitSet for lower-key because it avoids setting up values for non-exist items
+        // into the key-ledger. so, create bitSet and initialize so, it can't be ignored at #addOpenClosed
+        rangeBitSetMap.computeIfAbsent(lowerEndpoint.getKey(), (key) -> createNewBitSet())
+                .set((int) lowerValueOpen + 1);
+        this.addOpenClosed(lowerEndpoint.getKey(), lowerValueOpen, upperEndpoint.getKey(), upperValueClosed);
+    }
+
+    public boolean contains(LongPair position) {
+        checkNotNull(position, "argument can't be null");
+        return contains(position.getKey(), position.getValue());
+    }
+
+    public void remove(Range<LongPair> range) {
+        LongPair lowerEndpoint = range.hasLowerBound() ? range.lowerEndpoint() : LongPair.earliest;
+        LongPair upperEndpoint = range.hasUpperBound() ? range.upperEndpoint() : LongPair.latest;
+
+        long lower = (range.hasLowerBound() && range.lowerBoundType().equals(BoundType.CLOSED))
+                ? getSafeEntry(lowerEndpoint)
+                : getSafeEntry(lowerEndpoint) + 1;
+        long upper = (range.hasUpperBound() && range.upperBoundType().equals(BoundType.CLOSED))
+                ? getSafeEntry(upperEndpoint)
+                : getSafeEntry(upperEndpoint) - 1;
+
+        // if lower-bound is not set then remove all the keys less than given upper-bound range
+        if (lowerEndpoint.equals(LongPair.earliest)) {
+            // remove all keys with
+            rangeBitSetMap.forEach((key, set) -> {
+                if (key < upperEndpoint.getKey()) {
+                    rangeBitSetMap.remove(key);
+                }
+            });
+        }
+
+        // if upper-bound is not set then remove all the keys greater than given lower-bound range
+        if (upperEndpoint.equals(LongPair.latest)) {
+            // remove all keys with
+            rangeBitSetMap.forEach((key, set) -> {
+                if (key > lowerEndpoint.getKey()) {
+                    rangeBitSetMap.remove(key);
+                }
+            });
+        }
+
+        // remove all the keys between two endpoint keys
+        rangeBitSetMap.forEach((key, set) -> {
+            if (lowerEndpoint.getKey() == upperEndpoint.getKey()) {
+                set.clear((int) lower, (int) upper + 1);
+            } else {
+                // eg: remove-range: [(3,5) - (5,5)] -> Delete all items from 3,6->3,N,4.*,5,0->5,5
+                if (key == lowerEndpoint.getKey()) {
+                    // remove all entries from given position to last position
+                    set.clear((int) lower, set.previousSetBit(set.size()));
+                } else if (key == upperEndpoint.getKey()) {
+                    // remove all entries from 0 to given position
+                    set.clear(0, (int) upper + 1);
+                } else if (key > lowerEndpoint.getKey() && key < upperEndpoint.getKey()) {
+                    rangeBitSetMap.remove(key);
+                }
+            }
+            // remove bit-set if set is empty
+            if (set.isEmpty()) {
+                rangeBitSetMap.remove(key);
+            }
+        });
+
+        updatedAfterCached = true;
+    }
+
+    private int getSafeEntry(LongPair position) {
+        return (int) Math.max(position.getValue(), -1);
+    }
+
+    private int getSafeEntry(long value) {
+        return (int) Math.max(value, -1);
+    }
+
+    private BitSet createNewBitSet() {
+        return this.threadSafe ? new ConcurrentBitSet(bitSetSize) : new BitSet(bitSetSize);
+    }
+
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
new file mode 100644
index 0000000..daad0ad
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import java.util.Collection;
+import java.util.Set;
+
+import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.TreeRangeSet;
+
+/**
+ * A set comprising zero or more ranges type of key-value pair.
+ */
+public interface LongPairRangeSet<T extends Comparable<T>> {
+
+    /**
+     * Adds the specified range (range that contains all values strictly greater than {@code
+    * lower} and less than or equal to {@code upper}.) to this {@code RangeSet} (optional operation). That is, for equal
+     * range sets a and b, the result of {@code a.add(range)} is that {@code a} will be the minimal range set for which
+     * both {@code a.enclosesAll(b)} and {@code a.encloses(range)}.
+     * 
+     * <pre>
+     *  
+     * &#64;param lowerKey :  value for key of lowerEndpoint of Range
+     * &#64;param lowerValue: value for value of lowerEndpoint of Range
+     * &#64;param upperKey  : value for key of upperEndpoint of Range
+     * &#64;param upperValue: value for value of upperEndpoint of Range
+     * </pre>
+     */
+    void addOpenClosed(long lowerKey, long lowerValue, long upperKey, long upperValue);
+
+    /** Determines whether any of this range set's member ranges contains {@code value}. */
+    boolean contains(long key, long value);
+
+    /**
+     * Returns the unique range from this range set that {@linkplain Range#contains contains} {@code value}, or
+     * {@code null} if this range set does not contain {@code value}.
+     */
+    Range<T> rangeContaining(long key, long value);
+
+    /**
+     * Remove range that contains all values less than or equal to given key-value.
+     * 
+     * @param key
+     * @param value
+     */
+    void removeAtMost(long key, long value);
+
+    boolean isEmpty();
+
+    void clear();
+
+    /**
+     * Returns the minimal range which {@linkplain Range#encloses(Range) encloses} all ranges in this range set.
+     * 
+     * @return
+     */
+    Range<T> span();
+
+    /**
+     * Returns a view of the {@linkplain Range#isConnected disconnected} ranges that make up this range set.
+     * 
+     * @return
+     */
+    Collection<Range<T>> asRanges();
+    
+    /**
+     * Performs the given action for each entry in this map until all entries have been processed or action returns "false".
+     * Unless otherwise specified by the implementing class, actions are performed in the order of entry
+     * set iteration (if an iteration order is specified.)
+     * 
+     * @param action
+     */
+    void forEach(RangeProcessor<T> action);
+    
+    /**
+     * Performs the given action for each entry in this map until all entries have been processed or action returns "false".
+     * Unless otherwise specified by the implementing class, actions are performed in the order of entry
+     * set iteration (if an iteration order is specified.)
+     * 
+     * @param action
+     * @param consumer
+     */
+    void forEach(RangeProcessor<T> action, LongPairConsumer<? extends T> consumer);
+    
+    /**
+     * Returns total number of ranges into the set.
+     * 
+     * @return
+     */
+    int size();
+
+    /**
+     * It returns very first smallest range in the rangeSet.
+     * 
+     * @return Range<T> first smallest range into the set
+     */
+    Range<T> firstRange();
+
+    public static interface LongPairConsumer<T> {
+        T apply(long key, long value);
+    }
+
+    public static interface RangeProcessor<T extends Comparable<T>> {
+        /**
+         * 
+         * @param range
+         * @return false if there is no further processing required
+         */
+        boolean process(Range<T> range);
+    }
+    
+    public static class LongPair implements Comparable<LongPair> {
+
+        public static final LongPair earliest = new LongPair(-1, -1);
+        public static final LongPair latest = new LongPair(Integer.MAX_VALUE, Integer.MAX_VALUE);
+
+        private long key;
+        private long value;
+
+        public LongPair(long key, long value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        public long getKey() {
+            return this.key;
+        }
+
+        public long getValue() {
+            return this.value;
+        }
+
+        @Override
+        public int compareTo(LongPair o) {
+            return ComparisonChain.start().compare(key, o.getKey()).compare(value, o.getValue()).result();
+        }
+
+        @Override
+        public String toString() {
+            return String.format("%d:%d", key, value);
+        }
+    }
+
+    public static class DefaultRangeSet<T extends Comparable<T>> implements LongPairRangeSet<T> {
+
+        RangeSet<T> set = TreeRangeSet.create();
+
+        private final LongPairConsumer<T> consumer;
+
+        public DefaultRangeSet(LongPairConsumer<T> consumer) {
+            this.consumer = consumer;
+        }
+
+        @Override
+        public void clear() {
+            set.clear();
+        }
+
+        @Override
+        public void addOpenClosed(long key1, long value1, long key2, long value2) {
+            set.add(Range.openClosed(consumer.apply(key1, value1), consumer.apply(key2, value2)));
+        }
+
+        public boolean contains(T position) {
+            return set.contains(position);
+        }
+
+        public Range<T> rangeContaining(T position) {
+            return set.rangeContaining(position);
+        }
+
+        @Override
+        public Range<T> rangeContaining(long key, long value) {
+            return this.rangeContaining(consumer.apply(key, value));
+        }
+
+        public void remove(Range<T> range) {
+            set.remove(range);
+        }
+
+        @Override
+        public void removeAtMost(long key, long value) {
+            set.remove(Range.atMost(consumer.apply(key, value)));
+        }
+
+        @Override
+        public boolean isEmpty() {
+            return set.isEmpty();
+        }
+
+        @Override
+        public Range<T> span() {
+            return set.span();
+        }
+
+        @Override
+        public Set<Range<T>> asRanges() {
+            return set.asRanges();
+        }
+
+        @Override
+        public void forEach(RangeProcessor<T> action) {
+            forEach(action, consumer);
+        }
+
+        @Override
+        public void forEach(RangeProcessor<T> action, LongPairConsumer<? extends T> consumer) {
+            for (Range<T> range : asRanges()) {
+                if (!action.process(range)) {
+                    break;
+                }
+            }
+        }
+        
+        @Override
+        public boolean contains(long key, long value) {
+            return this.contains(consumer.apply(key, value));
+        }
+
+        @Override
+        public Range<T> firstRange() {
+            return set.asRanges().iterator().next();
+        }
+
+        @Override
+        public int size() {
+            return set.asRanges().size();
+        }
+
+        @Override
+        public String toString() {
+            return set.toString();
+        }
+    }
+}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java
new file mode 100644
index 0000000..54bcecc
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java
@@ -0,0 +1,384 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import static org.testng.Assert.assertEquals;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pulsar.common.util.collections.LongPairRangeSet.LongPair;
+import org.apache.pulsar.common.util.collections.LongPairRangeSet.LongPairConsumer;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.BoundType;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
+import com.google.common.collect.TreeRangeSet;
+
+public class ConcurrentOpenLongPairRangeSetTest {
+
+    static final LongPairConsumer<LongPair> consumer = (key, value) -> new LongPair(key, value);
+
+    @Test
+    public void testAddForSameKey() {
+        ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
+        // add 0 to 5
+        set.add(Range.closed(new LongPair(0, 0), new LongPair(0, 5)));
+        // add 8,9,10
+        set.add(Range.closed(new LongPair(0, 8), new LongPair(0, 8)));
+        set.add(Range.closed(new LongPair(0, 9), new LongPair(0, 9)));
+        set.add(Range.closed(new LongPair(0, 10), new LongPair(0, 10)));
+        // add 98 to 99 and 102,105
+        set.add(Range.closed(new LongPair(0, 98), new LongPair(0, 99)));
+        set.add(Range.closed(new LongPair(0, 102), new LongPair(0, 106)));
+
+        List<Range<LongPair>> ranges = set.asRanges();
+        int count = 0;
+        assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(0, -1), new LongPair(0, 5))));
+        assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(0, 7), new LongPair(0, 10))));
+        assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(0, 97), new LongPair(0, 99))));
+        assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(0, 101), new LongPair(0, 106))));
+    }
+
+    @Test
+    public void testAddForDifferentKey() {
+        ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
+        // [98,100],[(1,5),(1,5)],[(1,10,1,15)],[(1,20),(1,20)],[(2,0),(2,10)]
+        set.addOpenClosed(0, 98, 0, 99);
+        set.addOpenClosed(0, 100, 1, 5);
+        set.addOpenClosed(1, 10, 1, 15);
+        set.addOpenClosed(1, 20, 2, 10);
+
+        List<Range<LongPair>> ranges = set.asRanges();
+        int count = 0;
+        assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(0, 98), new LongPair(0, 99))));
+        assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(1, -1), new LongPair(1, 5))));
+        assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(1, 10), new LongPair(1, 15))));
+        assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(2, -1), new LongPair(2, 10))));
+    }
+
+    @Test
+    public void testAddCompareCompareWithGuava() {
+        ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
+        com.google.common.collect.RangeSet<LongPair> gSet = TreeRangeSet.create();
+
+        // add 10K values for key 0
+        int totalInsert = 10_000;
+        // add single values
+        for (int i = 0; i < totalInsert; i++) {
+            if (i % 3 == 0 || i % 6 == 0 || i % 8 == 0) {
+                LongPair lower = new LongPair(0, i - 1);
+                LongPair upper = new LongPair(0, i);
+                // set.add(Range.openClosed(lower, upper));
+                set.addOpenClosed(lower.getKey(), lower.getValue(), upper.getKey(), upper.getValue());
+                gSet.add(Range.openClosed(lower, upper));
+            }
+        }
+        // add batches
+        for (int i = totalInsert; i < (totalInsert * 2); i++) {
+            if (i % 5 == 0) {
+                LongPair lower = new LongPair(0, i - 3 - 1);
+                LongPair upper = new LongPair(0, i + 3);
+                // set.add(Range.openClosed(lower, upper));
+                set.addOpenClosed(lower.getKey(), lower.getValue(), upper.getKey(), upper.getValue());
+                gSet.add(Range.openClosed(lower, upper));
+            }
+        }
+        List<Range<LongPair>> ranges = set.asRanges();
+        Set<Range<LongPair>> gRanges = gSet.asRanges();
+
+        List<Range<LongPair>> gRangeConnected = getConnectedRange(gRanges);
+        assertEquals(gRangeConnected.size(), ranges.size());
+        int i = 0;
+        for (Range<LongPair> range : gRangeConnected) {
+            assertEquals(range, ranges.get(i));
+            i++;
+        }
+    }
+
+    @Test
+    public void testDeleteCompareWithGuava() {
+
+        ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
+        com.google.common.collect.RangeSet<LongPair> gSet = TreeRangeSet.create();
+
+        // add 10K values for key 0
+        int totalInsert = 10_000;
+        // add single values
+        List<Range<LongPair>> removedRanges = Lists.newArrayList();
+        for (int i = 0; i < totalInsert; i++) {
+            if (i % 3 == 0 || i % 7 == 0 || i % 11 == 0) {
+                continue;
+            }
+            LongPair lower = new LongPair(0, i - 1);
+            LongPair upper = new LongPair(0, i);
+            Range<LongPair> range = Range.openClosed(lower, upper);
+            // set.add(range);
+            set.addOpenClosed(lower.getKey(), lower.getValue(), upper.getKey(), upper.getValue());
+            gSet.add(range);
+            if (i % 4 == 0) {
+                removedRanges.add(range);
+            }
+        }
+        // add batches
+        for (int i = totalInsert; i < (totalInsert * 2); i++) {
+            LongPair lower = new LongPair(0, i - 3 - 1);
+            LongPair upper = new LongPair(0, i + 3);
+            Range<LongPair> range = Range.openClosed(lower, upper);
+            if (i % 5 != 0) {
+                // set.add(range);
+                set.addOpenClosed(lower.getKey(), lower.getValue(), upper.getKey(), upper.getValue());
+                gSet.add(range);
+            }
+            if (i % 4 == 0) {
+                removedRanges.add(range);
+            }
+        }
+        // remove records
+        for (Range<LongPair> range : removedRanges) {
+            set.remove(range);
+            gSet.remove(range);
+        }
+
+        List<Range<LongPair>> ranges = set.asRanges();
+        Set<Range<LongPair>> gRanges = gSet.asRanges();
+        List<Range<LongPair>> gRangeConnected = getConnectedRange(gRanges);
+        assertEquals(gRangeConnected.size(), ranges.size());
+        int i = 0;
+        for (Range<LongPair> range : gRangeConnected) {
+            assertEquals(range, ranges.get(i));
+            i++;
+        }
+    }
+
+    @Test
+    public void testSpanWithGuava() {
+        ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
+        com.google.common.collect.RangeSet<LongPair> gSet = TreeRangeSet.create();
+        set.add(Range.openClosed(new LongPair(0, 97), new LongPair(0, 99)));
+        gSet.add(Range.openClosed(new LongPair(0, 97), new LongPair(0, 99)));
+        set.add(Range.openClosed(new LongPair(0, 99), new LongPair(1, 5)));
+        gSet.add(Range.openClosed(new LongPair(0, 99), new LongPair(1, 5)));
+        assertEquals(set.span(), gSet.span());
+        assertEquals(set.span(), Range.openClosed(new LongPair(0, 97), new LongPair(1, 5)));
+
+        set.add(Range.openClosed(new LongPair(1, 9), new LongPair(1, 15)));
+        set.add(Range.openClosed(new LongPair(1, 19), new LongPair(2, 10)));
+        set.add(Range.openClosed(new LongPair(2, 24), new LongPair(2, 28)));
+        set.add(Range.openClosed(new LongPair(3, 11), new LongPair(3, 20)));
+        set.add(Range.openClosed(new LongPair(4, 11), new LongPair(4, 20)));
+        gSet.add(Range.openClosed(new LongPair(1, 9), new LongPair(1, 15)));
+        gSet.add(Range.openClosed(new LongPair(1, 19), new LongPair(2, 10)));
+        gSet.add(Range.openClosed(new LongPair(2, 24), new LongPair(2, 28)));
+        gSet.add(Range.openClosed(new LongPair(3, 11), new LongPair(3, 20)));
+        gSet.add(Range.openClosed(new LongPair(4, 11), new LongPair(4, 20)));
+        assertEquals(set.span(), gSet.span());
+        assertEquals(set.span(), Range.openClosed(new LongPair(0, 97), new LongPair(4, 20)));
+    }
+
+    @Test
+    public void testFirstRange() {
+        ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
+        Range<LongPair> range = Range.openClosed(new LongPair(0, 97), new LongPair(0, 99));
+        set.add(range);
+        assertEquals(set.firstRange(), range);
+        assertEquals(set.size(), 1);
+        range = Range.openClosed(new LongPair(0, 98), new LongPair(0, 105));
+        set.add(range);
+        assertEquals(set.firstRange(), Range.openClosed(new LongPair(0, 97), new LongPair(0, 105)));
+        assertEquals(set.size(), 1);
+        range = Range.openClosed(new LongPair(0, 5), new LongPair(0, 75));
+        set.add(range);
+        assertEquals(set.firstRange(), range);
+        assertEquals(set.size(), 2);
+    }
+
+    @Test
+    public void testToString() {
+        ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
+        Range<LongPair> range = Range.openClosed(new LongPair(0, 97), new LongPair(0, 99));
+        set.add(range);
+        assertEquals(set.toString(), "[(0:97..0:99]]");
+        range = Range.openClosed(new LongPair(0, 98), new LongPair(0, 105));
+        set.add(range);
+        assertEquals(set.toString(), "[(0:97..0:105]]");
+        range = Range.openClosed(new LongPair(0, 5), new LongPair(0, 75));
+        set.add(range);
+        assertEquals(set.toString(), "[(0:5..0:75],(0:97..0:105]]");
+    }
+
+    @Test
+    public void testDeleteForDifferentKey() {
+        ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
+        set.addOpenClosed(0, 97, 0, 99);
+        set.addOpenClosed(0, 99, 1, 5);
+        set.addOpenClosed(1, 9, 1, 15);
+        set.addOpenClosed(1, 19, 2, 10);
+        set.addOpenClosed(2, 24, 2, 28);
+        set.addOpenClosed(3, 11, 3, 20);
+        set.addOpenClosed(4, 11, 4, 20);
+
+        // delete only (0,100)
+        set.remove(Range.open(new LongPair(0, 99), new LongPair(0, 105)));
+
+        /**
+         * delete all keys from [2,27]->[4,15] : remaining [2,25..26,28], [4,16..20]
+         */
+        set.remove(Range.closed(new LongPair(2, 27), new LongPair(4, 15)));
+
+        List<Range<LongPair>> ranges = set.asRanges();
+        int count = 0;
+        assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(0, 97), new LongPair(0, 99))));
+        assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(1, -1), new LongPair(1, 5))));
+        assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(1, 9), new LongPair(1, 15))));
+        assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(2, -1), new LongPair(2, 10))));
+
+        assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(2, 24), new LongPair(2, 26))));
+        assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(2, 27), new LongPair(2, 28))));
+        assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(4, 15), new LongPair(4, 20))));
+    }
+
+    @Test
+    public void testDeleteWithAtMost() {
+        ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
+        set.add(Range.closed(new LongPair(0, 98), new LongPair(0, 99)));
+        set.add(Range.closed(new LongPair(0, 100), new LongPair(1, 5)));
+        set.add(Range.closed(new LongPair(1, 10), new LongPair(1, 15)));
+        set.add(Range.closed(new LongPair(1, 20), new LongPair(2, 10)));
+        set.add(Range.closed(new LongPair(2, 25), new LongPair(2, 28)));
+        set.add(Range.closed(new LongPair(3, 12), new LongPair(3, 20)));
+        set.add(Range.closed(new LongPair(4, 12), new LongPair(4, 20)));
+
+        // delete only (0,100)
+        set.remove(Range.open(new LongPair(0, 99), new LongPair(0, 105)));
+
+        /**
+         * delete all keys from [2,27]->[4,15] : remaining [2,25..26,28], [4,16..20]
+         */
+        set.remove(Range.atMost(new LongPair(2, 27)));
+
+        List<Range<LongPair>> ranges = set.asRanges();
+        int count = 0;
+        assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(2, 27), new LongPair(2, 28))));
+        assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(3, 11), new LongPair(3, 20))));
+        assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(4, 11), new LongPair(4, 20))));
+    }
+
+    @Test
+    public void testDeleteWithLeastMost() {
+        ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
+        set.add(Range.closed(new LongPair(0, 98), new LongPair(0, 99)));
+        set.add(Range.closed(new LongPair(0, 100), new LongPair(1, 5)));
+        set.add(Range.closed(new LongPair(1, 10), new LongPair(1, 15)));
+        set.add(Range.closed(new LongPair(1, 20), new LongPair(2, 10)));
+        set.add(Range.closed(new LongPair(2, 25), new LongPair(2, 28)));
+        set.add(Range.closed(new LongPair(3, 12), new LongPair(3, 20)));
+        set.add(Range.closed(new LongPair(4, 12), new LongPair(4, 20)));
+
+        // delete only (0,100)
+        set.remove(Range.open(new LongPair(0, 99), new LongPair(0, 105)));
+
+        /**
+         * delete all keys from [2,27]->[4,15] : remaining [2,25..26,28], [4,16..20]
+         */
+        set.remove(Range.atLeast(new LongPair(2, 27)));
+
+        List<Range<LongPair>> ranges = set.asRanges();
+        int count = 0;
+        assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(0, 97), new LongPair(0, 99))));
+        assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(1, -1), new LongPair(1, 5))));
+        assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(1, 9), new LongPair(1, 15))));
+        assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(1, 19), new LongPair(1, 20))));
+        assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(2, -1), new LongPair(2, 10))));
+        assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(2, 24), new LongPair(2, 26))));
+    }
+
+    @Test
+    public void testRangeContaining() {
+        ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
+        set.add(Range.closed(new LongPair(0, 98), new LongPair(0, 99)));
+        set.add(Range.closed(new LongPair(0, 100), new LongPair(1, 5)));
+        com.google.common.collect.RangeSet<LongPair> gSet = TreeRangeSet.create();
+        gSet.add(Range.closed(new LongPair(0, 98), new LongPair(0, 100)));
+        gSet.add(Range.closed(new LongPair(0, 101), new LongPair(1, 5)));
+        set.add(Range.closed(new LongPair(1, 10), new LongPair(1, 15)));
+        set.add(Range.closed(new LongPair(1, 20), new LongPair(2, 10)));
+        set.add(Range.closed(new LongPair(2, 25), new LongPair(2, 28)));
+        set.add(Range.closed(new LongPair(3, 12), new LongPair(3, 20)));
+        set.add(Range.closed(new LongPair(4, 12), new LongPair(4, 20)));
+        gSet.add(Range.closed(new LongPair(1, 10), new LongPair(1, 15)));
+        gSet.add(Range.closed(new LongPair(1, 20), new LongPair(2, 10)));
+        gSet.add(Range.closed(new LongPair(2, 25), new LongPair(2, 28)));
+        gSet.add(Range.closed(new LongPair(3, 12), new LongPair(3, 20)));
+        gSet.add(Range.closed(new LongPair(4, 12), new LongPair(4, 20)));
+
+        LongPair position = new LongPair(0, 99);
+        assertEquals(set.rangeContaining(position.getKey(), position.getValue()),
+                Range.closed(new LongPair(0, 98), new LongPair(0, 100)));
+        assertEquals(set.rangeContaining(position.getKey(), position.getValue()), gSet.rangeContaining(position));
+
+        position = new LongPair(2, 30);
+        assertEquals(set.rangeContaining(position.getKey(), position.getValue()), null);
+        assertEquals(set.rangeContaining(position.getKey(), position.getValue()), gSet.rangeContaining(position));
+
+        position = new LongPair(3, 13);
+        assertEquals(set.rangeContaining(position.getKey(), position.getValue()),
+                Range.closed(new LongPair(3, 12), new LongPair(3, 20)));
+        assertEquals(set.rangeContaining(position.getKey(), position.getValue()), gSet.rangeContaining(position));
+
+        position = new LongPair(3, 22);
+        assertEquals(set.rangeContaining(position.getKey(), position.getValue()), null);
+        assertEquals(set.rangeContaining(position.getKey(), position.getValue()), gSet.rangeContaining(position));
+    }
+
+    private List<Range<LongPair>> getConnectedRange(Set<Range<LongPair>> gRanges) {
+        List<Range<LongPair>> gRangeConnected = Lists.newArrayList();
+        Range<LongPair> lastRange = null;
+        for (Range<LongPair> range : gRanges) {
+            if (lastRange == null) {
+                lastRange = range;
+                continue;
+            }
+            LongPair previousUpper = lastRange.upperEndpoint();
+            LongPair currentLower = range.lowerEndpoint();
+            int previousUpperValue = (int) (lastRange.upperBoundType().equals(BoundType.CLOSED)
+                    ? previousUpper.getValue()
+                    : previousUpper.getValue() - 1);
+            int currentLowerValue = (int) (range.lowerBoundType().equals(BoundType.CLOSED) ? currentLower.getValue()
+                    : currentLower.getValue() + 1);
+            boolean connected = (previousUpper.getKey() == currentLower.getKey())
+                    ? (previousUpperValue >= currentLowerValue)
+                    : false;
+            if (connected) {
+                lastRange = Range.closed(lastRange.lowerEndpoint(), range.upperEndpoint());
+            } else {
+                gRangeConnected.add(lastRange);
+                lastRange = range;
+            }
+        }
+        int lowerOpenValue = (int) (lastRange.lowerBoundType().equals(BoundType.CLOSED)
+                ? (lastRange.lowerEndpoint().getValue() - 1)
+                : lastRange.lowerEndpoint().getValue());
+        lastRange = Range.openClosed(new LongPair(lastRange.lowerEndpoint().getKey(), lowerOpenValue),
+                lastRange.upperEndpoint());
+        gRangeConnected.add(lastRange);
+        return gRangeConnected;
+    }
+}