You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/03/04 16:04:57 UTC

[1/5] cassandra git commit: Remove cold_reads_to_omit from STCS

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 33a9adac8 -> 3c3fefa04
  refs/heads/trunk b5b1af703 -> 20b62de80


Remove cold_reads_to_omit from STCS

Patch by marcuse; reviewed by thobbs for CASSANDRA-8860


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/33a9adac
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/33a9adac
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/33a9adac

Branch: refs/heads/trunk
Commit: 33a9adac8e11cc4b01aa305868412b74048d3b34
Parents: d4e3786
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Mar 3 11:49:08 2015 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Mar 4 09:09:18 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   8 ++
 pylib/cqlshlib/cql3handling.py                  |   1 -
 .../SizeTieredCompactionStrategy.java           | 136 -------------------
 .../SizeTieredCompactionStrategyOptions.java    |  14 --
 .../SizeTieredCompactionStrategyTest.java       |  90 ------------
 6 files changed, 9 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/33a9adac/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 748acf8..4992d85 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.4
+ * Remove cold_reads_to_omit from STCS (CASSANDRA-8860)
  * Make EstimatedHistogram#percentile() use ceil instead of floor (CASSANDRA-8883)
  * Fix top partitions reporting wrong cardinality (CASSANDRA-8834)
  * Fix rare NPE in KeyCacheSerializer (CASSANDRA-8067)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33a9adac/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 602770c..06013b8 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -13,6 +13,14 @@ restore snapshots created with the previous major version using the
 'sstableloader' tool. You can upgrade the file format of your snapshots
 using the provided 'sstableupgrade' tool.
 
+2.1.4
+=====
+Upgrading
+---------
+    - The option to omit cold sstables with size tiered compaction has been
+      removed - it is almost always better to use date tiered compaction for
+      workloads that have cold data. 
+
 2.1.3
 =====
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33a9adac/pylib/cqlshlib/cql3handling.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py
index f089cd7..88f042e 100644
--- a/pylib/cqlshlib/cql3handling.py
+++ b/pylib/cqlshlib/cql3handling.py
@@ -468,7 +468,6 @@ def cf_prop_val_mapkey_completer(ctxt, cass):
             opts.add('min_threshold')
             opts.add('bucket_high')
             opts.add('bucket_low')
-            opts.add('cold_reads_to_omit')
         elif csc == 'LeveledCompactionStrategy':
             opts.add('sstable_size_in_mb')
         elif csc == 'DateTieredCompactionStrategy':

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33a9adac/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 08102c1..19abd9c 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -82,7 +82,6 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
         int maxThreshold = cfs.getMaximumCompactionThreshold();
 
         Iterable<SSTableReader> candidates = filterSuspectSSTables(Sets.intersection(cfs.getUncompactingSSTables(), sstables));
-        candidates = filterColdSSTables(Lists.newArrayList(candidates), options.coldReadsToOmit, cfs.getMinimumCompactionThreshold());
 
         List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(candidates), options.bucketHigh, options.bucketLow, options.minSSTableSize);
         logger.debug("Compaction buckets are {}", buckets);
@@ -106,141 +105,6 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
         return Collections.singletonList(sstablesWithTombstones.get(0));
     }
 
-    /**
-     * Removes as many cold sstables as possible while retaining at least 1-coldReadsToOmit of the total reads/sec
-     * across all sstables
-     * @param sstables all sstables to consider
-     * @param coldReadsToOmit the proportion of total reads/sec that will be omitted (0=omit nothing, 1=omit everything)
-     * @param minThreshold min compaction threshold
-     * @return a list of sstables with the coldest sstables excluded until the reads they represent reaches coldReadsToOmit
-     */
-    @VisibleForTesting
-    static List<SSTableReader> filterColdSSTables(List<SSTableReader> sstables, double coldReadsToOmit, int minThreshold)
-    {
-        if (coldReadsToOmit == 0.0)
-            return sstables;
-
-        // Sort the sstables by hotness (coldest-first). We first build a map because the hotness may change during the sort.
-        final Map<SSTableReader, Double> hotnessSnapshot = getHotnessMap(sstables);
-        Collections.sort(sstables, new Comparator<SSTableReader>()
-        {
-            public int compare(SSTableReader o1, SSTableReader o2)
-            {
-                int comparison = Double.compare(hotnessSnapshot.get(o1), hotnessSnapshot.get(o2));
-                if (comparison != 0)
-                    return comparison;
-
-                // break ties with size on disk (mainly for system tables and cold tables)
-                comparison = Long.compare(o1.bytesOnDisk(), o2.bytesOnDisk());
-                if (comparison != 0)
-                    return comparison;
-
-                // if there's still a tie, use generation, which is guaranteed to be unique.  this ensures that
-                // our filtering is deterministic, which can be useful when debugging.
-                return o1.descriptor.generation - o2.descriptor.generation;
-            }
-        });
-
-        // calculate the total reads/sec across all sstables
-        double totalReads = 0.0;
-        for (SSTableReader sstr : sstables)
-            if (sstr.getReadMeter() != null)
-                totalReads += sstr.getReadMeter().twoHourRate();
-
-        // if this is a system table with no read meters or we don't have any read rates yet, just return them all
-        if (totalReads == 0.0)
-            return sstables;
-
-        // iteratively ignore the coldest sstables until ignoring one more would put us over the coldReadsToOmit threshold
-        double maxColdReads = coldReadsToOmit * totalReads;
-
-        double totalColdReads = 0.0;
-        int cutoffIndex = 0;
-        while (cutoffIndex < sstables.size())
-        {
-            SSTableReader sstable = sstables.get(cutoffIndex);
-            if (sstable.getReadMeter() == null)
-            {
-                throw new AssertionError("If you're seeing this exception, please attach your logs to CASSANDRA-8238 to help us debug. "+sstable);
-            }
-            double reads = sstable.getReadMeter().twoHourRate();
-            if (totalColdReads + reads > maxColdReads)
-                break;
-
-            totalColdReads += reads;
-            cutoffIndex++;
-        }
-        List<SSTableReader> hotSSTables = new ArrayList<>(sstables.subList(cutoffIndex, sstables.size()));
-        List<SSTableReader> coldSSTables = sstables.subList(0, cutoffIndex);
-        logger.debug("hotSSTables={}, coldSSTables={}", hotSSTables.size(), coldSSTables.size());
-        if (hotSSTables.size() >= minThreshold)
-            return hotSSTables;
-        if (coldSSTables.size() < minThreshold)
-            return Collections.emptyList();
-
-        Map<SSTableReader, Set<SSTableReader>> overlapMap = new HashMap<>();
-        for (int i = 0; i < coldSSTables.size(); i++)
-        {
-            SSTableReader sstable = coldSSTables.get(i);
-            Set<SSTableReader> overlaps = new HashSet<>();
-            for (int j = 0; j < coldSSTables.size(); j++)
-            {
-                SSTableReader innerSSTable = coldSSTables.get(j);
-                if (ColumnNameHelper.overlaps(sstable.getSSTableMetadata().minColumnNames,
-                                              sstable.getSSTableMetadata().maxColumnNames,
-                                              innerSSTable.getSSTableMetadata().minColumnNames,
-                                              innerSSTable.getSSTableMetadata().maxColumnNames,
-                                              sstable.metadata.comparator))
-                {
-                    overlaps.add(innerSSTable);
-                }
-            }
-            overlapMap.put(sstable, overlaps);
-        }
-        List<Set<SSTableReader>> overlapChains = new ArrayList<>();
-        for (SSTableReader sstable : overlapMap.keySet())
-            overlapChains.add(createOverlapChain(sstable, overlapMap));
-
-        Collections.sort(overlapChains, new Comparator<Set<SSTableReader>>()
-        {
-            @Override
-            public int compare(Set<SSTableReader> o1, Set<SSTableReader> o2)
-            {
-                return Longs.compare(SSTableReader.getTotalBytes(o2), SSTableReader.getTotalBytes(o1));
-            }
-        });
-        for (Set<SSTableReader> overlapping : overlapChains)
-        {
-            // if we are expecting to only keep 70% of the keys after a compaction, run a compaction on these cold sstables:
-            if (SSTableReader.estimateCompactionGain(overlapping) < 0.7)
-                return new ArrayList<>(overlapping);
-        }
-        return Collections.emptyList();
-    }
-
-    /**
-     * returns a set with all overlapping sstables starting with s.
-     * if we have 3 sstables, a, b, c where a overlaps with b, but not c and b overlaps with c, all sstables would be returned.
-     *
-     * m contains an sstable -> all overlapping mapping
-     */
-    private static Set<SSTableReader> createOverlapChain(SSTableReader s, Map<SSTableReader, Set<SSTableReader>> m)
-    {
-        Deque<SSTableReader> sstables = new ArrayDeque<>();
-        Set<SSTableReader> overlapChain = new HashSet<>();
-        sstables.push(s);
-        while (!sstables.isEmpty())
-        {
-            SSTableReader sstable = sstables.pop();
-            if (overlapChain.add(sstable))
-            {
-                if (m.containsKey(sstable))
-                    sstables.addAll(m.get(sstable));
-            }
-        }
-        return overlapChain;
-    }
-
 
     /**
      * @param buckets list of buckets from which to return the most interesting, where "interesting" is the total hotness for reads

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33a9adac/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
index 84e7d61..911bb9f 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
@@ -26,16 +26,13 @@ public final class SizeTieredCompactionStrategyOptions
     protected static final long DEFAULT_MIN_SSTABLE_SIZE = 50L * 1024L * 1024L;
     protected static final double DEFAULT_BUCKET_LOW = 0.5;
     protected static final double DEFAULT_BUCKET_HIGH = 1.5;
-    protected static final double DEFAULT_COLD_READS_TO_OMIT = 0.05;
     protected static final String MIN_SSTABLE_SIZE_KEY = "min_sstable_size";
     protected static final String BUCKET_LOW_KEY = "bucket_low";
     protected static final String BUCKET_HIGH_KEY = "bucket_high";
-    protected static final String COLD_READS_TO_OMIT_KEY = "cold_reads_to_omit";
 
     protected long minSSTableSize;
     protected double bucketLow;
     protected double bucketHigh;
-    protected double coldReadsToOmit;
 
     public SizeTieredCompactionStrategyOptions(Map<String, String> options)
     {
@@ -45,8 +42,6 @@ public final class SizeTieredCompactionStrategyOptions
         bucketLow = optionValue == null ? DEFAULT_BUCKET_LOW : Double.parseDouble(optionValue);
         optionValue = options.get(BUCKET_HIGH_KEY);
         bucketHigh = optionValue == null ? DEFAULT_BUCKET_HIGH : Double.parseDouble(optionValue);
-        optionValue = options.get(COLD_READS_TO_OMIT_KEY);
-        coldReadsToOmit = optionValue == null ? DEFAULT_COLD_READS_TO_OMIT : Double.parseDouble(optionValue);
     }
 
     public SizeTieredCompactionStrategyOptions()
@@ -54,7 +49,6 @@ public final class SizeTieredCompactionStrategyOptions
         minSSTableSize = DEFAULT_MIN_SSTABLE_SIZE;
         bucketLow = DEFAULT_BUCKET_LOW;
         bucketHigh = DEFAULT_BUCKET_HIGH;
-        coldReadsToOmit = DEFAULT_COLD_READS_TO_OMIT;
     }
 
     private static double parseDouble(Map<String, String> options, String key, double defaultValue) throws ConfigurationException
@@ -94,17 +88,9 @@ public final class SizeTieredCompactionStrategyOptions
                                                            BUCKET_HIGH_KEY, bucketHigh, BUCKET_LOW_KEY, bucketLow));
         }
 
-        double maxColdReadsRatio = parseDouble(options, COLD_READS_TO_OMIT_KEY, DEFAULT_COLD_READS_TO_OMIT);
-        if (maxColdReadsRatio < 0.0 || maxColdReadsRatio > 1.0)
-        {
-            throw new ConfigurationException(String.format("%s value (%s) should be between between 0.0 and 1.0",
-                                                           COLD_READS_TO_OMIT_KEY, optionValue));
-        }
-
         uncheckedOptions.remove(MIN_SSTABLE_SIZE_KEY);
         uncheckedOptions.remove(BUCKET_LOW_KEY);
         uncheckedOptions.remove(BUCKET_HIGH_KEY);
-        uncheckedOptions.remove(COLD_READS_TO_OMIT_KEY);
 
         return uncheckedOptions;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33a9adac/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
index 87b284e..1591f03 100644
--- a/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
@@ -33,7 +33,6 @@ import org.apache.cassandra.utils.Pair;
 import static org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy.getBuckets;
 import static org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy.mostInterestingBucket;
 import static org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy.trimToThresholdWithHotness;
-import static org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy.filterColdSSTables;
 import static org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy.validateOptions;
 
 import static org.junit.Assert.*;
@@ -45,7 +44,6 @@ public class SizeTieredCompactionStrategyTest extends SchemaLoader
     public void testOptionsValidation() throws ConfigurationException
     {
         Map<String, String> options = new HashMap<>();
-        options.put(SizeTieredCompactionStrategyOptions.COLD_READS_TO_OMIT_KEY, "0.35");
         options.put(SizeTieredCompactionStrategyOptions.BUCKET_LOW_KEY, "0.5");
         options.put(SizeTieredCompactionStrategyOptions.BUCKET_HIGH_KEY, "1.5");
         options.put(SizeTieredCompactionStrategyOptions.MIN_SSTABLE_SIZE_KEY, "10000");
@@ -54,25 +52,6 @@ public class SizeTieredCompactionStrategyTest extends SchemaLoader
 
         try
         {
-            options.put(SizeTieredCompactionStrategyOptions.COLD_READS_TO_OMIT_KEY, "-0.5");
-            validateOptions(options);
-            fail(String.format("Negative %s should be rejected", SizeTieredCompactionStrategyOptions.COLD_READS_TO_OMIT_KEY));
-        }
-        catch (ConfigurationException e) {}
-
-        try
-        {
-            options.put(SizeTieredCompactionStrategyOptions.COLD_READS_TO_OMIT_KEY, "10.0");
-            validateOptions(options);
-            fail(String.format("%s > 1.0 should be rejected", SizeTieredCompactionStrategyOptions.COLD_READS_TO_OMIT_KEY));
-        }
-        catch (ConfigurationException e)
-        {
-            options.put(SizeTieredCompactionStrategyOptions.COLD_READS_TO_OMIT_KEY, "0.25");
-        }
-
-        try
-        {
             options.put(SizeTieredCompactionStrategyOptions.BUCKET_LOW_KEY, "1000.0");
             validateOptions(options);
             fail("bucket_low greater than bucket_high should be rejected");
@@ -186,73 +165,4 @@ public class SizeTieredCompactionStrategyTest extends SchemaLoader
         assertEquals(String.format("bucket hotness (%f) should be close to %f", bucket.right, expectedBucketHotness),
                      expectedBucketHotness, bucket.right, 1.0);
     }
-
-    @Test
-    public void testFilterColdSSTables() throws Exception
-    {
-        String ksname = "Keyspace1";
-        String cfname = "Standard1";
-        Keyspace keyspace = Keyspace.open(ksname);
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
-        cfs.truncateBlocking();
-        cfs.disableAutoCompaction();
-
-        ByteBuffer value = ByteBuffer.wrap(new byte[100]);
-
-        // create 10 sstables
-        int numSSTables = 10;
-        for (int r = 0; r < numSSTables; r++)
-        {
-            DecoratedKey key = Util.dk(String.valueOf(r));
-            Mutation rm = new Mutation(ksname, key.getKey());
-            rm.add(cfname, Util.cellname("column"), value, 0);
-            rm.apply();
-            cfs.forceBlockingFlush();
-        }
-        cfs.forceBlockingFlush();
-
-        List<SSTableReader> filtered;
-        List<SSTableReader> sstrs = new ArrayList<>(cfs.getSSTables());
-
-        for (SSTableReader sstr : sstrs)
-            sstr.overrideReadMeter(null);
-        filtered = filterColdSSTables(sstrs, 0.05, 0);
-        assertEquals("when there are no read meters, no sstables should be filtered", sstrs.size(), filtered.size());
-
-        for (SSTableReader sstr : sstrs)
-            sstr.overrideReadMeter(new RestorableMeter(0.0, 0.0));
-        filtered = filterColdSSTables(sstrs, 0.05, 0);
-        assertEquals("when all read meters are zero, no sstables should be filtered", sstrs.size(), filtered.size());
-
-        // leave all read rates at 0 besides one
-        sstrs.get(0).overrideReadMeter(new RestorableMeter(1000.0, 1000.0));
-        filtered = filterColdSSTables(sstrs, 0.05, 0);
-        assertEquals("there should only be one hot sstable", 1, filtered.size());
-        assertEquals(1000.0, filtered.get(0).getReadMeter().twoHourRate(), 0.5);
-
-        // the total read rate is 100, and we'll set a threshold of 2.5%, so two of the sstables with read
-        // rate 1.0 should be ignored, but not the third
-        for (SSTableReader sstr : sstrs)
-            sstr.overrideReadMeter(new RestorableMeter(0.0, 0.0));
-        sstrs.get(0).overrideReadMeter(new RestorableMeter(97.0, 97.0));
-        sstrs.get(1).overrideReadMeter(new RestorableMeter(1.0, 1.0));
-        sstrs.get(2).overrideReadMeter(new RestorableMeter(1.0, 1.0));
-        sstrs.get(3).overrideReadMeter(new RestorableMeter(1.0, 1.0));
-
-        filtered = filterColdSSTables(sstrs, 0.025, 0);
-        assertEquals(2, filtered.size());
-        assertEquals(98.0, filtered.get(0).getReadMeter().twoHourRate() + filtered.get(1).getReadMeter().twoHourRate(), 0.5);
-
-        // make sure a threshold of 0.0 doesn't result in any sstables being filtered
-        for (SSTableReader sstr : sstrs)
-            sstr.overrideReadMeter(new RestorableMeter(1.0, 1.0));
-        filtered = filterColdSSTables(sstrs, 0.0, 0);
-        assertEquals(sstrs.size(), filtered.size());
-
-        // just for fun, set a threshold where all sstables are considered cold
-        for (SSTableReader sstr : sstrs)
-            sstr.overrideReadMeter(new RestorableMeter(1.0, 1.0));
-        filtered = filterColdSSTables(sstrs, 1.0, 0);
-        assertTrue(filtered.isEmpty());
-    }
 }


[2/5] cassandra git commit: Make SSTableRewriter.abort() more robust to failure

Posted by be...@apache.org.
Make SSTableRewriter.abort() more robust to failure

patch by benedict; reviewed by branimir for CASSANDRA-8832


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3c3fefa0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3c3fefa0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3c3fefa0

Branch: refs/heads/trunk
Commit: 3c3fefa04f027a1ecbede1d41c1bf3df25218a5d
Parents: 33a9ada
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Mar 4 14:59:16 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Mar 4 14:59:16 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |   1 +
 .../org/apache/cassandra/db/DataTracker.java    |   4 +-
 .../db/compaction/CompactionManager.java        |   5 +-
 .../cassandra/io/sstable/SSTableReader.java     |   3 +-
 .../cassandra/io/sstable/SSTableRewriter.java   |  96 +++++++++++---
 .../org/apache/cassandra/utils/Throwables.java  |  32 +++++
 .../apache/cassandra/utils/concurrent/Refs.java |  25 +++-
 .../utils/concurrent/SelfRefCounted.java        |  24 ++++
 .../io/sstable/SSTableRewriterTest.java         | 124 +++++++++++--------
 10 files changed, 238 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c3fefa0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4992d85..6133536 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.4
+ * Make SSTableRewriter.abort() more robust to failure (CASSANDRA-8832)
  * Remove cold_reads_to_omit from STCS (CASSANDRA-8860)
  * Make EstimatedHistogram#percentile() use ceil instead of floor (CASSANDRA-8883)
  * Fix top partitions reporting wrong cardinality (CASSANDRA-8834)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c3fefa0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 9b792b6..38c5dbe 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -30,6 +30,7 @@ import javax.management.openmbean.*;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.*;
+import com.google.common.base.Throwables;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.*;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c3fefa0/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 8224311..81964f9 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.notifications.*;
 import org.apache.cassandra.utils.Interval;
 import org.apache.cassandra.utils.IntervalTree;
 import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.concurrent.Refs;
 
 public class DataTracker
 {
@@ -406,8 +407,7 @@ public class DataTracker
         for (SSTableReader sstable : newSSTables)
             sstable.setTrackedBy(this);
 
-        for (SSTableReader sstable : oldSSTables)
-            sstable.selfRef().release();
+        Refs.release(Refs.selfRefs(oldSSTables));
     }
 
     private void removeSSTablesFromTracker(Collection<SSTableReader> oldSSTables)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c3fefa0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index e54a25f..992378f 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1271,7 +1271,10 @@ public class CompactionManager implements CompactionManagerMBean
                 if (t instanceof CompactionInterruptedException)
                 {
                     logger.info(t.getMessage());
-                    logger.debug("Full interruption stack trace:", t);
+                    if (t.getSuppressed() != null && t.getSuppressed().length > 0)
+                        logger.warn("Interruption of compaction encountered exceptions:", t);
+                    else
+                        logger.debug("Full interruption stack trace:", t);
                 }
                 else
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c3fefa0/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 13abc04..973b0c9 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -107,6 +107,7 @@ import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.concurrent.Ref;
 import org.apache.cassandra.utils.concurrent.RefCounted;
+import org.apache.cassandra.utils.concurrent.SelfRefCounted;
 
 import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR;
 
@@ -166,7 +167,7 @@ import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR
  *
  * TODO: fill in details about DataTracker and lifecycle interactions for tools, and for compaction strategies
  */
