You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/12/02 09:05:37 UTC

[1/2] cassandra git commit: Optimize the way we check if a token is repaired in anticompaction

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 a320737b1 -> 9f19dd4e4


Optimize the way we check if a token is repaired in anticompaction

Patch by marcuse; reviewed by Ariel Weisberg for CASSANDRA-10768


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dbfeeac1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dbfeeac1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dbfeeac1

Branch: refs/heads/cassandra-2.2
Commit: dbfeeac177074692bdf71d98ffb2cacb14802fb3
Parents: 5ba69a3
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Nov 25 11:28:51 2015 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Dec 2 08:37:08 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionManager.java        |  4 +-
 src/java/org/apache/cassandra/dht/Range.java    | 44 +++++++++++++
 .../org/apache/cassandra/dht/RangeTest.java     | 67 ++++++++++++++++++++
 4 files changed, 114 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbfeeac1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3ce2da6..b0f9588 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.12
+ * Optimize the way we check if a token is repaired in anticompaction (CASSANDRA-10768)
  * Add proper error handling to stream receiver (CASSANDRA-10774)
  * Warn or fail when changing cluster topology live (CASSANDRA-10243)
  * Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbfeeac1/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index b0ad244..2630ba2 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1115,11 +1115,11 @@ public class CompactionManager implements CompactionManagerMBean
                 metrics.beginCompaction(ci);
                 try
                 {
+                    Range.OrderedRangeContainmentChecker containmentChecker = new Range.OrderedRangeContainmentChecker(ranges);
                     while (iter.hasNext())
                     {
                         AbstractCompactedRow row = iter.next();
-                        // if current range from sstable is repaired, save it into the new repaired sstable
-                        if (Range.isInRanges(row.key.getToken(), ranges))
+                        if (containmentChecker.contains(row.key.getToken()))
                         {
                             repairedSSTableWriter.append(row);
                             repairedKeyCount++;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbfeeac1/src/java/org/apache/cassandra/dht/Range.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Range.java b/src/java/org/apache/cassandra/dht/Range.java
index 505f1f3..81c92a2 100644
--- a/src/java/org/apache/cassandra/dht/Range.java
+++ b/src/java/org/apache/cassandra/dht/Range.java
@@ -21,6 +21,8 @@ import java.io.Serializable;
 import java.util.*;
 
 import org.apache.commons.lang3.ObjectUtils;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.Pair;
@@ -491,4 +493,46 @@ public class Range<T extends RingPosition<T>> extends AbstractBounds<T> implemen
     {
         return new Range<T>(left, newRight);
     }
+
+    /**
+     * Helper class to check if a token is contained within a given collection of ranges
+     */
+    public static class OrderedRangeContainmentChecker
+    {
+        private final Iterator<Range<Token>> normalizedRangesIterator;
+        private Token lastToken = null;
+        private Range<Token> currentRange;
+
+        public OrderedRangeContainmentChecker(Collection<Range<Token>> ranges)
+        {
+            normalizedRangesIterator = normalize(ranges).iterator();
+            assert normalizedRangesIterator.hasNext();
+            currentRange = normalizedRangesIterator.next();
+        }
+
+        /**
+         * Returns true if the ranges given in the constructor contains the token, false otherwise.
+         *
+         * The tokens passed to this method must be in increasing order
+         *
+         * @param t token to check, must be larger than or equal to the last token passed
+         * @return true if the token is contained within the ranges given to the constructor.
+         */
+        public boolean contains(Token t)
+        {
+            assert lastToken == null || lastToken.compareTo(t) <= 0;
+            lastToken = t;
+            while (true)
+            {
+                if (t.compareTo(currentRange.left) <= 0)
+                    return false;
+                else if (t.compareTo(currentRange.right) <= 0 || currentRange.right.compareTo(currentRange.left) <= 0)
+                    return true;
+
+                if (!normalizedRangesIterator.hasNext())
+                    return false;
+                currentRange = normalizedRangesIterator.next();
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dbfeeac1/test/unit/org/apache/cassandra/dht/RangeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/RangeTest.java b/test/unit/org/apache/cassandra/dht/RangeTest.java
index 906396c..1d8123b 100644
--- a/test/unit/org/apache/cassandra/dht/RangeTest.java
+++ b/test/unit/org/apache/cassandra/dht/RangeTest.java
@@ -19,9 +19,15 @@
 package org.apache.cassandra.dht;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Random;
 import java.util.Set;
+
+import com.google.common.base.Joiner;
+
 import static java.util.Arrays.asList;
 
 import org.apache.commons.lang3.StringUtils;
@@ -29,6 +35,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.db.RowPosition;
 import static org.apache.cassandra.Util.range;
+import static org.junit.Assert.*;
 
 
 public class RangeTest
@@ -536,4 +543,64 @@ public class RangeTest
         expected = asList(range("", ""));
         assertNormalize(input, expected);
     }
+
+    @Test
+    public void testRandomOrderedRangeContainmentChecker()
+    {
+        Random r = new Random();
+        for (int j = 0; j < 1000; j++)
+        {
+            int numTokens = r.nextInt(300) + 1;
+            List<Range<Token>> ranges = new ArrayList<>(numTokens);
+            List<Token> tokens = new ArrayList<>(2 * numTokens);
+            for (int i = 0; i < 2 * numTokens; i++)
+                tokens.add(t(r.nextLong()));
+
+            Collections.sort(tokens);
+
+            for (int i = 0; i < tokens.size(); i++)
+            {
+                ranges.add(new Range<>(tokens.get(i), tokens.get(i + 1)));
+                i++;
+            }
+
+            List<Token> tokensToTest = new ArrayList<>();
+            for (int i = 0; i < 10000; i++)
+                tokensToTest.add(t(r.nextLong()));
+
+            tokensToTest.add(t(Long.MAX_VALUE));
+            tokensToTest.add(t(Long.MIN_VALUE));
+            tokensToTest.add(t(Long.MAX_VALUE - 1));
+            tokensToTest.add(t(Long.MIN_VALUE + 1));
+            Collections.sort(tokensToTest);
+
+            Range.OrderedRangeContainmentChecker checker = new Range.OrderedRangeContainmentChecker(ranges);
+            for (Token t : tokensToTest)
+            {
+                if (checker.contains(t) != Range.isInRanges(t, ranges)) // avoid running Joiner.on(..) every iteration
+                    fail(String.format("This should never flap! If it does, it is a bug (ranges = %s, token = %s)", Joiner.on(",").join(ranges), t));
+            }
+        }
+    }
+
+    @Test
+    public void testBoundariesORCC()
+    {
+        List<Range<Token>> ranges = asList(r(Long.MIN_VALUE, Long.MIN_VALUE + 1), r(Long.MAX_VALUE - 1, Long.MAX_VALUE));
+        Range.OrderedRangeContainmentChecker checker = new Range.OrderedRangeContainmentChecker(ranges);
+        assertFalse(checker.contains(t(Long.MIN_VALUE)));
+        assertTrue(checker.contains(t(Long.MIN_VALUE + 1)));
+        assertFalse(checker.contains(t(0)));
+        assertFalse(checker.contains(t(Long.MAX_VALUE - 1)));
+        assertTrue(checker.contains(t(Long.MAX_VALUE)));
+    }
+
+    private static Range<Token> r(long left, long right)
+    {
+        return new Range<>(t(left), t(right));
+    }
+    private static Token t(long t)
+    {
+        return new LongToken(t);
+    }
 }


[2/2] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by ma...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9f19dd4e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9f19dd4e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9f19dd4e

Branch: refs/heads/cassandra-2.2
Commit: 9f19dd4e4e4f4adc948b36a3fd38077cbc691617
Parents: a320737 dbfeeac
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Dec 2 08:53:33 2015 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Dec 2 08:53:33 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionManager.java        |  3 +-
 src/java/org/apache/cassandra/dht/Range.java    | 44 +++++++++++++
 .../org/apache/cassandra/dht/RangeTest.java     | 66 ++++++++++++++++++++
 4 files changed, 113 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f19dd4e/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index cf73f57,b0f9588..eaad3a2
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,23 -1,5 +1,24 @@@
 -2.1.12
 +2.2.4
 + * Show CQL help in cqlsh in web browser (CASSANDRA-7225)
 + * Serialize on disk the proper SSTable compression ratio (CASSANDRA-10775)
 + * Reject index queries while the index is building (CASSANDRA-8505)
 + * CQL.textile syntax incorrectly includes optional keyspace for aggregate SFUNC and FINALFUNC (CASSANDRA-10747)
 + * Fix JSON update with prepared statements (CASSANDRA-10631)
 + * Don't do anticompaction after subrange repair (CASSANDRA-10422)
 + * Fix SimpleDateType type compatibility (CASSANDRA-10027)
 + * (Hadoop) fix splits calculation (CASSANDRA-10640)
 + * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
 + * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645)
 + * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
 + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
 + * Expose phi values from failure detector via JMX and tweak debug
 +   and trace logging (CASSANDRA-9526)
 + * Fix RangeNamesQueryPager (CASSANDRA-10509)
 + * Deprecate Pig support (CASSANDRA-10542)
 + * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
 + * Fix IllegalArgumentException in DataOutputBuffer.reallocate for large buffers (CASSANDRA-10592)
 +Merged from 2.1:
+  * Optimize the way we check if a token is repaired in anticompaction (CASSANDRA-10768)
   * Add proper error handling to stream receiver (CASSANDRA-10774)
   * Warn or fail when changing cluster topology live (CASSANDRA-10243)
   * Status command in debian/ubuntu init script doesn't work (CASSANDRA-10213)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f19dd4e/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index f3a69a6,2630ba2..65f93c0
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -1192,90 -1092,68 +1192,91 @@@ public class CompactionManager implemen
              if (!new File(sstable.getFilename()).exists())
              {
                  logger.info("Skipping anticompaction for {}, required sstable was compacted and is no longer available.", sstable);
 +                i.remove();
                  continue;
              }
 +            if (groupMaxDataAge < sstable.maxDataAge)
 +                groupMaxDataAge = sstable.maxDataAge;
 +        }
  
 -            logger.info("Anticompacting {}", sstable);
 -            Set<SSTableReader> sstableAsSet = new HashSet<>();
 -            sstableAsSet.add(sstable);
 +        if (anticompactionGroup.originals().size() == 0)
 +        {
 +            logger.info("No valid anticompactions for this group, All sstables were compacted and are no longer available");
 +            return 0;
 +        }
  
 -            File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION));
 -            SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false, false);
 -            SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false, false);
 +        logger.info("Anticompacting {}", anticompactionGroup);
 +        Set<SSTableReader> sstableAsSet = anticompactionGroup.originals();
 +
 +        File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION));
 +        long repairedKeyCount = 0;
 +        long unrepairedKeyCount = 0;
 +        AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
 +        try (SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false, false);
 +             SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, anticompactionGroup, groupMaxDataAge, false, false);
 +             AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup.originals());
 +             CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs)))
 +        {
 +            int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(sstableAsSet)));
  
 -            try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(new HashSet<>(Collections.singleton(sstable)));
 -                 CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs)))
 -            {
 -                int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)sstable.estimatedKeys());
 -                repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable));
 -                unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable));
 +            repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet));
 +            unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet));
  
 -                CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller);
 -                Iterator<AbstractCompactedRow> iter = ci.iterator();
 -                metrics.beginCompaction(ci);
 -                try
 +            CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID());
 +            metrics.beginCompaction(ci);
 +            try
 +            {
 +                @SuppressWarnings("resource")
 +                CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
++                Range.OrderedRangeContainmentChecker containmentChecker = new Range.OrderedRangeContainmentChecker(ranges);
 +                while (iter.hasNext())
                  {
 -                    Range.OrderedRangeContainmentChecker containmentChecker = new Range.OrderedRangeContainmentChecker(ranges);
 -                    while (iter.hasNext())
 +                    @SuppressWarnings("resource")
 +                    AbstractCompactedRow row = iter.next();
 +                    // if current range from sstable is repaired, save it into the new repaired sstable
-                     if (Range.isInRanges(row.key.getToken(), ranges))
++                    if (containmentChecker.contains(row.key.getToken()))
                      {
 -                        AbstractCompactedRow row = iter.next();
 -                        if (containmentChecker.contains(row.key.getToken()))
 -                        {
 -                            repairedSSTableWriter.append(row);
 -                            repairedKeyCount++;
 -                        }
 -                        // otherwise save into the new 'non-repaired' table
 -                        else
 -                        {
 -                            unRepairedSSTableWriter.append(row);
 -                            unrepairedKeyCount++;
 -                        }
 +                        repairedSSTableWriter.append(row);
 +                        repairedKeyCount++;
 +                    }
 +                    // otherwise save into the new 'non-repaired' table
 +                    else
 +                    {
 +                        unRepairedSSTableWriter.append(row);
 +                        unrepairedKeyCount++;
                      }
                  }
 -                finally
 -                {
 -                    metrics.finishCompaction(ci);
 -                }
 -                anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt));
 -                anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE));
 -                cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables, OperationType.ANTICOMPACTION);
              }
 -            catch (Throwable e)
 +            finally
              {
 -                JVMStabilityInspector.inspectThrowable(e);
 -                logger.error("Error anticompacting " + sstable, e);
 -                repairedSSTableWriter.abort();
 -                unRepairedSSTableWriter.abort();
 +                metrics.finishCompaction(ci);
              }
 -        }
 -        String format = "Repaired {} keys of {} for {}/{}";
 -        logger.debug(format, repairedKeyCount, (repairedKeyCount + unrepairedKeyCount), cfs.keyspace, cfs.getColumnFamilyName());
 -        String format2 = "Anticompaction completed successfully, anticompacted from {} to {} sstable(s).";
 -        logger.info(format2, repairedSSTables.size(), anticompactedSSTables.size());
  
 -        return anticompactedSSTables;
 +            List<SSTableReader> anticompactedSSTables = new ArrayList<>();
 +            // since both writers are operating over the same Transaction, we cannot use the convenience Transactional.finish() method,
 +            // as on the second finish() we would prepareToCommit() on a Transaction that has already been committed, which is forbidden by the API
 +            // (since it indicates misuse). We call permitRedundantTransitions so that calls that transition to a state already occupied are permitted.
 +            anticompactionGroup.permitRedundantTransitions();
 +            repairedSSTableWriter.setRepairedAt(repairedAt).prepareToCommit();
 +            unRepairedSSTableWriter.prepareToCommit();
 +            anticompactedSSTables.addAll(repairedSSTableWriter.finished());
 +            anticompactedSSTables.addAll(unRepairedSSTableWriter.finished());
 +            repairedSSTableWriter.commit();
 +            unRepairedSSTableWriter.commit();
 +
 +            logger.trace("Repaired {} keys out of {} for {}/{} in {}", repairedKeyCount,
 +                                                                       repairedKeyCount + unrepairedKeyCount,
 +                                                                       cfs.keyspace.getName(),
 +                                                                       cfs.getColumnFamilyName(),
 +                                                                       anticompactionGroup);
 +            return anticompactedSSTables.size();
 +        }
 +        catch (Throwable e)
 +        {
 +            JVMStabilityInspector.inspectThrowable(e);
 +            logger.error("Error anticompacting " + anticompactionGroup, e);
 +        }
 +        return 0;
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f19dd4e/src/java/org/apache/cassandra/dht/Range.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/dht/Range.java
index cbf093c,81c92a2..9893531
--- a/src/java/org/apache/cassandra/dht/Range.java
+++ b/src/java/org/apache/cassandra/dht/Range.java
@@@ -21,7 -21,10 +21,9 @@@ import java.io.Serializable
  import java.util.*;
  
  import org.apache.commons.lang3.ObjectUtils;
