You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/11/11 23:18:14 UTC

[04/15] cassandra git commit: Invalidate row/counter cache after stream receive task is completed

Invalidate row/counter cache after stream receive task is completed

patch by Paulo Motta; reviewed by yukim for CASSANDRA-10341


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1c3ff924
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1c3ff924
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1c3ff924

Branch: refs/heads/cassandra-3.0
Commit: 1c3ff9242a0bfc5c544c69f68ee7b17a464a5ab3
Parents: 6bad57f
Author: Paulo Motta <pa...@gmail.com>
Authored: Wed Nov 11 13:26:22 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Nov 11 15:52:37 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 31 ++++++++++
 .../db/compaction/CompactionController.java     |  5 --
 src/java/org/apache/cassandra/dht/Bounds.java   | 62 ++++++++++++++++++++
 .../cassandra/io/sstable/SSTableRewriter.java   |  1 -
 .../cassandra/streaming/StreamReader.java       |  1 -
 .../cassandra/streaming/StreamReceiveTask.java  | 36 ++++++++++++
 .../apache/cassandra/db/CounterCacheTest.java   | 45 ++++++++++++++
 .../org/apache/cassandra/db/RowCacheTest.java   | 61 +++++++++++++++++--
 .../org/apache/cassandra/dht/BoundsTest.java    | 61 +++++++++++++++++++
 10 files changed, 291 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c3ff924/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fa2017a..92244a0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.12
+ * Invalidate cache after stream receive task is completed (CASSANDRA-10341)
  * Reject counter writes in CQLSSTableWriter (CASSANDRA-10258)
  * Remove superfluous COUNTER_MUTATION stage mapping (CASSANDRA-10605)
  * Improve json2sstable error reporting on nonexistent columns (CASSANDRA-10401)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c3ff924/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 906e18c..54f6fff 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -2505,6 +2505,37 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             CacheService.instance.invalidateCounterCacheForCf(metadata.ksAndCFName);
     }
 
+    public int invalidateRowCache(Collection<Bounds<Token>> boundsToInvalidate)
+    {
+        int invalidatedKeys = 0;
+        for (RowCacheKey key : CacheService.instance.rowCache.getKeySet())
+        {
+            DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.key));
+            if (key.ksAndCFName.equals(metadata.ksAndCFName) && Bounds.isInBounds(dk.getToken(), boundsToInvalidate))
+            {
+                invalidateCachedRow(dk);
+                invalidatedKeys++;
+            }
+        }
+
+        return invalidatedKeys;
+    }
+
+    public int invalidateCounterCache(Collection<Bounds<Token>> boundsToInvalidate)
+    {
+        int invalidatedKeys = 0;
+        for (CounterCacheKey key : CacheService.instance.counterCache.getKeySet())
+        {
+            DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.partitionKey));
+            if (key.ksAndCFName.equals(metadata.ksAndCFName) && Bounds.isInBounds(dk.getToken(), boundsToInvalidate))
+            {
+                CacheService.instance.counterCache.remove(key);
+                invalidatedKeys++;
+            }
+        }
+        return invalidatedKeys;
+    }
+
     /**
      * @return true if @param key is contained in the row cache
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c3ff924/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index f8ff163..35d0832 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -189,11 +189,6 @@ public class CompactionController implements AutoCloseable
         return min;
     }
 
-    public void invalidateCachedRow(DecoratedKey key)
-    {
-        cfs.invalidateCachedRow(key);
-    }
-
     public void close()
     {
         overlappingSSTables.release();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c3ff924/src/java/org/apache/cassandra/dht/Bounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Bounds.java b/src/java/org/apache/cassandra/dht/Bounds.java
index 42eea77..5ffde42 100644
--- a/src/java/org/apache/cassandra/dht/Bounds.java
+++ b/src/java/org/apache/cassandra/dht/Bounds.java
@@ -17,8 +17,17 @@
  */
 package org.apache.cassandra.dht;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
+import java.util.Set;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.Sets;
 
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.service.StorageService;
@@ -108,6 +117,20 @@ public class Bounds<T extends RingPosition<T>> extends AbstractBounds<T>
         return "]";
     }
 
+    public static <T extends RingPosition<T>> boolean isInBounds(T token, Iterable<Bounds<T>> bounds)
+    {
+        assert bounds != null;
+
+        for (Bounds<T> bound : bounds)
+        {
+            if (bound.contains(token))
+            {
+                return true;
+            }
+        }
+        return false;
+    }
+
     /**
      * Compute a bounds of keys corresponding to a given bounds of token.
      */
@@ -132,4 +155,43 @@ public class Bounds<T extends RingPosition<T>> extends AbstractBounds<T>
     {
         return new Bounds<T>(left, newRight);
     }