-public class SSTableReader extends SSTable implements RefCounted<SSTableReader>
+public class SSTableReader extends SSTable implements SelfRefCounted<SSTableReader>
 {
     private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c3fefa0/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 2ca3e6f..be1085b 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -21,6 +21,7 @@ import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Functions;
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -32,6 +33,8 @@ import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 import org.apache.cassandra.utils.CLibrary;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static org.apache.cassandra.utils.Throwables.merge;
+
 /**
  * Wraps one or more writers as output for rewriting one or more readers: every sstable_preemptive_open_interval_in_mb
  * we look in the summary we're collecting for the latest writer for the penultimate key that we know to have been fully
@@ -84,6 +87,12 @@ public class SSTableRewriter
 
     private SSTableWriter writer;
     private Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>();
+    private State state = State.WORKING;
+
+    private static enum State
+    {
+        WORKING, FINISHED, ABORTED
+    }
 
     public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, boolean isOffline)
     {
@@ -176,30 +185,79 @@ public class SSTableRewriter
 
     public void abort()
     {
-        switchWriter(null, true);
-        moveStarts(null, null, true);
+        switch (state)
+        {
+            case ABORTED:
+                return;
+            case FINISHED:
+                throw new IllegalStateException("Cannot abort - changes have already been committed");
+        }
+        state = State.ABORTED;
+
+        Throwable fail = null;
+        try
+        {
+            moveStarts(null, null, true);
+        }
+        catch (Throwable t)
+        {
+            fail = merge(fail, t);
+        }
 
         // remove already completed SSTables
         for (SSTableReader sstable : finished)
         {
-            sstable.markObsolete();
-            sstable.selfRef().release();
+            try
+            {
+                sstable.markObsolete();
+                sstable.selfRef().release();
+            }
+            catch (Throwable t)
+            {
+                fail = merge(fail, t);
+            }
         }
 
+        if (writer != null)
+            finishedEarly.add(new Finished(writer, currentlyOpenedEarly));
+
         // abort the writers
         for (Finished finished : finishedEarly)
         {
-            boolean opened = finished.reader != null;
-            finished.writer.abort();
-            if (opened)
+            try
+            {
+                finished.writer.abort();
+            }
+            catch (Throwable t)
             {
-                // if we've already been opened, add ourselves to the discard pile
-                discard.add(finished.reader);
-                finished.reader.markObsolete();
+                fail = merge(fail, t);
+            }
+            try
+            {
+                if (finished.reader != null)
+                {
+                    // if we've already been opened, add ourselves to the discard pile
+                    discard.add(finished.reader);
+                    finished.reader.markObsolete();
+                }
+            }
+            catch (Throwable t)
+            {
+                fail = merge(fail, t);
             }
         }
 
-        replaceWithFinishedReaders(Collections.<SSTableReader>emptyList());
+        try
+        {
+            replaceWithFinishedReaders(Collections.<SSTableReader>emptyList());
+        }
+        catch (Throwable t)
+        {
+            fail = merge(fail, t);
+        }
+
+        if (fail != null)
+            throw Throwables.propagate(fail);
     }
 
     /**
@@ -301,11 +359,6 @@ public class SSTableRewriter
 
     public void switchWriter(SSTableWriter newWriter)
     {
-        switchWriter(newWriter, false);
-    }
-
-    private void switchWriter(SSTableWriter newWriter, boolean abort)
-    {
         if (writer == null)
         {
             writer = newWriter;
@@ -313,7 +366,7 @@ public class SSTableRewriter
         }
 
         // we leave it as a tmp file, but we open it and add it to the dataTracker
-        if (writer.getFilePointer() != 0 && !abort)
+        if (writer.getFilePointer() != 0)
         {
             SSTableReader reader = writer.finish(SSTableWriter.FinishType.EARLY, maxAge, -1);
             replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
@@ -362,8 +415,14 @@ public class SSTableRewriter
 
     private List<SSTableReader> finishAndMaybeThrow(long repairedAt, boolean throwEarly, boolean throwLate)
     {
+        switch (state)
+        {
+            case FINISHED: case ABORTED:
+                throw new IllegalStateException("Cannot finish - changes have already been " + state.toString().toLowerCase());
+        }
+
         List<SSTableReader> newReaders = new ArrayList<>();
-        switchWriter(null, false);
+        switchWriter(null);
 
         if (throwEarly)
             throw new RuntimeException("exception thrown early in finish, for testing");
@@ -396,6 +455,7 @@ public class SSTableRewriter
             throw new RuntimeException("exception thrown after all sstables finished, for testing");
 
         replaceWithFinishedReaders(newReaders);
+        state = State.FINISHED;
         return finished;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c3fefa0/src/java/org/apache/cassandra/utils/Throwables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Throwables.java b/src/java/org/apache/cassandra/utils/Throwables.java
new file mode 100644
index 0000000..552ca87
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/Throwables.java
@@ -0,0 +1,32 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.utils;
+
+public class Throwables
+{
+
+    public static Throwable merge(Throwable existingFail, Throwable newFail)
+    {
+        if (existingFail == null)
+            return newFail;
+        existingFail.addSuppressed(newFail);
+        return existingFail;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c3fefa0/src/java/org/apache/cassandra/utils/concurrent/Refs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Refs.java b/src/java/org/apache/cassandra/utils/concurrent/Refs.java
index 3a930d2..b24fc2f 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Refs.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Refs.java
@@ -2,9 +2,15 @@ package org.apache.cassandra.utils.concurrent;
 
 import java.util.*;
 
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
 import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 
+import static org.apache.cassandra.utils.Throwables.merge;
+
 /**
  * A collection of managed Ref references to RefCounted objects, and the objects they are referencing.
  * Care MUST be taken when using this collection, as if a permanent reference to it leaks we will not
@@ -196,7 +202,7 @@ public final class Refs<T extends RefCounted<T>> extends AbstractCollection<T> i
         throw new IllegalStateException();
     }
 
-    private static void release(Iterable<? extends Ref<?>> refs)
+    public static void release(Iterable<? extends Ref<?>> refs)
     {
         Throwable fail = null;
         for (Ref ref : refs)
@@ -207,13 +213,22 @@ public final class Refs<T extends RefCounted<T>> extends AbstractCollection<T> i
             }
             catch (Throwable t)
             {
-                if (fail == null)
-                    fail = t;
-                else
-                    fail.addSuppressed(t);
+                fail = merge(fail, t);
             }
         }
         if (fail != null)
             throw Throwables.propagate(fail);
     }
+
+    public static <T extends SelfRefCounted<T>> Iterable<Ref<T>> selfRefs(Iterable<T> refs)
+    {
+        return Iterables.transform(refs, new Function<T, Ref<T>>()
+        {
+            @Nullable
+            public Ref<T> apply(T t)
+            {
+                return t.selfRef();
+            }
+        });
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c3fefa0/src/java/org/apache/cassandra/utils/concurrent/SelfRefCounted.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/SelfRefCounted.java b/src/java/org/apache/cassandra/utils/concurrent/SelfRefCounted.java
new file mode 100644
index 0000000..cb45757
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/concurrent/SelfRefCounted.java
@@ -0,0 +1,24 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.utils.concurrent;
+
+public interface SelfRefCounted<T extends SelfRefCounted<T>> extends RefCounted<T>
+{
+    public Ref<T> selfRef();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c3fefa0/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 4957e5a..258b6b5 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -21,6 +21,9 @@ import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
 import com.google.common.collect.Sets;
 import org.junit.Test;
 
@@ -301,48 +304,83 @@ public class SSTableRewriterTest extends SchemaLoader
     @Test
     public void testNumberOfFiles_abort() throws Exception
     {
-        Keyspace keyspace = Keyspace.open(KEYSPACE);
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
+        testNumberOfFiles_abort(new RewriterTest()
+        {
+            public void run(ISSTableScanner scanner, CompactionController controller, SSTableReader sstable, ColumnFamilyStore cfs, SSTableRewriter rewriter)
+            {
+                int files = 1;
+                while(scanner.hasNext())
+                {
+                    rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+                    if (rewriter.currentWriter().getFilePointer() > 25000000)
+                    {
+                        rewriter.switchWriter(getWriter(cfs, sstable.descriptor.directory));
+                        files++;
+                        assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+                    }
+                }
+                rewriter.abort();
+            }
+        });
+    }
 
-        SSTableReader s = writeFile(cfs, 1000);
-        cfs.addSSTable(s);
-        long startSize = cfs.metric.liveDiskSpaceUsed.count();
-        DecoratedKey origFirst = s.first;
-        DecoratedKey origLast = s.last;
-        Set<SSTableReader> compacting = Sets.newHashSet(s);
-        SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
-        rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+    @Test
+    public void testNumberOfFiles_abort2() throws Exception
+    {
+        testNumberOfFiles_abort(new RewriterTest()
+        {
+            public void run(ISSTableScanner scanner, CompactionController controller, SSTableReader sstable, ColumnFamilyStore cfs, SSTableRewriter rewriter)
+            {
+                int files = 1;
+                while(scanner.hasNext())
+                {
+                    rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+                    if (rewriter.currentWriter().getFilePointer() > 25000000)
+                    {
+                        rewriter.switchWriter(getWriter(cfs, sstable.descriptor.directory));
+                        files++;
+                        assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+                    }
+                    if (files == 3)
+                    {
+                        //testing to abort when we have nothing written in the new file
+                        rewriter.abort();
+                        break;
+                    }
+                }
+            }
+        });
+    }
 
-        int files = 1;
-        try (ISSTableScanner scanner = s.getScanner();
-             CompactionController controller = new CompactionController(cfs, compacting, 0))
+    @Test
+    public void testNumberOfFiles_abort3() throws Exception
+    {
+        testNumberOfFiles_abort(new RewriterTest()
         {
-            while(scanner.hasNext())
+            public void run(ISSTableScanner scanner, CompactionController controller, SSTableReader sstable, ColumnFamilyStore cfs, SSTableRewriter rewriter)
             {
-                rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
-                if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+                int files = 1;
+                while(scanner.hasNext())
                 {
-                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
-                    files++;
-                    assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+                    rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+                    if (files == 1 && rewriter.currentWriter().getFilePointer() > 10000000)
+                    {
+                        rewriter.switchWriter(getWriter(cfs, sstable.descriptor.directory));
+                        files++;
+                        assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+                    }
                 }
+                rewriter.abort();
             }
-        }
-        rewriter.abort();
-        Thread.sleep(1000);
-        assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.count());
-        assertEquals(1, cfs.getSSTables().size());
-        assertFileCounts(s.descriptor.directory.list(), 0, 0);
-        assertEquals(cfs.getSSTables().iterator().next().first, origFirst);
-        assertEquals(cfs.getSSTables().iterator().next().last, origLast);
-        validateCFS(cfs);
+        });
+    }
 
+    private static interface RewriterTest
+    {
+        public void run(ISSTableScanner scanner, CompactionController controller, SSTableReader sstable, ColumnFamilyStore cfs, SSTableRewriter rewriter);
     }
 
-    @Test
-    public void testNumberOfFiles_abort2() throws Exception
+    private void testNumberOfFiles_abort(RewriterTest test) throws Exception
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -353,36 +391,22 @@ public class SSTableRewriterTest extends SchemaLoader
 
         DecoratedKey origFirst = s.first;
         DecoratedKey origLast = s.last;
+        long startSize = cfs.metric.liveDiskSpaceUsed.count();
         Set<SSTableReader> compacting = Sets.newHashSet(s);
         SSTableRewriter.overrideOpenInterval(10000000);
         SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
-        int files = 1;
         try (ISSTableScanner scanner = s.getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0))
         {
-            while(scanner.hasNext())
-            {
-                rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
-                if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
-                {
-                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
-                    files++;
-                    assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
-                }
-                if (files == 3)
-                {
-                    //testing to abort when we have nothing written in the new file
-                    rewriter.abort();
-                    break;
-                }
-            }
+            test.run(scanner, controller, s, cfs, rewriter);
         }
+
         Thread.sleep(1000);
+        assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.count());
         assertEquals(1, cfs.getSSTables().size());
         assertFileCounts(s.descriptor.directory.list(), 0, 0);
-
         assertEquals(cfs.getSSTables().iterator().next().first, origFirst);
         assertEquals(cfs.getSSTables().iterator().next().last, origLast);
         validateCFS(cfs);


[5/5] cassandra git commit: Merge branch 'cassandra-2.1' into trunk

Posted by be...@apache.org.
Merge branch 'cassandra-2.1' into trunk

Conflicts:
	CHANGES.txt
	NEWS.txt
	src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
	src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
	test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/20b62de8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/20b62de8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/20b62de8

Branch: refs/heads/trunk
Commit: 20b62de80b27066d09372a824898febc794ad239
Parents: b5b1af7 3c3fefa
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Mar 4 15:03:36 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Mar 4 15:03:36 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   3 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |   1 +
 .../org/apache/cassandra/db/DataTracker.java    |   4 +-
 .../db/compaction/CompactionManager.java        |   5 +-
 .../SizeTieredCompactionStrategyOptions.java    |   3 -
 .../cassandra/io/sstable/SSTableRewriter.java   |  96 +++++++++++---
 .../io/sstable/format/SSTableReader.java        |   3 +-
 .../org/apache/cassandra/utils/Throwables.java  |  32 +++++
 .../apache/cassandra/utils/concurrent/Refs.java |  25 +++-
 .../utils/concurrent/SelfRefCounted.java        |  24 ++++
 .../io/sstable/SSTableRewriterTest.java         | 124 +++++++++++--------
 12 files changed, 241 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/20b62de8/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 1759f7c,6133536..596b4af
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,65 -1,5 +1,66 @@@
 +3.0
 + * Add role based access control (CASSANDRA-7653, 8650, 7216, 8760, 8849)
 + * Avoid accessing partitioner through StorageProxy (CASSANDRA-8244, 8268)
 + * Upgrade Metrics library and remove depricated metrics (CASSANDRA-5657)
 + * Serializing Row cache alternative, fully off heap (CASSANDRA-7438)
 + * Duplicate rows returned when in clause has repeated values (CASSANDRA-6707)
 + * Make CassandraException unchecked, extend RuntimeException (CASSANDRA-8560)
 + * Support direct buffer decompression for reads (CASSANDRA-8464)
 + * DirectByteBuffer compatible LZ4 methods (CASSANDRA-7039)
 + * Group sstables for anticompaction correctly (CASSANDRA-8578)
 + * Add ReadFailureException to native protocol, respond
 +   immediately when replicas encounter errors while handling
 +   a read request (CASSANDRA-7886)
 + * Switch CommitLogSegment from RandomAccessFile to nio (CASSANDRA-8308)
 + * Allow mixing token and partition key restrictions (CASSANDRA-7016)
 + * Support index key/value entries on map collections (CASSANDRA-8473)
 + * Modernize schema tables (CASSANDRA-8261)
 + * Support for user-defined aggregation functions (CASSANDRA-8053)
 + * Fix NPE in SelectStatement with empty IN values (CASSANDRA-8419)
 + * Refactor SelectStatement, return IN results in natural order instead
 +   of IN value list order and ignore duplicate values in partition key IN restrictions (CASSANDRA-7981)
 + * Support UDTs, tuples, and collections in user-defined
 +   functions (CASSANDRA-7563)
 + * Fix aggregate fn results on empty selection, result column name,
 +   and cqlsh parsing (CASSANDRA-8229)
 + * Mark sstables as repaired after full repair (CASSANDRA-7586)
 + * Extend Descriptor to include a format value and refactor reader/writer
 +   APIs (CASSANDRA-7443)
 + * Integrate JMH for microbenchmarks (CASSANDRA-8151)
 + * Keep sstable levels when bootstrapping (CASSANDRA-7460)
 + * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838)
 + * Support for aggregation functions (CASSANDRA-4914)
 + * Remove cassandra-cli (CASSANDRA-7920)
 + * Accept dollar quoted strings in CQL (CASSANDRA-7769)
 + * Make assassinate a first class command (CASSANDRA-7935)
 + * Support IN clause on any partition key column (CASSANDRA-7855)
 + * Support IN clause on any clustering column (CASSANDRA-4762)
 + * Improve compaction logging (CASSANDRA-7818)
 + * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
 + * Do anticompaction in groups (CASSANDRA-6851)
 + * Support user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929,
 +   7924, 7812, 8063, 7813, 7708)
 + * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
 + * Move sstable RandomAccessReader to nio2, which allows using the
 +   FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
 + * Remove CQL2 (CASSANDRA-5918)
 + * Add Thrift get_multi_slice call (CASSANDRA-6757)
 + * Optimize fetching multiple cells by name (CASSANDRA-6933)
 + * Allow compilation in java 8 (CASSANDRA-7028)
 + * Make incremental repair default (CASSANDRA-7250)
 + * Enable code coverage thru JaCoCo (CASSANDRA-7226)
 + * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369) 
 + * Shorten SSTable path (CASSANDRA-6962)
 + * Use unsafe mutations for most unit tests (CASSANDRA-6969)
 + * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
 + * Fail on very large batch sizes (CASSANDRA-8011)
 + * Improve concurrency of repair (CASSANDRA-6455, 8208)
 + * Select optimal CRC32 implementation at runtime (CASSANDRA-8614)
 + * Evaluate MurmurHash of Token once per query (CASSANDRA-7096)
 +
 +
  2.1.4
+  * Make SSTableRewriter.abort() more robust to failure (CASSANDRA-8832)
   * Remove cold_reads_to_omit from STCS (CASSANDRA-8860)
   * Make EstimatedHistogram#percentile() use ceil instead of floor (CASSANDRA-8883)
   * Fix top partitions reporting wrong cardinality (CASSANDRA-8834)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/20b62de8/NEWS.txt
----------------------------------------------------------------------
diff --cc NEWS.txt
index b492e4c,06013b8..73da5ba
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -13,60 -13,6 +13,63 @@@ restore snapshots created with the prev
  'sstableloader' tool. You can upgrade the file format of your snapshots
  using the provided 'sstableupgrade' tool.
  
++<<<<<<< HEAD
 +3.0
 +===
 +
 +New features
 +------------
 +   - Authentication & Authorization APIs have been updated to introduce
 +     roles. Roles and Permissions granted to them are inherited, supporting
 +     role based access control. The role concept supercedes that of users
 +     and CQL constructs such as CREATE USER are deprecated but retained for
 +     compatibility. The requirement to explicitly create Roles in Cassandra
 +     even when auth is handled by an external system has been removed, so
 +     authentication & authorization can be delegated to such systems in their
 +     entirety.
 +   - In addition to the above, Roles are also first class resources and can be the
 +     subject of permissions. Users (roles) can now be granted permissions on other
 +     roles, including CREATE, ALTER, DROP & AUTHORIZE, which removesthe need for
 +     superuser privileges in order to perform user/role management operations.
 +   - Creators of database resources (Keyspaces, Tables, Roles) are now automatically
 +     granted all permissions on them (if the IAuthorizer implementation supports
 +     this).
 +   - SSTable file name is changed. Now you don't have Keyspace/CF name
 +     in file name. Also, secondary index has its own directory under parent's
 +     directory.
 +   - Support for user-defined functions and user-defined aggregates have
 +     been added to CQL.
 +
 +
 +Upgrading
 +---------
 +   - Pig's CqlStorage has been removed, use CqlNativeStorage instead
 +   - IAuthenticator been updated to remove responsibility for user/role
 +     maintenance and is now solely responsible for validating credentials,
 +     This is primarily done via SASL, though an optional method exists for
 +     systems which need support for the Thrift login() method.
 +   - IRoleManager interface has been added which takes over the maintenance
 +     functions from IAuthenticator. IAuthorizer is mainly unchanged. Auth data
 +     in systems using the stock internal implementations PasswordAuthenticator
 +     & CassandraAuthorizer will be automatically converted during upgrade,
 +     with minimal operator intervention required. Custom implementations will
 +     require modification, though these can be used in conjunction with the
 +     stock CassandraRoleManager so providing an IRoleManager implementation
 +     should not usually be necessary.
 +   - Fat client support has been removed since we have push notifications to clients
 +   - cassandra-cli has been removed. Please use cqlsh instead.
 +   - YamlFileNetworkTopologySnitch has been removed; switch to
 +     GossipingPropertyFileSnitch instead.
 +   - CQL2 has been removed entirely in this release (previously deprecated
 +     in 2.0.0). Please switch to CQL3 if you haven't already done so.
 +   - Very large batches will now be rejected (defaults to 50kb). This
 +     can be customized by modifying batch_size_fail_threshold_in_kb.
 +   - The results of CQL3 queries containing an IN restriction will be ordered
 +     in the normal order and not anymore in the order in which the column values were
 +     specified in the IN restriction.
 +
++=======
++>>>>>>> cassandra-2.1
  2.1.4
  =====
  Upgrading

http://git-wip-us.apache.org/repos/asf/cassandra/blob/20b62de8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/20b62de8/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/20b62de8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/20b62de8/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------


[4/5] cassandra git commit: Merge branch 'cassandra-2.1' into trunk

Posted by be...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/20b62de8/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 4468d57,0000000..7953d98
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@@ -1,2057 -1,0 +1,2058 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.sstable.format;
 +
 +import java.io.*;
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.concurrent.*;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicLong;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Predicate;
 +import com.google.common.collect.Iterators;
 +import com.google.common.collect.Ordering;
 +import com.google.common.primitives.Longs;
 +import com.google.common.util.concurrent.RateLimiter;
 +
 +import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
 +import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
 +import com.clearspring.analytics.stream.cardinality.ICardinality;
 +import org.apache.cassandra.cache.CachingOptions;
 +import org.apache.cassandra.cache.InstrumentingCache;
 +import org.apache.cassandra.cache.KeyCacheKey;
 +import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 +import org.apache.cassandra.concurrent.ScheduledExecutors;
 +import org.apache.cassandra.config.*;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 +import org.apache.cassandra.db.commitlog.ReplayPosition;
 +import org.apache.cassandra.db.composites.CellName;
 +import org.apache.cassandra.db.filter.ColumnSlice;
 +import org.apache.cassandra.db.index.SecondaryIndex;
 +import org.apache.cassandra.dht.*;
 +import org.apache.cassandra.io.compress.CompressionMetadata;
 +import org.apache.cassandra.io.sstable.*;
 +import org.apache.cassandra.io.sstable.metadata.*;
 +import org.apache.cassandra.io.util.*;
 +import org.apache.cassandra.metrics.RestorableMeter;
 +import org.apache.cassandra.metrics.StorageMetrics;
 +import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.service.CacheService;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.*;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +import org.apache.cassandra.utils.concurrent.Ref;
 +import org.apache.cassandra.utils.concurrent.RefCounted;
++import org.apache.cassandra.utils.concurrent.SelfRefCounted;
 +
 +import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR;
 +
 +/**
 + * An SSTableReader can be constructed in a number of places, but typically is either
 + * read from disk at startup, or constructed from a flushed memtable, or after compaction
 + * to replace some existing sstables. However once created, an sstablereader may also be modified.
 + *
 + * A reader's OpenReason describes its current stage in its lifecycle, as follows:
 + *
 + * NORMAL
 + * From:       None        => Reader has been read from disk, either at startup or from a flushed memtable
 + *             EARLY       => Reader is the final result of a compaction
 + *             MOVED_START => Reader WAS being compacted, but this failed and it has been restored to NORMAL status
 + *
 + * EARLY
 + * From:       None        => Reader is a compaction replacement that is either incomplete and has been opened
 + *                            to represent its partial result status, or has been finished but the compaction
 + *                            it is a part of has not yet completed fully
 + *             EARLY       => Same as from None, only it is not the first time it has been
 + *
 + * MOVED_START
 + * From:       NORMAL      => Reader is being compacted. This compaction has not finished, but the compaction result
 + *                            is either partially or fully opened, to either partially or fully replace this reader.
 + *                            This reader's start key has been updated to represent this, so that reads only hit
 + *                            one or the other reader.
 + *
 + * METADATA_CHANGE
 + * From:       NORMAL      => Reader has seen low traffic and the amount of memory available for index summaries is
 + *                            constrained, so its index summary has been downsampled.
 + *         METADATA_CHANGE => Same
 + *
 + * Note that in parallel to this, there are two different Descriptor types; TMPLINK and FINAL; the latter corresponds
 + * to NORMAL state readers and all readers that replace a NORMAL one. TMPLINK is used for EARLY state readers and
 + * no others.
 + *
 + * When a reader is being compacted, if the result is large its replacement may be opened as EARLY before compaction
 + * completes in order to present the result to consumers earlier. In this case the reader will itself be changed to
 + * a MOVED_START state, where its start no longer represents its on-disk minimum key. This is to permit reads to be
 + * directed to only one reader when the two represent the same data. The EARLY file can represent a compaction result
 + * that is either partially complete and still in-progress, or a complete and immutable sstable that is part of a larger
 + * macro compaction action that has not yet fully completed.
 + *
 + * Currently ALL compaction results at least briefly go through an EARLY open state prior to completion, regardless
 + * of if early opening is enabled.
 + *
 + * Since a reader can be created multiple times over the same shared underlying resources, and the exact resources
 + * it shares between each instance differ subtly, we track the lifetime of any underlying resource with its own
 + * reference count, which each instance takes a Ref to. Each instance then tracks references to itself, and once these
 + * all expire it releases its Refs to these underlying resources.
 + *
 + * There is some shared cleanup behaviour needed only once all sstablereaders in a certain stage of their lifecycle
 + * (i.e. EARLY or NORMAL opening), and some that must only occur once all readers of any kind over a single logical
 + * sstable have expired. These are managed by the TypeTidy and GlobalTidy classes at the bottom, and are effectively
 + * managed as another resource each instance tracks its own Ref instance to, to ensure all of these resources are
 + * cleaned up safely and can be debugged otherwise.
 + *
 + * TODO: fill in details about DataTracker and lifecycle interactions for tools, and for compaction strategies
 + */
- public abstract class SSTableReader extends SSTable implements RefCounted<SSTableReader>
++public abstract class SSTableReader extends SSTable implements SelfRefCounted<SSTableReader>
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class);
 +
 +    private static final ScheduledThreadPoolExecutor syncExecutor = new ScheduledThreadPoolExecutor(1);
 +    private static final RateLimiter meterSyncThrottle = RateLimiter.create(100.0);
 +
 +    public static final Comparator<SSTableReader> maxTimestampComparator = new Comparator<SSTableReader>()
 +    {
 +        public int compare(SSTableReader o1, SSTableReader o2)
 +        {
 +            long ts1 = o1.getMaxTimestamp();
 +            long ts2 = o2.getMaxTimestamp();
 +            return (ts1 > ts2 ? -1 : (ts1 == ts2 ? 0 : 1));
 +        }
 +    };
 +
 +    public static final Comparator<SSTableReader> sstableComparator = new Comparator<SSTableReader>()
 +    {
 +        public int compare(SSTableReader o1, SSTableReader o2)
 +        {
 +            return o1.first.compareTo(o2.first);
 +        }
 +    };
 +
 +    public static final Ordering<SSTableReader> sstableOrdering = Ordering.from(sstableComparator);
 +
 +    /**
 +     * maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an upper bound
 +     * to the newest piece of data stored in the sstable. In other words, this sstable does not contain items created
 +     * later than maxDataAge.
 +     *
 +     * The field is not serialized to disk, so relying on it for more than what truncate does is not advised.
 +     *
 +     * When a new sstable is flushed, maxDataAge is set to the time of creation.
 +     * When a sstable is created from compaction, maxDataAge is set to max of all merged sstables.
 +     *
 +     * The age is in milliseconds since epoc and is local to this host.
 +     */
 +    public final long maxDataAge;
 +
 +    public enum OpenReason
 +    {
 +        NORMAL,
 +        EARLY,
 +        METADATA_CHANGE,
 +        MOVED_START,
 +        SHADOWED // => MOVED_START past end
 +    }
 +
 +    public final OpenReason openReason;
 +
 +    // indexfile and datafile: might be null before a call to load()
 +    protected SegmentedFile ifile;
 +    protected SegmentedFile dfile;
 +    protected IndexSummary indexSummary;
 +    protected IFilter bf;
 +
 +    protected final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
 +
 +    protected InstrumentingCache<KeyCacheKey, RowIndexEntry> keyCache;
 +
 +    protected final BloomFilterTracker bloomFilterTracker = new BloomFilterTracker();
 +
 +    // technically isCompacted is not necessary since it should never be unreferenced unless it is also compacted,
 +    // but it seems like a good extra layer of protection against reference counting bugs to not delete data based on that alone
 +    protected final AtomicBoolean isSuspect = new AtomicBoolean(false);
 +
 +    // not final since we need to be able to change level on a file.
 +    protected volatile StatsMetadata sstableMetadata;
 +
 +    protected final AtomicLong keyCacheHit = new AtomicLong(0);
 +    protected final AtomicLong keyCacheRequest = new AtomicLong(0);
 +
 +    private final InstanceTidier tidy = new InstanceTidier(descriptor, metadata);
 +    private final Ref<SSTableReader> selfRef = new Ref<>(this, tidy);
 +
 +    private RestorableMeter readMeter;
 +
 +    /**
 +     * Calculate approximate key count.
 +     * If cardinality estimator is available on all given sstables, then this method use them to estimate
 +     * key count.
 +     * If not, then this uses index summaries.
 +     *
 +     * @param sstables SSTables to calculate key count
 +     * @return estimated key count
 +     */
 +    public static long getApproximateKeyCount(Collection<SSTableReader> sstables)
 +    {
 +        long count = -1;
 +
 +        // check if cardinality estimator is available for all SSTables
 +        boolean cardinalityAvailable = !sstables.isEmpty() && Iterators.all(sstables.iterator(), new Predicate<SSTableReader>()
 +        {
 +            public boolean apply(SSTableReader sstable)
 +            {
 +                return sstable.descriptor.version.hasNewStatsFile();
 +            }
 +        });
 +
 +        // if it is, load them to estimate key count
 +        if (cardinalityAvailable)
 +        {
 +            boolean failed = false;
 +            ICardinality cardinality = null;
 +            for (SSTableReader sstable : sstables)
 +            {
 +                try
 +                {
 +                    CompactionMetadata metadata = (CompactionMetadata) sstable.descriptor.getMetadataSerializer().deserialize(sstable.descriptor, MetadataType.COMPACTION);
 +                    assert metadata != null : sstable.getFilename();
 +                    if (cardinality == null)
 +                        cardinality = metadata.cardinalityEstimator;
 +                    else
 +                        cardinality = cardinality.merge(metadata.cardinalityEstimator);
 +                }
 +                catch (IOException e)
 +                {
 +                    logger.warn("Reading cardinality from Statistics.db failed.", e);
 +                    failed = true;
 +                    break;
 +                }
 +                catch (CardinalityMergeException e)
 +                {
 +                    logger.warn("Cardinality merge failed.", e);
 +                    failed = true;
 +                    break;
 +                }
 +            }
 +            if (cardinality != null && !failed)
 +                count = cardinality.cardinality();
 +        }
 +
 +        // if something went wrong above or cardinality is not available, calculate using index summary
 +        if (count < 0)
 +        {
 +            for (SSTableReader sstable : sstables)
 +                count += sstable.estimatedKeys();
 +        }
 +        return count;
 +    }
 +
 +    /**
 +     * Estimates how much of the keys we would keep if the sstables were compacted together
 +     */
 +    public static double estimateCompactionGain(Set<SSTableReader> overlapping)
 +    {
 +        Set<ICardinality> cardinalities = new HashSet<>(overlapping.size());
 +        for (SSTableReader sstable : overlapping)
 +        {
 +            try
 +            {
 +                ICardinality cardinality = ((CompactionMetadata) sstable.descriptor.getMetadataSerializer().deserialize(sstable.descriptor, MetadataType.COMPACTION)).cardinalityEstimator;
 +                if (cardinality != null)
 +                    cardinalities.add(cardinality);
 +                else
 +                    logger.debug("Got a null cardinality estimator in: "+sstable.getFilename());
 +            }
 +            catch (IOException e)
 +            {
 +                logger.warn("Could not read up compaction metadata for " + sstable, e);
 +            }
 +        }
 +        long totalKeyCountBefore = 0;
 +        for (ICardinality cardinality : cardinalities)
 +        {
 +            totalKeyCountBefore += cardinality.cardinality();
 +        }
 +        if (totalKeyCountBefore == 0)
 +            return 1;
 +
 +        long totalKeyCountAfter = mergeCardinalities(cardinalities).cardinality();
 +        logger.debug("Estimated compaction gain: {}/{}={}", totalKeyCountAfter, totalKeyCountBefore, ((double)totalKeyCountAfter)/totalKeyCountBefore);
 +        return ((double)totalKeyCountAfter)/totalKeyCountBefore;
 +    }
 +
 +    private static ICardinality mergeCardinalities(Collection<ICardinality> cardinalities)
 +    {
 +        ICardinality base = new HyperLogLogPlus(13, 25); // see MetadataCollector.cardinality
 +        try
 +        {
 +            base = base.merge(cardinalities.toArray(new ICardinality[cardinalities.size()]));
 +        }
 +        catch (CardinalityMergeException e)
 +        {
 +            logger.warn("Could not merge cardinalities", e);
 +        }
 +        return base;
 +    }
 +
 +    public static SSTableReader open(Descriptor descriptor) throws IOException
 +    {
 +        CFMetaData metadata;
 +        if (descriptor.cfname.contains(SECONDARY_INDEX_NAME_SEPARATOR))
 +        {
 +            int i = descriptor.cfname.indexOf(SECONDARY_INDEX_NAME_SEPARATOR);
 +            String parentName = descriptor.cfname.substring(0, i);
 +            CFMetaData parent = Schema.instance.getCFMetaData(descriptor.ksname, parentName);
 +            ColumnDefinition def = parent.getColumnDefinitionForIndex(descriptor.cfname.substring(i + 1));
 +            metadata = CFMetaData.newIndexMetadata(parent, def, SecondaryIndex.getIndexComparator(parent, def));
 +        }
 +        else
 +        {
 +            metadata = Schema.instance.getCFMetaData(descriptor.ksname, descriptor.cfname);
 +        }
 +        return open(descriptor, metadata);
 +    }
 +
 +    public static SSTableReader open(Descriptor desc, CFMetaData metadata) throws IOException
 +    {
 +        IPartitioner p = desc.cfname.contains(SECONDARY_INDEX_NAME_SEPARATOR)
 +                ? new LocalPartitioner(metadata.getKeyValidator())
 +                : StorageService.getPartitioner();
 +        return open(desc, componentsFor(desc), metadata, p);
 +    }
 +
 +    public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
 +    {
 +        return open(descriptor, components, metadata, partitioner, true);
 +    }
 +
 +    public static SSTableReader openNoValidation(Descriptor descriptor, Set<Component> components, CFMetaData metadata) throws IOException
 +    {
 +        return open(descriptor, components, metadata, StorageService.getPartitioner(), false);
 +    }
 +
 +    /**
 +     * Open SSTable reader to be used in batch mode(such as sstableloader).
 +     *
 +     * @param descriptor
 +     * @param components
 +     * @param metadata
 +     * @param partitioner
 +     * @return opened SSTableReader
 +     * @throws IOException
 +     */
 +    public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
 +    {
 +        // Minimum components without which we can't do anything
 +        assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
 +        assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
 +
 +        Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
 +                EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
 +        ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
 +        StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
 +
 +        // Check if sstable is created using same partitioner.
 +        // Partitioner can be null, which indicates older version of sstable or no stats available.
 +        // In that case, we skip the check.
 +        String partitionerName = partitioner.getClass().getCanonicalName();
 +        if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
 +        {
 +            logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s.  Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
 +                    descriptor, validationMetadata.partitioner, partitionerName));
 +            System.exit(1);
 +        }
 +
 +        logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
 +        SSTableReader sstable = internalOpen(descriptor, components, metadata, partitioner, System.currentTimeMillis(),
 +                statsMetadata, OpenReason.NORMAL);
 +
 +        // special implementation of load to use non-pooled SegmentedFile builders
 +        SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
 +        SegmentedFile.Builder dbuilder = sstable.compression
 +                ? new CompressedSegmentedFile.Builder(null)
 +                : new BufferedSegmentedFile.Builder();
 +        if (!sstable.loadSummary(ibuilder, dbuilder))
 +            sstable.buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL);
 +        sstable.ifile = ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
 +        sstable.dfile = dbuilder.complete(sstable.descriptor.filenameFor(Component.DATA));
 +        sstable.bf = FilterFactory.AlwaysPresent;
 +        sstable.setup();
 +        return sstable;
 +    }
 +
 +    private static SSTableReader open(Descriptor descriptor,
 +                                      Set<Component> components,
 +                                      CFMetaData metadata,
 +                                      IPartitioner partitioner,
 +                                      boolean validate) throws IOException
 +    {
 +        // Minimum components without which we can't do anything
 +        assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
 +        assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
 +
 +        Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
 +                EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
 +        ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
 +        StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
 +
 +        // Check if sstable is created using same partitioner.
 +        // Partitioner can be null, which indicates older version of sstable or no stats available.
 +        // In that case, we skip the check.
 +        String partitionerName = partitioner.getClass().getCanonicalName();
 +        if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
 +        {
 +            logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s.  Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
 +                    descriptor, validationMetadata.partitioner, partitionerName));
 +            System.exit(1);
 +        }
 +
 +        logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
 +        SSTableReader sstable = internalOpen(descriptor, components, metadata, partitioner, System.currentTimeMillis(),
 +                statsMetadata, OpenReason.NORMAL);
 +
 +        // load index and filter
 +        long start = System.nanoTime();
 +        sstable.load(validationMetadata);
 +        logger.debug("INDEX LOAD TIME for {}: {} ms.", descriptor, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
 +
 +        sstable.setup();
 +        if (validate)
 +            sstable.validate();
 +
 +        if (sstable.getKeyCache() != null)
 +            logger.debug("key cache contains {}/{} keys", sstable.getKeyCache().size(), sstable.getKeyCache().getCapacity());
 +
 +        return sstable;
 +    }
 +
 +    public static void logOpenException(Descriptor descriptor, IOException e)
 +    {
 +        if (e instanceof FileNotFoundException)
 +            logger.error("Missing sstable component in {}; skipped because of {}", descriptor, e.getMessage());
 +        else
 +            logger.error("Corrupt sstable {}; skipped", descriptor, e);
 +    }
 +
 +    public static Collection<SSTableReader> openAll(Set<Map.Entry<Descriptor, Set<Component>>> entries,
 +                                                    final CFMetaData metadata,
 +                                                    final IPartitioner partitioner)
 +    {
 +        final Collection<SSTableReader> sstables = new LinkedBlockingQueue<>();
 +
 +        ExecutorService executor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("SSTableBatchOpen", FBUtilities.getAvailableProcessors());
 +        for (final Map.Entry<Descriptor, Set<Component>> entry : entries)
 +        {
 +            Runnable runnable = new Runnable()
 +            {
 +                public void run()
 +                {
 +                    SSTableReader sstable;
 +                    try
 +                    {
 +                        sstable = open(entry.getKey(), entry.getValue(), metadata, partitioner);
 +                    }
 +                    catch (IOException ex)
 +                    {
 +                        logger.error("Corrupt sstable {}; skipped", entry, ex);
 +                        return;
 +                    }
 +                    sstables.add(sstable);
 +                }
 +            };
 +            executor.submit(runnable);
 +        }
 +
 +        executor.shutdown();
 +        try
 +        {
 +            executor.awaitTermination(7, TimeUnit.DAYS);
 +        }
 +        catch (InterruptedException e)
 +        {
 +            throw new AssertionError(e);
 +        }
 +
 +        return sstables;
 +
 +    }
 +
 +    /**
 +     * Open a RowIndexedReader which already has its state initialized (by SSTableWriter).
 +     */
 +    public static SSTableReader internalOpen(Descriptor desc,
 +                                      Set<Component> components,
 +                                      CFMetaData metadata,
 +                                      IPartitioner partitioner,
 +                                      SegmentedFile ifile,
 +                                      SegmentedFile dfile,
 +                                      IndexSummary isummary,
 +                                      IFilter bf,
 +                                      long maxDataAge,
 +                                      StatsMetadata sstableMetadata,
 +                                      OpenReason openReason)
 +    {
 +        assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null;
 +
 +        SSTableReader reader = internalOpen(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
 +
 +        reader.bf = bf;
 +        reader.ifile = ifile;
 +        reader.dfile = dfile;
 +        reader.indexSummary = isummary;
 +        reader.setup();
 +
 +        return reader;
 +    }
 +
 +
 +    private static SSTableReader internalOpen(final Descriptor descriptor,
 +                                            Set<Component> components,
 +                                            CFMetaData metadata,
 +                                            IPartitioner partitioner,
 +                                            Long maxDataAge,
 +                                            StatsMetadata sstableMetadata,
 +                                            OpenReason openReason)
 +    {
 +        Factory readerFactory = descriptor.getFormat().getReaderFactory();
 +
 +        return readerFactory.open(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
 +    }
 +
 +    protected SSTableReader(final Descriptor desc,
 +                            Set<Component> components,
 +                            CFMetaData metadata,
 +                            IPartitioner partitioner,
 +                            long maxDataAge,
 +                            StatsMetadata sstableMetadata,
 +                            OpenReason openReason)
 +    {
 +        super(desc, components, metadata, partitioner);
 +        this.sstableMetadata = sstableMetadata;
 +        this.maxDataAge = maxDataAge;
 +        this.openReason = openReason;
 +        this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata);
 +    }
 +
 +    public static long getTotalBytes(Iterable<SSTableReader> sstables)
 +    {
 +        long sum = 0;
 +        for (SSTableReader sstable : sstables)
 +        {
 +            sum += sstable.onDiskLength();
 +        }
 +        return sum;
 +    }
 +
 +    public boolean equals(Object that)
 +    {
 +        return that instanceof SSTableReader && ((SSTableReader) that).descriptor.equals(this.descriptor);
 +    }
 +
 +    public int hashCode()
 +    {
 +        return this.descriptor.hashCode();
 +    }
 +
 +    public String getFilename()
 +    {
 +        return dfile.path;
 +    }
 +
 +    public String getIndexFilename()
 +    {
 +        return ifile.path;
 +    }
 +
 +    public void setTrackedBy(DataTracker tracker)
 +    {
 +        tidy.type.deletingTask.setTracker(tracker);
 +        // under normal operation we can do this at any time, but SSTR is also used outside C* proper,
 +        // e.g. by BulkLoader, which does not initialize the cache.  As a kludge, we set up the cache
 +        // here when we know we're being wired into the rest of the server infrastructure.
 +        keyCache = CacheService.instance.keyCache;
 +    }
 +
 +    private void load(ValidationMetadata validation) throws IOException
 +    {
 +        if (metadata.getBloomFilterFpChance() == 1.0)
 +        {
 +            // bf is disabled.
 +            load(false, true);
 +            bf = FilterFactory.AlwaysPresent;
 +        }
 +        else if (!components.contains(Component.FILTER) || validation == null)
 +        {
 +            // bf is enabled, but filter component is missing.
 +            load(true, true);
 +        }
 +        else if (validation.bloomFilterFPChance != metadata.getBloomFilterFpChance())
 +        {
 +            // bf fp chance in sstable metadata and it has changed since compaction.
 +            load(true, true);
 +        }
 +        else
 +        {
 +            // bf is enabled and fp chance matches the currently configured value.
 +            load(false, true);
 +            loadBloomFilter();
 +        }
 +    }
 +
 +    /**
 +     * Load bloom filter from Filter.db file.
 +     *
 +     * @throws IOException
 +     */
 +    private void loadBloomFilter() throws IOException
 +    {
 +        DataInputStream stream = null;
 +        try
 +        {
 +            stream = new DataInputStream(new BufferedInputStream(new FileInputStream(descriptor.filenameFor(Component.FILTER))));
 +            bf = FilterFactory.deserialize(stream, true);
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(stream);
 +        }
 +    }
 +
 +    /**
 +     * Loads ifile, dfile and indexSummary, and optionally recreates the bloom filter.
 +     * @param saveSummaryIfCreated for bulk loading purposes, if the summary was absent and needed to be built, you can
 +     *                             avoid persisting it to disk by setting this to false
 +     */
 +    private void load(boolean recreateBloomFilter, boolean saveSummaryIfCreated) throws IOException
 +    {
 +        SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
 +        SegmentedFile.Builder dbuilder = compression
 +                ? SegmentedFile.getCompressedBuilder()
 +                : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
 +
 +        boolean summaryLoaded = loadSummary(ibuilder, dbuilder);
 +        if (recreateBloomFilter || !summaryLoaded)
 +            buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded, Downsampling.BASE_SAMPLING_LEVEL);
 +
 +        ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
 +        dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
 +        if (saveSummaryIfCreated && (recreateBloomFilter || !summaryLoaded)) // save summary information to disk
 +            saveSummary(ibuilder, dbuilder);
 +    }
 +
 +    /**
 +     * Build index summary(and optionally bloom filter) by reading through Index.db file.
 +     *
 +     * @param recreateBloomFilter true if recreate bloom filter
 +     * @param ibuilder
 +     * @param dbuilder
 +     * @param summaryLoaded true if index summary is already loaded and not need to build again
 +     * @throws IOException
 +     */
 +    private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded, int samplingLevel) throws IOException
 +    {
 +        // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
 +        RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
 +
 +        try
 +        {
 +            long indexSize = primaryIndex.length();
 +            long histogramCount = sstableMetadata.estimatedRowSize.count();
 +            long estimatedKeys = histogramCount > 0 && !sstableMetadata.estimatedRowSize.isOverflowed()
 +                    ? histogramCount
 +                    : estimateRowsFromIndex(primaryIndex); // statistics is supposed to be optional
 +
 +            if (recreateBloomFilter)
 +                bf = FilterFactory.getFilter(estimatedKeys, metadata.getBloomFilterFpChance(), true);
 +
 +            IndexSummaryBuilder summaryBuilder = null;
 +            if (!summaryLoaded)
 +                summaryBuilder = new IndexSummaryBuilder(estimatedKeys, metadata.getMinIndexInterval(), samplingLevel);
 +
 +            long indexPosition;
 +            RowIndexEntry.IndexSerializer rowIndexSerializer = descriptor.getFormat().getIndexSerializer(metadata);
 +
 +            while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
 +            {
 +                ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
 +                RowIndexEntry indexEntry = rowIndexSerializer.deserialize(primaryIndex, descriptor.version);
 +                DecoratedKey decoratedKey = partitioner.decorateKey(key);
 +                if (first == null)
 +                    first = decoratedKey;
 +                last = decoratedKey;
 +
 +                if (recreateBloomFilter)
 +                    bf.add(decoratedKey);
 +
 +                // if summary was already read from disk we don't want to re-populate it using primary index
 +                if (!summaryLoaded)
 +                {
 +                    summaryBuilder.maybeAddEntry(decoratedKey, indexPosition);
 +                    ibuilder.addPotentialBoundary(indexPosition);
 +                    dbuilder.addPotentialBoundary(indexEntry.position);
 +                }
 +            }
 +
 +            if (!summaryLoaded)
 +                indexSummary = summaryBuilder.build(partitioner);
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(primaryIndex);
 +        }
 +
 +        first = getMinimalKey(first);
 +        last = getMinimalKey(last);
 +    }
 +
 +    /**
 +     * Load index summary from Summary.db file if it exists.
 +     *
 +     * if loaded index summary has different index interval from current value stored in schema,
 +     * then Summary.db file will be deleted and this returns false to rebuild summary.
 +     *
 +     * @param ibuilder
 +     * @param dbuilder
 +     * @return true if index summary is loaded successfully from Summary.db file.
 +     */
 +    public boolean loadSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
 +    {
 +        File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
 +        if (!summariesFile.exists())
 +            return false;
 +
 +        DataInputStream iStream = null;
 +        try
 +        {
 +            iStream = new DataInputStream(new FileInputStream(summariesFile));
 +            indexSummary = IndexSummary.serializer.deserialize(iStream, partitioner, descriptor.version.hasSamplingLevel(), metadata.getMinIndexInterval(), metadata.getMaxIndexInterval());
 +            first = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
 +            last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
 +            ibuilder.deserializeBounds(iStream);
 +            dbuilder.deserializeBounds(iStream);
 +        }
 +        catch (IOException e)
 +        {
 +            if (indexSummary != null)
 +                indexSummary.close();
 +            logger.debug("Cannot deserialize SSTable Summary File {}: {}", summariesFile.getPath(), e.getMessage());
 +            // corrupted; delete it and fall back to creating a new summary
 +            FileUtils.closeQuietly(iStream);
 +            // delete it and fall back to creating a new summary
 +            FileUtils.deleteWithConfirm(summariesFile);
 +            return false;
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(iStream);
 +        }
 +
 +        return true;
 +    }
 +
 +    /**
 +     * Save index summary to Summary.db file.
 +     *
 +     * @param ibuilder
 +     * @param dbuilder
 +     */
 +    public void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
 +    {
 +        saveSummary(ibuilder, dbuilder, indexSummary);
 +    }
 +
 +    private void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, IndexSummary summary)
 +    {
 +        File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
 +        if (summariesFile.exists())
 +            FileUtils.deleteWithConfirm(summariesFile);
 +
 +        DataOutputStreamAndChannel oStream = null;
 +        try
 +        {
 +            oStream = new DataOutputStreamAndChannel(new FileOutputStream(summariesFile));
 +            IndexSummary.serializer.serialize(summary, oStream, descriptor.version.hasSamplingLevel());
 +            ByteBufferUtil.writeWithLength(first.getKey(), oStream);
 +            ByteBufferUtil.writeWithLength(last.getKey(), oStream);
 +            ibuilder.serializeBounds(oStream);
 +            dbuilder.serializeBounds(oStream);
 +        }
 +        catch (IOException e)
 +        {
 +            logger.debug("Cannot save SSTable Summary: ", e);
 +
 +            // corrupted hence delete it and let it load it now.
 +            if (summariesFile.exists())
 +                FileUtils.deleteWithConfirm(summariesFile);
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(oStream);
 +        }
 +    }
 +
 +    public void setReplacedBy(SSTableReader replacement)
 +    {
 +        synchronized (tidy.global)
 +        {
 +            assert replacement != null;
 +            assert !tidy.isReplaced;
 +            assert tidy.global.live == this;
 +            tidy.isReplaced = true;
 +            tidy.global.live = replacement;
 +        }
 +    }
 +
 +    public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose)
 +    {
 +        synchronized (tidy.global)
 +        {
 +            assert openReason != OpenReason.EARLY;
 +            SSTableReader replacement = internalOpen(descriptor, components, metadata, partitioner, ifile.sharedCopy(),
 +                                                          dfile.sharedCopy(), indexSummary.sharedCopy(), bf.sharedCopy(),
 +                                                          maxDataAge, sstableMetadata, OpenReason.MOVED_START);
 +            // TODO: make data/index start accurate for compressed files
 +            // TODO: merge with caller's firstKeyBeyond() work,to save time
 +            if (newStart.compareTo(first) > 0)
 +            {
 +                final long dataStart = getPosition(newStart, Operator.EQ).position;
 +                final long indexStart = getIndexScanPosition(newStart);
 +                this.tidy.runOnClose = new Runnable()
 +                {
 +                    public void run()
 +                    {
 +                        CLibrary.trySkipCache(dfile.path, 0, dataStart);
 +                        CLibrary.trySkipCache(ifile.path, 0, indexStart);
 +                        if (runOnClose != null)
 +                            runOnClose.run();
 +                    }
 +                };
 +            }
 +
 +            replacement.first = newStart;
 +            replacement.last = this.last;
 +            setReplacedBy(replacement);
 +            return replacement;
 +        }
 +    }
 +
 +    public SSTableReader cloneAsShadowed(final Runnable runOnClose)
 +    {
 +        synchronized (tidy.global)
 +        {
 +            assert openReason != OpenReason.EARLY;
 +            this.tidy.runOnClose = new Runnable()
 +            {
 +                public void run()
 +                {
 +                    CLibrary.trySkipCache(dfile.path, 0, 0);
 +                    CLibrary.trySkipCache(ifile.path, 0, 0);
 +                    runOnClose.run();
 +                }
 +            };
 +
 +            SSTableReader replacement = internalOpen(descriptor, components, metadata, partitioner, ifile.sharedCopy(),
 +                                                          dfile.sharedCopy(), indexSummary.sharedCopy(), bf.sharedCopy(),
 +                                                          maxDataAge, sstableMetadata, OpenReason.SHADOWED);
 +            replacement.first = first;
 +            replacement.last = last;
 +            setReplacedBy(replacement);
 +            return replacement;
 +        }
 +    }
 +
 +    /**
 +     * Returns a new SSTableReader with the same properties as this SSTableReader except that a new IndexSummary will
 +     * be built at the target samplingLevel.  This (original) SSTableReader instance will be marked as replaced, have
 +     * its DeletingTask removed, and have its periodic read-meter sync task cancelled.
 +     * @param samplingLevel the desired sampling level for the index summary on the new SSTableReader
 +     * @return a new SSTableReader
 +     * @throws IOException
 +     */
 +    public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore parent, int samplingLevel) throws IOException
 +    {
 +        synchronized (tidy.global)
 +        {
 +            assert openReason != OpenReason.EARLY;
 +
 +            int minIndexInterval = metadata.getMinIndexInterval();
 +            int maxIndexInterval = metadata.getMaxIndexInterval();
 +            double effectiveInterval = indexSummary.getEffectiveIndexInterval();
 +
 +            IndexSummary newSummary;
 +            long oldSize = bytesOnDisk();
 +
 +            // We have to rebuild the summary from the on-disk primary index in three cases:
 +            // 1. The sampling level went up, so we need to read more entries off disk
 +            // 2. The min_index_interval changed (in either direction); this changes what entries would be in the summary
 +            //    at full sampling (and consequently at any other sampling level)
 +            // 3. The max_index_interval was lowered, forcing us to raise the sampling level
 +            if (samplingLevel > indexSummary.getSamplingLevel() || indexSummary.getMinIndexInterval() != minIndexInterval || effectiveInterval > maxIndexInterval)
 +            {
 +                newSummary = buildSummaryAtLevel(samplingLevel);
 +            }
 +            else if (samplingLevel < indexSummary.getSamplingLevel())
 +            {
 +                // we can use the existing index summary to make a smaller one
 +                newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, partitioner);
 +
 +                SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
 +                SegmentedFile.Builder dbuilder = compression
 +                        ? SegmentedFile.getCompressedBuilder()
 +                        : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
 +                saveSummary(ibuilder, dbuilder, newSummary);
 +            }
 +            else
 +            {
 +                throw new AssertionError("Attempted to clone SSTableReader with the same index summary sampling level and " +
 +                        "no adjustments to min/max_index_interval");
 +            }
 +
 +            long newSize = bytesOnDisk();
 +            StorageMetrics.load.inc(newSize - oldSize);
 +            parent.metric.liveDiskSpaceUsed.inc(newSize - oldSize);
 +
 +            SSTableReader replacement = internalOpen(descriptor, components, metadata, partitioner, ifile.sharedCopy(),
 +                                                     dfile.sharedCopy(), newSummary, bf.sharedCopy(), maxDataAge,
 +                                                     sstableMetadata, OpenReason.METADATA_CHANGE);
 +            replacement.first = this.first;
 +            replacement.last = this.last;
 +            setReplacedBy(replacement);
 +            return replacement;
 +        }
 +    }
 +
 +    private IndexSummary buildSummaryAtLevel(int newSamplingLevel) throws IOException
 +    {
 +        // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
 +        RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
 +        try
 +        {
 +            long indexSize = primaryIndex.length();
 +            IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(estimatedKeys(), metadata.getMinIndexInterval(), newSamplingLevel);
 +
 +            long indexPosition;
 +            while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
 +            {
 +                summaryBuilder.maybeAddEntry(partitioner.decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition);
 +                RowIndexEntry.Serializer.skip(primaryIndex);
 +            }
 +
 +            return summaryBuilder.build(partitioner);
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(primaryIndex);
 +        }
 +    }
 +
 +    public RestorableMeter getReadMeter()
 +    {
 +        return readMeter;
 +    }
 +
 +    public int getIndexSummarySamplingLevel()
 +    {
 +        return indexSummary.getSamplingLevel();
 +    }
 +
 +    public long getIndexSummaryOffHeapSize()
 +    {
 +        return indexSummary.getOffHeapSize();
 +    }
 +
 +    public int getMinIndexInterval()
 +    {
 +        return indexSummary.getMinIndexInterval();
 +    }
 +
 +    public double getEffectiveIndexInterval()
 +    {
 +        return indexSummary.getEffectiveIndexInterval();
 +    }
 +
 +    public void releaseSummary()
 +    {
 +        tidy.releaseSummary();
 +        indexSummary = null;
 +    }
 +
 +    private void validate()
 +    {
 +        if (this.first.compareTo(this.last) > 0)
 +        {
 +            selfRef().release();
 +            throw new IllegalStateException(String.format("SSTable first key %s > last key %s", this.first, this.last));
 +        }
 +    }
 +
 +    /**
 +     * Gets the position in the index file to start scanning to find the given key (at most indexInterval keys away,
 +     * modulo downsampling of the index summary). Always returns a value >= 0
 +     */
 +    public long getIndexScanPosition(RowPosition key)
 +    {
 +        if (openReason == OpenReason.MOVED_START && key.compareTo(first) < 0)
 +            key = first;
 +
 +        return getIndexScanPositionFromBinarySearchResult(indexSummary.binarySearch(key), indexSummary);
 +    }
 +
 +    protected static long getIndexScanPositionFromBinarySearchResult(int binarySearchResult, IndexSummary referencedIndexSummary)
 +    {
 +        if (binarySearchResult == -1)
 +            return 0;
 +        else
 +            return referencedIndexSummary.getPosition(getIndexSummaryIndexFromBinarySearchResult(binarySearchResult));
 +    }
 +
 +    protected static int getIndexSummaryIndexFromBinarySearchResult(int binarySearchResult)
 +    {
 +        if (binarySearchResult < 0)
 +        {
 +            // binary search gives us the first index _greater_ than the key searched for,
 +            // i.e., its insertion position
 +            int greaterThan = (binarySearchResult + 1) * -1;
 +            if (greaterThan == 0)
 +                return -1;
 +            return greaterThan - 1;
 +        }
 +        else
 +        {
 +            return binarySearchResult;
 +        }
 +    }
 +
 +    /**
 +     * Returns the compression metadata for this sstable.
 +     * @throws IllegalStateException if the sstable is not compressed
 +     */
 +    public CompressionMetadata getCompressionMetadata()
 +    {
 +        if (!compression)
 +            throw new IllegalStateException(this + " is not compressed");
 +
 +        CompressionMetadata cmd = ((ICompressedFile) dfile).getMetadata();
 +
 +        //We need the parent cf metadata
 +        String cfName = metadata.isSecondaryIndex() ? metadata.getParentColumnFamilyName() : metadata.cfName;
 +        cmd.parameters.setLiveMetadata(Schema.instance.getCFMetaData(metadata.ksName, cfName));
 +
 +        return cmd;
 +    }
 +
 +    /**
 +     * Returns the amount of memory in bytes used off heap by the compression meta-data.
 +     * @return the amount of memory in bytes used off heap by the compression meta-data
 +     */
 +    public long getCompressionMetadataOffHeapSize()
 +    {
 +        if (!compression)
 +            return 0;
 +
 +        return getCompressionMetadata().offHeapSize();
 +    }
 +
 +    /**
 +     * For testing purposes only.
 +     */
 +    public void forceFilterFailures()
 +    {
 +        bf = FilterFactory.AlwaysPresent;
 +    }
 +
 +    public IFilter getBloomFilter()
 +    {
 +        return bf;
 +    }
 +
 +    public long getBloomFilterSerializedSize()
 +    {
 +        return bf.serializedSize();
 +    }
 +
 +    /**
 +     * Returns the amount of memory in bytes used off heap by the bloom filter.
 +     * @return the amount of memory in bytes used off heap by the bloom filter
 +     */
 +    public long getBloomFilterOffHeapSize()
 +    {
 +        return bf.offHeapSize();
 +    }
 +
 +    /**
 +     * @return An estimate of the number of keys in this SSTable based on the index summary.
 +     */
 +    public long estimatedKeys()
 +    {
 +        return indexSummary.getEstimatedKeyCount();
 +    }
 +
 +    /**
 +     * @param ranges
 +     * @return An estimate of the number of keys for given ranges in this SSTable.
 +     */
 +    public long estimatedKeysForRanges(Collection<Range<Token>> ranges)
 +    {
 +        long sampleKeyCount = 0;
 +        List<Pair<Integer, Integer>> sampleIndexes = getSampleIndexesForRanges(indexSummary, ranges);
 +        for (Pair<Integer, Integer> sampleIndexRange : sampleIndexes)
 +            sampleKeyCount += (sampleIndexRange.right - sampleIndexRange.left + 1);
 +
 +        // adjust for the current sampling level: (BSL / SL) * index_interval_at_full_sampling
 +        long estimatedKeys = sampleKeyCount * ((long) Downsampling.BASE_SAMPLING_LEVEL * indexSummary.getMinIndexInterval()) / indexSummary.getSamplingLevel();
 +        return Math.max(1, estimatedKeys);
 +    }
 +
 +    /**
 +     * Returns the number of entries in the IndexSummary.  At full sampling, this is approximately 1/INDEX_INTERVALth of
 +     * the keys in this SSTable.
 +     */
 +    public int getIndexSummarySize()
 +    {
 +        return indexSummary.size();
 +    }
 +
 +    /**
 +     * Returns the approximate number of entries the IndexSummary would contain if it were at full sampling.
 +     */
 +    public int getMaxIndexSummarySize()
 +    {
 +        return indexSummary.getMaxNumberOfEntries();
 +    }
 +
 +    /**
 +     * Returns the key for the index summary entry at `index`.
 +     */
 +    public byte[] getIndexSummaryKey(int index)
 +    {
 +        return indexSummary.getKey(index);
 +    }
 +
 +    private static List<Pair<Integer,Integer>> getSampleIndexesForRanges(IndexSummary summary, Collection<Range<Token>> ranges)
 +    {
 +        // use the index to determine a minimal section for each range
 +        List<Pair<Integer,Integer>> positions = new ArrayList<>();
 +
 +        for (Range<Token> range : Range.normalize(ranges))
 +        {
 +            RowPosition leftPosition = range.left.maxKeyBound();
 +            RowPosition rightPosition = range.right.maxKeyBound();
 +
 +            int left = summary.binarySearch(leftPosition);
 +            if (left < 0)
 +                left = (left + 1) * -1;
 +            else
 +                // left range are start exclusive
 +                left = left + 1;
 +            if (left == summary.size())
 +                // left is past the end of the sampling
 +                continue;
 +
 +            int right = Range.isWrapAround(range.left, range.right)
 +                    ? summary.size() - 1
 +                    : summary.binarySearch(rightPosition);
 +            if (right < 0)
 +            {
 +                // range are end inclusive so we use the previous index from what binarySearch give us
 +                // since that will be the last index we will return
 +                right = (right + 1) * -1;
 +                if (right == 0)
 +                    // Means the first key is already stricly greater that the right bound
 +                    continue;
 +                right--;
 +            }
 +
 +            if (left > right)
 +                // empty range
 +                continue;
 +            positions.add(Pair.create(left, right));
 +        }
 +        return positions;
 +    }
 +
 +    public Iterable<DecoratedKey> getKeySamples(final Range<Token> range)
 +    {
 +        final List<Pair<Integer, Integer>> indexRanges = getSampleIndexesForRanges(indexSummary, Collections.singletonList(range));
 +
 +        if (indexRanges.isEmpty())
 +            return Collections.emptyList();
 +
 +        return new Iterable<DecoratedKey>()
 +        {
 +            public Iterator<DecoratedKey> iterator()
 +            {
 +                return new Iterator<DecoratedKey>()
 +                {
 +                    private Iterator<Pair<Integer, Integer>> rangeIter = indexRanges.iterator();
 +                    private Pair<Integer, Integer> current;
 +                    private int idx;
 +
 +                    public boolean hasNext()
 +                    {
 +                        if (current == null || idx > current.right)
 +                        {
 +                            if (rangeIter.hasNext())
 +                            {
 +                                current = rangeIter.next();
 +                                idx = current.left;
 +                                return true;
 +                            }
 +                            return false;
 +                        }
 +
 +                        return true;
 +                    }
 +
 +                    public DecoratedKey next()
 +                    {
 +                        byte[] bytes = indexSummary.getKey(idx++);
 +                        return partitioner.decorateKey(ByteBuffer.wrap(bytes));
 +                    }
 +
 +                    public void remove()
 +                    {
 +                        throw new UnsupportedOperationException();
 +                    }
 +                };
 +            }
 +        };
 +    }
 +
 +    /**
 +     * Determine the minimal set of sections that can be extracted from this SSTable to cover the given ranges.
 +     * @return A sorted list of (offset,end) pairs that cover the given ranges in the datafile for this SSTable.
 +     */
 +    public List<Pair<Long,Long>> getPositionsForRanges(Collection<Range<Token>> ranges)
 +    {
 +        // use the index to determine a minimal section for each range
 +        List<Pair<Long,Long>> positions = new ArrayList<>();
 +        for (Range<Token> range : Range.normalize(ranges))
 +        {
 +            assert !range.isWrapAround() || range.right.isMinimum();
 +            // truncate the range so it at most covers the sstable
 +            AbstractBounds<RowPosition> bounds = range.toRowBounds();
 +            RowPosition leftBound = bounds.left.compareTo(first) > 0 ? bounds.left : first.getToken().minKeyBound();
 +            RowPosition rightBound = bounds.right.isMinimum() ? last.getToken().maxKeyBound() : bounds.right;
 +
 +            if (leftBound.compareTo(last) > 0 || rightBound.compareTo(first) < 0)
 +                continue;
 +
 +            long left = getPosition(leftBound, Operator.GT).position;
 +            long right = (rightBound.compareTo(last) > 0)
 +                         ? uncompressedLength()
 +                         : getPosition(rightBound, Operator.GT).position;
 +
 +            if (left == right)
 +                // empty range
 +                continue;
 +
 +            assert left < right : String.format("Range=%s openReason=%s first=%s last=%s left=%d right=%d", range, openReason, first, last, left, right);
 +            positions.add(Pair.create(left, right));
 +        }
 +        return positions;
 +    }
 +
 +    public void invalidateCacheKey(DecoratedKey key)
 +    {
 +        KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.getKey());
 +        keyCache.remove(cacheKey);
 +    }
 +
 +    public void cacheKey(DecoratedKey key, RowIndexEntry info)
 +    {
 +        CachingOptions caching = metadata.getCaching();
 +
 +        if (!caching.keyCache.isEnabled()
 +                || keyCache == null
 +                || keyCache.getCapacity() == 0)
 +        {
 +            return;
 +        }
 +
 +        KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.getKey());
 +        logger.trace("Adding cache entry for {} -> {}", cacheKey, info);
 +        keyCache.put(cacheKey, info);
 +    }
 +
 +    public RowIndexEntry getCachedPosition(DecoratedKey key, boolean updateStats)
 +    {
 +        return getCachedPosition(new KeyCacheKey(metadata.cfId, descriptor, key.getKey()), updateStats);
 +    }
 +
 +    protected RowIndexEntry getCachedPosition(KeyCacheKey unifiedKey, boolean updateStats)
 +    {
 +        if (keyCache != null && keyCache.getCapacity() > 0) {
 +            if (updateStats)
 +            {
 +                RowIndexEntry cachedEntry = keyCache.get(unifiedKey);
 +                keyCacheRequest.incrementAndGet();
 +                if (cachedEntry != null)
 +                {
 +                    keyCacheHit.incrementAndGet();
 +                    bloomFilterTracker.addTruePositive();
 +                }
 +                return cachedEntry;
 +            }
 +            else
 +            {
 +                return keyCache.getInternal(unifiedKey);
 +            }
 +        }
 +        return null;
 +    }
 +
 +    /**
 +     * Get position updating key cache and stats.
 +     * @see #getPosition(org.apache.cassandra.db.RowPosition, SSTableReader.Operator, boolean)
 +     */
 +    public RowIndexEntry getPosition(RowPosition key, Operator op)
 +    {
 +        return getPosition(key, op, true, false);
 +    }
 +
 +    public RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats)
 +    {
 +        return getPosition(key, op, updateCacheAndStats, false);
 +    }
 +    /**
 +     * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to
 +     * allow key selection by token bounds but only if op != * EQ
 +     * @param op The Operator defining matching keys: the nearest key to the target matching the operator wins.
 +     * @param updateCacheAndStats true if updating stats and cache
 +     * @return The index entry corresponding to the key, or null if the key is not present
 +     */
 +    protected abstract RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats, boolean permitMatchPastLast);
 +
 +    //Corresponds to a name column
 +    public abstract OnDiskAtomIterator iterator(DecoratedKey key, SortedSet<CellName> columns);
 +    public abstract OnDiskAtomIterator iterator(FileDataInput file, DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry);
 +
 +    //Corresponds to a slice query
 +    public abstract OnDiskAtomIterator iterator(DecoratedKey key, ColumnSlice[] slices, boolean reverse);
 +    public abstract OnDiskAtomIterator iterator(FileDataInput file, DecoratedKey key, ColumnSlice[] slices, boolean reversed, RowIndexEntry indexEntry);
 +
 +    /**
 +     * Finds and returns the first key beyond a given token in this SSTable or null if no such key exists.
 +     */
 +    public DecoratedKey firstKeyBeyond(RowPosition token)
 +    {
 +        if (token.compareTo(first) < 0)
 +            return first;
 +
 +        long sampledPosition = getIndexScanPosition(token);
 +
 +        Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
 +        while (segments.hasNext())
 +        {
 +            FileDataInput in = segments.next();
 +            try
 +            {
 +                while (!in.isEOF())
 +                {
 +                    ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
 +                    DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey);
 +                    if (indexDecoratedKey.compareTo(token) > 0)
 +                        return indexDecoratedKey;
 +
 +                    RowIndexEntry.Serializer.skip(in);
 +                }
 +            }
 +            catch (IOException e)
 +            {
 +                markSuspect();
 +                throw new CorruptSSTableException(e, in.getPath());
 +            }
 +            finally
 +            {
 +                FileUtils.closeQuietly(in);
 +            }
 +        }
 +
 +        return null;
 +    }
 +
 +    /**
 +     * @return The length in bytes of the data for this SSTable. For
 +     * compressed files, this is not the same thing as the on disk size (see
 +     * onDiskLength())
 +     */
 +    public long uncompressedLength()
 +    {
 +        return dfile.length;
 +    }
 +
 +    /**
 +     * @return The length in bytes of the on disk size for this SSTable. For
 +     * compressed files, this is not the same thing as the data length (see
 +     * length())
 +     */
 +    public long onDiskLength()
 +    {
 +        return dfile.onDiskLength;
 +    }
 +
 +    /**
 +     * Mark the sstable as obsolete, i.e., compacted into newer sstables.
 +     *
 +     * When calling this function, the caller must ensure that the SSTableReader is not referenced anywhere
 +     * except for threads holding a reference.
 +     *
 +     * @return true if the this is the first time the file was marked obsolete.  Calling this
 +     * multiple times is usually buggy (see exceptions in DataTracker.unmarkCompacting and removeOldSSTablesSize).
 +     */
 +    public boolean markObsolete()
 +    {
 +        if (logger.isDebugEnabled())
 +            logger.debug("Marking {} compacted", getFilename());
 +
 +        synchronized (tidy.global)
 +        {
 +            assert !tidy.isReplaced;
 +        }
 +        return !tidy.global.isCompacted.getAndSet(true);
 +    }
 +
 +    public boolean isMarkedCompacted()
 +    {
 +        return tidy.global.isCompacted.get();
 +    }
 +
 +    public void markSuspect()
 +    {
 +        if (logger.isDebugEnabled())
 +            logger.debug("Marking {} as a suspect for blacklisting.", getFilename());
 +
 +        isSuspect.getAndSet(true);
 +    }
 +
 +    public boolean isMarkedSuspect()
 +    {
 +        return isSuspect.get();
 +    }
 +
 +
 +    /**
 +     * I/O SSTableScanner
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public ISSTableScanner getScanner()
 +    {
 +        return getScanner((RateLimiter) null);
 +    }
 +
 +    public ISSTableScanner getScanner(RateLimiter limiter)
 +    {
 +        return getScanner(DataRange.allData(partitioner), limiter);
 +    }
 +
 +    /**
 +     *
 +     * @param dataRange filter to use when reading the columns
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public ISSTableScanner getScanner(DataRange dataRange)
 +    {
 +        return getScanner(dataRange, null);
 +    }
 +
 +    /**
 +     * Direct I/O SSTableScanner over a defined range of tokens.
 +     *
 +     * @param range the range of keys to cover
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public ISSTableScanner getScanner(Range<Token> range, RateLimiter limiter)
 +    {
 +        if (range == null)
 +            return getScanner(limiter);
 +        return getScanner(Collections.singletonList(range), limiter);
 +    }
 +
 +    /**
 +     * Direct I/O SSTableScanner over a defined collection of ranges of tokens.
 +     *
 +     * @param ranges the range of keys to cover
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public abstract ISSTableScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter);
 +
 +    /**
 +     *
 +     * @param dataRange filter to use when reading the columns
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public abstract ISSTableScanner getScanner(DataRange dataRange, RateLimiter limiter);
 +
 +
 +
 +    public FileDataInput getFileDataInput(long position)
 +    {
 +        return dfile.getSegment(position);
 +    }
 +
 +    /**
 +     * Tests if the sstable contains data newer than the given age param (in localhost currentMilli time).
 +     * This works in conjunction with maxDataAge which is an upper bound on the create of data in this sstable.
 +     * @param age The age to compare the maxDataAre of this sstable. Measured in millisec since epoc on this host
 +     * @return True iff this sstable contains data that's newer than the given age parameter.
 +     */
 +    public boolean newSince(long age)
 +    {
 +        return maxDataAge > age;
 +    }
 +
 +    public void createLinks(String snapshotDirectoryPath)
 +    {
 +        for (Component component : components)
 +        {
 +            File sourceFile = new File(descriptor.filenameFor(component));
 +            File targetLink = new File(snapshotDirectoryPath, sourceFile.getName());
 +            FileUtils.createHardLink(sourceFile, targetLink);
 +        }
 +    }
 +
 +    public boolean isRepaired()
 +    {
 +        return sstableMetadata.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE;
 +    }
 +
 +    public SSTableReader getCurrentReplacement()
 +    {
 +        return tidy.global.live;
 +    }
 +
 +    /**
 +     * TODO: Move someplace reusable
 +     */
 +    public abstract static class Operator
 +    {
 +        public static final Operator EQ = new Equals();
 +        public static final Operator GE = new GreaterThanOrEqualTo();
 +        public static final Operator GT = new GreaterThan();
 +
 +        /**
 +         * @param comparison The result of a call to compare/compareTo, with the desired field on the rhs.
 +         * @return less than 0 if the operator cannot match forward, 0 if it matches, greater than 0 if it might match forward.
 +         */
 +        public abstract int apply(int comparison);
 +
 +        final static class Equals extends Operator
 +        {
 +            public int apply(int comparison) { return -comparison; }
 +        }
 +
 +        final static class GreaterThanOrEqualTo extends Operator
 +        {
 +            public int apply(int comparison) { return comparison >= 0 ? 0 : 1; }
 +        }
 +
 +        final static class GreaterThan extends Operator
 +        {
 +            public int apply(int comparison) { return comparison > 0 ? 0 : 1; }
 +        }
 +    }
 +
 +    public long getBloomFilterFalsePositiveCount()
 +    {
 +        return bloomFilterTracker.getFalsePositiveCount();
 +    }
 +
 +    public long getRecentBloomFilterFalsePositiveCount()
 +    {
 +        return bloomFilterTracker.getRecentFalsePositiveCount();
 +    }
 +
 +    public long getBloomFilterTruePositiveCount()
 +    {
 +        return bloomFilterTracker.getTruePositiveCount();
 +    }
 +
 +    public long getRecentBloomFilterTruePositiveCount()
 +    {
 +        return bloomFilterTracker.getRecentTruePositiveCount();
 +    }
 +
 +    public InstrumentingCache<KeyCacheKey, RowIndexEntry> getKeyCache()
 +    {
 +        return keyCache;
 +    }
 +
 +    public EstimatedHistogram getEstimatedRowSize()
 +    {
 +        return sstableMetadata.estimatedRowSize;
 +    }
 +
 +    public EstimatedHistogram getEstimatedColumnCount()
 +    {
 +        return sstableMetadata.estimatedColumnCount;
 +    }
 +
 +    public double getEstimatedDroppableTombstoneRatio(int gcBefore)
 +    {
 +        return sstableMetadata.getEstimatedDroppableTombstoneRatio(gcBefore);
 +    }
 +
 +    public double getDroppableTombstonesBefore(int gcBefore)
 +    {
 +        return sstableMetadata.getDroppableTombstonesBefore(gcBefore);
 +    }
 +
 +    public double getCompressionRatio()
 +    {
 +        return sstableMetadata.compressionRatio;
 +    }
 +
 +    public ReplayPosition getReplayPosition()
 +    {
 +        return sstableMetadata.replayPosition;
 +    }
 +
 +    public long getMinTimestamp()
 +    {
 +        return sstableMetadata.minTimestamp;
 +    }
 +
 +    public long getMaxTimestamp()
 +    {
 +        return sstableMetadata.maxTimestamp;
 +    }
 +
 +    public Set<Integer> getAncestors()
 +    {
 +        try
 +        {
 +            CompactionMetadata compactionMetadata = (CompactionMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.COMPACTION);
 +            return compactionMetadata.ancestors;
 +        }
 +        catch (IOException e)
 +        {
 +            SSTableReader.logOpenException(descriptor, e);
 +            return Collections.emptySet();
 +        }
 +    }
 +
 +    public int getSSTableLevel()
 +    {
 +        return sstableMetadata.sstableLevel;
 +    }
 +
 +    /**
 +     * Reloads the sstable metadata from disk.
 +     *
 +     * Called after level is changed on sstable, for example if the sstable is dropped to L0
 +     *
 +     * Might be possible to remove in future versions
 +     *
 +     * @throws IOException
 +     */
 +    public void reloadSSTableMetadata() throws IOException
 +    {
 +        this.sstableMetadata = (StatsMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.STATS);
 +    }
 +
 +    public StatsMetadata getSSTableMetadata()
 +    {
 +        return sstableMetadata;
 +    }
 +
 +    public RandomAccessReader openDataReader(RateLimiter limiter)
 +    {
 +        assert limiter != null;
 +        return dfile.createThrottledReader(limiter);
 +    }
 +
 +    public RandomAccessReader openDataReader()
 +    {
 +        return dfile.createReader();
 +    }
 +
 +    public RandomAccessReader openIndexReader()
 +    {
 +        return ifile.createReader();
 +    }
 +
 +    /**
 +     * @param component component to get timestamp.
 +     * @return last modified time for given component. 0 if given component does not exist or IO error occurs.
 +     */
 +    public long getCreationTimeFor(Component component)
 +    {
 +        return new File(descriptor.filenameFor(component)).lastModified();
 +    }
 +
 +    /**
 +     * @return Number of key cache hit
 +     */
 +    public long getKeyCacheHit()
 +    {
 +        return keyCacheHit.get();
 +    }
 +
 +    /**
 +     * @return Number of key cache request
 +     */
 +    public long getKeyCacheRequest()
 +    {
 +        return keyCacheRequest.get();
 +    }
 +
 +    /**
 +     * Increment the total row read count and read rate for this SSTable.  This should not be incremented for range
 +     * slice queries, row cache hits, or non-query reads, like compaction.
 +     */
 +    public void incrementReadCount()
 +    {
 +        if (readMeter != null)
 +            readMeter.mark();
 +    }
 +
 +    public static class SizeComparator implements Comparator<SSTableReader>
 +    {
 +        public int compare(SSTableReader o1, SSTableReader o2)
 +        {
 +            return Longs.compare(o1.onDiskLength(), o2.onDiskLength());
 +        }
 +    }
 +
 +    public Ref<SSTableReader> tryRef()
 +    {
 +        return selfRef.tryRef();
 +    }
 +
 +    public Ref<SSTableReader> selfRef()
 +    {
 +        return selfRef;
 +    }
 +
 +    public Ref<SSTableReader> ref()
 +    {
 +        return selfRef.ref();
 +    }
 +
 +    void setup()
 +    {
 +        tidy.setup(this);
 +        this.readMeter = tidy.global.readMeter;
 +    }
 +
 +    @VisibleForTesting
 +    public void overrideReadMeter(RestorableMeter readMeter)
 +    {
 +        this.readMeter = tidy.global.readMeter = readMeter;
 +    }
 +
 +    /**
 +     * One instance per SSTableReader we create. This references the type-shared tidy, which in turn references
 +     * the globally shared tidy, i.e.
 +     *
 +     * InstanceTidier => DescriptorTypeTitdy => GlobalTidy
 +     *
 +     * We can create many InstanceTidiers (one for every time we reopen an sstable with MOVED_START for example), but there can only be
 +     * two DescriptorTypeTidy (FINAL and TEMPLINK) and only one GlobalTidy for one single logical sstable.
 +     *
 +     * When the InstanceTidier cleansup, it releases its reference to its DescriptorTypeTidy; when all InstanceTidiers
 +     * for that type have run, the DescriptorTypeTidy cleansup. DescriptorTypeTidy behaves in the same way towards GlobalTidy.
 +     *
 +     * For ease, we stash a direct reference to both our type-shared and global tidier
 +     */
 +    private static final class InstanceTidier implements Tidy
 +    {
 +        private final Descriptor descriptor;
 +        private final CFMetaData metadata;
 +        private IFilter bf;
 +        private IndexSummary summary;
 +
 +        private SegmentedFile dfile;
 +        private SegmentedFile ifile;
 +        private Runnable runOnClose;
 +        private boolean isReplaced = false;
 +
 +        // a reference to our shared per-Descriptor.Type tidy instance, that
 +        // we will release when we are ourselves released
 +        private Ref<DescriptorTypeTidy> typeRef;
 +
 +        // a convenience stashing of the shared per-descriptor-type tidy instance itself
 +        // and the per-logical-sstable globally shared state that it is linked to
 +        private DescriptorTypeTidy type;
 +        private GlobalTidy global;
 +
 +        private boolean setup;
 +
 +        void setup(SSTableReader reader)
 +        {
 +            this.setup = true;
 +            this.bf = reader.bf;
 +            this.summary = reader.indexSummary;
 +            this.dfile = reader.dfile;
 +            this.ifile = reader.ifile;
 +            // get a new reference to the shared descriptor-type tidy
 +            this.typeRef = DescriptorTypeTidy.get(reader);
 +            this.type = typeRef.get();
 +            this.global = type.globalRef.get();
 +        }
 +
 +        InstanceTidier(Descriptor descriptor, CFMetaData metadata)
 +        {
 +            this.descriptor = descriptor;
 +            this.metadata = metadata;
 +        }
 +
 +        public void tidy()
 +        {
 +            // don't try to cleanup if the sstablereader was never fully constructed
 +            if (!setup)
 +                return;
 +
 +            final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId);
 +            final OpOrder.Barrier barrier;
 +            if (cfs != null)
 +            {
 +                barrier = cfs.readOrdering.newBarrier();
 +                barrier.issue();
 +            }
 +            else
 +                barrier = null;
 +
 +            ScheduledExecutors.nonPeriodicTasks.execute(new Runnable()
 +            {
 +                public void run()
 +                {
 +                    if (barrier != null)
 +                        barrier.await();
 +                    bf.close();
 +                    dfile.close();
 +                    ifile.close();
 +                    if (summary != null)
 +                        summary.close();
 +                    if (runOnClose != null)
 +                        runOnClose.run();
 +                    typeRef.release();
 +                }
 +            });
 +        }
 +
 +        public String name()
 +        {
 +            return descriptor.toString();
 +        }
 +
 +        void releaseSummary()
 +        {
 +            summary.close();
 +            assert summary.isCleanedUp();
 +            summary = null;
 +        }
 +    }
 +
 +    /**
 +     * One shared between all instances of a given Descriptor.Type.
 +     * Performs only two things: the deletion of the sstables for the type,
 +     * if necessary; and the shared reference to the globally shared state.
 +     *
 +     * All InstanceTidiers, on setup(), ask the static get() method for their shared state,
 +     * and stash a reference to it to be released when they are. Once all such references are
 +     * released, the shared tidy will be performed.
 +     */
 +    static final class DescriptorTypeTidy implements Tidy
 +    {
 +        // keyed by REAL descriptor (TMPLINK/FINAL), mapping to the shared DescriptorTypeTidy for that descriptor
 +        static final ConcurrentMap<Descriptor, Ref<DescriptorTypeTidy>> lookup = new ConcurrentHashMap<>();
 +
 +        private final Descriptor desc;
 +        private final Ref<GlobalTidy> globalRef;
 +        private final SSTableDeletingTask deletingTask;
 +
 +        DescriptorTypeTidy(Descriptor desc, SSTableReader sstable)
 +        {
 +            this.desc = desc;
 +            this.deletingTask = new SSTableDeletingTask(desc, sstable);
 +            // get a new reference to the shared global tidy
 +            this.globalRef = GlobalTidy.get(sstable);
 +        }
 +
 +        public void tidy()
 +        {
 +            lookup.remove(desc);
 +            boolean isCompacted = globalRef.get().isCompacted.get();
 +            globalRef.release();
 +            switch (desc.type)
 +            {
 +                case FINAL:
 +                    if (isCompacted)
 +                        deletingTask.run();
 +                    break;
 +                case TEMPLINK:
 +                    deletingTask.run();
 +                    break;
 +                default:
 +                    throw new IllegalStateException();
 +            }
 +        }
 +
 +        public String name()
 +        {
 +            return desc.toString();
 +        }
 +
 +        // get a new reference to the shared DescriptorTypeTidy for this sstable
 +        public static Ref<DescriptorTypeTidy> get(SSTableReader sstable)
 +        {
 +            Descriptor desc = sstable.descriptor;
 +            if (sstable.openReason == OpenReason.EARLY)
 +                desc = desc.asType(Descriptor.Type.TEMPLINK);
 +            Ref<DescriptorTypeTidy> refc = lookup.get(desc);
 +            if (refc != null)
 +                return refc.ref();
 +            final DescriptorTypeTidy tidy = new DescriptorTypeTidy(desc, sstable);
 +            refc = new Ref<>(tidy, tidy);
 +            Ref<?> ex = lookup.putIfAbsent(desc, refc);
 +            assert ex == null;
 +            return refc;
 +        }
 +    }
 +
 +    /**
 +     * One instance per logical sstable. This both tracks shared cleanup and some shared state related
 +     * to the sstable's lifecycle. All DescriptorTypeTidy instances, on construction, obtain a reference to us
 +     * via our static get(). There should only ever be at most two such references extant at any one time,
 +     * since only TMPLINK and FINAL type descriptors should be open as readers. When all files of both
 +     * kinds have been released, this shared tidy will be performed.
 +     */
 +    static final class GlobalTidy implements Tidy
 +    {
 +        // keyed by FINAL descriptor, mapping to the shared GlobalTidy for that descriptor
 +        static final ConcurrentMap<Descriptor, Ref<GlobalTidy>> lookup = new ConcurrentHashMap<>();
 +
 +        private final Descriptor desc;
 +        // a single convenience property for getting the most recent version of an sstable, not related to tidying
 +        private SSTableReader live;
 +        // the readMeter that is shared between all instances of the sstable, and can be overridden in all of them
 +        // at once also, for testing purposes
 +        private RestorableMeter readMeter;
 +        // the scheduled persistence of the readMeter, that we will cancel once all instances of this logical
 +        // sstable have been released
 +        private final ScheduledFuture readMeterSyncFuture;
 +        // shared state managing if the logical sstable has been compacted; this is used in cleanup both here
 +        // and in the FINAL type tidier
 +        private final AtomicBoolean isCompacted;
 +
 +        GlobalTidy(final SSTableReader reader)
 +        {
 +            this.desc = reader.descriptor;
 +            this.isCompacted = new AtomicBoolean();
 +            this.live = reader;
 +            // Don't track read rates for tables in the system keyspace and don't bother trying to load or persist
 +            // the read meter when in client mode.
 +            if (SystemKeyspace.NAME.equals(desc.ksname))
 +            {
 +                readMeter = null;
 +                readMeterSyncFuture = null;
 +                return;
 +            }
 +
 +            readMeter = SystemKeyspace.getSSTableReadMeter(desc.ksname, desc.cfname, desc.generation);
 +            // sync the average read rate to system.sstable_activity every five minutes, starting one minute from now
 +            readMeterSyncFuture = syncExecutor.scheduleAtFixedRate(new Runnable()
 +            {
 +                public void run()
 +                {
 +                    if (!isCompacted.get())
 +                    {
 +                        meterSyncThrottle.acquire();
 +                        SystemKeyspace.persistSSTableReadMeter(desc.ksname, desc.cfname, desc.generation, readMeter);
 +                    }
 +                }
 +            }, 1, 5, TimeUnit.MINUTES);
 +        }
 +
 +        public void tidy()
 +        {
 +            lookup.remove(desc);
 +            if (readMeterSyncFuture != null)
 +                readMeterSyncFuture.cancel(true);
 +            if (isCompacted.get())
 +                SystemKeyspace.clearSSTableReadMeter(desc.ksname, desc.cfname, desc.generation);
 +            // don't ideally want to dropPageCache for the file until all instances have been released
 +            dropPageCache(desc.filenameFor(Component.DATA));
 +            dropPageCache(desc.filenameFor(Component.PRIMARY_INDEX));
 +        }
 +
 +        public String name()
 +        {
 +            return desc.toString();
 +        }
 +
 +        // get a new reference to the shared GlobalTidy for this sstable
 +        public static Ref<GlobalTidy> get(SSTableReader sstable)
 +        {
 +            Descriptor descriptor = sstable.descriptor;
 +            Ref<GlobalTidy> refc = lookup.get(descriptor);
 +            if (refc != null)
 +                return refc.ref();
 +            final GlobalTidy tidy = new GlobalTidy(sstable);
 +            refc = new Ref<>(tidy, tidy);
 +            Ref<?> ex = lookup.putIfAbsent(descriptor, refc);
 +            assert ex == null;
 +            return refc;
 +        }
 +    }
 +
 +    private static void dropPageCache(String filePath)
 +    {
 +        RandomAccessFile file = null;
 +
 +        try
 +        {
 +            file = new RandomAccessFile(filePath, "r");
 +
 +            int fd = CLibrary.getfd(file.getFD());
 +
 +            if (fd > 0)
 +            {
 +                if (logger.isDebugEnabled())
 +                    logger.debug(String.format("Dropping page cache of file %s.", filePath));
 +
 +                CLibrary.trySkipCache(fd, 0, 0);
 +            }
 +        }
 +        catch (IOException e)
 +        {
 +            // we don't care if cache cleanup fails
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(file);
 +        }
 +    }
 +
 +    public static abstract class Factory
 +    {
 +        public abstract SSTableReader open(final Descriptor descriptor,
 +                                           Set<Component> components,
 +                                           CFMetaData metadata,
 +                                           IPartitioner partitioner,
 +                                           Long maxDataAge,
 +                                           StatsMetadata sstableMetadata,
 +                                           OpenReason openReason);
 +
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/20b62de8/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 8e3efd7,258b6b5..25894f1
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@@ -21,9 -21,10 +21,12 @@@ import java.io.File
  import java.nio.ByteBuffer;
  import java.util.*;
  
+ import javax.annotation.Nullable;
+ 
+ import com.google.common.base.Function;
  import com.google.common.collect.Sets;
 +import org.junit.After;
 +import org.junit.BeforeClass;
  import org.junit.Test;
  
  import org.apache.cassandra.SchemaLoader;
@@@ -382,6 -391,7 +420,7 @@@ public class SSTableRewriterTest extend
  
          DecoratedKey origFirst = s.first;
          DecoratedKey origLast = s.last;
 -        long startSize = cfs.metric.liveDiskSpaceUsed.count();
++        long startSize = cfs.metric.liveDiskSpaceUsed.getCount();
          Set<SSTableReader> compacting = Sets.newHashSet(s);
          SSTableRewriter.overrideOpenInterval(10000000);
          SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
@@@ -390,28 -400,13 +429,13 @@@
          try (ISSTableScanner scanner = s.getScanner();
               CompactionController controller = new CompactionController(cfs, compacting, 0))
          {
-             while(scanner.hasNext())
-             {
-                 rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
-                 if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
-                 {
-                     rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
-                     files++;
-                     assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
-                 }
-                 if (files == 3)
-                 {
-                     //testing to abort when we have nothing written in the new file
-                     rewriter.abort();
-                     break;
-                 }
-             }
+             test.run(scanner, controller, s, cfs, rewriter);
          }
+ 
          Thread.sleep(1000);
 -        assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.count());