+ 
+ import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.RowPosition;
 -import org.apache.cassandra.service.StorageService;
  import org.apache.cassandra.utils.Pair;
  
  /**
@@@ -465,13 -472,67 +467,55 @@@ public class Range<T extends RingPositi
      /**
       * Compute a range of keys corresponding to a given range of token.
       */
 -    public static Range<RowPosition> makeRowRange(Token left, Token right, IPartitioner partitioner)
 +    public static Range<RowPosition> makeRowRange(Token left, Token right)
      {
 -        return new Range<RowPosition>(left.maxKeyBound(partitioner), right.maxKeyBound(partitioner), partitioner);
 +        return new Range<RowPosition>(left.maxKeyBound(), right.maxKeyBound());
      }
  
 -    @SuppressWarnings("unchecked")
 -    public AbstractBounds<RowPosition> toRowBounds()
 +    public static Range<RowPosition> makeRowRange(Range<Token> tokenBounds)
      {
 -        return (left instanceof Token) ? makeRowRange((Token)left, (Token)right, partitioner) : (Range<RowPosition>)this;
 -    }
 -
 -    @SuppressWarnings("unchecked")
 -    public AbstractBounds<Token> toTokenBounds()
 -    {
 -        return (left instanceof RowPosition) ? new Range<Token>(((RowPosition)left).getToken(), ((RowPosition)right).getToken(), partitioner) : (Range<Token>)this;
 -    }
 -
 -    public AbstractBounds<T> withNewRight(T newRight)
 -    {
 -        return new Range<T>(left, newRight);
 +        return makeRowRange(tokenBounds.left, tokenBounds.right);
      }