+
+    /**
+     * Retrieves non-overlapping bounds for the list of input bounds
+     *
+     * Assume we have the following bounds
+     * (brackets representing left/right bound):
+     * [   ] [   ]    [   ]   [  ]
+     * [   ]         [       ]
+     * This method will return the following bounds:
+     * [         ]    [          ]
+     *
+     * @param bounds unsorted bounds to find overlaps
+     * @return the non-overlapping bounds
+     */
+    public static <T extends RingPosition<T>> Set<Bounds<T>> getNonOverlappingBounds(Iterable<Bounds<T>> bounds)
+    {
+        ArrayList<Bounds<T>> sortedBounds = Lists.newArrayList(bounds);
+        Collections.sort(sortedBounds, new Comparator<Bounds<T>>()
+        {
+            public int compare(Bounds<T> o1, Bounds<T> o2)
+            {
+                return o1.left.compareTo(o2.left);
+            }
+        });
+
+        Set<Bounds<T>> nonOverlappingBounds = Sets.newHashSet();
+
+        PeekingIterator<Bounds<T>> it = Iterators.peekingIterator(sortedBounds.iterator());
+        while (it.hasNext())
+        {
+            Bounds<T> beginBound = it.next();
+            Bounds<T> endBound = beginBound;
+            while (it.hasNext() && endBound.right.compareTo(it.peek().left) >= 0)
+                endBound = it.next();
+            nonOverlappingBounds.add(new Bounds<>(beginBound.left, endBound.right));
+        }
+
+        return nonOverlappingBounds;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c3ff924/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 82492a8..af5d1d3 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -32,7 +32,6 @@ import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 import org.apache.cassandra.utils.CLibrary;
-import org.apache.cassandra.utils.FBUtilities;
 
 import static org.apache.cassandra.utils.Throwables.merge;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c3ff924/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java
index 5389a80..18013fe 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@ -166,6 +166,5 @@ public class StreamReader
     {
         DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
         writer.appendFromStream(key, cfs.metadata, in, inputVersion);
-        cfs.invalidateCachedRow(key);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c3ff924/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index da2d7d6..738c93c 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -23,14 +23,20 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.utils.FBUtilities;
@@ -47,6 +53,8 @@ public class StreamReceiveTask extends StreamTask
                                                                                                               FBUtilities.getAvailableProcessors(),
                                                                                                               60, TimeUnit.SECONDS);
 
+    private static final Logger logger = LoggerFactory.getLogger(StreamReceiveTask.class);
+
     // number of files to receive
     private final int totalFiles;
     // total size of files to receive
@@ -79,6 +87,7 @@ public class StreamReceiveTask extends StreamTask
         assert cfId.equals(sstable.metadata.cfId);
 
         sstables.add(sstable);
+
         if (sstables.size() == totalFiles)
         {
             done = true;
@@ -134,6 +143,33 @@ public class StreamReceiveTask extends StreamTask
                 // add sstables and build secondary indexes
                 cfs.addSSTables(readers);
                 cfs.indexManager.maybeBuildSecondaryIndexes(readers, cfs.indexManager.allIndexesNames());
+
+                //invalidate row and counter cache
+                if (cfs.isRowCacheEnabled() || cfs.metadata.isCounter())
+                {
+                    List<Bounds<Token>> boundsToInvalidate = new ArrayList<>(readers.size());
+                    for (SSTableReader sstable : readers)
+                        boundsToInvalidate.add(new Bounds<Token>(sstable.first.getToken(), sstable.last.getToken()));
+                    Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(boundsToInvalidate);
+
+                    if (cfs.isRowCacheEnabled())
+                    {
+                        int invalidatedKeys = cfs.invalidateRowCache(nonOverlappingBounds);
+                        if (invalidatedKeys > 0)
+                            logger.debug("[Stream #{}] Invalidated {} row cache entries on table {}.{} after stream " +
+                                         "receive task completed.", task.session.planId(), invalidatedKeys,
+                                         cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                    }
+
+                    if (cfs.metadata.isCounter())
+                    {
+                        int invalidatedKeys = cfs.invalidateCounterCache(nonOverlappingBounds);
+                        if (invalidatedKeys > 0)
+                            logger.debug("[Stream #{}] Invalidated {} counter cache entries on table {}.{} after stream " +
+                                        "receive task completed.", task.session.planId(), invalidatedKeys,
+                                        cfs.keyspace.getName(), cfs.getColumnFamilyName());
+                    }
+                }
             }
 
             task.session.taskCompleted(task);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c3ff924/test/unit/org/apache/cassandra/db/CounterCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CounterCacheTest.java b/test/unit/org/apache/cassandra/db/CounterCacheTest.java
index 20e067c..542358d 100644
--- a/test/unit/org/apache/cassandra/db/CounterCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/CounterCacheTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.db;
 
+import java.util.Collections;
 import java.util.concurrent.ExecutionException;
 
 import org.junit.AfterClass;
@@ -24,6 +25,8 @@ import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.utils.FBUtilities;
@@ -71,6 +74,48 @@ public class CounterCacheTest extends SchemaLoader
     }
 
     @Test
+    public void testCounterCacheInvalidate()
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
+        CacheService.instance.invalidateCounterCache();
+
+        assertEquals(0, CacheService.instance.counterCache.size());
+        assertNull(cfs.getCachedCounter(bytes(1), cellname(1)));
+        assertNull(cfs.getCachedCounter(bytes(1), cellname(2)));
+        assertNull(cfs.getCachedCounter(bytes(2), cellname(1)));
+        assertNull(cfs.getCachedCounter(bytes(2), cellname(2)));
+        assertNull(cfs.getCachedCounter(bytes(3), cellname(1)));
+        assertNull(cfs.getCachedCounter(bytes(3), cellname(2)));
+
+        cfs.putCachedCounter(bytes(1), cellname(1), ClockAndCount.create(1L, 1L));
+        cfs.putCachedCounter(bytes(1), cellname(2), ClockAndCount.create(1L, 2L));
+        cfs.putCachedCounter(bytes(2), cellname(1), ClockAndCount.create(2L, 1L));
+        cfs.putCachedCounter(bytes(2), cellname(2), ClockAndCount.create(2L, 2L));
+        cfs.putCachedCounter(bytes(3), cellname(1), ClockAndCount.create(3L, 1L));
+        cfs.putCachedCounter(bytes(3), cellname(2), ClockAndCount.create(3L, 2L));
+
+        assertEquals(6, CacheService.instance.counterCache.size());
+        assertEquals(ClockAndCount.create(1L, 1L), cfs.getCachedCounter(bytes(1), cellname(1)));
+        assertEquals(ClockAndCount.create(1L, 2L), cfs.getCachedCounter(bytes(1), cellname(2)));
+        assertEquals(ClockAndCount.create(2L, 1L), cfs.getCachedCounter(bytes(2), cellname(1)));
+        assertEquals(ClockAndCount.create(2L, 2L), cfs.getCachedCounter(bytes(2), cellname(2)));
+        assertEquals(ClockAndCount.create(3L, 1L), cfs.getCachedCounter(bytes(3), cellname(1)));
+        assertEquals(ClockAndCount.create(3L, 2L), cfs.getCachedCounter(bytes(3), cellname(2)));
+
+        cfs.invalidateCounterCache(Collections.singleton(new Bounds<Token>(cfs.partitioner.decorateKey(bytes(1)).getToken(),
+                                                                           cfs.partitioner.decorateKey(bytes(2)).getToken())));
+
+        assertEquals(2, CacheService.instance.counterCache.size());
+        assertNull(cfs.getCachedCounter(bytes(1), cellname(1)));
+        assertNull(cfs.getCachedCounter(bytes(1), cellname(2)));
+        assertNull(cfs.getCachedCounter(bytes(2), cellname(1)));
+        assertNull(cfs.getCachedCounter(bytes(2), cellname(2)));
+        assertEquals(ClockAndCount.create(3L, 1L), cfs.getCachedCounter(bytes(3), cellname(1)));
+        assertEquals(ClockAndCount.create(3L, 2L), cfs.getCachedCounter(bytes(3), cellname(2)));
+    }
+
+    @Test
     public void testSaveLoad() throws ExecutionException, InterruptedException, WriteTimeoutException
     {
         ColumnFamilyStore cfs = Keyspace.open(KS).getColumnFamilyStore(CF);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c3ff924/test/unit/org/apache/cassandra/db/RowCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowCacheTest.java b/test/unit/org/apache/cassandra/db/RowCacheTest.java
index 6d4554d..719c771 100644
--- a/test/unit/org/apache/cassandra/db/RowCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java
@@ -20,8 +20,12 @@ package org.apache.cassandra.db;
 
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
+import java.util.TreeSet;
 
+import com.google.common.collect.Lists;
 import org.junit.AfterClass;
 import org.junit.Test;
 
@@ -32,13 +36,15 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.composites.*;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.BytesToken;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
+
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 public class RowCacheTest extends SchemaLoader
 {
@@ -152,6 +158,51 @@ public class RowCacheTest extends SchemaLoader
     }
 
     @Test
+    public void testInvalidateRowCache() throws Exception
+    {
+        StorageService.instance.initServer(0);
+        CacheService.instance.setRowCacheCapacityInMB(1);
+        rowCacheLoad(100, Integer.MAX_VALUE, 1000);
+
+        ColumnFamilyStore store = Keyspace.open(KEYSPACE).getColumnFamilyStore(COLUMN_FAMILY);
+        assertEquals(CacheService.instance.rowCache.getKeySet().size(), 100);
+
+        //construct 5 ranges of 20 elements each
+        ArrayList<Bounds<Token>> subranges = getBounds(20);
+
+        //invalidate 3 of the 5 ranges
+        ArrayList<Bounds<Token>> boundsToInvalidate = Lists.newArrayList(subranges.get(0), subranges.get(2), subranges.get(4));
+        int invalidatedKeys = store.invalidateRowCache(boundsToInvalidate);
+        assertEquals(60, invalidatedKeys);
+
+        //now there should be only 40 cached entries left
+        assertEquals(40, CacheService.instance.rowCache.getKeySet().size());
+        CacheService.instance.setRowCacheCapacityInMB(0);
+    }
+
+    private ArrayList<Bounds<Token>> getBounds(int nElements)
+    {
+        ColumnFamilyStore store = Keyspace.open(KEYSPACE).getColumnFamilyStore(COLUMN_FAMILY);
+        TreeSet<DecoratedKey> orderedKeys = new TreeSet<>();
+
+        for (RowCacheKey key : CacheService.instance.rowCache.getKeySet())
+            orderedKeys.add(store.partitioner.decorateKey(ByteBuffer.wrap(key.key)));
+
+        ArrayList<Bounds<Token>> boundsToInvalidate = new ArrayList<>();
+        Iterator<DecoratedKey> iterator = orderedKeys.iterator();
+
+        while (iterator.hasNext())
+        {
+            Token startRange = iterator.next().getToken();
+            for (int i = 0; i < nElements-2; i++)
+                iterator.next();
+            Token endRange = iterator.next().getToken();
+            boundsToInvalidate.add(new Bounds<>(startRange, endRange));
+        }
+        return boundsToInvalidate;
+    }
+
+    @Test
     public void testRowCachePartialLoad() throws Exception
     {
         CacheService.instance.setRowCacheCapacityInMB(1);
@@ -220,9 +271,9 @@ public class RowCacheTest extends SchemaLoader
 
         // populate row cache, we should not get a row cache hit;
         cachedStore.getColumnFamily(QueryFilter.getSliceFilter(dk, cf,
-                                                                Composites.EMPTY,
-                                                                Composites.EMPTY,
-                                                                false, 10, System.currentTimeMillis()));
+                                                               Composites.EMPTY,
+                                                               Composites.EMPTY,
+                                                               false, 10, System.currentTimeMillis()));
         assertEquals(startRowCacheHits, cachedStore.metric.rowCacheHit.count());
 
         // do another query, limit is 20, which is < 100 that we cache, we should get a hit and it should be in range
