You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/08/14 20:27:21 UTC

[1/2] cassandra git commit: Improve assertions around some of the usage of AbstractBounds

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 507ed1484 -> 99e0c907e


Improve assertions around some of the usage of AbstractBounds

patch by Sylvain Lebresne and Ariel Weisberg; reviewed by Aleksey
Yeschenko for CASSANDRA-9462


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3aa7308e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3aa7308e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3aa7308e

Branch: refs/heads/cassandra-3.0
Commit: 3aa7308e8f86969158c8d919c3f77658ae7c4fc3
Parents: 10be826
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Aug 12 16:34:07 2015 -0400
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Aug 14 21:20:17 2015 +0300

----------------------------------------------------------------------
 .../apache/cassandra/db/ColumnFamilyStore.java  | 18 +++++++++++---
 .../cassandra/db/SizeEstimatesRecorder.java     |  3 ++-
 .../org/apache/cassandra/db/lifecycle/View.java | 13 +++++++---
 .../apache/cassandra/dht/AbstractBounds.java    | 25 ++++++++++++++++++++
 src/java/org/apache/cassandra/dht/Bounds.java   |  2 +-
 .../apache/cassandra/dht/ExcludingBounds.java   |  2 +-
 .../cassandra/dht/IncludingExcludingBounds.java |  2 +-
 .../cassandra/streaming/StreamSession.java      | 19 ++++++++-------
 .../apache/cassandra/db/lifecycle/ViewTest.java |  7 +++---
 9 files changed, 70 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3aa7308e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 482e3ee..8bda6b2 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1929,11 +1929,17 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      */
     public Function<View, List<SSTableReader>> viewFilter(final AbstractBounds<RowPosition> rowBounds)
     {
+        assert !AbstractBounds.strictlyWrapsAround(rowBounds.left, rowBounds.right);
         return new Function<View, List<SSTableReader>>()
         {
             public List<SSTableReader> apply(View view)
             {
-                return compactionStrategyWrapper.filterSSTablesForReads(view.sstablesInBounds(rowBounds));
+                // Note that View.sstablesInBounds always includes it's bound while rowBounds may not. This is ok however
+                // because the fact we restrict the sstables returned by this function is an optimization in the first
+                // place and the returned sstables will (almost) never cover *exactly* rowBounds anyway. It's also
+                // *very* unlikely that a sstable is included *just* because we consider one of the bound inclusively
+                // instead of exclusively, so the performance impact is negligible in practice.
+                return view.sstablesInBounds(rowBounds.left, rowBounds.right);
             }
         };
     }