+ 
+     /**
+      * Helper class to check if a token is contained within a given collection of ranges
+      */
+     public static class OrderedRangeContainmentChecker
+     {
+         private final Iterator<Range<Token>> normalizedRangesIterator;
+         private Token lastToken = null;
+         private Range<Token> currentRange;
+ 
+         public OrderedRangeContainmentChecker(Collection<Range<Token>> ranges)
+         {
+             normalizedRangesIterator = normalize(ranges).iterator();
+             assert normalizedRangesIterator.hasNext();
+             currentRange = normalizedRangesIterator.next();
+         }
+ 
+         /**
+          * Returns true if the ranges given in the constructor contains the token, false otherwise.
+          *
+          * The tokens passed to this method must be in increasing order
+          *
+          * @param t token to check, must be larger than or equal to the last token passed
+          * @return true if the token is contained within the ranges given to the constructor.
+          */
+         public boolean contains(Token t)
+         {
+             assert lastToken == null || lastToken.compareTo(t) <= 0;
+             lastToken = t;
+             while (true)
+             {
+                 if (t.compareTo(currentRange.left) <= 0)
+                     return false;
+                 else if (t.compareTo(currentRange.right) <= 0 || currentRange.right.compareTo(currentRange.left) <= 0)
+                     return true;
+ 
+                 if (!normalizedRangesIterator.hasNext())
+                     return false;
+                 currentRange = normalizedRangesIterator.next();
+             }
+         }
+     }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f19dd4e/test/unit/org/apache/cassandra/dht/RangeTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/dht/RangeTest.java
index d93356a,1d8123b..85f2586
--- a/test/unit/org/apache/cassandra/dht/RangeTest.java
+++ b/test/unit/org/apache/cassandra/dht/RangeTest.java
@@@ -27,11 -32,10 +32,12 @@@ import static java.util.Arrays.asList
  
  import org.apache.commons.lang3.StringUtils;
  import org.junit.Test;
 -
  import org.apache.cassandra.db.RowPosition;
 +import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken;
 +import org.apache.cassandra.dht.ByteOrderedPartitioner.BytesToken;
 +
  import static org.apache.cassandra.Util.range;
