You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2016/07/18 15:43:19 UTC

[04/50] cassandra git commit: Always select the live sstables when getting sstables in bounds

Always select the live sstables when getting sstables in bounds

Patch by marcuse; reviewed by benedict for CASSANDRA-11944


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5b0566a7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5b0566a7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5b0566a7

Branch: refs/heads/cassandra-3.8
Commit: 5b0566a70f373d6eb537c89c0db2a2e224706916
Parents: 2217695
Author: Marcus Eriksson <ma...@apache.org>
Authored: Thu Jun 2 09:37:06 2016 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Jul 6 07:52:28 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                               |  1 +
 src/java/org/apache/cassandra/db/ColumnFamilyStore.java   |  8 ++++----
 .../apache/cassandra/db/PartitionRangeReadCommand.java    |  2 +-
 .../org/apache/cassandra/db/SizeEstimatesRecorder.java    |  5 +++--
 .../db/compaction/AbstractCompactionStrategy.java         |  2 +-
 .../cassandra/db/compaction/CompactionController.java     |  2 +-
 .../db/compaction/DateTieredCompactionStrategy.java       |  2 +-
 .../db/compaction/TimeWindowCompactionStrategy.java       |  2 +-
 src/java/org/apache/cassandra/db/lifecycle/View.java      | 10 +++++-----
 .../org/apache/cassandra/streaming/StreamSession.java     |  8 +++++++-
 test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java |  2 +-
 11 files changed, 26 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b0566a7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7f8a3a1..99ac3ad 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.9
+ * Always select the live sstables when getting sstables in bounds (CASSANDRA-11944)
  * Fix column ordering of results with static columns for Thrift requests in
    a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of
    those static columns in query results (CASSANDRA-12123)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b0566a7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 3264327..1be3175 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1231,7 +1231,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      * @return sstables whose key range overlaps with that of the given sstables, not including itself.
      * (The given sstables may or may not overlap with each other.)
      */
-    public Collection<SSTableReader> getOverlappingSSTables(SSTableSet sstableSet, Iterable<SSTableReader> sstables)
+    public Collection<SSTableReader> getOverlappingLiveSSTables(Iterable<SSTableReader> sstables)
     {
         logger.trace("Checking for sstables overlapping {}", sstables);
 
@@ -1282,7 +1282,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         Set<SSTableReader> results = new HashSet<>();
 
         for (AbstractBounds<PartitionPosition> bound : bounds)
-            Iterables.addAll(results, view.sstablesInBounds(sstableSet, bound.left, bound.right));
+            Iterables.addAll(results, view.liveSSTablesInBounds(bound.left, bound.right));
 
         return Sets.difference(results, ImmutableSet.copyOf(sstables));
     }
@@ -1290,11 +1290,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     /**
      * like getOverlappingSSTables, but acquires references before returning
      */
-    public Refs<SSTableReader> getAndReferenceOverlappingSSTables(SSTableSet sstableSet, Iterable<SSTableReader> sstables)
+    public Refs<SSTableReader> getAndReferenceOverlappingLiveSSTables(Iterable<SSTableReader> sstables)
     {
         while (true)
         {
-            Iterable<SSTableReader> overlapped = getOverlappingSSTables(sstableSet, sstables);
+            Iterable<SSTableReader> overlapped = getOverlappingLiveSSTables(sstables);
             Refs<SSTableReader> refs = Refs.tryRef(overlapped);
             if (refs != null)
                 return refs;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b0566a7/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 9585b59..842ad5f 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -175,7 +175,7 @@ public class PartitionRangeReadCommand extends ReadCommand
 
     protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadOrderGroup orderGroup)
     {
-        ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, dataRange().keyRange()));
+        ColumnFamilyStore.ViewFragment view = cfs.select(View.selectLive(dataRange().keyRange()));
         Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), dataRange().keyRange().getString(metadata().getKeyValidator()));
 
         // fetch data from current memtable, historical memtables, and SSTables in the correct order.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b0566a7/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