++        assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.getCount());
          assertEquals(1, cfs.getSSTables().size());
          assertFileCounts(s.descriptor.directory.list(), 0, 0);
- 
          assertEquals(cfs.getSSTables().iterator().next().first, origFirst);
          assertEquals(cfs.getSSTables().iterator().next().last, origLast);
          validateCFS(cfs);


[3/5] cassandra git commit: Make SSTableRewriter.abort() more robust to failure

Posted by be...@apache.org.
Make SSTableRewriter.abort() more robust to failure

patch by benedict; reviewed by branimir for CASSANDRA-8832


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3c3fefa0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3c3fefa0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3c3fefa0

Branch: refs/heads/cassandra-2.1
Commit: 3c3fefa04f027a1ecbede1d41c1bf3df25218a5d
Parents: 33a9ada
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Mar 4 14:59:16 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Mar 4 14:59:16 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |   1 +
 .../org/apache/cassandra/db/DataTracker.java    |   4 +-
 .../db/compaction/CompactionManager.java        |   5 +-
 .../cassandra/io/sstable/SSTableReader.java     |   3 +-
 .../cassandra/io/sstable/SSTableRewriter.java   |  96 +++++++++++---
 .../org/apache/cassandra/utils/Throwables.java  |  32 +++++
 .../apache/cassandra/utils/concurrent/Refs.java |  25 +++-
 .../utils/concurrent/SelfRefCounted.java        |  24 ++++
 .../io/sstable/SSTableRewriterTest.java         | 124 +++++++++++--------
 10 files changed, 238 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c3fefa0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4992d85..6133536 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.4