@@ -1944,6 +1950,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      */
     public Function<View, List<SSTableReader>> viewFilter(final Collection<AbstractBounds<RowPosition>> rowBoundsCollection, final boolean includeRepaired)
     {
+        assert AbstractBounds.noneStrictlyWrapsAround(rowBoundsCollection);
         return new Function<View, List<SSTableReader>>()
         {
             public List<SSTableReader> apply(View view)
@@ -1951,7 +1958,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 Set<SSTableReader> sstables = Sets.newHashSet();
                 for (AbstractBounds<RowPosition> rowBounds : rowBoundsCollection)
                 {
-                    for (SSTableReader sstable : view.sstablesInBounds(rowBounds))
+                    // Note that View.sstablesInBounds always includes it's bound while rowBounds may not. This is ok however
+                    // because the fact we restrict the sstables returned by this function is an optimization in the first
+                    // place and the returned sstables will (almost) never cover *exactly* rowBounds anyway. It's also
+                    // *very* unlikely that a sstable is included *just* because we consider one of the bound inclusively
+                    // instead of exclusively, so the performance impact is negligible in practice.
+                    for (SSTableReader sstable : view.sstablesInBounds(rowBounds.left, rowBounds.right))
                     {
                         if (includeRepaired || !sstable.isRepaired())
                             sstables.add(sstable);
@@ -2335,7 +2347,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         {
             if (!manifestFile.getParentFile().exists())
                 manifestFile.getParentFile().mkdirs();
-            
+
             try (PrintStream out = new PrintStream(manifestFile))
             {
                 final JSONObject manifestJSON = new JSONObject();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3aa7308e/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
index c68109c..f054315 100644
--- a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
+++ b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
@@ -85,9 +85,10 @@ public class SizeEstimatesRecorder extends MigrationListener implements Runnable
     @SuppressWarnings("resource")
     private void recordSizeEstimates(ColumnFamilyStore table, Collection<Range<Token>> localRanges)
     {
+        List<Range<Token>> unwrappedRanges = Range.normalize(localRanges);
         // for each local primary range, estimate (crudely) mean partition size and partitions count.
         Map<Range<Token>, Pair<Long, Long>> estimates = new HashMap<>(localRanges.size());
-        for (Range<Token> range : localRanges)
+        for (Range<Token> range : unwrappedRanges)
         {
             // filter sstables that have partitions in this range.
             Refs<SSTableReader> refs = null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3aa7308e/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/View.java b/src/java/org/apache/cassandra/db/lifecycle/View.java
index 0d1100b..73ba131 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/View.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@ -126,12 +126,19 @@ public class View
         return String.format("View(pending_count=%d, sstables=%s, compacting=%s)", liveMemtables.size() + flushingMemtables.size() - 1, sstables, compacting);
     }
 
-    public List<SSTableReader> sstablesInBounds(AbstractBounds<RowPosition> rowBounds)
+    /**
+      * Returns the sstables that have any partition between {@code left} and {@code right}, when both bounds are taken inclusively.
+      * The interval formed by {@code left} and {@code right} shouldn't wrap.
+      */
+    public List<SSTableReader> sstablesInBounds(RowPosition left, RowPosition right)
     {
+        assert !AbstractBounds.strictlyWrapsAround(left, right);
+
         if (intervalTree.isEmpty())
             return Collections.emptyList();
-        RowPosition stopInTree = rowBounds.right.isMinimum() ? intervalTree.max() : rowBounds.right;
-        return intervalTree.search(Interval.<RowPosition, SSTableReader>create(rowBounds.left, stopInTree));
+
+        RowPosition stopInTree = right.isMinimum() ? intervalTree.max() : right;
+        return intervalTree.search(Interval.<RowPosition, SSTableReader>create(left, stopInTree));
     }
 
     // METHODS TO CONSTRUCT FUNCTIONS FOR MODIFYING A VIEW:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3aa7308e/src/java/org/apache/cassandra/dht/AbstractBounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/AbstractBounds.java b/src/java/org/apache/cassandra/dht/AbstractBounds.java
index b5ffc22..c33ffc0 100644
--- a/src/java/org/apache/cassandra/dht/AbstractBounds.java
+++ b/src/java/org/apache/cassandra/dht/AbstractBounds.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.dht;
 import java.io.DataInput;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.List;
 
 import org.apache.cassandra.db.DecoratedKey;
@@ -71,6 +72,30 @@ public abstract class AbstractBounds<T extends RingPosition<T>> implements Seria
     public abstract boolean inclusiveLeft();
     public abstract boolean inclusiveRight();
 
+    /**
+     * Whether {@code left} and {@code right} forms a wrapping interval, that is if unwrapping wouldn't be a no-op.
+     * <p>
+     * Note that the semantic is slightly different from {@link Range#isWrapAround()} in the sense that if both
+     * {@code right} are minimal (for the partitioner), this methods return false (doesn't wrap) while
+     * {@link Range#isWrapAround()} returns true (does wrap). This is confusing and we should fix it by
+     * refactoring/rewriting the whole AbstractBounds hierarchy with cleaner semantics, but we don't want to risk
+     * breaking something by changing {@link Range#isWrapAround()} in the meantime.
+     */
+    public static <T extends RingPosition<T>> boolean strictlyWrapsAround(T left, T right)
+    {
+        return !(left.compareTo(right) <= 0 || right.isMinimum());
+    }
+
+    public static <T extends RingPosition<T>> boolean noneStrictlyWrapsAround(Collection<AbstractBounds<T>> bounds)
+    {
+        for (AbstractBounds<T> b : bounds)
+        {
+            if (strictlyWrapsAround(b.left, b.right))
+                return false;
+        }
+        return true;
+    }
+
     @Override
     public int hashCode()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3aa7308e/src/java/org/apache/cassandra/dht/Bounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Bounds.java b/src/java/org/apache/cassandra/dht/Bounds.java
index 4a5a701..9060bcf 100644
--- a/src/java/org/apache/cassandra/dht/Bounds.java
+++ b/src/java/org/apache/cassandra/dht/Bounds.java
@@ -32,7 +32,7 @@ public class Bounds<T extends RingPosition<T>> extends AbstractBounds<T>
     {
         super(left, right);
         // unlike a Range, a Bounds may not wrap
-        assert left.compareTo(right) <= 0 || right.isMinimum() : "[" + left + "," + right + "]";
+        assert !strictlyWrapsAround(left, right) : "[" + left + "," + right + "]";
     }
 
     public boolean contains(T position)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3aa7308e/src/java/org/apache/cassandra/dht/ExcludingBounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/ExcludingBounds.java b/src/java/org/apache/cassandra/dht/ExcludingBounds.java
index 0d37e5c..7319356 100644
--- a/src/java/org/apache/cassandra/dht/ExcludingBounds.java
+++ b/src/java/org/apache/cassandra/dht/ExcludingBounds.java
@@ -31,7 +31,7 @@ public class ExcludingBounds<T extends RingPosition<T>> extends AbstractBounds<T
     {
         super(left, right);
         // unlike a Range, an ExcludingBounds may not wrap, nor be empty
-        assert left.compareTo(right) < 0 || right.isMinimum() : "(" + left + "," + right + ")";
+        assert !strictlyWrapsAround(left, right) && (right.isMinimum() || left.compareTo(right) != 0) : "(" + left + "," + right + ")";
     }
 
     public boolean contains(T position)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3aa7308e/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java b/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java
index e9e8e8e..abcf87b 100644
--- a/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java
+++ b/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java
@@ -32,7 +32,7 @@ public class IncludingExcludingBounds<T extends RingPosition<T>> extends Abstrac
         super(left, right);
         // unlike a Range, an IncludingExcludingBounds may not wrap, nor have
         // right == left unless the right is the min token
-        assert left.compareTo(right) < 0 || right.isMinimum() : "[" + left + "," + right + ")";
+        assert !strictlyWrapsAround(left, right) && (right.isMinimum() || left.compareTo(right) != 0) : "(" + left + "," + right + ")";
     }
 
     public boolean contains(T position)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3aa7308e/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 366bc33..55d7e68 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -24,8 +24,6 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import javax.annotation.Nullable;
-
 import com.google.common.base.Function;
 import com.google.common.collect.*;
 
@@ -38,7 +36,6 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.RowPosition;
-import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.*;
@@ -316,9 +313,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber
         {
             for (ColumnFamilyStore cfStore : stores)
             {
-                final List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size());
+                final List<Range<RowPosition>> keyRanges = new ArrayList<>(ranges.size());
                 for (Range<Token> range : ranges)
-                    rowBoundsList.add(Range.makeRowRange(range));
+                    keyRanges.add(Range.makeRowRange(range));
                 refs.addAll(cfStore.selectAndReference(new Function<View, List<SSTableReader>>()
                 {
                     public List<SSTableReader> apply(View view)
@@ -328,11 +325,17 @@ public class StreamSession implements IEndpointStateChangeSubscriber
                             permittedInstances.put(reader, reader);
 
                         Set<SSTableReader> sstables = Sets.newHashSet();
-                        for (AbstractBounds<RowPosition> rowBounds : rowBoundsList)
+                        for (Range<RowPosition> keyRange : keyRanges)
                         {
-                            // sstableInBounds may contain early opened sstables
-                            for (SSTableReader sstable : view.sstablesInBounds(rowBounds))
+                            // keyRange excludes its start, while sstableInBounds is inclusive (of both start and end).
+                            // This is fine however, because keyRange has been created from a token range through Range.makeRowRange (see above).
+                            // And that later method uses the Token.maxKeyBound() method to creates the range, which return a "fake" key that
+                            // sort after all keys having the token. That "fake" key cannot however be equal to any real key, so that even
+                            // including keyRange.left will still exclude any key having the token of the original token range, and so we're
+                            // still actually selecting what we wanted.
+                            for (SSTableReader sstable : view.sstablesInBounds(keyRange.left, keyRange.right))
                             {
+                                // sstableInBounds may contain early opened sstables
                                 if (isIncremental && sstable.isRepaired())
                                     continue;
                                 sstable = permittedInstances.get(sstable);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3aa7308e/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
index 4c8006a..32a81e2 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
@@ -61,13 +61,14 @@ public class ViewTest
             {
                 RowPosition min = MockSchema.readerBounds(i);
                 RowPosition max = MockSchema.readerBounds(j);
-                for (boolean minInc : new boolean[] { true, false} )
+                for (boolean minInc : new boolean[] { true })//, false} )
                 {
-                    for (boolean maxInc : new boolean[] { true, false} )
+                    for (boolean maxInc : new boolean[] { true })//, false} )
                     {
                         if (i == j && !(minInc && maxInc))
                             continue;
-                        List<SSTableReader> r = initialView.sstablesInBounds(AbstractBounds.bounds(min, minInc, max, maxInc));
+                        AbstractBounds<RowPosition> bounds = AbstractBounds.bounds(min, minInc, max, maxInc);
+                        List<SSTableReader> r = initialView.sstablesInBounds(bounds.left, bounds.right);
                         Assert.assertEquals(String.format("%d(%s) %d(%s)", i, minInc, j, maxInc), j - i + (minInc ? 0 : -1) + (maxInc ? 1 : 0), r.size());
                     }
                 }


[2/2] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

Posted by al...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/99e0c907
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/99e0c907
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/99e0c907

Branch: refs/heads/cassandra-3.0
Commit: 99e0c907eabc26f876f984daf33fdc2d3ab66a24
Parents: 507ed14 3aa7308
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Aug 14 21:27:08 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Aug 14 21:27:08 2015 +0300

----------------------------------------------------------------------
 .../apache/cassandra/db/ColumnFamilyStore.java  |  2 +-
 .../cassandra/db/SizeEstimatesRecorder.java     |  3 ++-
 .../org/apache/cassandra/db/lifecycle/View.java | 20 ++++++++++++----
 .../apache/cassandra/dht/AbstractBounds.java    | 25 ++++++++++++++++++++
 src/java/org/apache/cassandra/dht/Bounds.java   |  2 +-
 .../apache/cassandra/dht/ExcludingBounds.java   |  2 +-
 .../cassandra/dht/IncludingExcludingBounds.java |  2 +-
 .../cassandra/streaming/StreamSession.java      | 16 +++++++++----
 .../apache/cassandra/db/lifecycle/ViewTest.java |  8 ++++---
 9 files changed, 63 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 797e2c7,8bda6b2..8d72ecf
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -1179,7 -1394,7 +1179,7 @@@ public class ColumnFamilyStore implemen
          Set<SSTableReader> results = null;
          for (SSTableReader sstable : sstables)
          {
-             Set<SSTableReader> overlaps = ImmutableSet.copyOf(view.sstablesInBounds(sstableSet, AbstractBounds.bounds(sstable.first, true, sstable.last, true)));
 -            Set<SSTableReader> overlaps = ImmutableSet.copyOf(tree.search(Interval.<RowPosition, SSTableReader>create(sstable.first, sstable.last)));
++            Set<SSTableReader> overlaps = ImmutableSet.copyOf(view.sstablesInBounds(sstableSet, sstable.first, sstable.last));
              results = results == null ? overlaps : Sets.union(results, overlaps).immutableCopy();
          }
          results = Sets.difference(results, ImmutableSet.copyOf(sstables));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/View.java
index 75590fa,73ba131..7ee0fdf
--- a/src/java/org/apache/cassandra/db/lifecycle/View.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@@ -176,41 -126,19 +176,53 @@@ public class Vie
          return String.format("View(pending_count=%d, sstables=%s, compacting=%s)", liveMemtables.size() + flushingMemtables.size() - 1, sstables, compacting);
      }
  
-     public Iterable<SSTableReader> sstablesInBounds(SSTableSet sstableSet, AbstractBounds<PartitionPosition> rowBounds)
+     /**
 -      * Returns the sstables that have any partition between {@code left} and {@code right}, when both bounds are taken inclusively.
 -      * The interval formed by {@code left} and {@code right} shouldn't wrap.
 -      */
 -    public List<SSTableReader> sstablesInBounds(RowPosition left, RowPosition right)
++     * Returns the sstables that have any partition between {@code left} and {@code right}, when both bounds are taken inclusively.
++     * The interval formed by {@code left} and {@code right} shouldn't wrap.
++     */
++    public Iterable<SSTableReader> sstablesInBounds(SSTableSet sstableSet, PartitionPosition left, PartitionPosition right)
      {
+         assert !AbstractBounds.strictlyWrapsAround(left, right);
+ 
          if (intervalTree.isEmpty())
              return Collections.emptyList();
-         PartitionPosition stopInTree = rowBounds.right.isMinimum() ? intervalTree.max() : rowBounds.right;
-         return select(sstableSet, intervalTree.search(Interval.create(rowBounds.left, stopInTree)));
+ 
 -        RowPosition stopInTree = right.isMinimum() ? intervalTree.max() : right;
 -        return intervalTree.search(Interval.<RowPosition, SSTableReader>create(left, stopInTree));
++        PartitionPosition stopInTree = right.isMinimum() ? intervalTree.max() : right;
++        return select(sstableSet, intervalTree.search(Interval.create(left, stopInTree)));
 +    }
 +
 +    public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet)
 +    {
 +        return (view) -> view.sstables(sstableSet);
 +    }
 +
 +    public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet, Predicate<SSTableReader> filter)
 +    {
 +        return (view) -> view.sstables(sstableSet, filter);
 +    }
 +
 +    /**
 +     * @return a ViewFragment containing the sstables and memtables that may need to be merged
 +     * for the given @param key, according to the interval tree
 +     */
 +    public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet, DecoratedKey key)
 +    {
 +        assert sstableSet == SSTableSet.LIVE;
 +        return (view) -> view.intervalTree.search(key);
 +    }
 +
 +    /**
 +     * @return a ViewFragment containing the sstables and memtables that may need to be merged
 +     * for rows within @param rowBounds, inclusive, according to the interval tree.
 +     */
 +    public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet, AbstractBounds<PartitionPosition> rowBounds)
 +    {
-         return (view) -> view.sstablesInBounds(sstableSet, rowBounds);
++        // Note that View.sstablesInBounds always includes it's bound while rowBounds may not. This is ok however
++        // because the fact we restrict the sstables returned by this function is an optimization in the first
++        // place and the returned sstables will (almost) never cover *exactly* rowBounds anyway. It's also
++        // *very* unlikely that a sstable is included *just* because we consider one of the bound inclusively
++        // instead of exclusively, so the performance impact is negligible in practice.
++        return (view) -> view.sstablesInBounds(sstableSet, rowBounds.left, rowBounds.right);
      }
  
      // METHODS TO CONSTRUCT FUNCTIONS FOR MODIFYING A VIEW:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/dht/AbstractBounds.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/dht/Bounds.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/dht/ExcludingBounds.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index 861528b,55d7e68..bb5be1e
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -35,8 -35,7 +35,7 @@@ import org.slf4j.LoggerFactory
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.Keyspace;
 -import org.apache.cassandra.db.RowPosition;
 +import org.apache.cassandra.db.PartitionPosition;
- import org.apache.cassandra.dht.AbstractBounds;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.gms.*;
@@@ -320,23 -313,40 +319,30 @@@ public class StreamSession implements I
          {
              for (ColumnFamilyStore cfStore : stores)
              {
-                 final List<AbstractBounds<PartitionPosition>> rowBoundsList = new ArrayList<>(ranges.size());
 -                final List<Range<RowPosition>> keyRanges = new ArrayList<>(ranges.size());
++                final List<Range<PartitionPosition>> keyRanges = new ArrayList<>(ranges.size());
                  for (Range<Token> range : ranges)
-                     rowBoundsList.add(Range.makeRowRange(range));
+                     keyRanges.add(Range.makeRowRange(range));
 -                refs.addAll(cfStore.selectAndReference(new Function<View, List<SSTableReader>>()
 -                {
 -                    public List<SSTableReader> apply(View view)
 +                refs.addAll(cfStore.selectAndReference(view -> {
 +                    Set<SSTableReader> sstables = Sets.newHashSet();
-                     for (AbstractBounds<PartitionPosition> rowBounds : rowBoundsList)
++                    for (Range<PartitionPosition> keyRange : keyRanges)
                      {
-                         for (SSTableReader sstable : view.sstablesInBounds(SSTableSet.CANONICAL, rowBounds))
 -                        Map<SSTableReader, SSTableReader> permittedInstances = new HashMap<>();
 -                        for (SSTableReader reader : ColumnFamilyStore.CANONICAL_SSTABLES.apply(view))
 -                            permittedInstances.put(reader, reader);
 -
 -                        Set<SSTableReader> sstables = Sets.newHashSet();
 -                        for (Range<RowPosition> keyRange : keyRanges)
++                        // keyRange excludes its start, while sstableInBounds is inclusive (of both start and end).
++                        // This is fine however, because keyRange has been created from a token range through Range.makeRowRange (see above).
++                        // And that later method uses the Token.maxKeyBound() method to creates the range, which return a "fake" key that
++                        // sort after all keys having the token. That "fake" key cannot however be equal to any real key, so that even
++                        // including keyRange.left will still exclude any key having the token of the original token range, and so we're
++                        // still actually selecting what we wanted.
++                        for (SSTableReader sstable : view.sstablesInBounds(SSTableSet.CANONICAL, keyRange.left, keyRange.right))
                          {
 -                            // keyRange excludes its start, while sstableInBounds is inclusive (of both start and end).
 -                            // This is fine however, because keyRange has been created from a token range through Range.makeRowRange (see above).
 -                            // And that later method uses the Token.maxKeyBound() method to creates the range, which return a "fake" key that
 -                            // sort after all keys having the token. That "fake" key cannot however be equal to any real key, so that even
 -                            // including keyRange.left will still exclude any key having the token of the original token range, and so we're
 -                            // still actually selecting what we wanted.
 -                            for (SSTableReader sstable : view.sstablesInBounds(keyRange.left, keyRange.right))
 -                            {
 -                                // sstableInBounds may contain early opened sstables
 -                                if (isIncremental && sstable.isRepaired())
 -                                    continue;
 -                                sstable = permittedInstances.get(sstable);
 -                                if (sstable != null)
 -                                    sstables.add(sstable);
 -                            }
++                            // sstableInBounds may contain early opened sstables
 +                            if (!isIncremental || !sstable.isRepaired())
 +                                sstables.add(sstable);
                          }
 -
 -                        logger.debug("ViewFilter for {}/{} sstables", sstables.size(), view.sstables.size());
 -                        return ImmutableList.copyOf(sstables);
                      }
 +
 +                    if (logger.isDebugEnabled())
 +                        logger.debug("ViewFilter for {}/{} sstables", sstables.size(), Iterables.size(view.sstables(SSTableSet.CANONICAL)));
 +                    return sstables;
                  }).refs);
              }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
index 27d426a,32a81e2..40afa54
--- a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
@@@ -61,15 -59,16 +61,17 @@@ public class ViewTes
          {
              for (int j = i ; j < 5 ; j++)
              {
 -                RowPosition min = MockSchema.readerBounds(i);
 -                RowPosition max = MockSchema.readerBounds(j);
 +                PartitionPosition min = MockSchema.readerBounds(i);
 +                PartitionPosition max = MockSchema.readerBounds(j);
-                 for (boolean minInc : new boolean[] { true, false} )
+                 for (boolean minInc : new boolean[] { true })//, false} )
                  {
-                     for (boolean maxInc : new boolean[] { true, false} )
+                     for (boolean maxInc : new boolean[] { true })//, false} )
                      {
                          if (i == j && !(minInc && maxInc))
                              continue;
-                         List<SSTableReader> r = ImmutableList.copyOf(initialView.sstablesInBounds(SSTableSet.LIVE, AbstractBounds.bounds(min, minInc, max, maxInc)));
 -                        AbstractBounds<RowPosition> bounds = AbstractBounds.bounds(min, minInc, max, maxInc);
 -                        List<SSTableReader> r = initialView.sstablesInBounds(bounds.left, bounds.right);
++
++                        AbstractBounds<PartitionPosition> bounds = AbstractBounds.bounds(min, minInc, max, maxInc);
++                        List<SSTableReader> r = ImmutableList.copyOf(initialView.sstablesInBounds(SSTableSet.LIVE,bounds.left, bounds.right));
                          Assert.assertEquals(String.format("%d(%s) %d(%s)", i, minInc, j, maxInc), j - i + (minInc ? 0 : -1) + (maxInc ? 1 : 0), r.size());
                      }
                  }