@@ -272,8 +323,6 @@ public class RowCacheTest extends SchemaLoader
     {
         CompactionManager.instance.disableAutoCompaction();
 
-        ColumnFamilyStore store = Keyspace.open(KEYSPACE).getColumnFamilyStore(COLUMN_FAMILY);
-
         // empty the cache
         CacheService.instance.invalidateRowCache();
         assert CacheService.instance.rowCache.size() == 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c3ff924/test/unit/org/apache/cassandra/dht/BoundsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/BoundsTest.java b/test/unit/org/apache/cassandra/dht/BoundsTest.java
new file mode 100644
index 0000000..527b498
--- /dev/null
+++ b/test/unit/org/apache/cassandra/dht/BoundsTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class BoundsTest
+{
+
+    private Bounds<Token> bounds(long left, long right)
+    {
+        return new Bounds<Token>(new LongToken(left), new LongToken(right));
+    }
+
+    @Test
+    /**
+     * [0,1],[0,5],[1,8],[4,10] = [0, 10]
+     * [15,19][19,20] = [15,20]
+     * [21, 22] = [21,22]
+     */
+    public void testGetNonOverlappingBounds()
+    {
+        List<Bounds<Token>> bounds = new LinkedList<>();
+        bounds.add(bounds(19, 20));
+        bounds.add(bounds(0, 1));
+        bounds.add(bounds(4, 10));
+        bounds.add(bounds(15, 19));
+        bounds.add(bounds(0, 5));
+        bounds.add(bounds(21, 22));
+        bounds.add(bounds(1, 8));
+
+        Set<Bounds<Token>> nonOverlappingBounds = Bounds.getNonOverlappingBounds(bounds);
+        assertEquals(3, nonOverlappingBounds.size());
+        assertTrue(nonOverlappingBounds.contains(bounds(0, 10)));
+        assertTrue(nonOverlappingBounds.contains(bounds(15,20)));
+        assertTrue(nonOverlappingBounds.contains(bounds(21,22)));
+    }
+}