+ * Make SSTableRewriter.abort() more robust to failure (CASSANDRA-8832)
  * Remove cold_reads_to_omit from STCS (CASSANDRA-8860)
  * Make EstimatedHistogram#percentile() use ceil instead of floor (CASSANDRA-8883)
  * Fix top partitions reporting wrong cardinality (CASSANDRA-8834)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c3fefa0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 9b792b6..38c5dbe 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -30,6 +30,7 @@ import javax.management.openmbean.*;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.*;
+import com.google.common.base.Throwables;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.*;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c3fefa0/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 8224311..81964f9 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.notifications.*;
 import org.apache.cassandra.utils.Interval;
 import org.apache.cassandra.utils.IntervalTree;
 import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.concurrent.Refs;
 
 public class DataTracker
 {
@@ -406,8 +407,7 @@ public class DataTracker
         for (SSTableReader sstable : newSSTables)
             sstable.setTrackedBy(this);
 
-        for (SSTableReader sstable : oldSSTables)
-            sstable.selfRef().release();
+        Refs.release(Refs.selfRefs(oldSSTables));
     }
 
     private void removeSSTablesFromTracker(Collection<SSTableReader> oldSSTables)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c3fefa0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index e54a25f..992378f 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1271,7 +1271,10 @@ public class CompactionManager implements CompactionManagerMBean
                 if (t instanceof CompactionInterruptedException)
                 {
                     logger.info(t.getMessage());
-                    logger.debug("Full interruption stack trace:", t);
+                    if (t.getSuppressed() != null && t.getSuppressed().length > 0)
+                        logger.warn("Interruption of compaction encountered exceptions:", t);
+                    else
+                        logger.debug("Full interruption stack trace:", t);
                 }
                 else
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c3fefa0/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 13abc04..973b0c9 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -107,6 +107,7 @@ import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.concurrent.Ref;
 import org.apache.cassandra.utils.concurrent.RefCounted;
