You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/11/11 23:18:14 UTC
[04/15] cassandra git commit: Invalidate row/counter cache after
stream receive task is completed
Invalidate row/counter cache after stream receive task is completed
patch by Paulo Motta; reviewed by yukim for CASSANDRA-10341
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1c3ff924
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1c3ff924
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1c3ff924
Branch: refs/heads/cassandra-3.0
Commit: 1c3ff9242a0bfc5c544c69f68ee7b17a464a5ab3
Parents: 6bad57f
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Nov 11 13:26:22 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Nov 11 15:52:37 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 31 ++++++++++
.../db/compaction/CompactionController.java | 5 --
src/java/org/apache/cassandra/dht/Bounds.java | 62 ++++++++++++++++++++
.../cassandra/io/sstable/SSTableRewriter.java | 1 -
.../cassandra/streaming/StreamReader.java | 1 -
.../cassandra/streaming/StreamReceiveTask.java | 36 ++++++++++++
.../apache/cassandra/db/CounterCacheTest.java | 45 ++++++++++++++
.../org/apache/cassandra/db/RowCacheTest.java | 61 +++++++++++++++++--
.../org/apache/cassandra/dht/BoundsTest.java | 61 +++++++++++++++++++
10 files changed, 291 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c3ff924/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fa2017a..92244a0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.12
+ * Invalidate cache after stream receive task is completed (CASSANDRA-10341)
* Reject counter writes in CQLSSTableWriter (CASSANDRA-10258)
* Remove superfluous COUNTER_MUTATION stage mapping (CASSANDRA-10605)
* Improve json2sstable error reporting on nonexistent columns (CASSANDRA-10401)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c3ff924/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 906e18c..54f6fff 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2505,6 +2505,37 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
CacheService.instance.invalidateCounterCacheForCf(metadata.ksAndCFName);
}
+ public int invalidateRowCache(Collection<Bounds<Token>> boundsToInvalidate)
+ {
+ int invalidatedKeys = 0;
+ for (RowCacheKey key : CacheService.instance.rowCache.getKeySet())
+ {
+ DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.key));
+ if (key.ksAndCFName.equals(metadata.ksAndCFName) && Bounds.isInBounds(dk.getToken(), boundsToInvalidate))
+ {
+ invalidateCachedRow(dk);
+ invalidatedKeys++;
+ }
+ }
+
+ return invalidatedKeys;
+ }
+
+ public int invalidateCounterCache(Collection<Bounds<Token>> boundsToInvalidate)
+ {
+ int invalidatedKeys = 0;
+ for (CounterCacheKey key : CacheService.instance.counterCache.getKeySet())
+ {
+ DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.partitionKey));
+ if (key.ksAndCFName.equals(metadata.ksAndCFName) && Bounds.isInBounds(dk.getToken(), boundsToInvalidate))
+ {
+ CacheService.instance.counterCache.remove(key);
+ invalidatedKeys++;
+ }
+ }
+ return invalidatedKeys;
+ }
+
/**
* @return true if @param key is contained in the row cache
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c3ff924/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index f8ff163..35d0832 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -189,11 +189,6 @@ public class CompactionController implements AutoCloseable
return min;
}
- public void invalidateCachedRow(DecoratedKey key)
- {
- cfs.invalidateCachedRow(key);
- }
-
public void close()
{
overlappingSSTables.release();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c3ff924/src/java/org/apache/cassandra/dht/Bounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Bounds.java b/src/java/org/apache/cassandra/dht/Bounds.java
index 42eea77..5ffde42 100644
--- a/src/java/org/apache/cassandra/dht/Bounds.java
+++ b/src/java/org/apache/cassandra/dht/Bounds.java
@@ -17,8 +17,17 @@
*/
package org.apache.cassandra.dht;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.Sets;
import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.service.StorageService;
@@ -108,6 +117,20 @@ public class Bounds<T extends RingPosition<T>> extends AbstractBounds<T>
return "]";
}
+ public static <T extends RingPosition<T>> boolean isInBounds(T token, Iterable<Bounds<T>> bounds)
+ {
+ assert bounds != null;
+
+ for (Bounds<T> bound : bounds)
+ {
+ if (bound.contains(token))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
/**
* Compute a bounds of keys corresponding to a given bounds of token.
*/
@@ -132,4 +155,43 @@ public class Bounds<T extends RingPosition<T>> extends AbstractBounds<T>
{
return new Bounds<T>(left, newRight);
}
+
+ /**
+ * Retrieves non-overlapping bounds for the list of input bounds
+ *
+ * Assume we have the following bounds
+ * (brackets representing left/right bound):
+ * [ ] [ ] [ ] [ ]
+ * [ ] [ ]
+ * This method will return the following bounds:
+ * [ ] [ ]
+ *
+ * @param bounds unsorted bounds to find overlaps
+ * @return the non-overlapping bounds
+ */
+ public static <T extends RingPosition<T>> Set<Bounds<T>> getNonOverlappingBounds(Iterable<Bounds<T>> bounds)
+ {
+ ArrayList<Bounds<T>> sortedBounds = Lists.newArrayList(bounds);
+ Collections.sort(sortedBounds, new Comparator<Bounds<T>>()
+ {
+ public int compare(Bounds<T> o1, Bounds<T> o2)
+ {
+ return o1.left.compareTo(o2.left);
+ }
+ });
+
+ Set<Bounds<T>> nonOverlappingBounds = Sets.newHashSet();
+
+ PeekingIterator<Bounds<T>> it = Iterators.peekingIterator(sortedBounds.iterator());
+ while (it.hasNext())
+ {
+ Bounds<T> beginBound = it.next();
+ Bounds<T> endBound = beginBound;
+ while (it.hasNext() && endBound.right.compareTo(it.peek().left) >= 0)
+ endBound = it.next();
+ nonOverlappingBounds.add(new Bounds<>(beginBound.left, endBound.right));
+ }
+
+ return nonOverlappingBounds;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c3ff924/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 82492a8..af5d1d3 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -32,7 +32,6 @@ import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.compaction.AbstractCompactedRow;
import org.apache.cassandra.utils.CLibrary;
-import org.apache.cassandra.utils.FBUtilities;
import static org.apache.cassandra.utils.Throwables.merge;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c3ff924/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 5389a80..18013fe 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -166,6 +166,5 @@ public class StreamReader
{
DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
writer.appendFromStream(key, cfs.metadata, in, inputVersion);
- cfs.invalidateCachedRow(key);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c3ff924/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index da2d7d6..738c93c 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -23,14 +23,20 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.utils.FBUtilities;
@@ -47,6 +53,8 @@ public class StreamReceiveTask extends StreamTask
FBUtilities.getAvailableProcessors(),
60, TimeUnit.SECONDS);
+ private static final Logger logger = LoggerFactory.getLogger(StreamReceiveTask.class);
+
// number of files to receive
private final int totalFiles;
// total size of files to receive
@@ -79,6 +87,7 @@ public class StreamReceiveTask extends StreamTask
assert cfId.equals(sstable.metadata.cfId);
sstables.add(sstable);
+
if (sstables.size() == totalFiles)
{
done = true;
@@ -134,6 +143,33 @@ public class StreamReceiveTask extends StreamTask
// add sstables and build secondary indexes
cfs.addSSTables(readers);
cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
+
+ //invalidate row and counter cache
+ if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
+ {
+ List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
+ for (SSTableReader sstable : readers)
+ boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
+ Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+
+ if (cfs.isRowCacheEnabled())
+ {
+ int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ }
+
+ if (cfs.metadata.isCounter())
+ {
+ int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
+ if (invalidatedKeys > 0)
+ logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
+ "receive task completed.", task.session.planId(), invalidatedKeys,
+ cfs.keyspace.getName(), cfs.getColumnFamilyName());
+ }
+ }
}
task.session.taskCompleted(task);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c3ff924/test/unit/org/apache/cassandra/db/CounterCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CounterCacheTest.java b/test/unit/org/apache/cassandra/db/CounterCacheTest.java
index 20e067c..542358d 100644
--- a/test/unit/org/apache/cassandra/db/CounterCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/CounterCacheTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.db;
+import java.util.Collections;
import java.util.concurrent.ExecutionException;
import org.junit.AfterClass;
@@ -24,6 +25,8 @@ import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.utils.FBUtilities;
@@ -71,6 +74,48 @@ public class CounterCacheTest extends SchemaLoader
}
@Test
+ public void testCounterCacheInvalidate()
+ {
+ ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF);
+ cfs.truncateBlocking();
+ CacheService.instance.invalidateCounterCache();
+
+ assertEquals(0, CacheService.instance.counterCache.size());
+ assertNull(cfs.getCachedCounter(bytes(1), cellname(1)));
+ assertNull(cfs.getCachedCounter(bytes(1), cellname(2)));
+ assertNull(cfs.getCachedCounter(bytes(2), cellname(1)));
+ assertNull(cfs.getCachedCounter(bytes(2), cellname(2)));
+ assertNull(cfs.getCachedCounter(bytes(3), cellname(1)));
+ assertNull(cfs.getCachedCounter(bytes(3), cellname(2)));
+
+ cfs.putCachedCounter(bytes(1), cellname(1), ClockAndCount.create(1L, 1L));
+ cfs.putCachedCounter(bytes(1), cellname(2), ClockAndCount.create(1L, 2L));
+ cfs.putCachedCounter(bytes(2), cellname(1), ClockAndCount.create(2L, 1L));
+ cfs.putCachedCounter(bytes(2), cellname(2), ClockAndCount.create(2L, 2L));
+ cfs.putCachedCounter(bytes(3), cellname(1), ClockAndCount.create(3L, 1L));
+ cfs.putCachedCounter(bytes(3), cellname(2), ClockAndCount.create(3L, 2L));
+
+ assertEquals(6, CacheService.instance.counterCache.size());
+ assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(1), cellname(1)));
+ assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(1), cellname(2)));
+ assertEquals(ClockAndCount.create(2L, 1L), cfs.getCachedCounter(bytes(2), cellname(1)));
+ assertEquals(ClockAndCount.create(2L, 2L), cfs.getCachedCounter(bytes(2), cellname(2)));
+ assertEquals(ClockAndCount.create(3L, 1L), cfs.getCachedCounter(bytes(3), cellname(1)));
+ assertEquals(ClockAndCount.create(3L, 2L), cfs.getCachedCounter(bytes(3), cellname(2)));
+
+ cfs.invalidateCounterCache(Collections.singleton(new Bounds<Token>(cfs.partitioner.decorateKey(bytes(1)).getToken(),
+ cfs.partitioner.decorateKey(bytes(2)).getToken())));
+
+ assertEquals(2, CacheService.instance.counterCache.size());
+ assertNull(cfs.getCachedCounter(bytes(1), cellname(1)));
+ assertNull(cfs.getCachedCounter(bytes(1), cellname(2)));
+ assertNull(cfs.getCachedCounter(bytes(2), cellname(1)));
+ assertNull(cfs.getCachedCounter(bytes(2), cellname(2)));
+ assertEquals(ClockAndCount.create(3L, 1L), cfs.getCachedCounter(bytes(3), cellname(1)));
+ assertEquals(ClockAndCount.create(3L, 2L), cfs.getCachedCounter(bytes(3), cellname(2)));
+ }
+
+ @Test
public void testSaveLoad() throws ExecutionException, InterruptedException, WriteTimeoutException
{
ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c3ff924/test/unit/org/apache/cassandra/db/RowCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowCacheTest.java b/test/unit/org/apache/cassandra/db/RowCacheTest.java
index 6d4554d..719c771 100644
--- a/test/unit/org/apache/cassandra/db/RowCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java
@@ -20,8 +20,12 @@ package org.apache.cassandra.db;
import java.net.InetAddress;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Iterator;
+import java.util.TreeSet;
+import com.google.common.collect.Lists;
import org.junit.AfterClass;
import org.junit.Test;
@@ -32,13 +36,15 @@ import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.composites.*;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.BytesToken;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
+
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
public class RowCacheTest extends SchemaLoader
{
@@ -152,6 +158,51 @@ public class RowCacheTest extends SchemaLoader
}
@Test
+ public void testInvalidateRowCache() throws Exception
+ {
+ StorageService.instance.initServer(0);
+ CacheService.instance.setRowCacheCapacityInMB(1);
+ rowCacheLoad(100, Integer.MAX_VALUE, 1000);
+
+ ColumnFamilyStore store = Keyspace.open(KEYSPACE).getColumnFamilyStore(COLUMN_FAMILY);
+ assertEquals(CacheService.instance.rowCache.getKeySet().size(), 100);
+
+ //construct 5 ranges of 20 elements each
+ ArrayList<Bounds<Token>> subranges = getBounds(20);
+
+ //invalidate 3 of the 5 ranges
+ ArrayList<Bounds<Token>> boundsToInvalidate = Lists.newArrayList(subranges.get(0), subranges.get(2), subranges.get(4));
+ int invalidatedKeys = store.invalidateRowCache(boundsToInvalidate);
+ assertEquals(60, invalidatedKeys);
+
+ //now there should be only 40 cached entries left
+ assertEquals(40, CacheService.instance.rowCache.getKeySet().size());
+ CacheService.instance.setRowCacheCapacityInMB(0);
+ }
+
+ private ArrayList<Bounds<Token>> getBounds(int nElements)
+ {
+ ColumnFamilyStore store = Keyspace.open(KEYSPACE).getColumnFamilyStore(COLUMN_FAMILY);
+ TreeSet<DecoratedKey> orderedKeys = new TreeSet<>();
+
+ for (RowCacheKey key : CacheService.instance.rowCache.getKeySet())
+ orderedKeys.add(store.partitioner.decorateKey(ByteBuffer.wrap(key.key)));
+
+ ArrayList<Bounds<Token>> boundsToInvalidate = new ArrayList<>();
+ Iterator<DecoratedKey> iterator = orderedKeys.iterator();
+
+ while (iterator.hasNext())
+ {
+ Token startRange = iterator.next().getToken();
+ for (int i = 0; i < nElements-2; i++)
+ iterator.next();
+ Token endRange = iterator.next().getToken();
+ boundsToInvalidate.add(new Bounds<>(startRange, endRange));
+ }
+ return boundsToInvalidate;
+ }
+
+ @Test
public void testRowCachePartialLoad() throws Exception
{
CacheService.instance.setRowCacheCapacityInMB(1);
@@ -220,9 +271,9 @@ public class RowCacheTest extends SchemaLoader
// populate row cache, we should not get a row cache hit;
cachedStore.getColumnFamily(QueryFilter.getSliceFilter(dk, cf,
- Composites.EMPTY,
- Composites.EMPTY,
- false, 10, System.currentTimeMillis()));
+ Composites.EMPTY,
+ Composites.EMPTY,
+ false, 10, System.currentTimeMillis()));
assertEquals(startRowCacheHits, cachedStore.metric.rowCacheHit.count());
// do another query, limit is 20, which is < 100 that we cache, we should get a hit and it should be in range
@@ -272,8 +323,6 @@ public class RowCacheTest extends SchemaLoader
{
CompactionManager.instance.disableAutoCompaction();
- ColumnFamilyStore store = Keyspace.open(KEYSPACE).getColumnFamilyStore(COLUMN_FAMILY);
-
// empty the cache
CacheService.instance.invalidateRowCache();
assert CacheService.instance.rowCache.size() == 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c3ff924/test/unit/org/apache/cassandra/dht/BoundsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/BoundsTest.java b/test/unit/org/apache/cassandra/dht/BoundsTest.java
new file mode 100644
index 0000000..527b498
--- /dev/null
+++ b/test/unit/org/apache/cassandra/dht/BoundsTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class BoundsTest
+{
+
+ private Bounds<Token> bounds(long left, long right)
+ {
+ return new Bounds<Token>(new LongToken(left), new LongToken(right));
+ }
+
+ @Test
+ /**
+ * [0,1],[0,5],[1,8],[4,10] = [0, 10]
+ * [15,19][19,20] = [15,20]
+ * [21, 22] = [21,22]
+ */
+ public void testGetNonOverlappingBounds()
+ {
+ List<Bounds<Token>> bounds = new LinkedList<>();
+ bounds.add(bounds(19, 20));
+ bounds.add(bounds(0, 1));
+ bounds.add(bounds(4, 10));
+ bounds.add(bounds(15, 19));
+ bounds.add(bounds(0, 5));
+ bounds.add(bounds(21, 22));
+ bounds.add(bounds(1, 8));
+
+ Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(bounds);
+ assertEquals(3, nonOverlappingBounds.size());
+ assertTrue(nonOverlappingBounds.contains(bounds(0, 10)));
+ assertTrue(nonOverlappingBounds.contains(bounds(15,20)));
+ assertTrue(nonOverlappingBounds.contains(bounds(21,22)));
+ }
+}