+ import static org.junit.Assert.*;
  
  
  public class RangeTest
@@@ -540,4 -543,64 +546,64 @@@
          expected = asList(range("", ""));
          assertNormalize(input, expected);
      }
+ 
+     @Test
+     public void testRandomOrderedRangeContainmentChecker()
+     {
+         Random r = new Random();
+         for (int j = 0; j < 1000; j++)
+         {
+             int numTokens = r.nextInt(300) + 1;
+             List<Range<Token>> ranges = new ArrayList<>(numTokens);
+             List<Token> tokens = new ArrayList<>(2 * numTokens);
+             for (int i = 0; i < 2 * numTokens; i++)
+                 tokens.add(t(r.nextLong()));
+ 
+             Collections.sort(tokens);
+ 
+             for (int i = 0; i < tokens.size(); i++)
+             {
+                 ranges.add(new Range<>(tokens.get(i), tokens.get(i + 1)));
+                 i++;
+             }
+ 
+             List<Token> tokensToTest = new ArrayList<>();
+             for (int i = 0; i < 10000; i++)
+                 tokensToTest.add(t(r.nextLong()));
+ 
+             tokensToTest.add(t(Long.MAX_VALUE));
+             tokensToTest.add(t(Long.MIN_VALUE));
+             tokensToTest.add(t(Long.MAX_VALUE - 1));
+             tokensToTest.add(t(Long.MIN_VALUE + 1));
+             Collections.sort(tokensToTest);
+ 
+             Range.OrderedRangeContainmentChecker checker = new Range.OrderedRangeContainmentChecker(ranges);
+             for (Token t : tokensToTest)
+             {
+                 if (checker.contains(t) != Range.isInRanges(t, ranges)) // avoid running Joiner.on(..) every iteration
+                     fail(String.format("This should never flap! If it does, it is a bug (ranges = %s, token = %s)", Joiner.on(",").join(ranges), t));
+             }
+         }
+     }
+ 
+     @Test
+     public void testBoundariesORCC()
+     {
+         List<Range<Token>> ranges = asList(r(Long.MIN_VALUE, Long.MIN_VALUE + 1), r(Long.MAX_VALUE - 1, Long.MAX_VALUE));
+         Range.OrderedRangeContainmentChecker checker = new Range.OrderedRangeContainmentChecker(ranges);
+         assertFalse(checker.contains(t(Long.MIN_VALUE)));
+         assertTrue(checker.contains(t(Long.MIN_VALUE + 1)));
+         assertFalse(checker.contains(t(0)));
+         assertFalse(checker.contains(t(Long.MAX_VALUE - 1)));
+         assertTrue(checker.contains(t(Long.MAX_VALUE)));
+     }
+ 
+     private static Range<Token> r(long left, long right)
+     {
+         return new Range<>(t(left), t(right));
+     }
+     private static Token t(long t)
+     {
 -        return new LongToken(t);
++        return new Murmur3Partitioner.LongToken(t);
+     }
  }