+import org.apache.cassandra.utils.concurrent.SelfRefCounted;
 
 import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR;
 
@@ -166,7 +167,7 @@ import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR
  *
  * TODO: fill in details about DataTracker and lifecycle interactions for tools, and for compaction strategies
  */
-public class SSTableReader extends SSTable implements RefCounted<SSTableReader>
+public class SSTableReader extends SSTable implements SelfRefCounted<SSTableReader>
 {
     private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c3fefa0/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 2ca3e6f..be1085b 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -21,6 +21,7 @@ import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Functions;
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -32,6 +33,8 @@ import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 import org.apache.cassandra.utils.CLibrary;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static org.apache.cassandra.utils.Throwables.merge;
+
 /**
  * Wraps one or more writers as output for rewriting one or more readers: every sstable_preemptive_open_interval_in_mb
  * we look in the summary we're collecting for the latest writer for the penultimate key that we know to have been fully
@@ -84,6 +87,12 @@ public class SSTableRewriter
 
     private SSTableWriter writer;
     private Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>();
+    private State state = State.WORKING;
+
+    private static enum State
+    {
+        WORKING, FINISHED, ABORTED
+    }
 
     public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, boolean isOffline)
     {
@@ -176,30 +185,79 @@ public class SSTableRewriter
 
     public void abort()
     {
-        switchWriter(null, true);
-        moveStarts(null, null, true);
+        switch (state)
+        {
+            case ABORTED:
+                return;
+            case FINISHED:
+                throw new IllegalStateException("Cannot abort - changes have already been committed");
+        }
+        state = State.ABORTED;
+
+        Throwable fail = null;
+        try
+        {
+            moveStarts(null, null, true);
+        }
+        catch (Throwable t)
+        {
+            fail = merge(fail, t);
+        }
 
         // remove already completed SSTables
         for (SSTableReader sstable : finished)
         {
-            sstable.markObsolete();
-            sstable.selfRef().release();
+            try
+            {
+                sstable.markObsolete();
+                sstable.selfRef().release();
+            }
+            catch (Throwable t)
+            {
+                fail = merge(fail, t);
+            }
         }
 
+        if (writer != null)
+            finishedEarly.add(new Finished(writer, currentlyOpenedEarly));
+
         // abort the writers
         for (Finished finished : finishedEarly)
         {
-            boolean opened = finished.reader != null;
-            finished.writer.abort();
-            if (opened)
+            try
+            {
+                finished.writer.abort();
+            }
+            catch (Throwable t)
             {
-                // if we've already been opened, add ourselves to the discard pile
-                discard.add(finished.reader);
-                finished.reader.markObsolete();
+                fail = merge(fail, t);
+            }
+            try
+            {
+                if (finished.reader != null)
+                {
+                    // if we've already been opened, add ourselves to the discard pile
+                    discard.add(finished.reader);
+                    finished.reader.markObsolete();
+                }
+            }
+            catch (Throwable t)
+            {
+                fail = merge(fail, t);
             }
         }
 
-        replaceWithFinishedReaders(Collections.<SSTableReader>emptyList());
+        try
+        {
+            replaceWithFinishedReaders(Collections.<SSTableReader>emptyList());
+        }
+        catch (Throwable t)
+        {
+            fail = merge(fail, t);
+        }
+
+        if (fail != null)
+            throw Throwables.propagate(fail);
     }
 
     /**
@@ -301,11 +359,6 @@ public class SSTableRewriter
 
     public void switchWriter(SSTableWriter newWriter)
     {
-        switchWriter(newWriter, false);
-    }
-
-    private void switchWriter(SSTableWriter newWriter, boolean abort)
-    {
         if (writer == null)
         {
             writer = newWriter;
@@ -313,7 +366,7 @@ public class SSTableRewriter
         }
 
         // we leave it as a tmp file, but we open it and add it to the dataTracker
-        if (writer.getFilePointer() != 0 && !abort)
+        if (writer.getFilePointer() != 0)
         {
             SSTableReader reader = writer.finish(SSTableWriter.FinishType.EARLY, maxAge, -1);
             replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
@@ -362,8 +415,14 @@ public class SSTableRewriter
 
     private List<SSTableReader> finishAndMaybeThrow(long repairedAt, boolean throwEarly, boolean throwLate)
     {
+        switch (state)
+        {
+            case FINISHED: case ABORTED:
+                throw new IllegalStateException("Cannot finish - changes have already been " + state.toString().toLowerCase());
+        }
+
         List<SSTableReader> newReaders = new ArrayList<>();
-        switchWriter(null, false);
+        switchWriter(null);
 
         if (throwEarly)
             throw new RuntimeException("exception thrown early in finish, for testing");
@@ -396,6 +455,7 @@ public class SSTableRewriter
             throw new RuntimeException("exception thrown after all sstables finished, for testing");
 
         replaceWithFinishedReaders(newReaders);
+        state = State.FINISHED;
         return finished;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c3fefa0/src/java/org/apache/cassandra/utils/Throwables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Throwables.java b/src/java/org/apache/cassandra/utils/Throwables.java
new file mode 100644
index 0000000..552ca87
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/Throwables.java
@@ -0,0 +1,32 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.utils;
+
+public class Throwables
+{
+
+    public static Throwable merge(Throwable existingFail, Throwable newFail)
+    {
+        if (existingFail == null)
+            return newFail;
+        existingFail.addSuppressed(newFail);
+        return existingFail;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c3fefa0/src/java/org/apache/cassandra/utils/concurrent/Refs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Refs.java b/src/java/org/apache/cassandra/utils/concurrent/Refs.java
index 3a930d2..b24fc2f 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Refs.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Refs.java
@@ -2,9 +2,15 @@ package org.apache.cassandra.utils.concurrent;
 
 import java.util.*;
 
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
 import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 
+import static org.apache.cassandra.utils.Throwables.merge;
+
 /**
  * A collection of managed Ref references to RefCounted objects, and the objects they are referencing.
  * Care MUST be taken when using this collection, as if a permanent reference to it leaks we will not
@@ -196,7 +202,7 @@ public final class Refs<T extends RefCounted<T>> extends AbstractCollection<T> i
         throw new IllegalStateException();
     }
 
-    private static void release(Iterable<? extends Ref<?>> refs)
+    public static void release(Iterable<? extends Ref<?>> refs)
     {
         Throwable fail = null;
         for (Ref ref : refs)
@@ -207,13 +213,22 @@ public final class Refs<T extends RefCounted<T>> extends AbstractCollection<T> i
             }
             catch (Throwable t)
             {
-                if (fail == null)
-                    fail = t;
-                else
-                    fail.addSuppressed(t);
+                fail = merge(fail, t);
             }
         }
         if (fail != null)
             throw Throwables.propagate(fail);
     }
+
+    public static <T extends SelfRefCounted<T>> Iterable<Ref<T>> selfRefs(Iterable<T> refs)
+    {
+        return Iterables.transform(refs, new Function<T, Ref<T>>()
+        {
+            @Nullable
+            public Ref<T> apply(T t)
+            {
+                return t.selfRef();
+            }
+        });
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c3fefa0/src/java/org/apache/cassandra/utils/concurrent/SelfRefCounted.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/SelfRefCounted.java b/src/java/org/apache/cassandra/utils/concurrent/SelfRefCounted.java
new file mode 100644
index 0000000..cb45757
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/concurrent/SelfRefCounted.java
@@ -0,0 +1,24 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.utils.concurrent;
+
+public interface SelfRefCounted<T extends SelfRefCounted<T>> extends RefCounted<T>
+{
+    public Ref<T> selfRef();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c3fefa0/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 4957e5a..258b6b5 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -21,6 +21,9 @@ import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import javax.annotation.Nullable;
+
+import com.google.common.base.Function;
 import com.google.common.collect.Sets;
 import org.junit.Test;
 
@@ -301,48 +304,83 @@ public class SSTableRewriterTest extends SchemaLoader
     @Test
     public void testNumberOfFiles_abort() throws Exception
     {
-        Keyspace keyspace = Keyspace.open(KEYSPACE);
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
+        testNumberOfFiles_abort(new RewriterTest()
+        {
+            public void run(ISSTableScanner scanner, CompactionController controller, SSTableReader sstable, ColumnFamilyStore cfs, SSTableRewriter rewriter)
+            {
+                int files = 1;
+                while(scanner.hasNext())
+                {
+                    rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+                    if (rewriter.currentWriter().getFilePointer() > 25000000)
+                    {
+                        rewriter.switchWriter(getWriter(cfs, sstable.descriptor.directory));
+                        files++;
+                        assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+                    }
+                }
+                rewriter.abort();
+            }
+        });
+    }
 
-        SSTableReader s = writeFile(cfs, 1000);
-        cfs.addSSTable(s);
-        long startSize = cfs.metric.liveDiskSpaceUsed.count();
-        DecoratedKey origFirst = s.first;
-        DecoratedKey origLast = s.last;
-        Set<SSTableReader> compacting = Sets.newHashSet(s);
-        SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
-        rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+    @Test
+    public void testNumberOfFiles_abort2() throws Exception
+    {
+        testNumberOfFiles_abort(new RewriterTest()
+        {
+            public void run(ISSTableScanner scanner, CompactionController controller, SSTableReader sstable, ColumnFamilyStore cfs, SSTableRewriter rewriter)
+            {
+                int files = 1;
+                while(scanner.hasNext())
+                {
+                    rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+                    if (rewriter.currentWriter().getFilePointer() > 25000000)
+                    {
+                        rewriter.switchWriter(getWriter(cfs, sstable.descriptor.directory));
+                        files++;
+                        assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+                    }
+                    if (files == 3)
+                    {
+                        //testing to abort when we have nothing written in the new file
+                        rewriter.abort();
+                        break;
+                    }
+                }
+            }
+        });
+    }
 
-        int files = 1;
-        try (ISSTableScanner scanner = s.getScanner();
-             CompactionController controller = new CompactionController(cfs, compacting, 0))
+    @Test
+    public void testNumberOfFiles_abort3() throws Exception
+    {
+        testNumberOfFiles_abort(new RewriterTest()
         {
-            while(scanner.hasNext())
+            public void run(ISSTableScanner scanner, CompactionController controller, SSTableReader sstable, ColumnFamilyStore cfs, SSTableRewriter rewriter)
             {
-                rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
-                if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+                int files = 1;
+                while(scanner.hasNext())
                 {
-                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
-                    files++;
-                    assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+                    rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
+                    if (files == 1 && rewriter.currentWriter().getFilePointer() > 10000000)
+                    {
+                        rewriter.switchWriter(getWriter(cfs, sstable.descriptor.directory));
+                        files++;
+                        assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
+                    }
                 }
+                rewriter.abort();
             }
-        }
-        rewriter.abort();
-        Thread.sleep(1000);
-        assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.count());
-        assertEquals(1, cfs.getSSTables().size());
-        assertFileCounts(s.descriptor.directory.list(), 0, 0);
-        assertEquals(cfs.getSSTables().iterator().next().first, origFirst);
-        assertEquals(cfs.getSSTables().iterator().next().last, origLast);
-        validateCFS(cfs);
+        });
+    }
 
+    private static interface RewriterTest
+    {
+        public void run(ISSTableScanner scanner, CompactionController controller, SSTableReader sstable, ColumnFamilyStore cfs, SSTableRewriter rewriter);
     }
 
-    @Test
-    public void testNumberOfFiles_abort2() throws Exception
+    private void testNumberOfFiles_abort(RewriterTest test) throws Exception
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -353,36 +391,22 @@ public class SSTableRewriterTest extends SchemaLoader
 
         DecoratedKey origFirst = s.first;
         DecoratedKey origLast = s.last;
+        long startSize = cfs.metric.liveDiskSpaceUsed.count();
         Set<SSTableReader> compacting = Sets.newHashSet(s);
         SSTableRewriter.overrideOpenInterval(10000000);
         SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
         rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
 
-        int files = 1;
         try (ISSTableScanner scanner = s.getScanner();
              CompactionController controller = new CompactionController(cfs, compacting, 0))
         {
-            while(scanner.hasNext())
-            {
-                rewriter.append(new LazilyCompactedRow(controller, Arrays.asList(scanner.next())));
-                if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
-                {
-                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
-                    files++;
-                    assertEquals(cfs.getSSTables().size(), files); // we have one original file plus the ones we have switched out.
-                }
-                if (files == 3)
-                {
-                    //testing to abort when we have nothing written in the new file
-                    rewriter.abort();
-                    break;
-                }
-            }
+            test.run(scanner, controller, s, cfs, rewriter);
         }
+
         Thread.sleep(1000);
+        assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.count());
         assertEquals(1, cfs.getSSTables().size());
         assertFileCounts(s.descriptor.directory.list(), 0, 0);
-
         assertEquals(cfs.getSSTables().iterator().next().first, origFirst);
         assertEquals(cfs.getSSTables().iterator().next().last, origLast);
         validateCFS(cfs);