You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2019/05/18 06:56:26 UTC
[pulsar] branch master updated: [pulsar-common] add open Concurrent
LongPair RangeSet (#3818)
This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ce685dc [pulsar-common] add open Concurrent LongPair RangeSet (#3818)
ce685dc is described below
commit ce685dc3b359674941f02f81185651645012e80a
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Fri May 17 23:56:22 2019 -0700
[pulsar-common] add open Concurrent LongPair RangeSet (#3818)
* [pulsar-common] add open Concurrent LongPair RangeSet
* add open-range set methods
* add forEach
* add forEach with consumer
* Fix stamp-lock usage
* Move ConcurrentBitSet to separate class
---
.../common/util/collections/ConcurrentBitSet.java | 160 +++++++++
.../ConcurrentOpenLongPairRangeSet.java | 370 ++++++++++++++++++++
.../common/util/collections/LongPairRangeSet.java | 254 ++++++++++++++
.../ConcurrentOpenLongPairRangeSetTest.java | 384 +++++++++++++++++++++
4 files changed, 1168 insertions(+)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
new file mode 100644
index 0000000..739ad1e
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentBitSet.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import java.util.BitSet;
+import java.util.concurrent.locks.StampedLock;
+
+public class ConcurrentBitSet extends BitSet {
+
+ private static final long serialVersionUID = 1L;
+ private final StampedLock rwLock = new StampedLock();
+
+ /**
+ * Creates a bit set whose initial size is large enough to explicitly represent bits with indices in the range
+ * {@code 0} through {@code nbits-1}. All bits are initially {@code false}.
+ *
+ * @param nbits
+ * the initial size of the bit set
+ * @throws NegativeArraySizeException
+ * if the specified initial size is negative
+ */
+ public ConcurrentBitSet(int nbits) {
+ super(nbits);
+ }
+
+ @Override
+ public boolean get(int bitIndex) {
+ long stamp = rwLock.tryOptimisticRead();
+ boolean isSet = super.get(bitIndex);
+ if (!rwLock.validate(stamp)) {
+ stamp = rwLock.readLock();
+ try {
+ isSet = super.get(bitIndex);
+ } finally {
+ rwLock.unlockRead(stamp);
+ }
+ }
+ return isSet;
+ }
+
+ @Override
+ public void set(int bitIndex) {
+ long stamp = rwLock.tryOptimisticRead();
+ super.set(bitIndex);
+ if (!rwLock.validate(stamp)) {
+ stamp = rwLock.readLock();
+ try {
+ super.set(bitIndex);
+ } finally {
+ rwLock.unlockRead(stamp);
+ }
+ }
+ }
+
+ @Override
+ public void set(int fromIndex, int toIndex) {
+ long stamp = rwLock.tryOptimisticRead();
+ super.set(fromIndex, toIndex);
+ if (!rwLock.validate(stamp)) {
+ stamp = rwLock.readLock();
+ try {
+ super.set(fromIndex, toIndex);
+ } finally {
+ rwLock.unlockRead(stamp);
+ }
+ }
+ }
+
+ @Override
+ public int nextSetBit(int fromIndex) {
+ long stamp = rwLock.tryOptimisticRead();
+ int bit = super.nextSetBit(fromIndex);
+ if (!rwLock.validate(stamp)) {
+ stamp = rwLock.readLock();
+ try {
+ bit = super.nextSetBit(fromIndex);
+ } finally {
+ rwLock.unlockRead(stamp);
+ }
+ }
+ return bit;
+ }
+
+ @Override
+ public int nextClearBit(int fromIndex) {
+ long stamp = rwLock.tryOptimisticRead();
+ int bit = super.nextClearBit(fromIndex);
+ if (!rwLock.validate(stamp)) {
+ stamp = rwLock.readLock();
+ try {
+ bit = super.nextClearBit(fromIndex);
+ } finally {
+ rwLock.unlockRead(stamp);
+ }
+ }
+ return bit;
+ }
+
+ @Override
+ public int previousSetBit(int fromIndex) {
+ long stamp = rwLock.tryOptimisticRead();
+ int bit = super.previousSetBit(fromIndex);
+ if (!rwLock.validate(stamp)) {
+ stamp = rwLock.readLock();
+ try {
+ bit = super.previousSetBit(fromIndex);
+ } finally {
+ rwLock.unlockRead(stamp);
+ }
+ }
+ return bit;
+ }
+
+ @Override
+ public int previousClearBit(int fromIndex) {
+ long stamp = rwLock.tryOptimisticRead();
+ int bit = super.previousClearBit(fromIndex);
+ if (!rwLock.validate(stamp)) {
+ stamp = rwLock.readLock();
+ try {
+ bit = super.previousClearBit(fromIndex);
+ } finally {
+ rwLock.unlockRead(stamp);
+ }
+ }
+ return bit;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ long stamp = rwLock.tryOptimisticRead();
+ boolean isEmpty = super.isEmpty();
+ if (!rwLock.validate(stamp)) {
+ // Fallback to read lock
+ stamp = rwLock.readLock();
+ try {
+ isEmpty = super.isEmpty();
+ } finally {
+ rwLock.unlockRead(stamp);
+ }
+ }
+ return isEmpty;
+ }
+}
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
new file mode 100644
index 0000000..a88b8bf
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.BoundType;
+import com.google.common.collect.Range;
+
+/**
+ * A Concurrent set comprising zero or more ranges of type {@link LongPair}. This can be alternative of
+ * {@link com.google.common.collect.RangeSet} and can be used if {@code range} type is {@link LongPair} </br>
+ *
+ * <pre>
+ *
+ * Usage:
+ * a. This can be used if one doesn't want to create object for every new inserted {@code range}
+ * b. It creates {@link BitSet} for every unique first-key of the range.
+ * So, this rangeSet is not suitable for large number of unique keys.
+ * </pre>
+ *
+ *
+ */
+public class ConcurrentOpenLongPairRangeSet<T extends Comparable<T>> implements LongPairRangeSet<T> {
+
+ protected final NavigableMap<Long, BitSet> rangeBitSetMap = new ConcurrentSkipListMap<>();
+ private boolean threadSafe = true;
+ private final int bitSetSize;
+ private final LongPairConsumer<T> consumer;
+
+ // caching place-holder for cpu-optimization to avoid calculating ranges again
+ private volatile int cachedSize = 0;
+ private volatile String cachedToString = "[]";
+ private volatile boolean updatedAfterCached = true;
+
+ public ConcurrentOpenLongPairRangeSet(LongPairConsumer<T> consumer) {
+ this(1024, true, consumer);
+ }
+
+ public ConcurrentOpenLongPairRangeSet(int size, LongPairConsumer<T> consumer) {
+ this(size, true, consumer);
+ }
+
+ public ConcurrentOpenLongPairRangeSet(int size, boolean threadSafe, LongPairConsumer<T> consumer) {
+ this.threadSafe = threadSafe;
+ this.bitSetSize = size;
+ this.consumer = consumer;
+ }
+
+ /**
+ * Adds the specified range to this {@code RangeSet} (optional operation). That is, for equal range sets a and b,
+ * the result of {@code a.add(range)} is that {@code a} will be the minimal range set for which both
+ * {@code a.enclosesAll(b)} and {@code a.encloses(range)}.
+ * <p>
+ * Note that {@code range} will merge given {@code range} with any ranges in the range set that are
+ * {@linkplain Range#isConnected(Range) connected} with it. Moreover, if {@code range} is empty, this is a no-op.
+ */
+ @Override
+ public void addOpenClosed(long lowerKey, long lowerValueOpen, long upperKey, long upperValue) {
+ long lowerValue = lowerValueOpen + 1;
+ if (lowerKey != upperKey) {
+ // (1) set lower to last in lowerRange.getKey()
+ if (isValid(lowerKey, lowerValue)) {
+ BitSet rangeBitSet = rangeBitSetMap.get(lowerKey);
+ // if lower and upper has different key/ledger then set ranges for lower-key only if
+ // a. bitSet already exist and given value is not the last value in the bitset.
+ // it will prevent setting up values which are not actually expected to set
+ // eg: (2:10..4:10] in this case , don't set any value for 2:10 and set [4:0..4:10]
+ if (rangeBitSet != null && (rangeBitSet.previousSetBit(rangeBitSet.size()) > lowerValueOpen)) {
+ int lastValue = rangeBitSet.previousSetBit(rangeBitSet.size());
+ rangeBitSet.set((int) lowerValue, (int) Math.max(lastValue, lowerValue) + 1);
+ }
+ }
+ // (2) set 0th-index to upper-index in upperRange.getKey()
+ if (isValid(upperKey, upperValue)) {
+ BitSet rangeBitSet = rangeBitSetMap.computeIfAbsent(upperKey, (key) -> createNewBitSet());
+ if (rangeBitSet != null) {
+ rangeBitSet.set(0, (int) upperValue + 1);
+ }
+ }
+ // No-op if values are not valid eg: if lower == LongPair.earliest or upper == LongPair.latest then nothing
+ // to set
+ } else {
+ long key = lowerKey;
+ BitSet rangeBitSet = rangeBitSetMap.computeIfAbsent(key, (k) -> createNewBitSet());
+ rangeBitSet.set((int) lowerValue, (int) upperValue + 1);
+ }
+ updatedAfterCached = true;
+ }
+
+ private boolean isValid(long key, long value) {
+ return key != LongPair.earliest.getKey() && value != LongPair.earliest.getValue()
+ && key != LongPair.latest.getKey() && value != LongPair.latest.getValue();
+ }
+
+ @Override
+ public boolean contains(long key, long value) {
+
+ BitSet rangeBitSet = rangeBitSetMap.get(key);
+ if (rangeBitSet != null) {
+ return rangeBitSet.get(getSafeEntry(value));
+ }
+ return false;
+ }
+
+ @Override
+ public Range<T> rangeContaining(long key, long value) {
+ BitSet rangeBitSet = rangeBitSetMap.get(key);
+ if (rangeBitSet != null) {
+ if (!rangeBitSet.get(getSafeEntry(value))) {
+ // if position is not part of any range then return null
+ return null;
+ }
+ int lowerValue = rangeBitSet.previousClearBit(getSafeEntry(value)) + 1;
+ final T lower = consumer.apply(key, lowerValue);
+ final T upper = consumer.apply(key,
+ Math.max(rangeBitSet.nextClearBit(getSafeEntry(value)) - 1, lowerValue));
+ return Range.closed(lower, upper);
+ }
+ return null;
+ }
+
+ @Override
+ public void removeAtMost(long key, long value) {
+ this.remove(Range.atMost(new LongPair(key, value)));
+ }
+
+ @Override
+ public boolean isEmpty() {
+ if (rangeBitSetMap.isEmpty()) {
+ return true;
+ }
+ AtomicBoolean isEmpty = new AtomicBoolean(false);
+ rangeBitSetMap.forEach((key, val) -> {
+ if (!isEmpty.get()) {
+ return;
+ }
+ isEmpty.set(val.isEmpty());
+ });
+ return isEmpty.get();
+ }
+
+ @Override
+ public void clear() {
+ rangeBitSetMap.clear();
+ updatedAfterCached = true;
+ }
+
+ @Override
+ public Range<T> span() {
+ Entry<Long, BitSet> firstSet = rangeBitSetMap.firstEntry();
+ Entry<Long, BitSet> lastSet = rangeBitSetMap.lastEntry();
+ int first = firstSet.getValue().nextSetBit(0);
+ int last = lastSet.getValue().previousSetBit(lastSet.getValue().size());
+ return Range.openClosed(consumer.apply(firstSet.getKey(), first - 1), consumer.apply(lastSet.getKey(), last));
+ }
+
+ @Override
+ public List<Range<T>> asRanges() {
+ List<Range<T>> ranges = new ArrayList<>();
+ forEach((range) -> {
+ ranges.add(range);
+ return true;
+ });
+ return ranges;
+ }
+
+ @Override
+ public void forEach(RangeProcessor<T> action) {
+ forEach(action, consumer);
+ }
+
+ @Override
+ public void forEach(RangeProcessor<T> action, LongPairConsumer<? extends T> consumer) {
+ AtomicBoolean completed = new AtomicBoolean(false);
+ rangeBitSetMap.forEach((key, set) -> {
+ if (completed.get()) {
+ return;
+ }
+ if (set.isEmpty()) {
+ return;
+ }
+ int first = set.nextSetBit(0);
+ int last = set.previousSetBit(set.size());
+ int currentClosedMark = first;
+ while (currentClosedMark != -1 && currentClosedMark <= last) {
+ int nextOpenMark = set.nextClearBit(currentClosedMark);
+ Range<T> range = Range.openClosed(consumer.apply(key, currentClosedMark - 1),
+ consumer.apply(key, nextOpenMark - 1));
+ if (!action.process(range)) {
+ completed.set(true);
+ break;
+ }
+ currentClosedMark = set.nextSetBit(nextOpenMark);
+ }
+ });
+ }
+
+ @Override
+ public Range<T> firstRange() {
+ Entry<Long, BitSet> firstSet = rangeBitSetMap.firstEntry();
+ int lower = firstSet.getValue().nextSetBit(0);
+ int upper = Math.max(lower, firstSet.getValue().nextClearBit(lower) - 1);
+ return Range.openClosed(consumer.apply(firstSet.getKey(), lower - 1), consumer.apply(firstSet.getKey(), upper));
+ }
+
+ @Override
+ public int size() {
+ if (updatedAfterCached) {
+ AtomicInteger size = new AtomicInteger(0);
+ forEach((range) -> {
+ size.getAndIncrement();
+ return true;
+ });
+ cachedSize = size.get();
+ updatedAfterCached = false;
+ }
+ return cachedSize;
+ }
+
+ @Override
+ public String toString() {
+ if (updatedAfterCached) {
+ StringBuilder toString = new StringBuilder();
+ AtomicBoolean first = new AtomicBoolean(true);
+ if (toString != null) {
+ toString.append("[");
+ }
+ forEach((range) -> {
+ if (!first.get()) {
+ toString.append(",");
+ }
+ toString.append(range);
+ first.set(false);
+ return true;
+ });
+ toString.append("]");
+ cachedToString = toString.toString();
+ updatedAfterCached = false;
+ }
+ return cachedToString;
+ }
+
+ /**
+ * Adds the specified range to this {@code RangeSet} (optional operation). That is, for equal range sets a and b,
+ * the result of {@code a.add(range)} is that {@code a} will be the minimal range set for which both
+ * {@code a.enclosesAll(b)} and {@code a.encloses(range)}.
+ *
+ * <p>
+ * Note that {@code range} will merge given {@code range} with any ranges in the range set that are
+ * {@linkplain Range#isConnected(Range) connected} with it. Moreover, if {@code range} is empty/invalid, this is a
+ * no-op.
+ */
+ public void add(Range<LongPair> range) {
+ LongPair lowerEndpoint = range.hasLowerBound() ? range.lowerEndpoint() : LongPair.earliest;
+ LongPair upperEndpoint = range.hasUpperBound() ? range.upperEndpoint() : LongPair.latest;
+
+ long lowerValueOpen = (range.hasLowerBound() && range.lowerBoundType().equals(BoundType.CLOSED))
+ ? getSafeEntry(lowerEndpoint) - 1
+ : getSafeEntry(lowerEndpoint);
+ long upperValueClosed = (range.hasUpperBound() && range.upperBoundType().equals(BoundType.CLOSED))
+ ? getSafeEntry(upperEndpoint)
+ : getSafeEntry(upperEndpoint) + 1;
+
+ // #addOpenClosed doesn't create bitSet for lower-key because it avoids setting up values for non-exist items
+ // into the key-ledger. so, create bitSet and initialize so, it can't be ignored at #addOpenClosed
+ rangeBitSetMap.computeIfAbsent(lowerEndpoint.getKey(), (key) -> createNewBitSet())
+ .set((int) lowerValueOpen + 1);
+ this.addOpenClosed(lowerEndpoint.getKey(), lowerValueOpen, upperEndpoint.getKey(), upperValueClosed);
+ }
+
+ public boolean contains(LongPair position) {
+ checkNotNull(position, "argument can't be null");
+ return contains(position.getKey(), position.getValue());
+ }
+
+ public void remove(Range<LongPair> range) {
+ LongPair lowerEndpoint = range.hasLowerBound() ? range.lowerEndpoint() : LongPair.earliest;
+ LongPair upperEndpoint = range.hasUpperBound() ? range.upperEndpoint() : LongPair.latest;
+
+ long lower = (range.hasLowerBound() && range.lowerBoundType().equals(BoundType.CLOSED))
+ ? getSafeEntry(lowerEndpoint)
+ : getSafeEntry(lowerEndpoint) + 1;
+ long upper = (range.hasUpperBound() && range.upperBoundType().equals(BoundType.CLOSED))
+ ? getSafeEntry(upperEndpoint)
+ : getSafeEntry(upperEndpoint) - 1;
+
+ // if lower-bound is not set then remove all the keys less than given upper-bound range
+ if (lowerEndpoint.equals(LongPair.earliest)) {
+ // remove all keys with
+ rangeBitSetMap.forEach((key, set) -> {
+ if (key < upperEndpoint.getKey()) {
+ rangeBitSetMap.remove(key);
+ }
+ });
+ }
+
+ // if upper-bound is not set then remove all the keys greater than given lower-bound range
+ if (upperEndpoint.equals(LongPair.latest)) {
+ // remove all keys with
+ rangeBitSetMap.forEach((key, set) -> {
+ if (key > lowerEndpoint.getKey()) {
+ rangeBitSetMap.remove(key);
+ }
+ });
+ }
+
+ // remove all the keys between two endpoint keys
+ rangeBitSetMap.forEach((key, set) -> {
+ if (lowerEndpoint.getKey() == upperEndpoint.getKey()) {
+ set.clear((int) lower, (int) upper + 1);
+ } else {
+ // eg: remove-range: [(3,5) - (5,5)] -> Delete all items from 3,6->3,N,4.*,5,0->5,5
+ if (key == lowerEndpoint.getKey()) {
+ // remove all entries from given position to last position
+ set.clear((int) lower, set.previousSetBit(set.size()));
+ } else if (key == upperEndpoint.getKey()) {
+ // remove all entries from 0 to given position
+ set.clear(0, (int) upper + 1);
+ } else if (key > lowerEndpoint.getKey() && key < upperEndpoint.getKey()) {
+ rangeBitSetMap.remove(key);
+ }
+ }
+ // remove bit-set if set is empty
+ if (set.isEmpty()) {
+ rangeBitSetMap.remove(key);
+ }
+ });
+
+ updatedAfterCached = true;
+ }
+
+ private int getSafeEntry(LongPair position) {
+ return (int) Math.max(position.getValue(), -1);
+ }
+
+ private int getSafeEntry(long value) {
+ return (int) Math.max(value, -1);
+ }
+
+ private BitSet createNewBitSet() {
+ return this.threadSafe ? new ConcurrentBitSet(bitSetSize) : new BitSet(bitSetSize);
+ }
+
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
new file mode 100644
index 0000000..daad0ad
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import java.util.Collection;
+import java.util.Set;
+
+import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.TreeRangeSet;
+
+/**
+ * A set comprising zero or more ranges type of key-value pair.
+ */
+public interface LongPairRangeSet<T extends Comparable<T>> {
+
+ /**
+ * Adds the specified range (range that contains all values strictly greater than {@code
+ * lower} and less than or equal to {@code upper}.) to this {@code RangeSet} (optional operation). That is, for equal
+ * range sets a and b, the result of {@code a.add(range)} is that {@code a} will be the minimal range set for which
+ * both {@code a.enclosesAll(b)} and {@code a.encloses(range)}.
+ *
+ * <pre>
+ *
+ * @param lowerKey : value for key of lowerEndpoint of Range
+ * @param lowerValue: value for value of lowerEndpoint of Range
+ * @param upperKey : value for key of upperEndpoint of Range
+ * @param upperValue: value for value of upperEndpoint of Range
+ * </pre>
+ */
+ void addOpenClosed(long lowerKey, long lowerValue, long upperKey, long upperValue);
+
+ /** Determines whether any of this range set's member ranges contains {@code value}. */
+ boolean contains(long key, long value);
+
+ /**
+ * Returns the unique range from this range set that {@linkplain Range#contains contains} {@code value}, or
+ * {@code null} if this range set does not contain {@code value}.
+ */
+ Range<T> rangeContaining(long key, long value);
+
+ /**
+ * Remove range that contains all values less than or equal to given key-value.
+ *
+ * @param key
+ * @param value
+ */
+ void removeAtMost(long key, long value);
+
+ boolean isEmpty();
+
+ void clear();
+
+ /**
+ * Returns the minimal range which {@linkplain Range#encloses(Range) encloses} all ranges in this range set.
+ *
+ * @return
+ */
+ Range<T> span();
+
+ /**
+ * Returns a view of the {@linkplain Range#isConnected disconnected} ranges that make up this range set.
+ *
+ * @return
+ */
+ Collection<Range<T>> asRanges();
+
+ /**
+ * Performs the given action for each entry in this map until all entries have been processed or action returns "false".
+ * Unless otherwise specified by the implementing class, actions are performed in the order of entry
+ * set iteration (if an iteration order is specified.)
+ *
+ * @param action
+ */
+ void forEach(RangeProcessor<T> action);
+
+ /**
+ * Performs the given action for each entry in this map until all entries have been processed or action returns "false".
+ * Unless otherwise specified by the implementing class, actions are performed in the order of entry
+ * set iteration (if an iteration order is specified.)
+ *
+ * @param action
+ * @param consumer
+ */
+ void forEach(RangeProcessor<T> action, LongPairConsumer<? extends T> consumer);
+
+ /**
+ * Returns total number of ranges into the set.
+ *
+ * @return
+ */
+ int size();
+
+ /**
+ * It returns very first smallest range in the rangeSet.
+ *
+ * @return Range<T> first smallest range into the set
+ */
+ Range<T> firstRange();
+
+ public static interface LongPairConsumer<T> {
+ T apply(long key, long value);
+ }
+
+ public static interface RangeProcessor<T extends Comparable<T>> {
+ /**
+ *
+ * @param range
+ * @return false if there is no further processing required
+ */
+ boolean process(Range<T> range);
+ }
+
+ public static class LongPair implements Comparable<LongPair> {
+
+ public static final LongPair earliest = new LongPair(-1, -1);
+ public static final LongPair latest = new LongPair(Integer.MAX_VALUE, Integer.MAX_VALUE);
+
+ private long key;
+ private long value;
+
+ public LongPair(long key, long value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public long getKey() {
+ return this.key;
+ }
+
+ public long getValue() {
+ return this.value;
+ }
+
+ @Override
+ public int compareTo(LongPair o) {
+ return ComparisonChain.start().compare(key, o.getKey()).compare(value, o.getValue()).result();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%d:%d", key, value);
+ }
+ }
+
+ public static class DefaultRangeSet<T extends Comparable<T>> implements LongPairRangeSet<T> {
+
+ RangeSet<T> set = TreeRangeSet.create();
+
+ private final LongPairConsumer<T> consumer;
+
+ public DefaultRangeSet(LongPairConsumer<T> consumer) {
+ this.consumer = consumer;
+ }
+
+ @Override
+ public void clear() {
+ set.clear();
+ }
+
+ @Override
+ public void addOpenClosed(long key1, long value1, long key2, long value2) {
+ set.add(Range.openClosed(consumer.apply(key1, value1), consumer.apply(key2, value2)));
+ }
+
+ public boolean contains(T position) {
+ return set.contains(position);
+ }
+
+ public Range<T> rangeContaining(T position) {
+ return set.rangeContaining(position);
+ }
+
+ @Override
+ public Range<T> rangeContaining(long key, long value) {
+ return this.rangeContaining(consumer.apply(key, value));
+ }
+
+ public void remove(Range<T> range) {
+ set.remove(range);
+ }
+
+ @Override
+ public void removeAtMost(long key, long value) {
+ set.remove(Range.atMost(consumer.apply(key, value)));
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return set.isEmpty();
+ }
+
+ @Override
+ public Range<T> span() {
+ return set.span();
+ }
+
+ @Override
+ public Set<Range<T>> asRanges() {
+ return set.asRanges();
+ }
+
+ @Override
+ public void forEach(RangeProcessor<T> action) {
+ forEach(action, consumer);
+ }
+
+ @Override
+ public void forEach(RangeProcessor<T> action, LongPairConsumer<? extends T> consumer) {
+ for (Range<T> range : asRanges()) {
+ if (!action.process(range)) {
+ break;
+ }
+ }
+ }
+
+ @Override
+ public boolean contains(long key, long value) {
+ return this.contains(consumer.apply(key, value));
+ }
+
+ @Override
+ public Range<T> firstRange() {
+ return set.asRanges().iterator().next();
+ }
+
+ @Override
+ public int size() {
+ return set.asRanges().size();
+ }
+
+ @Override
+ public String toString() {
+ return set.toString();
+ }
+ }
+}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java
new file mode 100644
index 0000000..54bcecc
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSetTest.java
@@ -0,0 +1,384 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.collections;
+
+import static org.testng.Assert.assertEquals;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pulsar.common.util.collections.LongPairRangeSet.LongPair;
+import org.apache.pulsar.common.util.collections.LongPairRangeSet.LongPairConsumer;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.BoundType;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
+import com.google.common.collect.TreeRangeSet;
+
+public class ConcurrentOpenLongPairRangeSetTest {
+
+ static final LongPairConsumer<LongPair> consumer = (key, value) -> new LongPair(key, value);
+
+ @Test
+ public void testAddForSameKey() {
+ ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
+ // add 0 to 5
+ set.add(Range.closed(new LongPair(0, 0), new LongPair(0, 5)));
+ // add 8,9,10
+ set.add(Range.closed(new LongPair(0, 8), new LongPair(0, 8)));
+ set.add(Range.closed(new LongPair(0, 9), new LongPair(0, 9)));
+ set.add(Range.closed(new LongPair(0, 10), new LongPair(0, 10)));
+ // add 98 to 99 and 102,105
+ set.add(Range.closed(new LongPair(0, 98), new LongPair(0, 99)));
+ set.add(Range.closed(new LongPair(0, 102), new LongPair(0, 106)));
+
+ List<Range<LongPair>> ranges = set.asRanges();
+ int count = 0;
+ assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(0, -1), new LongPair(0, 5))));
+ assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(0, 7), new LongPair(0, 10))));
+ assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(0, 97), new LongPair(0, 99))));
+ assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(0, 101), new LongPair(0, 106))));
+ }
+
+ @Test
+ public void testAddForDifferentKey() {
+ ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
+ // [98,100],[(1,5),(1,5)],[(1,10,1,15)],[(1,20),(1,20)],[(2,0),(2,10)]
+ set.addOpenClosed(0, 98, 0, 99);
+ set.addOpenClosed(0, 100, 1, 5);
+ set.addOpenClosed(1, 10, 1, 15);
+ set.addOpenClosed(1, 20, 2, 10);
+
+ List<Range<LongPair>> ranges = set.asRanges();
+ int count = 0;
+ assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(0, 98), new LongPair(0, 99))));
+ assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(1, -1), new LongPair(1, 5))));
+ assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(1, 10), new LongPair(1, 15))));
+ assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(2, -1), new LongPair(2, 10))));
+ }
+
+ @Test
+ public void testAddCompareCompareWithGuava() {
+ ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
+ com.google.common.collect.RangeSet<LongPair> gSet = TreeRangeSet.create();
+
+ // add 10K values for key 0
+ int totalInsert = 10_000;
+ // add single values
+ for (int i = 0; i < totalInsert; i++) {
+ if (i % 3 == 0 || i % 6 == 0 || i % 8 == 0) {
+ LongPair lower = new LongPair(0, i - 1);
+ LongPair upper = new LongPair(0, i);
+ // set.add(Range.openClosed(lower, upper));
+ set.addOpenClosed(lower.getKey(), lower.getValue(), upper.getKey(), upper.getValue());
+ gSet.add(Range.openClosed(lower, upper));
+ }
+ }
+ // add batches
+ for (int i = totalInsert; i < (totalInsert * 2); i++) {
+ if (i % 5 == 0) {
+ LongPair lower = new LongPair(0, i - 3 - 1);
+ LongPair upper = new LongPair(0, i + 3);
+ // set.add(Range.openClosed(lower, upper));
+ set.addOpenClosed(lower.getKey(), lower.getValue(), upper.getKey(), upper.getValue());
+ gSet.add(Range.openClosed(lower, upper));
+ }
+ }
+ List<Range<LongPair>> ranges = set.asRanges();
+ Set<Range<LongPair>> gRanges = gSet.asRanges();
+
+ List<Range<LongPair>> gRangeConnected = getConnectedRange(gRanges);
+ assertEquals(gRangeConnected.size(), ranges.size());
+ int i = 0;
+ for (Range<LongPair> range : gRangeConnected) {
+ assertEquals(range, ranges.get(i));
+ i++;
+ }
+ }
+
+ @Test
+ public void testDeleteCompareWithGuava() {
+
+ ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
+ com.google.common.collect.RangeSet<LongPair> gSet = TreeRangeSet.create();
+
+ // add 10K values for key 0
+ int totalInsert = 10_000;
+ // add single values
+ List<Range<LongPair>> removedRanges = Lists.newArrayList();
+ for (int i = 0; i < totalInsert; i++) {
+ if (i % 3 == 0 || i % 7 == 0 || i % 11 == 0) {
+ continue;
+ }
+ LongPair lower = new LongPair(0, i - 1);
+ LongPair upper = new LongPair(0, i);
+ Range<LongPair> range = Range.openClosed(lower, upper);
+ // set.add(range);
+ set.addOpenClosed(lower.getKey(), lower.getValue(), upper.getKey(), upper.getValue());
+ gSet.add(range);
+ if (i % 4 == 0) {
+ removedRanges.add(range);
+ }
+ }
+ // add batches
+ for (int i = totalInsert; i < (totalInsert * 2); i++) {
+ LongPair lower = new LongPair(0, i - 3 - 1);
+ LongPair upper = new LongPair(0, i + 3);
+ Range<LongPair> range = Range.openClosed(lower, upper);
+ if (i % 5 != 0) {
+ // set.add(range);
+ set.addOpenClosed(lower.getKey(), lower.getValue(), upper.getKey(), upper.getValue());
+ gSet.add(range);
+ }
+ if (i % 4 == 0) {
+ removedRanges.add(range);
+ }
+ }
+ // remove records
+ for (Range<LongPair> range : removedRanges) {
+ set.remove(range);
+ gSet.remove(range);
+ }
+
+ List<Range<LongPair>> ranges = set.asRanges();
+ Set<Range<LongPair>> gRanges = gSet.asRanges();
+ List<Range<LongPair>> gRangeConnected = getConnectedRange(gRanges);
+ assertEquals(gRangeConnected.size(), ranges.size());
+ int i = 0;
+ for (Range<LongPair> range : gRangeConnected) {
+ assertEquals(range, ranges.get(i));
+ i++;
+ }
+ }
+
+ @Test
+ public void testSpanWithGuava() {
+ ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
+ com.google.common.collect.RangeSet<LongPair> gSet = TreeRangeSet.create();
+ set.add(Range.openClosed(new LongPair(0, 97), new LongPair(0, 99)));
+ gSet.add(Range.openClosed(new LongPair(0, 97), new LongPair(0, 99)));
+ set.add(Range.openClosed(new LongPair(0, 99), new LongPair(1, 5)));
+ gSet.add(Range.openClosed(new LongPair(0, 99), new LongPair(1, 5)));
+ assertEquals(set.span(), gSet.span());
+ assertEquals(set.span(), Range.openClosed(new LongPair(0, 97), new LongPair(1, 5)));
+
+ set.add(Range.openClosed(new LongPair(1, 9), new LongPair(1, 15)));
+ set.add(Range.openClosed(new LongPair(1, 19), new LongPair(2, 10)));
+ set.add(Range.openClosed(new LongPair(2, 24), new LongPair(2, 28)));
+ set.add(Range.openClosed(new LongPair(3, 11), new LongPair(3, 20)));
+ set.add(Range.openClosed(new LongPair(4, 11), new LongPair(4, 20)));
+ gSet.add(Range.openClosed(new LongPair(1, 9), new LongPair(1, 15)));
+ gSet.add(Range.openClosed(new LongPair(1, 19), new LongPair(2, 10)));
+ gSet.add(Range.openClosed(new LongPair(2, 24), new LongPair(2, 28)));
+ gSet.add(Range.openClosed(new LongPair(3, 11), new LongPair(3, 20)));
+ gSet.add(Range.openClosed(new LongPair(4, 11), new LongPair(4, 20)));
+ assertEquals(set.span(), gSet.span());
+ assertEquals(set.span(), Range.openClosed(new LongPair(0, 97), new LongPair(4, 20)));
+ }
+
+ @Test
+ public void testFirstRange() {
+ ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
+ Range<LongPair> range = Range.openClosed(new LongPair(0, 97), new LongPair(0, 99));
+ set.add(range);
+ assertEquals(set.firstRange(), range);
+ assertEquals(set.size(), 1);
+ range = Range.openClosed(new LongPair(0, 98), new LongPair(0, 105));
+ set.add(range);
+ assertEquals(set.firstRange(), Range.openClosed(new LongPair(0, 97), new LongPair(0, 105)));
+ assertEquals(set.size(), 1);
+ range = Range.openClosed(new LongPair(0, 5), new LongPair(0, 75));
+ set.add(range);
+ assertEquals(set.firstRange(), range);
+ assertEquals(set.size(), 2);
+ }
+
+ @Test
+ public void testToString() {
+ ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
+ Range<LongPair> range = Range.openClosed(new LongPair(0, 97), new LongPair(0, 99));
+ set.add(range);
+ assertEquals(set.toString(), "[(0:97..0:99]]");
+ range = Range.openClosed(new LongPair(0, 98), new LongPair(0, 105));
+ set.add(range);
+ assertEquals(set.toString(), "[(0:97..0:105]]");
+ range = Range.openClosed(new LongPair(0, 5), new LongPair(0, 75));
+ set.add(range);
+ assertEquals(set.toString(), "[(0:5..0:75],(0:97..0:105]]");
+ }
+
+ @Test
+ public void testDeleteForDifferentKey() {
+ ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
+ set.addOpenClosed(0, 97, 0, 99);
+ set.addOpenClosed(0, 99, 1, 5);
+ set.addOpenClosed(1, 9, 1, 15);
+ set.addOpenClosed(1, 19, 2, 10);
+ set.addOpenClosed(2, 24, 2, 28);
+ set.addOpenClosed(3, 11, 3, 20);
+ set.addOpenClosed(4, 11, 4, 20);
+
+ // delete only (0,100)
+ set.remove(Range.open(new LongPair(0, 99), new LongPair(0, 105)));
+
+ /**
+ * delete all keys from [2,27]->[4,15] : remaining [2,25..26,28], [4,16..20]
+ */
+ set.remove(Range.closed(new LongPair(2, 27), new LongPair(4, 15)));
+
+ List<Range<LongPair>> ranges = set.asRanges();
+ int count = 0;
+ assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(0, 97), new LongPair(0, 99))));
+ assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(1, -1), new LongPair(1, 5))));
+ assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(1, 9), new LongPair(1, 15))));
+ assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(2, -1), new LongPair(2, 10))));
+
+ assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(2, 24), new LongPair(2, 26))));
+ assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(2, 27), new LongPair(2, 28))));
+ assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(4, 15), new LongPair(4, 20))));
+ }
+
+ @Test
+ public void testDeleteWithAtMost() {
+ ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
+ set.add(Range.closed(new LongPair(0, 98), new LongPair(0, 99)));
+ set.add(Range.closed(new LongPair(0, 100), new LongPair(1, 5)));
+ set.add(Range.closed(new LongPair(1, 10), new LongPair(1, 15)));
+ set.add(Range.closed(new LongPair(1, 20), new LongPair(2, 10)));
+ set.add(Range.closed(new LongPair(2, 25), new LongPair(2, 28)));
+ set.add(Range.closed(new LongPair(3, 12), new LongPair(3, 20)));
+ set.add(Range.closed(new LongPair(4, 12), new LongPair(4, 20)));
+
+ // delete only (0,100)
+ set.remove(Range.open(new LongPair(0, 99), new LongPair(0, 105)));
+
+ /**
+ * delete all keys from [2,27]->[4,15] : remaining [2,25..26,28], [4,16..20]
+ */
+ set.remove(Range.atMost(new LongPair(2, 27)));
+
+ List<Range<LongPair>> ranges = set.asRanges();
+ int count = 0;
+ assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(2, 27), new LongPair(2, 28))));
+ assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(3, 11), new LongPair(3, 20))));
+ assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(4, 11), new LongPair(4, 20))));
+ }
+
+ @Test
+ public void testDeleteWithLeastMost() {
+ ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
+ set.add(Range.closed(new LongPair(0, 98), new LongPair(0, 99)));
+ set.add(Range.closed(new LongPair(0, 100), new LongPair(1, 5)));
+ set.add(Range.closed(new LongPair(1, 10), new LongPair(1, 15)));
+ set.add(Range.closed(new LongPair(1, 20), new LongPair(2, 10)));
+ set.add(Range.closed(new LongPair(2, 25), new LongPair(2, 28)));
+ set.add(Range.closed(new LongPair(3, 12), new LongPair(3, 20)));
+ set.add(Range.closed(new LongPair(4, 12), new LongPair(4, 20)));
+
+ // delete only (0,100)
+ set.remove(Range.open(new LongPair(0, 99), new LongPair(0, 105)));
+
+ /**
+ * delete all keys from [2,27]->[4,15] : remaining [2,25..26,28], [4,16..20]
+ */
+ set.remove(Range.atLeast(new LongPair(2, 27)));
+
+ List<Range<LongPair>> ranges = set.asRanges();
+ int count = 0;
+ assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(0, 97), new LongPair(0, 99))));
+ assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(1, -1), new LongPair(1, 5))));
+ assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(1, 9), new LongPair(1, 15))));
+ assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(1, 19), new LongPair(1, 20))));
+ assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(2, -1), new LongPair(2, 10))));
+ assertEquals(ranges.get(count++), (Range.openClosed(new LongPair(2, 24), new LongPair(2, 26))));
+ }
+
+ @Test
+ public void testRangeContaining() {
+ ConcurrentOpenLongPairRangeSet<LongPair> set = new ConcurrentOpenLongPairRangeSet<>(consumer);
+ set.add(Range.closed(new LongPair(0, 98), new LongPair(0, 99)));
+ set.add(Range.closed(new LongPair(0, 100), new LongPair(1, 5)));
+ com.google.common.collect.RangeSet<LongPair> gSet = TreeRangeSet.create();
+ gSet.add(Range.closed(new LongPair(0, 98), new LongPair(0, 100)));
+ gSet.add(Range.closed(new LongPair(0, 101), new LongPair(1, 5)));
+ set.add(Range.closed(new LongPair(1, 10), new LongPair(1, 15)));
+ set.add(Range.closed(new LongPair(1, 20), new LongPair(2, 10)));
+ set.add(Range.closed(new LongPair(2, 25), new LongPair(2, 28)));
+ set.add(Range.closed(new LongPair(3, 12), new LongPair(3, 20)));
+ set.add(Range.closed(new LongPair(4, 12), new LongPair(4, 20)));
+ gSet.add(Range.closed(new LongPair(1, 10), new LongPair(1, 15)));
+ gSet.add(Range.closed(new LongPair(1, 20), new LongPair(2, 10)));
+ gSet.add(Range.closed(new LongPair(2, 25), new LongPair(2, 28)));
+ gSet.add(Range.closed(new LongPair(3, 12), new LongPair(3, 20)));
+ gSet.add(Range.closed(new LongPair(4, 12), new LongPair(4, 20)));
+
+ LongPair position = new LongPair(0, 99);
+ assertEquals(set.rangeContaining(position.getKey(), position.getValue()),
+ Range.closed(new LongPair(0, 98), new LongPair(0, 100)));
+ assertEquals(set.rangeContaining(position.getKey(), position.getValue()), gSet.rangeContaining(position));
+
+ position = new LongPair(2, 30);
+ assertEquals(set.rangeContaining(position.getKey(), position.getValue()), null);
+ assertEquals(set.rangeContaining(position.getKey(), position.getValue()), gSet.rangeContaining(position));
+
+ position = new LongPair(3, 13);
+ assertEquals(set.rangeContaining(position.getKey(), position.getValue()),
+ Range.closed(new LongPair(3, 12), new LongPair(3, 20)));
+ assertEquals(set.rangeContaining(position.getKey(), position.getValue()), gSet.rangeContaining(position));
+
+ position = new LongPair(3, 22);
+ assertEquals(set.rangeContaining(position.getKey(), position.getValue()), null);
+ assertEquals(set.rangeContaining(position.getKey(), position.getValue()), gSet.rangeContaining(position));
+ }
+
+ private List<Range<LongPair>> getConnectedRange(Set<Range<LongPair>> gRanges) {
+ List<Range<LongPair>> gRangeConnected = Lists.newArrayList();
+ Range<LongPair> lastRange = null;
+ for (Range<LongPair> range : gRanges) {
+ if (lastRange == null) {
+ lastRange = range;
+ continue;
+ }
+ LongPair previousUpper = lastRange.upperEndpoint();
+ LongPair currentLower = range.lowerEndpoint();
+ int previousUpperValue = (int) (lastRange.upperBoundType().equals(BoundType.CLOSED)
+ ? previousUpper.getValue()
+ : previousUpper.getValue() - 1);
+ int currentLowerValue = (int) (range.lowerBoundType().equals(BoundType.CLOSED) ? currentLower.getValue()
+ : currentLower.getValue() + 1);
+ boolean connected = (previousUpper.getKey() == currentLower.getKey())
+ ? (previousUpperValue >= currentLowerValue)
+ : false;
+ if (connected) {
+ lastRange = Range.closed(lastRange.lowerEndpoint(), range.upperEndpoint());
+ } else {
+ gRangeConnected.add(lastRange);
+ lastRange = range;
+ }
+ }
+ int lowerOpenValue = (int) (lastRange.lowerBoundType().equals(BoundType.CLOSED)
+ ? (lastRange.lowerEndpoint().getValue() - 1)
+ : lastRange.lowerEndpoint().getValue());
+ lastRange = Range.openClosed(new LongPair(lastRange.lowerEndpoint().getKey(), lowerOpenValue),
+ lastRange.upperEndpoint());
+ gRangeConnected.add(lastRange);
+ return gRangeConnected;
+ }
+}