You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/08/14 20:27:21 UTC
[1/2] cassandra git commit: Improve assertions around some of the
usage of AbstractBounds
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.0 507ed1484 -> 99e0c907e
Improve assertions around some of the usage of AbstractBounds
patch by Sylvain Lebresne and Ariel Weisberg; reviewed by Aleksey
Yeschenko for CASSANDRA-9462
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3aa7308e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3aa7308e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3aa7308e
Branch: refs/heads/cassandra-3.0
Commit: 3aa7308e8f86969158c8d919c3f77658ae7c4fc3
Parents: 10be826
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Aug 12 16:34:07 2015 -0400
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Aug 14 21:20:17 2015 +0300
----------------------------------------------------------------------
.../apache/cassandra/db/ColumnFamilyStore.java | 18 +++++++++++---
.../cassandra/db/SizeEstimatesRecorder.java | 3 ++-
.../org/apache/cassandra/db/lifecycle/View.java | 13 +++++++---
.../apache/cassandra/dht/AbstractBounds.java | 25 ++++++++++++++++++++
src/java/org/apache/cassandra/dht/Bounds.java | 2 +-
.../apache/cassandra/dht/ExcludingBounds.java | 2 +-
.../cassandra/dht/IncludingExcludingBounds.java | 2 +-
.../cassandra/streaming/StreamSession.java | 19 ++++++++-------
.../apache/cassandra/db/lifecycle/ViewTest.java | 7 +++---
9 files changed, 70 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3aa7308e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 482e3ee..8bda6b2 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1929,11 +1929,17 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
*/
public Function<View, List<SSTableReader>> viewFilter(final AbstractBounds<RowPosition> rowBounds)
{
+ assert !AbstractBounds.strictlyWrapsAround(rowBounds.left, rowBounds.right);
return new Function<View, List<SSTableReader>>()
{
public List<SSTableReader> apply(View view)
{
- return compactionStrategyWrapper.filterSSTablesForReads(view.sstablesInBounds(rowBounds));
+ // Note that View.sstablesInBounds always includes it's bound while rowBounds may not. This is ok however
+ // because the fact we restrict the sstables returned by this function is an optimization in the first
+ // place and the returned sstables will (almost) never cover *exactly* rowBounds anyway. It's also
+ // *very* unlikely that a sstable is included *just* because we consider one of the bound inclusively
+ // instead of exclusively, so the performance impact is negligible in practice.
+ return view.sstablesInBounds(rowBounds.left, rowBounds.right);
}
};
}
@@ -1944,6 +1950,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
*/
public Function<View, List<SSTableReader>> viewFilter(final Collection<AbstractBounds<RowPosition>> rowBoundsCollection, final boolean includeRepaired)
{
+ assert AbstractBounds.noneStrictlyWrapsAround(rowBoundsCollection);
return new Function<View, List<SSTableReader>>()
{
public List<SSTableReader> apply(View view)
@@ -1951,7 +1958,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
Set<SSTableReader> sstables = Sets.newHashSet();
for (AbstractBounds<RowPosition> rowBounds : rowBoundsCollection)
{
- for (SSTableReader sstable : view.sstablesInBounds(rowBounds))
+ // Note that View.sstablesInBounds always includes it's bound while rowBounds may not. This is ok however
+ // because the fact we restrict the sstables returned by this function is an optimization in the first
+ // place and the returned sstables will (almost) never cover *exactly* rowBounds anyway. It's also
+ // *very* unlikely that a sstable is included *just* because we consider one of the bound inclusively
+ // instead of exclusively, so the performance impact is negligible in practice.
+ for (SSTableReader sstable : view.sstablesInBounds(rowBounds.left, rowBounds.right))
{
if (includeRepaired || !sstable.isRepaired())
sstables.add(sstable);
@@ -2335,7 +2347,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
if (!manifestFile.getParentFile().exists())
manifestFile.getParentFile().mkdirs();
-
+
try (PrintStream out = new PrintStream(manifestFile))
{
final JSONObject manifestJSON = new JSONObject();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3aa7308e/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
index c68109c..f054315 100644
--- a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
+++ b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
@@ -85,9 +85,10 @@ public class SizeEstimatesRecorder extends MigrationListener implements Runnable
@SuppressWarnings("resource")
private void recordSizeEstimates(ColumnFamilyStore table, Collection<Range<Token>> localRanges)
{
+ List<Range<Token>> unwrappedRanges = Range.normalize(localRanges);
// for each local primary range, estimate (crudely) mean partition size and partitions count.
Map<Range<Token>, Pair<Long, Long>> estimates = new HashMap<>(localRanges.size());
- for (Range<Token> range : localRanges)
+ for (Range<Token> range : unwrappedRanges)
{
// filter sstables that have partitions in this range.
Refs<SSTableReader> refs = null;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3aa7308e/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/View.java b/src/java/org/apache/cassandra/db/lifecycle/View.java
index 0d1100b..73ba131 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/View.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@ -126,12 +126,19 @@ public class View
return String.format("View(pending_count=%d, sstables=%s, compacting=%s)", liveMemtables.size() + flushingMemtables.size() - 1, sstables, compacting);
}
- public List<SSTableReader> sstablesInBounds(AbstractBounds<RowPosition> rowBounds)
+ /**
+ * Returns the sstables that have any partition between {@code left} and {@code right}, when both bounds are taken inclusively.
+ * The interval formed by {@code left} and {@code right} shouldn't wrap.
+ */
+ public List<SSTableReader> sstablesInBounds(RowPosition left, RowPosition right)
{
+ assert !AbstractBounds.strictlyWrapsAround(left, right);
+
if (intervalTree.isEmpty())
return Collections.emptyList();
- RowPosition stopInTree = rowBounds.right.isMinimum() ? intervalTree.max() : rowBounds.right;
- return intervalTree.search(Interval.<RowPosition, SSTableReader>create(rowBounds.left, stopInTree));
+
+ RowPosition stopInTree = right.isMinimum() ? intervalTree.max() : right;
+ return intervalTree.search(Interval.<RowPosition, SSTableReader>create(left, stopInTree));
}
// METHODS TO CONSTRUCT FUNCTIONS FOR MODIFYING A VIEW:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3aa7308e/src/java/org/apache/cassandra/dht/AbstractBounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/AbstractBounds.java b/src/java/org/apache/cassandra/dht/AbstractBounds.java
index b5ffc22..c33ffc0 100644
--- a/src/java/org/apache/cassandra/dht/AbstractBounds.java
+++ b/src/java/org/apache/cassandra/dht/AbstractBounds.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.dht;
import java.io.DataInput;
import java.io.IOException;
import java.io.Serializable;
+import java.util.Collection;
import java.util.List;
import org.apache.cassandra.db.DecoratedKey;
@@ -71,6 +72,30 @@ public abstract class AbstractBounds<T extends RingPosition<T>> implements Seria
public abstract boolean inclusiveLeft();
public abstract boolean inclusiveRight();
+ /**
+ * Whether {@code left} and {@code right} forms a wrapping interval, that is if unwrapping wouldn't be a no-op.
+ * <p>
+ * Note that the semantic is slightly different from {@link Range#isWrapAround()} in the sense that if both
+ * {@code right} are minimal (for the partitioner), this methods return false (doesn't wrap) while
+ * {@link Range#isWrapAround()} returns true (does wrap). This is confusing and we should fix it by
+ * refactoring/rewriting the whole AbstractBounds hierarchy with cleaner semantics, but we don't want to risk
+ * breaking something by changing {@link Range#isWrapAround()} in the meantime.
+ */
+ public static <T extends RingPosition<T>> boolean strictlyWrapsAround(T left, T right)
+ {
+ return !(left.compareTo(right) <= 0 || right.isMinimum());
+ }
+
+ public static <T extends RingPosition<T>> boolean noneStrictlyWrapsAround(Collection<AbstractBounds<T>> bounds)
+ {
+ for (AbstractBounds<T> b : bounds)
+ {
+ if (strictlyWrapsAround(b.left, b.right))
+ return false;
+ }
+ return true;
+ }
+
@Override
public int hashCode()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3aa7308e/src/java/org/apache/cassandra/dht/Bounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/Bounds.java b/src/java/org/apache/cassandra/dht/Bounds.java
index 4a5a701..9060bcf 100644
--- a/src/java/org/apache/cassandra/dht/Bounds.java
+++ b/src/java/org/apache/cassandra/dht/Bounds.java
@@ -32,7 +32,7 @@ public class Bounds<T extends RingPosition<T>> extends AbstractBounds<T>
{
super(left, right);
// unlike a Range, a Bounds may not wrap
- assert left.compareTo(right) <= 0 || right.isMinimum() : "[" + left + "," + right + "]";
+ assert !strictlyWrapsAround(left, right) : "[" + left + "," + right + "]";
}
public boolean contains(T position)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3aa7308e/src/java/org/apache/cassandra/dht/ExcludingBounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/ExcludingBounds.java b/src/java/org/apache/cassandra/dht/ExcludingBounds.java
index 0d37e5c..7319356 100644
--- a/src/java/org/apache/cassandra/dht/ExcludingBounds.java
+++ b/src/java/org/apache/cassandra/dht/ExcludingBounds.java
@@ -31,7 +31,7 @@ public class ExcludingBounds<T extends RingPosition<T>> extends AbstractBounds<T
{
super(left, right);
// unlike a Range, an ExcludingBounds may not wrap, nor be empty
- assert left.compareTo(right) < 0 || right.isMinimum() : "(" + left + "," + right + ")";
+ assert !strictlyWrapsAround(left, right) && (right.isMinimum() || left.compareTo(right) != 0) : "(" + left + "," + right + ")";
}
public boolean contains(T position)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3aa7308e/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java b/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java
index e9e8e8e..abcf87b 100644
--- a/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java
+++ b/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java
@@ -32,7 +32,7 @@ public class IncludingExcludingBounds<T extends RingPosition<T>> extends Abstrac
super(left, right);
// unlike a Range, an IncludingExcludingBounds may not wrap, nor have
// right == left unless the right is the min token
- assert left.compareTo(right) < 0 || right.isMinimum() : "[" + left + "," + right + ")";
+ assert !strictlyWrapsAround(left, right) && (right.isMinimum() || left.compareTo(right) != 0) : "(" + left + "," + right + ")";
}
public boolean contains(T position)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3aa7308e/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 366bc33..55d7e68 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -24,8 +24,6 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
-import javax.annotation.Nullable;
-
import com.google.common.base.Function;
import com.google.common.collect.*;
@@ -38,7 +36,6 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.RowPosition;
-import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.*;
@@ -316,9 +313,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber
{
for (ColumnFamilyStore cfStore : stores)
{
- final List<AbstractBounds<RowPosition>> rowBoundsList = new ArrayList<>(ranges.size());
+ final List<Range<RowPosition>> keyRanges = new ArrayList<>(ranges.size());
for (Range<Token> range : ranges)
- rowBoundsList.add(Range.makeRowRange(range));
+ keyRanges.add(Range.makeRowRange(range));
refs.addAll(cfStore.selectAndReference(new Function<View, List<SSTableReader>>()
{
public List<SSTableReader> apply(View view)
@@ -328,11 +325,17 @@ public class StreamSession implements IEndpointStateChangeSubscriber
permittedInstances.put(reader, reader);
Set<SSTableReader> sstables = Sets.newHashSet();
- for (AbstractBounds<RowPosition> rowBounds : rowBoundsList)
+ for (Range<RowPosition> keyRange : keyRanges)
{
- // sstableInBounds may contain early opened sstables
- for (SSTableReader sstable : view.sstablesInBounds(rowBounds))
+ // keyRange excludes its start, while sstableInBounds is inclusive (of both start and end).
+ // This is fine however, because keyRange has been created from a token range through Range.makeRowRange (see above).
+ // And that later method uses the Token.maxKeyBound() method to creates the range, which return a "fake" key that
+ // sort after all keys having the token. That "fake" key cannot however be equal to any real key, so that even
+ // including keyRange.left will still exclude any key having the token of the original token range, and so we're
+ // still actually selecting what we wanted.
+ for (SSTableReader sstable : view.sstablesInBounds(keyRange.left, keyRange.right))
{
+ // sstableInBounds may contain early opened sstables
if (isIncremental && sstable.isRepaired())
continue;
sstable = permittedInstances.get(sstable);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3aa7308e/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
index 4c8006a..32a81e2 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
@@ -61,13 +61,14 @@ public class ViewTest
{
RowPosition min = MockSchema.readerBounds(i);
RowPosition max = MockSchema.readerBounds(j);
- for (boolean minInc : new boolean[] { true, false} )
+ for (boolean minInc : new boolean[] { true })//, false} )
{
- for (boolean maxInc : new boolean[] { true, false} )
+ for (boolean maxInc : new boolean[] { true })//, false} )
{
if (i == j && !(minInc && maxInc))
continue;
- List<SSTableReader> r = initialView.sstablesInBounds(AbstractBounds.bounds(min, minInc, max, maxInc));
+ AbstractBounds<RowPosition> bounds = AbstractBounds.bounds(min, minInc, max, maxInc);
+ List<SSTableReader> r = initialView.sstablesInBounds(bounds.left, bounds.right);
Assert.assertEquals(String.format("%d(%s) %d(%s)", i, minInc, j, maxInc), j - i + (minInc ? 0 : -1) + (maxInc ? 1 : 0), r.size());
}
}
[2/2] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
Posted by al...@apache.org.
Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/99e0c907
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/99e0c907
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/99e0c907
Branch: refs/heads/cassandra-3.0
Commit: 99e0c907eabc26f876f984daf33fdc2d3ab66a24
Parents: 507ed14 3aa7308
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Fri Aug 14 21:27:08 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Fri Aug 14 21:27:08 2015 +0300
----------------------------------------------------------------------
.../apache/cassandra/db/ColumnFamilyStore.java | 2 +-
.../cassandra/db/SizeEstimatesRecorder.java | 3 ++-
.../org/apache/cassandra/db/lifecycle/View.java | 20 ++++++++++++----
.../apache/cassandra/dht/AbstractBounds.java | 25 ++++++++++++++++++++
src/java/org/apache/cassandra/dht/Bounds.java | 2 +-
.../apache/cassandra/dht/ExcludingBounds.java | 2 +-
.../cassandra/dht/IncludingExcludingBounds.java | 2 +-
.../cassandra/streaming/StreamSession.java | 16 +++++++++----
.../apache/cassandra/db/lifecycle/ViewTest.java | 8 ++++---
9 files changed, 63 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 797e2c7,8bda6b2..8d72ecf
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -1179,7 -1394,7 +1179,7 @@@ public class ColumnFamilyStore implemen
Set<SSTableReader> results = null;
for (SSTableReader sstable : sstables)
{
- Set<SSTableReader> overlaps = ImmutableSet.copyOf(view.sstablesInBounds(sstableSet, AbstractBounds.bounds(sstable.first, true, sstable.last, true)));
- Set<SSTableReader> overlaps = ImmutableSet.copyOf(tree.search(Interval.<RowPosition, SSTableReader>create(sstable.first, sstable.last)));
++ Set<SSTableReader> overlaps = ImmutableSet.copyOf(view.sstablesInBounds(sstableSet, sstable.first, sstable.last));
results = results == null ? overlaps : Sets.union(results, overlaps).immutableCopy();
}
results = Sets.difference(results, ImmutableSet.copyOf(sstables));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/View.java
index 75590fa,73ba131..7ee0fdf
--- a/src/java/org/apache/cassandra/db/lifecycle/View.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@@ -176,41 -126,19 +176,53 @@@ public class Vie
return String.format("View(pending_count=%d, sstables=%s, compacting=%s)", liveMemtables.size() + flushingMemtables.size() - 1, sstables, compacting);
}
- public Iterable<SSTableReader> sstablesInBounds(SSTableSet sstableSet, AbstractBounds<PartitionPosition> rowBounds)
+ /**
- * Returns the sstables that have any partition between {@code left} and {@code right}, when both bounds are taken inclusively.
- * The interval formed by {@code left} and {@code right} shouldn't wrap.
- */
- public List<SSTableReader> sstablesInBounds(RowPosition left, RowPosition right)
++ * Returns the sstables that have any partition between {@code left} and {@code right}, when both bounds are taken inclusively.
++ * The interval formed by {@code left} and {@code right} shouldn't wrap.
++ */
++ public Iterable<SSTableReader> sstablesInBounds(SSTableSet sstableSet, PartitionPosition left, PartitionPosition right)
{
+ assert !AbstractBounds.strictlyWrapsAround(left, right);
+
if (intervalTree.isEmpty())
return Collections.emptyList();
- PartitionPosition stopInTree = rowBounds.right.isMinimum() ? intervalTree.max() : rowBounds.right;
- return select(sstableSet, intervalTree.search(Interval.create(rowBounds.left, stopInTree)));
+
- RowPosition stopInTree = right.isMinimum() ? intervalTree.max() : right;
- return intervalTree.search(Interval.<RowPosition, SSTableReader>create(left, stopInTree));
++ PartitionPosition stopInTree = right.isMinimum() ? intervalTree.max() : right;
++ return select(sstableSet, intervalTree.search(Interval.create(left, stopInTree)));
+ }
+
+ public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet)
+ {
+ return (view) -> view.sstables(sstableSet);
+ }
+
+ public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet, Predicate<SSTableReader> filter)
+ {
+ return (view) -> view.sstables(sstableSet, filter);
+ }
+
+ /**
+ * @return a ViewFragment containing the sstables and memtables that may need to be merged
+ * for the given @param key, according to the interval tree
+ */
+ public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet, DecoratedKey key)
+ {
+ assert sstableSet == SSTableSet.LIVE;
+ return (view) -> view.intervalTree.search(key);
+ }
+
+ /**
+ * @return a ViewFragment containing the sstables and memtables that may need to be merged
+ * for rows within @param rowBounds, inclusive, according to the interval tree.
+ */
+ public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet, AbstractBounds<PartitionPosition> rowBounds)
+ {
- return (view) -> view.sstablesInBounds(sstableSet, rowBounds);
++ // Note that View.sstablesInBounds always includes it's bound while rowBounds may not. This is ok however
++ // because the fact we restrict the sstables returned by this function is an optimization in the first
++ // place and the returned sstables will (almost) never cover *exactly* rowBounds anyway. It's also
++ // *very* unlikely that a sstable is included *just* because we consider one of the bound inclusively
++ // instead of exclusively, so the performance impact is negligible in practice.
++ return (view) -> view.sstablesInBounds(sstableSet, rowBounds.left, rowBounds.right);
}
// METHODS TO CONSTRUCT FUNCTIONS FOR MODIFYING A VIEW:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/dht/AbstractBounds.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/dht/Bounds.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/dht/ExcludingBounds.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index 861528b,55d7e68..bb5be1e
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -35,8 -35,7 +35,7 @@@ import org.slf4j.LoggerFactory
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.RowPosition;
+import org.apache.cassandra.db.PartitionPosition;
- import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.*;
@@@ -320,23 -313,40 +319,30 @@@ public class StreamSession implements I
{
for (ColumnFamilyStore cfStore : stores)
{
- final List<AbstractBounds<PartitionPosition>> rowBoundsList = new ArrayList<>(ranges.size());
- final List<Range<RowPosition>> keyRanges = new ArrayList<>(ranges.size());
++ final List<Range<PartitionPosition>> keyRanges = new ArrayList<>(ranges.size());
for (Range<Token> range : ranges)
- rowBoundsList.add(Range.makeRowRange(range));
+ keyRanges.add(Range.makeRowRange(range));
- refs.addAll(cfStore.selectAndReference(new Function<View, List<SSTableReader>>()
- {
- public List<SSTableReader> apply(View view)
+ refs.addAll(cfStore.selectAndReference(view -> {
+ Set<SSTableReader> sstables = Sets.newHashSet();
- for (AbstractBounds<PartitionPosition> rowBounds : rowBoundsList)
++ for (Range<PartitionPosition> keyRange : keyRanges)
{
- for (SSTableReader sstable : view.sstablesInBounds(SSTableSet.CANONICAL, rowBounds))
- Map<SSTableReader, SSTableReader> permittedInstances = new HashMap<>();
- for (SSTableReader reader : ColumnFamilyStore.CANONICAL_SSTABLES.apply(view))
- permittedInstances.put(reader, reader);
-
- Set<SSTableReader> sstables = Sets.newHashSet();
- for (Range<RowPosition> keyRange : keyRanges)
++ // keyRange excludes its start, while sstableInBounds is inclusive (of both start and end).
++ // This is fine however, because keyRange has been created from a token range through Range.makeRowRange (see above).
++ // And that later method uses the Token.maxKeyBound() method to creates the range, which return a "fake" key that
++ // sort after all keys having the token. That "fake" key cannot however be equal to any real key, so that even
++ // including keyRange.left will still exclude any key having the token of the original token range, and so we're
++ // still actually selecting what we wanted.
++ for (SSTableReader sstable : view.sstablesInBounds(SSTableSet.CANONICAL, keyRange.left, keyRange.right))
{
- // keyRange excludes its start, while sstableInBounds is inclusive (of both start and end).
- // This is fine however, because keyRange has been created from a token range through Range.makeRowRange (see above).
- // And that later method uses the Token.maxKeyBound() method to creates the range, which return a "fake" key that
- // sort after all keys having the token. That "fake" key cannot however be equal to any real key, so that even
- // including keyRange.left will still exclude any key having the token of the original token range, and so we're
- // still actually selecting what we wanted.
- for (SSTableReader sstable : view.sstablesInBounds(keyRange.left, keyRange.right))
- {
- // sstableInBounds may contain early opened sstables
- if (isIncremental && sstable.isRepaired())
- continue;
- sstable = permittedInstances.get(sstable);
- if (sstable != null)
- sstables.add(sstable);
- }
++ // sstableInBounds may contain early opened sstables
+ if (!isIncremental || !sstable.isRepaired())
+ sstables.add(sstable);
}
-
- logger.debug("ViewFilter for {}/{} sstables", sstables.size(), view.sstables.size());
- return ImmutableList.copyOf(sstables);
}
+
+ if (logger.isDebugEnabled())
+ logger.debug("ViewFilter for {}/{} sstables", sstables.size(), Iterables.size(view.sstables(SSTableSet.CANONICAL)));
+ return sstables;
}).refs);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
index 27d426a,32a81e2..40afa54
--- a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
@@@ -61,15 -59,16 +61,17 @@@ public class ViewTes
{
for (int j = i ; j < 5 ; j++)
{
- RowPosition min = MockSchema.readerBounds(i);
- RowPosition max = MockSchema.readerBounds(j);
+ PartitionPosition min = MockSchema.readerBounds(i);
+ PartitionPosition max = MockSchema.readerBounds(j);
- for (boolean minInc : new boolean[] { true, false} )
+ for (boolean minInc : new boolean[] { true })//, false} )
{
- for (boolean maxInc : new boolean[] { true, false} )
+ for (boolean maxInc : new boolean[] { true })//, false} )
{
if (i == j && !(minInc && maxInc))
continue;
- List<SSTableReader> r = ImmutableList.copyOf(initialView.sstablesInBounds(SSTableSet.LIVE, AbstractBounds.bounds(min, minInc, max, maxInc)));
- AbstractBounds<RowPosition> bounds = AbstractBounds.bounds(min, minInc, max, maxInc);
- List<SSTableReader> r = initialView.sstablesInBounds(bounds.left, bounds.right);
++
++ AbstractBounds<PartitionPosition> bounds = AbstractBounds.bounds(min, minInc, max, maxInc);
++ List<SSTableReader> r = ImmutableList.copyOf(initialView.sstablesInBounds(SSTableSet.LIVE,bounds.left, bounds.right));
Assert.assertEquals(String.format("%d(%s) %d(%s)", i, minInc, j, maxInc), j - i + (minInc ? 0 : -1) + (maxInc ? 1 : 0), r.size());
}
}