index 2a74ea9..3461aef 100644
--- a/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
+++ b/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
@@ -103,8 +103,9 @@ public class SizeEstimatesRecorder extends MigrationListener implements Runnable
             {
                 while (refs == null)
                 {
-                    ColumnFamilyStore.ViewFragment view = table.select(View.select(SSTableSet.CANONICAL, Range.makeRowRange(range)));
-                    refs = Refs.tryRef(view.sstables);
+                    // note that this is not guaranteed to return all sstables within the ranges, but for an estimated size, that should be fine
+                    Iterable<SSTableReader> canonicalSSTables = table.getTracker().getView().select(SSTableSet.CANONICAL, table.select(View.selectLive(Range.makeRowRange(range))).sstables);
+                    refs = Refs.tryRef(canonicalSSTables);
                 }
 
                 // calculate the estimates.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b0566a7/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index e30e4f7..0dce52b 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -380,7 +380,7 @@ public abstract class AbstractCompactionStrategy
         if (uncheckedTombstoneCompaction)
             return true;
 
-        Collection<SSTableReader> overlaps = cfs.getOverlappingSSTables(SSTableSet.CANONICAL, Collections.singleton(sstable));
+        Collection<SSTableReader> overlaps = cfs.getOverlappingLiveSSTables(Collections.singleton(sstable));
         if (overlaps.isEmpty())
         {
             // there is no overlap, tombstones are safely droppable

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b0566a7/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index a5b8308..fbf29e3 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -103,7 +103,7 @@ public class CompactionController implements AutoCloseable
         if (compacting == null)
             overlappingSSTables = Refs.tryRef(Collections.<SSTableReader>emptyList());
         else
-            overlappingSSTables = cfs.getAndReferenceOverlappingSSTables(SSTableSet.LIVE, compacting);
+            overlappingSSTables = cfs.getAndReferenceOverlappingLiveSSTables(compacting);
         this.overlapIterator = new OverlapIterator<>(buildIntervals(overlappingSSTables));
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b0566a7/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 8571906..3e6ae61 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -99,7 +99,7 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
         if (System.currentTimeMillis() - lastExpiredCheck > options.expiredSSTableCheckFrequency)
         {
             // Find fully expired SSTables. Those will be included no matter what.
-            expired = CompactionController.getFullyExpiredSSTables(cfs, uncompacting, cfs.getOverlappingSSTables(SSTableSet.CANONICAL, uncompacting), gcBefore);
+            expired = CompactionController.getFullyExpiredSSTables(cfs, uncompacting, cfs.getOverlappingLiveSSTables(uncompacting), gcBefore);
             lastExpiredCheck = System.currentTimeMillis();
         }
         Set<SSTableReader> candidates = Sets.newHashSet(filterSuspectSSTables(uncompacting));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b0566a7/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
index d1630c5..e2ab7dc 100644
--- a/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/TimeWindowCompactionStrategy.java
@@ -104,7 +104,7 @@ public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy
         if (System.currentTimeMillis() - lastExpiredCheck > options.expiredSSTableCheckFrequency)
         {
             logger.debug("TWCS expired check sufficiently far in the past, checking for fully expired SSTables");
-            expired = CompactionController.getFullyExpiredSSTables(cfs, uncompacting, cfs.getOverlappingSSTables(SSTableSet.CANONICAL, uncompacting), gcBefore);
+            expired = CompactionController.getFullyExpiredSSTables(cfs, uncompacting, cfs.getOverlappingLiveSSTables(uncompacting), gcBefore);
             lastExpiredCheck = System.currentTimeMillis();
         }
         else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b0566a7/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/View.java b/src/java/org/apache/cassandra/db/lifecycle/View.java
index 99903fc..96aaa49 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/View.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@ -136,7 +136,7 @@ public class View
         return Iterables.concat(sstables, filterOut(compacting, sstables));
     }
 
-    private Iterable<SSTableReader> select(SSTableSet sstableSet, Iterable<SSTableReader> sstables)
+    public Iterable<SSTableReader> select(SSTableSet sstableSet, Iterable<SSTableReader> sstables)
     {
         switch (sstableSet)
         {
@@ -182,7 +182,7 @@ public class View
      * Returns the sstables that have any partition between {@code left} and {@code right}, when both bounds are taken inclusively.
      * The interval formed by {@code left} and {@code right} shouldn't wrap.
      */
-    public Iterable<SSTableReader> sstablesInBounds(SSTableSet sstableSet, PartitionPosition left, PartitionPosition right)
+    public Iterable<SSTableReader> liveSSTablesInBounds(PartitionPosition left, PartitionPosition right)
     {
         assert !AbstractBounds.strictlyWrapsAround(left, right);
 
@@ -190,7 +190,7 @@ public class View
             return Collections.emptyList();
 
         PartitionPosition stopInTree = right.isMinimum() ? intervalTree.max() : right;
-        return select(sstableSet, intervalTree.search(Interval.create(left, stopInTree)));
+        return select(SSTableSet.LIVE, intervalTree.search(Interval.create(left, stopInTree)));
     }
 
     public static List<SSTableReader> sstablesInBounds(PartitionPosition left, PartitionPosition right, SSTableIntervalTree intervalTree)
@@ -228,14 +228,14 @@ public class View
      * @return a ViewFragment containing the sstables and memtables that may need to be merged
      * for rows within @param rowBounds, inclusive, according to the interval tree.
      */
-    public static Function<View, Iterable<SSTableReader>> select(SSTableSet sstableSet, AbstractBounds<PartitionPosition> rowBounds)
+    public static Function<View, Iterable<SSTableReader>> selectLive(AbstractBounds<PartitionPosition> rowBounds)
     {
         // Note that View.sstablesInBounds always includes it's bound while rowBounds may not. This is ok however
         // because the fact we restrict the sstables returned by this function is an optimization in the first
         // place and the returned sstables will (almost) never cover *exactly* rowBounds anyway. It's also
         // *very* unlikely that a sstable is included *just* because we consider one of the bound inclusively
         // instead of exclusively, so the performance impact is negligible in practice.
-        return (view) -> view.sstablesInBounds(sstableSet, rowBounds.left, rowBounds.right);
+        return (view) -> view.liveSSTablesInBounds(rowBounds.left, rowBounds.right);
     }
 
     // METHODS TO CONSTRUCT FUNCTIONS FOR MODIFYING A VIEW:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b0566a7/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index d5c060e..a14f815 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -29,8 +29,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.*;
 
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
-import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.lifecycle.View;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.slf4j.Logger;
@@ -332,6 +332,12 @@ public class StreamSession implements IEndpointStateChangeSubscriber
                     SSTableIntervalTree intervalTree = SSTableIntervalTree.build(view.sstables(SSTableSet.CANONICAL));
                     for (Range<PartitionPosition> keyRange : keyRanges)
                     {
+                        // keyRange excludes its start, while sstableInBounds is inclusive (of both start and end).
+                        // This is fine however, because keyRange has been created from a token range through Range.makeRowRange (see above).
+                        // And that later method uses the Token.maxKeyBound() method to creates the range, which return a "fake" key that
+                        // sort after all keys having the token. That "fake" key cannot however be equal to any real key, so that even
+                        // including keyRange.left will still exclude any key having the token of the original token range, and so we're
+                        // still actually selecting what we wanted.
                         for (SSTableReader sstable : View.sstablesInBounds(keyRange.left, keyRange.right, intervalTree))
                         {
                             if (!isIncremental || !sstable.isRepaired())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b0566a7/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
index 8a5e00e..98f9300 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
@@ -71,7 +71,7 @@ public class ViewTest
                             continue;
 
                         AbstractBounds<PartitionPosition> bounds = AbstractBounds.bounds(min, minInc, max, maxInc);
-                        List<SSTableReader> r = ImmutableList.copyOf(initialView.sstablesInBounds(SSTableSet.LIVE,bounds.left, bounds.right));
+                        List<SSTableReader> r = ImmutableList.copyOf(initialView.liveSSTablesInBounds(bounds.left, bounds.right));
                         Assert.assertEquals(String.format("%d(%s) %d(%s)", i, minInc, j, maxInc), j - i + (minInc ? 0 : -1) + (maxInc ? 1 : 0), r.size());
                     }
                 }