You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/11/05 09:50:48 UTC

[1/3] git commit: Improve compaction of repaired/unrepaired sstables

Repository: cassandra
Updated Branches:
  refs/heads/trunk e369ff69b -> af44d1a7c


Improve compaction of repaired/unrepaired sstables

Patch by marcuse; reviewed by Aleksey Yeschenko and Sankalp Kohli for CASSANDRA-8004


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e16f584e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e16f584e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e16f584e

Branch: refs/heads/trunk
Commit: e16f584e61dccd243656269a4f305c7a50b7e433
Parents: 0a0ba84
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Oct 15 10:46:01 2014 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Nov 5 09:35:24 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/config/CFMetaData.java |   3 +
 .../cassandra/db/CollationController.java       |   2 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |  57 +---
 .../compaction/AbstractCompactionStrategy.java  |  14 +
 .../db/compaction/CompactionManager.java        |   5 +-
 .../DateTieredCompactionStrategy.java           |  46 +--
 .../compaction/LeveledCompactionStrategy.java   |  60 +---
 .../db/compaction/LeveledManifest.java          | 110 ++----
 .../SizeTieredCompactionStrategy.java           |  63 ++--
 .../compaction/WrappingCompactionStrategy.java  | 331 +++++++++++++++++++
 .../cassandra/io/sstable/SSTableReader.java     |   3 +-
 .../db/compaction/CompactionsTest.java          |   4 +-
 .../LeveledCompactionStrategyTest.java          |  88 +++--
 14 files changed, 487 insertions(+), 300 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16f584e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 42cef8c..80f4c8f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.2
+ * Improve compaction of repaired/unrepaired sstables (CASSANDRA-8004)
  * Make cache serializers pluggable (CASSANDRA-8096)
  * Fix issues with CONTAINS (KEY) queries on secondary indexes
    (CASSANDRA-8147)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16f584e/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index d986c40..7f0e7cb 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -73,6 +73,7 @@ import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
 import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
 import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
+import org.apache.cassandra.db.compaction.WrappingCompactionStrategy;
 import org.apache.cassandra.db.composites.CType;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.CellNameType;
@@ -1296,6 +1297,8 @@ public final class CFMetaData
     {
         className = className.contains(".") ? className : "org.apache.cassandra.db.compaction." + className;
         Class<AbstractCompactionStrategy> strategyClass = FBUtilities.classForName(className, "compaction strategy");
+        if (strategyClass.equals(WrappingCompactionStrategy.class))
+            throw new ConfigurationException("You can't set WrappingCompactionStrategy as the compaction strategy!");
         if (!AbstractCompactionStrategy.class.isAssignableFrom(strategyClass))
             throw new ConfigurationException(String.format("Specified compaction strategy class (%s) is not derived from AbstractReplicationStrategy", className));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16f584e/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java
index f9d5daa..1b22e70 100644
--- a/src/java/org/apache/cassandra/db/CollationController.java
+++ b/src/java/org/apache/cassandra/db/CollationController.java
@@ -150,7 +150,7 @@ public class CollationController
             // "hoist up" the requested data into a more recent sstable
             if (sstablesIterated > cfs.getMinimumCompactionThreshold()
                 && !cfs.isAutoCompactionDisabled()
-                && cfs.getCompactionStrategy() instanceof SizeTieredCompactionStrategy)
+                && cfs.getCompactionStrategy().shouldDefragment())
             {
                 Tracing.trace("Defragmenting requested data");
                 Mutation mutation = new Mutation(cfs.keyspace.getName(), filter.key.getKey(), returnCF.cloneMe());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16f584e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 0541608..0fa50bb 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -127,7 +127,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     /* These are locally held copies to be changed from the config during runtime */
     private volatile DefaultInteger minCompactionThreshold;
     private volatile DefaultInteger maxCompactionThreshold;
-    private volatile AbstractCompactionStrategy compactionStrategy;
+    private final WrappingCompactionStrategy compactionStrategyWrapper;
 
     public final Directories directories;
 
@@ -146,7 +146,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             for (ColumnFamilyStore cfs : concatWithIndexes())
                 cfs.maxCompactionThreshold = new DefaultInteger(metadata.getMaxCompactionThreshold());
 
-        maybeReloadCompactionStrategy();
+        compactionStrategyWrapper.maybeReloadCompactionStrategy(metadata);
 
         scheduleFlush();
 
@@ -158,22 +158,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             switchMemtable();
     }
 
-    private void maybeReloadCompactionStrategy()
-    {
-        // Check if there is a need for reloading
-        if (metadata.compactionStrategyClass.equals(compactionStrategy.getClass()) && metadata.compactionStrategyOptions.equals(compactionStrategy.options))
-            return;
-
-        // synchronize vs runWithCompactionsDisabled calling pause/resume.  otherwise, letting old compactions
-        // finish should be harmless and possibly useful.
-        synchronized (this)
-        {
-            compactionStrategy.shutdown();
-            compactionStrategy = metadata.createCompactionStrategyInstance(this);
-            compactionStrategy.startup();
-        }
-    }
-
     void scheduleFlush()
     {
         int period = metadata.getMemtableFlushPeriod();
@@ -213,7 +197,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         try
         {
             metadata.compactionStrategyClass = CFMetaData.createCompactionStrategy(compactionStrategyClass);
-            maybeReloadCompactionStrategy();
+            compactionStrategyWrapper.maybeReloadCompactionStrategy(metadata);
         }
         catch (ConfigurationException e)
         {
@@ -297,13 +281,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             CacheService.instance.keyCache.loadSaved(this);
 
         // compaction strategy should be created after the CFS has been prepared
-        this.compactionStrategy = metadata.createCompactionStrategyInstance(this);
-        this.compactionStrategy.startup();
+        this.compactionStrategyWrapper = new WrappingCompactionStrategy(this);
 
         if (maxCompactionThreshold.value() <= 0 || minCompactionThreshold.value() <=0)
         {
             logger.warn("Disabling compaction strategy by setting compaction thresholds to 0 is deprecated, set the compaction option 'enabled' to 'false' instead.");
-            this.compactionStrategy.disable();
+            this.compactionStrategyWrapper.disable();
         }
 
         // create the private ColumnFamilyStores for the secondary column indexes
@@ -367,7 +350,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             logger.warn("Failed unregistering mbean: {}", mbeanName, e);
         }
 
-        compactionStrategy.shutdown();
+        compactionStrategyWrapper.shutdown();
 
         SystemKeyspace.removeTruncationRecord(metadata.cfId);
         data.unreferenceSSTables();
@@ -1370,7 +1353,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     void replaceFlushed(Memtable memtable, SSTableReader sstable)
     {
-        compactionStrategy.replaceFlushed(memtable, sstable);
+        compactionStrategyWrapper.replaceFlushed(memtable, sstable);
     }
 
     public boolean isValid()
@@ -1810,7 +1793,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         {
             public List<SSTableReader> apply(DataTracker.View view)
             {
-                return compactionStrategy.filterSSTablesForReads(view.intervalTree.search(key));
+                return compactionStrategyWrapper.filterSSTablesForReads(view.intervalTree.search(key));
             }
         };
     }
@@ -1825,7 +1808,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         {
             public List<SSTableReader> apply(DataTracker.View view)
             {
-                return compactionStrategy.filterSSTablesForReads(view.sstablesInBounds(rowBounds));
+                return compactionStrategyWrapper.filterSSTablesForReads(view.sstablesInBounds(rowBounds));
             }
         };
     }
@@ -2587,7 +2570,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         // we don't use CompactionStrategy.pause since we don't want users flipping that on and off
         // during runWithCompactionsDisabled
-        this.compactionStrategy.disable();
+        this.compactionStrategyWrapper.disable();
     }
 
     public void enableAutoCompaction()
@@ -2602,7 +2585,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     @VisibleForTesting
     public void enableAutoCompaction(boolean waitForFutures)
     {
-        this.compactionStrategy.enable();
+        this.compactionStrategyWrapper.enable();
         List<Future<?>> futures = CompactionManager.instance.submitBackground(this);
         if (waitForFutures)
             FBUtilities.waitOnFutures(futures);
@@ -2610,7 +2593,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public boolean isAutoCompactionDisabled()
     {
-        return !this.compactionStrategy.isEnabled();
+        return !this.compactionStrategyWrapper.isEnabled();
     }
 
     /*
@@ -2624,8 +2607,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public AbstractCompactionStrategy getCompactionStrategy()
     {
-        assert compactionStrategy != null : "No compaction strategy set yet";
-        return compactionStrategy;
+        return compactionStrategyWrapper;
     }
 
     public void setCompactionThresholds(int minThreshold, int maxThreshold)
@@ -2634,10 +2616,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
         minCompactionThreshold.set(minThreshold);
         maxCompactionThreshold.set(maxThreshold);
-
-        // this is called as part of CompactionStrategy constructor; avoid circular dependency by checking for null
-        if (compactionStrategy != null)
-            CompactionManager.instance.submitBackground(this);
+        CompactionManager.instance.submitBackground(this);
     }
 
     public int getMinimumCompactionThreshold()
@@ -2725,16 +2704,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public int getUnleveledSSTables()
     {
-        return this.compactionStrategy instanceof LeveledCompactionStrategy
-               ? ((LeveledCompactionStrategy) this.compactionStrategy).getLevelSize(0)
-               : 0;
+        return this.compactionStrategyWrapper.getUnleveledSSTables();
     }
 
     public int[] getSSTableCountPerLevel()
     {
-        return compactionStrategy instanceof LeveledCompactionStrategy
-               ? ((LeveledCompactionStrategy) compactionStrategy).getAllLevelSize()
-               : null;
+        return compactionStrategyWrapper.getSSTableCountPerLevel();
     }
 
     public static class ViewFragment

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16f584e/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 288c475..bf136b9 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -290,6 +290,20 @@ public abstract class AbstractCompactionStrategy
         return new ScannerList(scanners);
     }
 
+    public boolean shouldDefragment()
+    {
+        return false;
+    }
+
+    public String getName()
+    {
+        return getClass().getSimpleName();
+    }
+
+    public abstract void addSSTable(SSTableReader added);
+
+    public abstract void removeSSTable(SSTableReader sstable);
+
     public static class ScannerList implements AutoCloseable
     {
         public final List<ICompactionScanner> scanners;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16f584e/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 84c3cb5..272b533 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -179,7 +179,7 @@ public class CompactionManager implements CompactionManagerMBean
         logger.debug("Scheduling a background task check for {}.{} with {}",
                      cfs.keyspace.getName(),
                      cfs.name,
-                     cfs.getCompactionStrategy().getClass().getSimpleName());
+                     cfs.getCompactionStrategy().getName());
         List<Future<?>> futures = new ArrayList<Future<?>>();
 
         // we must schedule it at least once, otherwise compaction will stop for a CF until next flush
@@ -998,8 +998,7 @@ public class CompactionManager implements CompactionManagerMBean
             SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false);
             SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, false);
 
-            AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
-            try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(new HashSet<>(Collections.singleton(sstable)));
+            try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(new HashSet<>(Collections.singleton(sstable)));
                  CompactionController controller = new CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
             {
                 repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16f584e/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index 8c997ed..7c1b514 100644
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@ -35,8 +35,9 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
 {
     private static final Logger logger = LoggerFactory.getLogger(DateTieredCompactionStrategy.class);
 
-    protected DateTieredCompactionStrategyOptions options;
+    private final DateTieredCompactionStrategyOptions options;
     protected volatile int estimatedRemainingTasks;
+    private final Set<SSTableReader> sstables = new HashSet<>();
 
     public DateTieredCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
     {
@@ -75,36 +76,12 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
 
         int base = cfs.getMinimumCompactionThreshold();
         long now = getNow();
-        Iterable<SSTableReader> candidates = filterSuspectSSTables(cfs.getUncompactingSSTables());
+        Iterable<SSTableReader> candidates = filterSuspectSSTables(Sets.intersection(cfs.getUncompactingSSTables(), sstables));
 
-        Set<SSTableReader> repairedCandidates = new HashSet<>();
-        Set<SSTableReader> unRepairedCandidates = new HashSet<>();
-        for (SSTableReader sstable : candidates)
-        {
-            if (sstable.isRepaired())
-            {
-                repairedCandidates.add(sstable);
-            }
-            else
-            {
-                unRepairedCandidates.add(sstable);
-            }
-        }
-
-
-        List<SSTableReader> mostInterestingRepaired = getCompactionCandidates(repairedCandidates, now, base);
-        List<SSTableReader> mostInterestingUnrepaired = getCompactionCandidates(unRepairedCandidates, now, base);
-        if (mostInterestingRepaired != null && mostInterestingUnrepaired != null)
-        {
-            return mostInterestingRepaired.size() > mostInterestingUnrepaired.size() ? mostInterestingRepaired : mostInterestingUnrepaired;
-        }
-        else if (mostInterestingRepaired != null)
-        {
-            return mostInterestingRepaired;
-        }
-        else if (mostInterestingUnrepaired != null)
+        List<SSTableReader> mostInteresting = getCompactionCandidates(candidates, now, base);
+        if (mostInteresting != null)
         {
-            return mostInterestingUnrepaired;
+            return mostInteresting;
         }
 
         // if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone
@@ -185,8 +162,17 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy
             sstableMinTimestampPairs.add(Pair.create(sstable, sstable.getMinTimestamp()));
         return sstableMinTimestampPairs;
     }
+    @Override
+    public void addSSTable(SSTableReader sstable)
+    {
+        sstables.add(sstable);
+    }
 
-
+    @Override
+    public void removeSSTable(SSTableReader sstable)
+    {
+        sstables.remove(sstable);
+    }
     /**
      * A target time span used for bucketing SSTables based on timestamps.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16f584e/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 7f2d881..a560234 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -33,13 +33,8 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.notifications.INotification;
-import org.apache.cassandra.notifications.INotificationConsumer;
-import org.apache.cassandra.notifications.SSTableAddedNotification;
-import org.apache.cassandra.notifications.SSTableListChangedNotification;
-import org.apache.cassandra.notifications.SSTableRepairStatusChanged;
 
-public class LeveledCompactionStrategy extends AbstractCompactionStrategy implements INotificationConsumer
+public class LeveledCompactionStrategy extends AbstractCompactionStrategy
 {
     private static final Logger logger = LoggerFactory.getLogger(LeveledCompactionStrategy.class);
     private static final String SSTABLE_SIZE_OPTION = "sstable_size_in_mb";
@@ -71,26 +66,10 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
         }
         maxSSTableSizeInMB = configuredMaxSSTableSize;
 
-        manifest = LeveledManifest.create(cfs, this.maxSSTableSizeInMB, cfs.getSSTables(), localOptions);
+        manifest = new LeveledManifest(cfs, this.maxSSTableSizeInMB, localOptions);
         logger.debug("Created {}", manifest);
     }
 
-    @Override
-    public void startup()
-    {
-        super.startup();
-        cfs.getDataTracker().subscribe(this);
-        logger.debug("{} subscribed to the data tracker.", this);
-    }
-
-    @Override
-    public void shutdown()
-    {
-        super.shutdown();
-        cfs.getDataTracker().unsubscribe(this);
-        logger.debug("{} unsubscribed from the data tracker.", this);
-    }
-
     public int getLevelSize(int i)
     {
         return manifest.getLevelSize(i);
@@ -175,24 +154,6 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
         return manifest.getEstimatedTasks();
     }
 
-    public void handleNotification(INotification notification, Object sender)
-    {
-        if (notification instanceof SSTableAddedNotification)
-        {
-            SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification;
-            manifest.add(flushedNotification.added);
-        }
-        else if (notification instanceof SSTableListChangedNotification)
-        {
-            SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification;
-            manifest.replace(listChangedNotification.removed, listChangedNotification.added);
-        }
-        else if (notification instanceof SSTableRepairStatusChanged)
-        {
-            manifest.repairStatusChanged(((SSTableRepairStatusChanged) notification).sstable);
-        }
-    }
-
     public long getMaxSSTableBytes()
     {
         return maxSSTableSizeInMB * 1024L * 1024L;
@@ -203,10 +164,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
         Multimap<Integer, SSTableReader> byLevel = ArrayListMultimap.create();
         for (SSTableReader sstable : sstables)
         {
-            if (manifest.hasRepairedData() && !sstable.isRepaired())
-                byLevel.get(0).add(sstable);
-            else
-                byLevel.get(sstable.getSSTableLevel()).add(sstable);
+            byLevel.get(sstable.getSSTableLevel()).add(sstable);
         }
 
         List<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>(sstables.size());
@@ -247,6 +205,18 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
         return new ScannerList(scanners);
     }
 
+    @Override
+    public void addSSTable(SSTableReader added)
+    {
+        manifest.add(added);
+    }
+
+    @Override
+    public void removeSSTable(SSTableReader sstable)
+    {
+        manifest.remove(sstable);
+    }
+
     // Lazily creates SSTableBoundedScanner for sstable that are assumed to be from the
     // same level (e.g. non overlapping) - see #4142
     private static class LeveledScanner extends AbstractIterator<OnDiskAtomIterator> implements ICompactionScanner

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16f584e/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index a4d2115..4b26d23 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -62,18 +62,14 @@ public class LeveledManifest
     private final ColumnFamilyStore cfs;
     @VisibleForTesting
     protected final List<SSTableReader>[] generations;
-    @VisibleForTesting
-    protected final List<SSTableReader> unrepairedL0;
     private final RowPosition[] lastCompactedKeys;
     private final int maxSSTableSizeInBytes;
     private final SizeTieredCompactionStrategyOptions options;
-    private boolean hasRepairedData = false;
     private final int [] compactionCounter;
 
-    private LeveledManifest(ColumnFamilyStore cfs, int maxSSTableSizeInMB, SizeTieredCompactionStrategyOptions options)
+    LeveledManifest(ColumnFamilyStore cfs, int maxSSTableSizeInMB, SizeTieredCompactionStrategyOptions options)
     {
         this.cfs = cfs;
-        this.hasRepairedData = cfs.getRepairedSSTables().size() > 0;
         this.maxSSTableSizeInBytes = maxSSTableSizeInMB * 1024 * 1024;
         this.options = options;
 
@@ -88,7 +84,6 @@ public class LeveledManifest
             generations[i] = new ArrayList<>();
             lastCompactedKeys[i] = cfs.partitioner.getMinimumToken().minKeyBound();
         }
-        unrepairedL0 = new ArrayList<>();
         compactionCounter = new int[n];
     }
 
@@ -115,73 +110,39 @@ public class LeveledManifest
 
     public synchronized void add(SSTableReader reader)
     {
-        if (!hasRepairedData && reader.isRepaired())
-        {
-            // this is the first repaired sstable we get - we need to
-            // rebuild the entire manifest, unrepaired data should be
-            // in unrepairedL0. Note that we keep the sstable level in
-            // the sstable metadata since we are likely to be able to
-            // re-add it at a good level later (during anticompaction
-            // for example).
-            hasRepairedData = true;
-            rebuildManifestAfterFirstRepair();
-        }
-
         int level = reader.getSSTableLevel();
-        if (hasRepairedData && !reader.isRepaired())
+
+        assert level < generations.length : "Invalid level " + level + " out of " + (generations.length - 1);
+        logDistribution();
+        if (canAddSSTable(reader))
         {
-            logger.debug("Adding unrepaired {} to unrepaired L0", reader);
-            unrepairedL0.add(reader);
+            // adding the sstable does not cause overlap in the level
+            logger.debug("Adding {} to L{}", reader, level);
+            generations[level].add(reader);
         }
         else
         {
-            assert level < generations.length : "Invalid level " + level + " out of " + (generations.length - 1);
-            logDistribution();
-            if (canAddSSTable(reader))
+            // this can happen if:
+            // * a compaction has promoted an overlapping sstable to the given level, or
+            //   was also supposed to add an sstable at the given level.
+            // * we are moving sstables from unrepaired to repaired and the sstable
+            //   would cause overlap
+            //
+            // The add(..):ed sstable will be sent to level 0
+            try
             {
-                // adding the sstable does not cause overlap in the level
-                logger.debug("Adding {} to L{}", reader, level);
-                generations[level].add(reader);
+                reader.descriptor.getMetadataSerializer().mutateLevel(reader.descriptor, 0);
+                reader.reloadSSTableMetadata();
             }
-            else
+            catch (IOException e)
             {
-                // this can happen if:
-                // * a compaction has promoted an overlapping sstable to the given level, or
-                // * we promote a non-repaired sstable to repaired at level > 0, but an ongoing compaction
-                //   was also supposed to add an sstable at the given level.
-                //
-                // The add(..):ed sstable will be sent to level 0
-                try
-                {
-                    reader.descriptor.getMetadataSerializer().mutateLevel(reader.descriptor, 0);
-                    reader.reloadSSTableMetadata();
-                }
-                catch (IOException e)
-                {
-                    logger.error("Could not change sstable level - adding it at level 0 anyway, we will find it at restart.", e);
-                }
-                generations[0].add(reader);
+                logger.error("Could not change sstable level - adding it at level 0 anyway, we will find it at restart.", e);
             }
+            generations[0].add(reader);
         }
-
     }
 
 
-    /**
-     * Since we run standard LCS when we have no repaired data
-     * we need to move all sstables from the leveling
-     * to unrepairedL0.
-     */
-    private void rebuildManifestAfterFirstRepair()
-    {
-        for (int i = 0; i < getAllLevelSize().length; i++)
-        {
-            List<SSTableReader> oldLevel = generations[i];
-            generations[i] = new ArrayList<>();
-            for (SSTableReader sstable : oldLevel)
-                add(sstable);
-        }
-    }
 
     public synchronized void replace(Collection<SSTableReader> removed, Collection<SSTableReader> added)
     {
@@ -216,7 +177,7 @@ public class LeveledManifest
     {
         SSTableReader previous = null;
         Collections.sort(generations[level], SSTableReader.sstableComparator);
-        List<SSTableReader> outOfOrderSSTables = new ArrayList<SSTableReader>();
+        List<SSTableReader> outOfOrderSSTables = new ArrayList<>();
         for (SSTableReader current : generations[level])
         {
             if (previous != null && current.first.compareTo(previous.last) <= 0)
@@ -279,15 +240,6 @@ public class LeveledManifest
         }
     }
 
-    public synchronized void repairStatusChanged(Collection<SSTableReader> sstables)
-    {
-        for(SSTableReader sstable : sstables)
-        {
-            remove(sstable);
-            add(sstable);
-        }
-    }
-
     private String toString(Collection<SSTableReader> sstables)
     {
         StringBuilder builder = new StringBuilder();
@@ -320,18 +272,6 @@ public class LeveledManifest
      */
     public synchronized CompactionCandidate getCompactionCandidates()
     {
-        // if we don't have any repaired data, continue as usual
-        if (hasRepairedData)
-        {
-            Collection<SSTableReader> unrepairedMostInterresting = getSSTablesForSTCS(unrepairedL0);
-            if (!unrepairedMostInterresting.isEmpty())
-            {
-                logger.info("Unrepaired data is most interresting, compacting {} sstables with STCS", unrepairedMostInterresting.size());
-                for (SSTableReader reader : unrepairedMostInterresting)
-                    assert !reader.isRepaired();
-                return new CompactionCandidate(unrepairedMostInterresting, 0, Long.MAX_VALUE);
-            }
-        }
         // LevelDB gives each level a score of how much data it contains vs its ideal amount, and
         // compacts the level with the highest score. But this falls apart spectacularly once you
         // get behind.  Consider this set of levels:
@@ -519,7 +459,6 @@ public class LeveledManifest
         int level = reader.getSSTableLevel();
         assert level >= 0 : reader + " not present in manifest: "+level;
         generations[level].remove(reader);
-        unrepairedL0.remove(reader);
         return level;
     }
 
@@ -759,11 +698,6 @@ public class LeveledManifest
 
     }
 
-    public boolean hasRepairedData()
-    {
-        return hasRepairedData;
-    }
-
     public static class CompactionCandidate
     {
         public final Collection<SSTableReader> sstables;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16f584e/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 461c5e1..b72737a 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -31,7 +31,6 @@ import org.apache.cassandra.cql3.statements.CFPropDefs;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.Pair;
 
 public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
@@ -62,6 +61,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
 
     protected SizeTieredCompactionStrategyOptions options;
     protected volatile int estimatedRemainingTasks;
+    private final Set<SSTableReader> sstables = new HashSet<>();
 
     public SizeTieredCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
     {
@@ -79,17 +79,8 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
         int minThreshold = cfs.getMinimumCompactionThreshold();
         int maxThreshold = cfs.getMaximumCompactionThreshold();
 
-        Iterable<SSTableReader> candidates = filterSuspectSSTables(cfs.getUncompactingSSTables());
+        Iterable<SSTableReader> candidates = filterSuspectSSTables(Sets.intersection(cfs.getUncompactingSSTables(), sstables));
         candidates = filterColdSSTables(Lists.newArrayList(candidates), options.coldReadsToOmit);
-        Pair<Set<SSTableReader>,Set<SSTableReader>> repairedUnrepaired = splitInRepairedAndUnrepaired(candidates);
-        if (repairedUnrepaired.left.size() > repairedUnrepaired.right.size())
-        {
-            candidates = repairedUnrepaired.left;
-        }
-        else
-        {
-            candidates = repairedUnrepaired.right;
-        }
 
         List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(candidates), options.bucketHigh, options.bucketLow, options.minSSTableSize);
         logger.debug("Compaction buckets are {}", buckets);
@@ -113,20 +104,6 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
         return Collections.singletonList(sstablesWithTombstones.get(0));
     }
 
-    private static Pair<Set<SSTableReader>, Set<SSTableReader>> splitInRepairedAndUnrepaired(Iterable<SSTableReader> candidates)
-    {
-        Set<SSTableReader> repaired = new HashSet<>();
-        Set<SSTableReader> unRepaired = new HashSet<>();
-        for(SSTableReader candidate : candidates)
-        {
-            if (!candidate.isRepaired())
-                unRepaired.add(candidate);
-            else
-                repaired.add(candidate);
-        }
-        return Pair.create(repaired, unRepaired);
-    }
-
     /**
      * Removes as many cold sstables as possible while retaining at least 1-coldReadsToOmit of the total reads/sec
      * across all sstables
@@ -276,20 +253,12 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
 
     public Collection<AbstractCompactionTask> getMaximalTask(final int gcBefore)
     {
-        Iterable<SSTableReader> allSSTables = cfs.markAllCompacting();
-        if (allSSTables == null || Iterables.isEmpty(allSSTables))
+        Iterable<SSTableReader> filteredSSTables = filterSuspectSSTables(sstables);
+        if (Iterables.isEmpty(sstables))
             return null;
-        Set<SSTableReader> sstables = Sets.newHashSet(allSSTables);
-        Set<SSTableReader> repaired = new HashSet<>();
-        Set<SSTableReader> unrepaired = new HashSet<>();
-        for (SSTableReader sstable : sstables)
-        {
-            if (sstable.isRepaired())
-                repaired.add(sstable);
-            else
-                unrepaired.add(sstable);
-        }
-        return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, repaired, gcBefore, false), new CompactionTask(cfs, unrepaired, gcBefore, false));
+        if (!cfs.getDataTracker().markCompacting(filteredSSTables))
+            return null;
+        return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, filteredSSTables, gcBefore, false));
     }
 
     public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, final int gcBefore)
@@ -396,6 +365,24 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
         return uncheckedOptions;
     }
 
+    @Override
+    public boolean shouldDefragment()
+    {
+        return true;
+    }
+
+    @Override
+    public void addSSTable(SSTableReader added)
+    {
+        sstables.add(added);
+    }
+
+    @Override
+    public void removeSSTable(SSTableReader sstable)
+    {
+        sstables.remove(sstable);
+    }
+
     public String toString()
     {
         return String.format("SizeTieredCompactionStrategy[%s/%s]",

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16f584e/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
new file mode 100644
index 0000000..1d713ef
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.notifications.INotification;
+import org.apache.cassandra.notifications.INotificationConsumer;
+import org.apache.cassandra.notifications.SSTableAddedNotification;
+import org.apache.cassandra.notifications.SSTableDeletingNotification;
+import org.apache.cassandra.notifications.SSTableListChangedNotification;
+import org.apache.cassandra.notifications.SSTableRepairStatusChanged;
+
+public final class WrappingCompactionStrategy extends AbstractCompactionStrategy implements INotificationConsumer
+{
+    private static final Logger logger = LoggerFactory.getLogger(WrappingCompactionStrategy.class);
+    private volatile AbstractCompactionStrategy repaired;
+    private volatile AbstractCompactionStrategy unrepaired;
+    public WrappingCompactionStrategy(ColumnFamilyStore cfs)
+    {
+        super(cfs, cfs.metadata.compactionStrategyOptions);
+        reloadCompactionStrategy(cfs.metadata);
+        cfs.getDataTracker().subscribe(this);
+        logger.debug("{} subscribed to the data tracker.", this);
+    }
+
+    @Override
+    public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
+    {
+        if (!isEnabled())
+            return null;
+
+        if (repaired.getEstimatedRemainingTasks() > unrepaired.getEstimatedRemainingTasks())
+        {
+            AbstractCompactionTask repairedTask = repaired.getNextBackgroundTask(gcBefore);
+            if (repairedTask != null)
+                return repairedTask;
+            return unrepaired.getNextBackgroundTask(gcBefore);
+        }
+        else
+        {
+            AbstractCompactionTask unrepairedTask = unrepaired.getNextBackgroundTask(gcBefore);
+            if (unrepairedTask != null)
+                return unrepairedTask;
+            return repaired.getNextBackgroundTask(gcBefore);
+        }
+
+    }
+
+    @Override
+    public Collection<AbstractCompactionTask> getMaximalTask(final int gcBefore)
+    {
+        // runWithCompactionsDisabled cancels active compactions and disables them, then we are able
+        // to make the repaired/unrepaired strategies mark their own sstables as compacting. Once the
+        // sstables are marked the compactions are re-enabled
+        return cfs.runWithCompactionsDisabled(new Callable<Collection<AbstractCompactionTask>>()
+        {
+            @Override
+            public Collection<AbstractCompactionTask> call() throws Exception
+            {
+                synchronized (WrappingCompactionStrategy.this)
+                {
+                    Collection<AbstractCompactionTask> repairedTasks = repaired.getMaximalTask(gcBefore);
+                    Collection<AbstractCompactionTask> unrepairedTasks = unrepaired.getMaximalTask(gcBefore);
+
+                    if (repairedTasks == null && unrepairedTasks == null)
+                        return null;
+
+                    if (repairedTasks == null)
+                        return unrepairedTasks;
+                    if (unrepairedTasks == null)
+                        return repairedTasks;
+
+                    List<AbstractCompactionTask> tasks = new ArrayList<>();
+                    tasks.addAll(repairedTasks);
+                    tasks.addAll(unrepairedTasks);
+                    return tasks;
+                }
+            }
+        }, false);
+    }
+
+    @Override
+    public synchronized AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
+    {
+        assert !sstables.isEmpty();
+        boolean userDefinedInRepaired = sstables.iterator().next().isRepaired();
+        for (SSTableReader sstable : sstables)
+        {
+            if (userDefinedInRepaired != sstable.isRepaired())
+            {
+                logger.error("You can't mix repaired and unrepaired sstables in a user defined compaction");
+                return null;
+            }
+        }
+        if (userDefinedInRepaired)
+            return repaired.getUserDefinedTask(sstables, gcBefore);
+        else
+            return unrepaired.getUserDefinedTask(sstables, gcBefore);
+    }
+
+    @Override
+    public synchronized int getEstimatedRemainingTasks()
+    {
+        assert repaired.getClass().equals(unrepaired.getClass());
+        return repaired.getEstimatedRemainingTasks() + unrepaired.getEstimatedRemainingTasks();
+    }
+
+    @Override
+    public synchronized long getMaxSSTableBytes()
+    {
+        assert repaired.getClass().equals(unrepaired.getClass());
+        return unrepaired.getMaxSSTableBytes();
+    }
+
+    public synchronized void maybeReloadCompactionStrategy(CFMetaData metadata)
+    {
+        if (repaired != null && repaired.getClass().equals(metadata.compactionStrategyClass)
+            && unrepaired != null && unrepaired.getClass().equals(metadata.compactionStrategyClass)
+            && repaired.options.equals(metadata.compactionStrategyOptions)
+            && unrepaired.options.equals(metadata.compactionStrategyOptions))
+            return;
+
+        reloadCompactionStrategy(metadata);
+    }
+
+    public synchronized void reloadCompactionStrategy(CFMetaData metadata)
+    {
+        if (repaired != null)
+            repaired.shutdown();
+        if (unrepaired != null)
+            unrepaired.shutdown();
+        repaired = metadata.createCompactionStrategyInstance(cfs);
+        unrepaired = metadata.createCompactionStrategyInstance(cfs);
+        startup();
+    }
+
+    public synchronized int getUnleveledSSTables()
+    {
+        if (this.repaired instanceof LeveledCompactionStrategy && this.unrepaired instanceof LeveledCompactionStrategy)
+        {
+            return ((LeveledCompactionStrategy)repaired).getLevelSize(0) + ((LeveledCompactionStrategy)unrepaired).getLevelSize(0);
+        }
+        return 0;
+    }
+
+    public synchronized int[] getSSTableCountPerLevel()
+    {
+        if (this.repaired instanceof LeveledCompactionStrategy && this.unrepaired instanceof LeveledCompactionStrategy)
+        {
+            int [] repairedCountPerLevel = ((LeveledCompactionStrategy) repaired).getAllLevelSize();
+            int [] unrepairedCountPerLevel = ((LeveledCompactionStrategy) unrepaired).getAllLevelSize();
+            return sumArrays(repairedCountPerLevel, unrepairedCountPerLevel);
+        }
+        return null;
+    }
+
+    public static int [] sumArrays(int[] a, int [] b)
+    {
+        int [] res = new int[Math.max(a.length, b.length)];
+        for (int i = 0; i < res.length; i++)
+        {
+            if (i < a.length && i < b.length)
+                res[i] = a[i] + b[i];
+            else if (i < a.length)
+                res[i] = a[i];
+            else
+                res[i] = b[i];
+        }
+        return res;
+    }
+
+    @Override
+    public boolean shouldDefragment()
+    {
+        assert repaired.getClass().equals(unrepaired.getClass());
+        return repaired.shouldDefragment();
+    }
+
+    @Override
+    public String getName()
+    {
+        assert repaired.getClass().equals(unrepaired.getClass());
+        return repaired.getName();
+    }
+
+    @Override
+    public void addSSTable(SSTableReader added)
+    {
+        throw new UnsupportedOperationException("Can't add sstables to the wrapping compaction strategy");
+    }
+
+    @Override
+    public void removeSSTable(SSTableReader sstable)
+    {
+        throw new UnsupportedOperationException("Can't remove sstables from the wrapping compaction strategy");
+    }
+
+    public synchronized void handleNotification(INotification notification, Object sender)
+    {
+        if (notification instanceof SSTableAddedNotification)
+        {
+            SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification;
+            if (flushedNotification.added.isRepaired())
+                repaired.addSSTable(flushedNotification.added);
+            else
+                unrepaired.addSSTable(flushedNotification.added);
+        }
+        else if (notification instanceof SSTableListChangedNotification)
+        {
+            SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification;
+            for (SSTableReader sstable : listChangedNotification.removed)
+            {
+                if (sstable.isRepaired())
+                    repaired.removeSSTable(sstable);
+                else
+                    unrepaired.removeSSTable(sstable);
+            }
+            for (SSTableReader sstable : listChangedNotification.added)
+            {
+                if (sstable.isRepaired())
+                    repaired.addSSTable(sstable);
+                else
+                    unrepaired.addSSTable(sstable);
+            }
+        }
+        else if (notification instanceof SSTableRepairStatusChanged)
+        {
+            for (SSTableReader sstable : ((SSTableRepairStatusChanged) notification).sstable)
+            {
+                if (sstable.isRepaired())
+                {
+                    unrepaired.removeSSTable(sstable);
+                    repaired.addSSTable(sstable);
+                }
+                else
+                {
+                    repaired.removeSSTable(sstable);
+                    unrepaired.addSSTable(sstable);
+                }
+            }
+        }
+        else if (notification instanceof SSTableDeletingNotification)
+        {
+            SSTableReader sstable = ((SSTableDeletingNotification)notification).deleting;
+            if (sstable.isRepaired())
+                repaired.removeSSTable(sstable);
+            else
+                unrepaired.removeSSTable(sstable);
+        }
+    }
+
+    @Override
+    public List<SSTableReader> filterSSTablesForReads(List<SSTableReader> sstables)
+    {
+        // todo: union of filtered sstables or intersection?
+        return unrepaired.filterSSTablesForReads(repaired.filterSSTablesForReads(sstables));
+    }
+
+    @Override
+    public synchronized void startup()
+    {
+        super.startup();
+        for (SSTableReader sstable : cfs.getSSTables())
+        {
+            if (sstable.isRepaired())
+                repaired.addSSTable(sstable);
+            else
+                unrepaired.addSSTable(sstable);
+        }
+        repaired.startup();
+        unrepaired.startup();
+    }
+
+    @Override
+    public synchronized void shutdown()
+    {
+        super.shutdown();
+        repaired.shutdown();
+        unrepaired.shutdown();
+    }
+
+    @Override
+    public synchronized ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range)
+    {
+        List<SSTableReader> repairedSSTables = new ArrayList<>();
+        List<SSTableReader> unrepairedSSTables = new ArrayList<>();
+        for (SSTableReader sstable : sstables)
+            if (sstable.isRepaired())
+                repairedSSTables.add(sstable);
+            else
+                unrepairedSSTables.add(sstable);
+        ScannerList repairedScanners = repaired.getScanners(repairedSSTables, range);
+        ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, range);
+        List<ICompactionScanner> scanners = new ArrayList<>(repairedScanners.scanners.size() + unrepairedScanners.scanners.size());
+        scanners.addAll(repairedScanners.scanners);
+        scanners.addAll(unrepairedScanners.scanners);
+        return new ScannerList(scanners);
+    }
+
+    public List<AbstractCompactionStrategy> getWrappedStrategies()
+    {
+        return Arrays.asList(repaired, unrepaired);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16f584e/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 40e708d..8f302f3 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -240,6 +240,7 @@ public class SSTableReader extends SSTable
                 try
                 {
                     CompactionMetadata metadata = (CompactionMetadata) sstable.descriptor.getMetadataSerializer().deserialize(sstable.descriptor, MetadataType.COMPACTION);
+                    assert metadata != null : sstable.getFilename();
                     if (cardinality == null)
                         cardinality = metadata.cardinalityEstimator;
                     else
@@ -1633,7 +1634,7 @@ public class SSTableReader extends SSTable
 
         synchronized (replaceLock)
         {
-            assert replacedBy == null;
+            assert replacedBy == null : getFilename();
         }
         return !isCompacted.getAndSet(true);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16f584e/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 979b079..a1ecfab 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -126,9 +126,9 @@ public class CompactionsTest extends SchemaLoader
     public void testSingleSSTableCompactionWithLeveledCompaction() throws Exception
     {
         ColumnFamilyStore store = testSingleSSTableCompaction(LeveledCompactionStrategy.class.getCanonicalName());
-        LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) store.getCompactionStrategy();
+        WrappingCompactionStrategy strategy = (WrappingCompactionStrategy) store.getCompactionStrategy();
         // tombstone removal compaction should not promote level
-        assert strategy.getLevelSize(0) == 1;
+        assert strategy.getSSTableCountPerLevel()[0] == 1;
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e16f584e/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 65c7b69..ebc6e86 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -35,6 +35,8 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.notifications.SSTableAddedNotification;
+import org.apache.cassandra.notifications.SSTableRepairStatusChanged;
 import org.apache.cassandra.repair.RepairJobDesc;
 import org.apache.cassandra.repair.Validator;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -93,10 +95,10 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
         }
 
         waitForLeveling(cfs);
-        LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) cfs.getCompactionStrategy();
+        WrappingCompactionStrategy strategy = (WrappingCompactionStrategy) cfs.getCompactionStrategy();
         // Checking we're not completely bad at math
-        assert strategy.getLevelSize(1) > 0;
-        assert strategy.getLevelSize(2) > 0;
+        assert strategy.getSSTableCountPerLevel()[1] > 0;
+        assert strategy.getSSTableCountPerLevel()[2] > 0;
 
         Range<Token> range = new Range<>(Util.token(""), Util.token(""));
         int gcBefore = keyspace.getColumnFamilyStore(cfname).gcBefore(System.currentTimeMillis());
@@ -112,9 +114,9 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
      */
     private void waitForLeveling(ColumnFamilyStore cfs) throws InterruptedException
     {
-        LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) cfs.getCompactionStrategy();
+        WrappingCompactionStrategy strategy = (WrappingCompactionStrategy) cfs.getCompactionStrategy();
         // L0 is the lowest priority, so when that's done, we know everything is done
-        while (strategy.getLevelSize(0) > 1)
+        while (strategy.getSSTableCountPerLevel()[0] > 1)
             Thread.sleep(100);
     }
 
@@ -138,7 +140,7 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
         }
 
         waitForLeveling(cfs);
-        LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) cfs.getCompactionStrategy();
+        LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) ((WrappingCompactionStrategy) cfs.getCompactionStrategy()).getWrappedStrategies().get(1);
         assert strategy.getLevelSize(1) > 0;
 
         // get LeveledScanner for level 1 sstables
@@ -177,7 +179,7 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
         }
         waitForLeveling(cfs);
         cfs.forceBlockingFlush();
-        LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) cfs.getCompactionStrategy();
+        LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) ((WrappingCompactionStrategy) cfs.getCompactionStrategy()).getWrappedStrategies().get(1);
         cfs.disableAutoCompaction();
 
         while(CompactionManager.instance.isCompacting(Arrays.asList(cfs)))
@@ -227,61 +229,45 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
         while(CompactionManager.instance.isCompacting(Arrays.asList(cfs)))
             Thread.sleep(100);
 
-        LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) cfs.getCompactionStrategy();
-        assertTrue(strategy.getLevelSize(1) > 0);
-        assertTrue(strategy.getLevelSize(2) > 0);
+        WrappingCompactionStrategy strategy = (WrappingCompactionStrategy) cfs.getCompactionStrategy();
+        List<AbstractCompactionStrategy> strategies = strategy.getWrappedStrategies();
+        LeveledCompactionStrategy repaired = (LeveledCompactionStrategy) strategies.get(0);
+        LeveledCompactionStrategy unrepaired = (LeveledCompactionStrategy) strategies.get(1);
+        assertEquals(0, repaired.manifest.getLevelCount() );
+        assertEquals(2, unrepaired.manifest.getLevelCount());
+        assertTrue(strategy.getSSTableCountPerLevel()[1] > 0);
+        assertTrue(strategy.getSSTableCountPerLevel()[2] > 0);
 
         for (SSTableReader sstable : cfs.getSSTables())
-        {
             assertFalse(sstable.isRepaired());
-        }
+
         int sstableCount = 0;
-        for (List<SSTableReader> level : strategy.manifest.generations)
+        for (List<SSTableReader> level : unrepaired.manifest.generations)
             sstableCount += level.size();
-
+        // we only have unrepaired sstables:
         assertEquals(sstableCount, cfs.getSSTables().size());
 
-        assertFalse(strategy.manifest.hasRepairedData());
-        assertTrue(strategy.manifest.unrepairedL0.size() == 0);
-
-        SSTableReader sstable1 = strategy.manifest.generations[2].get(0);
-        SSTableReader sstable2 = strategy.manifest.generations[1].get(0);
+        SSTableReader sstable1 = unrepaired.manifest.generations[2].get(0);
+        SSTableReader sstable2 = unrepaired.manifest.generations[1].get(0);
 
-        // "repair" an sstable:
-        strategy.manifest.remove(sstable1);
         sstable1.descriptor.getMetadataSerializer().mutateRepairedAt(sstable1.descriptor, System.currentTimeMillis());
         sstable1.reloadSSTableMetadata();
         assertTrue(sstable1.isRepaired());
 
-        // make sure adding a repaired sstable makes the manifest contain only repaired data;
-        strategy.manifest.add(sstable1);
-        assertTrue(strategy.manifest.hasRepairedData());
-        assertTrue(strategy.manifest.generations[2].contains(sstable1));
-        assertFalse(strategy.manifest.generations[1].contains(sstable2));
-        assertTrue(strategy.manifest.unrepairedL0.contains(sstable2));
-        sstableCount = 0;
-        for (int i = 0; i < strategy.manifest.generations.length; i++)
-        {
-            sstableCount += strategy.manifest.generations[i].size();
-            if (i != 2)
-                assertEquals(strategy.manifest.generations[i].size(), 0);
-            else
-                assertEquals(strategy.manifest.generations[i].size(), 1);
-        }
-        assertEquals(1, sstableCount);
-
-        // make sure adding an unrepaired sstable puts it in unrepairedL0:
-        strategy.manifest.remove(sstable2);
-        strategy.manifest.add(sstable2);
-        assertTrue(strategy.manifest.unrepairedL0.contains(sstable2));
-        assertEquals(strategy.manifest.unrepairedL0.size(), cfs.getSSTables().size() - 1);
-
-        // make sure repairing an sstable takes it away from unrepairedL0 and puts it in the correct level:
-        strategy.manifest.remove(sstable2);
-        sstable2.descriptor.getMetadataSerializer().mutateRepairedAt(sstable2.descriptor, System.currentTimeMillis());
-        sstable2.reloadSSTableMetadata();
-        strategy.manifest.add(sstable2);
-        assertFalse(strategy.manifest.unrepairedL0.contains(sstable2));
-        assertTrue(strategy.manifest.generations[1].contains(sstable2));
+        strategy.handleNotification(new SSTableRepairStatusChanged(Arrays.asList(sstable1)), this);
+
+        int repairedSSTableCount = 0;
+        for (List<SSTableReader> level : repaired.manifest.generations)
+            repairedSSTableCount += level.size();
+        assertEquals(1, repairedSSTableCount);
+        // make sure the repaired sstable ends up in the same level in the repaired manifest:
+        assertTrue(repaired.manifest.generations[2].contains(sstable1));
+        // and that it is gone from unrepaired
+        assertFalse(unrepaired.manifest.generations[2].contains(sstable1));
+
+        unrepaired.removeSSTable(sstable2);
+        strategy.handleNotification(new SSTableAddedNotification(sstable2), this);
+        assertTrue(unrepaired.manifest.getLevel(1).contains(sstable2));
+        assertFalse(repaired.manifest.getLevel(1).contains(sstable2));
     }
 }


[3/3] git commit: Merge branch 'cassandra-2.1' into trunk

Posted by ma...@apache.org.
Merge branch 'cassandra-2.1' into trunk

Conflicts:
	src/java/org/apache/cassandra/db/compaction/CompactionManager.java
	src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
	src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
	src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
	test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
	test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/af44d1a7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/af44d1a7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/af44d1a7

Branch: refs/heads/trunk
Commit: af44d1a7c3baad2b5ce006b9c0f5249c38cb0504
Parents: e369ff6 e16f584
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Nov 5 09:47:48 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Nov 5 09:47:48 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/config/CFMetaData.java |   3 +
 .../cassandra/db/CollationController.java       |   2 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |  57 +---
 .../compaction/AbstractCompactionStrategy.java  |  14 +
 .../db/compaction/CompactionManager.java        |   2 +-
 .../DateTieredCompactionStrategy.java           |  46 +--
 .../compaction/LeveledCompactionStrategy.java   |  60 +---
 .../db/compaction/LeveledManifest.java          | 111 ++-----
 .../SizeTieredCompactionStrategy.java           |  63 ++--
 .../compaction/WrappingCompactionStrategy.java  | 331 +++++++++++++++++++
 .../io/sstable/format/SSTableReader.java        |   3 +-
 .../db/compaction/CompactionsTest.java          |   4 +-
 .../LeveledCompactionStrategyTest.java          |  88 +++--
 14 files changed, 486 insertions(+), 299 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/af44d1a7/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 817cbcf,80f4c8f..6ba76f9
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,37 -1,5 +1,38 @@@
 +3.0
 + * Mark sstables as repaired after full repair (CASSANDRA-7586) 
 + * Extend Descriptor to include a format value and refactor reader/writer apis (CASSANDRA-7443)
 + * Integrate JMH for microbenchmarks (CASSANDRA-8151)
 + * Keep sstable levels when bootstrapping (CASSANDRA-7460)
 + * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838)
 + * Support for aggregation functions (CASSANDRA-4914)
 + * Remove cassandra-cli (CASSANDRA-7920)
 + * Accept dollar quoted strings in CQL (CASSANDRA-7769)
 + * Make assassinate a first class command (CASSANDRA-7935)
 + * Support IN clause on any clustering column (CASSANDRA-4762)
 + * Improve compaction logging (CASSANDRA-7818)
 + * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
 + * Do anticompaction in groups (CASSANDRA-6851)
 + * Support pure user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929,
 +   7924, 7812, 8063)
 + * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
 + * Move sstable RandomAccessReader to nio2, which allows using the
 +   FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
 + * Remove CQL2 (CASSANDRA-5918)
 + * Add Thrift get_multi_slice call (CASSANDRA-6757)
 + * Optimize fetching multiple cells by name (CASSANDRA-6933)
 + * Allow compilation in java 8 (CASSANDRA-7028)
 + * Make incremental repair default (CASSANDRA-7250)
 + * Enable code coverage thru JaCoCo (CASSANDRA-7226)
 + * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369) 
 + * Shorten SSTable path (CASSANDRA-6962)
 + * Use unsafe mutations for most unit tests (CASSANDRA-6969)
 + * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
 + * Fail on very large batch sizes (CASSANDRA-8011)
 + * improve concurrency of repair (CASSANDRA-6455)
 +
 +
  2.1.2
+  * Improve compaction of repaired/unrepaired sstables (CASSANDRA-8004)
   * Make cache serializers pluggable (CASSANDRA-8096)
   * Fix issues with CONTAINS (KEY) queries on secondary indexes
     (CASSANDRA-8147)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af44d1a7/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af44d1a7/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af44d1a7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af44d1a7/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af44d1a7/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af44d1a7/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af44d1a7/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 7a1f883,a560234..3ac16ab
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@@ -44,13 -32,9 +44,8 @@@ import org.apache.cassandra.db.columnit
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.exceptions.ConfigurationException;
- import org.apache.cassandra.notifications.INotification;
- import org.apache.cassandra.notifications.INotificationConsumer;
- import org.apache.cassandra.notifications.SSTableAddedNotification;
- import org.apache.cassandra.notifications.SSTableListChangedNotification;
- import org.apache.cassandra.notifications.SSTableRepairStatusChanged;
 -import org.apache.cassandra.io.sstable.SSTableReader;
  
- public class LeveledCompactionStrategy extends AbstractCompactionStrategy implements INotificationConsumer
+ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
  {
      private static final Logger logger = LoggerFactory.getLogger(LeveledCompactionStrategy.class);
      private static final String SSTABLE_SIZE_OPTION = "sstable_size_in_mb";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af44d1a7/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 74be143,4b26d23..a4e420f
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@@ -320,31 -272,6 +272,18 @@@ public class LeveledManifes
       */
      public synchronized CompactionCandidate getCompactionCandidates()
      {
-         // if we don't have any repaired data, continue as usual
-         if (hasRepairedData)
-         {
-             Collection<SSTableReader> unrepairedMostInterresting = getSSTablesForSTCS(unrepairedL0);
-             if (!unrepairedMostInterresting.isEmpty())
-             {
-                 logger.info("Unrepaired data is most interresting, compacting {} sstables with STCS", unrepairedMostInterresting.size());
-                 for (SSTableReader reader : unrepairedMostInterresting)
-                     assert !reader.isRepaired();
-                 return new CompactionCandidate(unrepairedMostInterresting, 0, Long.MAX_VALUE);
-             }
-         }
- 
 +        // during bootstrap we only do size tiering in L0 to make sure
 +        // the streamed files can be placed in their original levels
 +        if (StorageService.instance.isBootstrapMode())
 +        {
 +            List<SSTableReader> mostInteresting = getSSTablesForSTCS(getLevel(0));
 +            if (!mostInteresting.isEmpty())
 +            {
 +                logger.info("Bootstrapping - doing STCS in L0");
 +                return new CompactionCandidate(mostInteresting, 0, Long.MAX_VALUE);
 +            }
 +            return null;
 +        }
          // LevelDB gives each level a score of how much data it contains vs its ideal amount, and
          // compacts the level with the highest score. But this falls apart spectacularly once you
          // get behind.  Consider this set of levels:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af44d1a7/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 0abb68d,b72737a..fb6b060
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@@ -24,8 -24,6 +24,7 @@@ import com.google.common.annotations.Vi
  import com.google.common.collect.Iterables;
  import com.google.common.collect.Lists;
  import com.google.common.collect.Sets;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
- import org.apache.cassandra.io.sstable.format.big.BigTableReader;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af44d1a7/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
index 0000000,1d713ef..32e63bb
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
@@@ -1,0 -1,331 +1,331 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.db.compaction;
+ 
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.List;
+ import java.util.concurrent.Callable;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.dht.Range;
+ import org.apache.cassandra.dht.Token;
 -import org.apache.cassandra.io.sstable.SSTableReader;
++import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.notifications.INotification;
+ import org.apache.cassandra.notifications.INotificationConsumer;
+ import org.apache.cassandra.notifications.SSTableAddedNotification;
+ import org.apache.cassandra.notifications.SSTableDeletingNotification;
+ import org.apache.cassandra.notifications.SSTableListChangedNotification;
+ import org.apache.cassandra.notifications.SSTableRepairStatusChanged;
+ 
+ public final class WrappingCompactionStrategy extends AbstractCompactionStrategy implements INotificationConsumer
+ {
+     private static final Logger logger = LoggerFactory.getLogger(WrappingCompactionStrategy.class);
+     private volatile AbstractCompactionStrategy repaired;
+     private volatile AbstractCompactionStrategy unrepaired;
+     public WrappingCompactionStrategy(ColumnFamilyStore cfs)
+     {
+         super(cfs, cfs.metadata.compactionStrategyOptions);
+         reloadCompactionStrategy(cfs.metadata);
+         cfs.getDataTracker().subscribe(this);
+         logger.debug("{} subscribed to the data tracker.", this);
+     }
+ 
+     @Override
+     public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
+     {
+         if (!isEnabled())
+             return null;
+ 
+         if (repaired.getEstimatedRemainingTasks() > unrepaired.getEstimatedRemainingTasks())
+         {
+             AbstractCompactionTask repairedTask = repaired.getNextBackgroundTask(gcBefore);
+             if (repairedTask != null)
+                 return repairedTask;
+             return unrepaired.getNextBackgroundTask(gcBefore);
+         }
+         else
+         {
+             AbstractCompactionTask unrepairedTask = unrepaired.getNextBackgroundTask(gcBefore);
+             if (unrepairedTask != null)
+                 return unrepairedTask;
+             return repaired.getNextBackgroundTask(gcBefore);
+         }
+ 
+     }
+ 
+     @Override
+     public Collection<AbstractCompactionTask> getMaximalTask(final int gcBefore)
+     {
+         // runWithCompactionsDisabled cancels active compactions and disables them, then we are able
+         // to make the repaired/unrepaired strategies mark their own sstables as compacting. Once the
+         // sstables are marked the compactions are re-enabled
+         return cfs.runWithCompactionsDisabled(new Callable<Collection<AbstractCompactionTask>>()
+         {
+             @Override
+             public Collection<AbstractCompactionTask> call() throws Exception
+             {
+                 synchronized (WrappingCompactionStrategy.this)
+                 {
+                     Collection<AbstractCompactionTask> repairedTasks = repaired.getMaximalTask(gcBefore);
+                     Collection<AbstractCompactionTask> unrepairedTasks = unrepaired.getMaximalTask(gcBefore);
+ 
+                     if (repairedTasks == null && unrepairedTasks == null)
+                         return null;
+ 
+                     if (repairedTasks == null)
+                         return unrepairedTasks;
+                     if (unrepairedTasks == null)
+                         return repairedTasks;
+ 
+                     List<AbstractCompactionTask> tasks = new ArrayList<>();
+                     tasks.addAll(repairedTasks);
+                     tasks.addAll(unrepairedTasks);
+                     return tasks;
+                 }
+             }
+         }, false);
+     }
+ 
+     @Override
+     public synchronized AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore)
+     {
+         assert !sstables.isEmpty();
+         boolean userDefinedInRepaired = sstables.iterator().next().isRepaired();
+         for (SSTableReader sstable : sstables)
+         {
+             if (userDefinedInRepaired != sstable.isRepaired())
+             {
+                 logger.error("You can't mix repaired and unrepaired sstables in a user defined compaction");
+                 return null;
+             }
+         }
+         if (userDefinedInRepaired)
+             return repaired.getUserDefinedTask(sstables, gcBefore);
+         else
+             return unrepaired.getUserDefinedTask(sstables, gcBefore);
+     }
+ 
+     @Override
+     public synchronized int getEstimatedRemainingTasks()
+     {
+         assert repaired.getClass().equals(unrepaired.getClass());
+         return repaired.getEstimatedRemainingTasks() + unrepaired.getEstimatedRemainingTasks();
+     }
+ 
+     @Override
+     public synchronized long getMaxSSTableBytes()
+     {
+         assert repaired.getClass().equals(unrepaired.getClass());
+         return unrepaired.getMaxSSTableBytes();
+     }
+ 
+     public synchronized void maybeReloadCompactionStrategy(CFMetaData metadata)
+     {
+         if (repaired != null && repaired.getClass().equals(metadata.compactionStrategyClass)
+             && unrepaired != null && unrepaired.getClass().equals(metadata.compactionStrategyClass)
+             && repaired.options.equals(metadata.compactionStrategyOptions)
+             && unrepaired.options.equals(metadata.compactionStrategyOptions))
+             return;
+ 
+         reloadCompactionStrategy(metadata);
+     }
+ 
+     public synchronized void reloadCompactionStrategy(CFMetaData metadata)
+     {
+         if (repaired != null)
+             repaired.shutdown();
+         if (unrepaired != null)
+             unrepaired.shutdown();
+         repaired = metadata.createCompactionStrategyInstance(cfs);
+         unrepaired = metadata.createCompactionStrategyInstance(cfs);
+         startup();
+     }
+ 
+     public synchronized int getUnleveledSSTables()
+     {
+         if (this.repaired instanceof LeveledCompactionStrategy && this.unrepaired instanceof LeveledCompactionStrategy)
+         {
+             return ((LeveledCompactionStrategy)repaired).getLevelSize(0) + ((LeveledCompactionStrategy)unrepaired).getLevelSize(0);
+         }
+         return 0;
+     }
+ 
+     public synchronized int[] getSSTableCountPerLevel()
+     {
+         if (this.repaired instanceof LeveledCompactionStrategy && this.unrepaired instanceof LeveledCompactionStrategy)
+         {
+             int [] repairedCountPerLevel = ((LeveledCompactionStrategy) repaired).getAllLevelSize();
+             int [] unrepairedCountPerLevel = ((LeveledCompactionStrategy) unrepaired).getAllLevelSize();
+             return sumArrays(repairedCountPerLevel, unrepairedCountPerLevel);
+         }
+         return null;
+     }
+ 
+     public static int [] sumArrays(int[] a, int [] b)
+     {
+         int [] res = new int[Math.max(a.length, b.length)];
+         for (int i = 0; i < res.length; i++)
+         {
+             if (i < a.length && i < b.length)
+                 res[i] = a[i] + b[i];
+             else if (i < a.length)
+                 res[i] = a[i];
+             else
+                 res[i] = b[i];
+         }
+         return res;
+     }
+ 
+     @Override
+     public boolean shouldDefragment()
+     {
+         assert repaired.getClass().equals(unrepaired.getClass());
+         return repaired.shouldDefragment();
+     }
+ 
+     @Override
+     public String getName()
+     {
+         assert repaired.getClass().equals(unrepaired.getClass());
+         return repaired.getName();
+     }
+ 
+     @Override
+     public void addSSTable(SSTableReader added)
+     {
+         throw new UnsupportedOperationException("Can't add sstables to the wrapping compaction strategy");
+     }
+ 
+     @Override
+     public void removeSSTable(SSTableReader sstable)
+     {
+         throw new UnsupportedOperationException("Can't remove sstables from the wrapping compaction strategy");
+     }
+ 
+     public synchronized void handleNotification(INotification notification, Object sender)
+     {
+         if (notification instanceof SSTableAddedNotification)
+         {
+             SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification;
+             if (flushedNotification.added.isRepaired())
+                 repaired.addSSTable(flushedNotification.added);
+             else
+                 unrepaired.addSSTable(flushedNotification.added);
+         }
+         else if (notification instanceof SSTableListChangedNotification)
+         {
+             SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification;
+             for (SSTableReader sstable : listChangedNotification.removed)
+             {
+                 if (sstable.isRepaired())
+                     repaired.removeSSTable(sstable);
+                 else
+                     unrepaired.removeSSTable(sstable);
+             }
+             for (SSTableReader sstable : listChangedNotification.added)
+             {
+                 if (sstable.isRepaired())
+                     repaired.addSSTable(sstable);
+                 else
+                     unrepaired.addSSTable(sstable);
+             }
+         }
+         else if (notification instanceof SSTableRepairStatusChanged)
+         {
+             for (SSTableReader sstable : ((SSTableRepairStatusChanged) notification).sstable)
+             {
+                 if (sstable.isRepaired())
+                 {
+                     unrepaired.removeSSTable(sstable);
+                     repaired.addSSTable(sstable);
+                 }
+                 else
+                 {
+                     repaired.removeSSTable(sstable);
+                     unrepaired.addSSTable(sstable);
+                 }
+             }
+         }
+         else if (notification instanceof SSTableDeletingNotification)
+         {
+             SSTableReader sstable = ((SSTableDeletingNotification)notification).deleting;
+             if (sstable.isRepaired())
+                 repaired.removeSSTable(sstable);
+             else
+                 unrepaired.removeSSTable(sstable);
+         }
+     }
+ 
+     @Override
+     public List<SSTableReader> filterSSTablesForReads(List<SSTableReader> sstables)
+     {
+         // todo: union of filtered sstables or intersection?
+         return unrepaired.filterSSTablesForReads(repaired.filterSSTablesForReads(sstables));
+     }
+ 
+     @Override
+     public synchronized void startup()
+     {
+         super.startup();
+         for (SSTableReader sstable : cfs.getSSTables())
+         {
+             if (sstable.isRepaired())
+                 repaired.addSSTable(sstable);
+             else
+                 unrepaired.addSSTable(sstable);
+         }
+         repaired.startup();
+         unrepaired.startup();
+     }
+ 
+     @Override
+     public synchronized void shutdown()
+     {
+         super.shutdown();
+         repaired.shutdown();
+         unrepaired.shutdown();
+     }
+ 
+     @Override
+     public synchronized ScannerList getScanners(Collection<SSTableReader> sstables, Range<Token> range)
+     {
+         List<SSTableReader> repairedSSTables = new ArrayList<>();
+         List<SSTableReader> unrepairedSSTables = new ArrayList<>();
+         for (SSTableReader sstable : sstables)
+             if (sstable.isRepaired())
+                 repairedSSTables.add(sstable);
+             else
+                 unrepairedSSTables.add(sstable);
+         ScannerList repairedScanners = repaired.getScanners(repairedSSTables, range);
+         ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, range);
+         List<ICompactionScanner> scanners = new ArrayList<>(repairedScanners.scanners.size() + unrepairedScanners.scanners.size());
+         scanners.addAll(repairedScanners.scanners);
+         scanners.addAll(unrepairedScanners.scanners);
+         return new ScannerList(scanners);
+     }
+ 
+     public List<AbstractCompactionStrategy> getWrappedStrategies()
+     {
+         return Arrays.asList(repaired, unrepaired);
+     }
+ }


[2/3] Merge branch 'cassandra-2.1' into trunk

Posted by ma...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/af44d1a7/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 7d4b8f3,0000000..6164883
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@@ -1,1887 -1,0 +1,1888 @@@
 +package org.apache.cassandra.io.sstable.format;
 +
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
 +import com.clearspring.analytics.stream.cardinality.ICardinality;
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Predicate;
 +import com.google.common.collect.Iterators;
 +import com.google.common.collect.Ordering;
 +import com.google.common.primitives.Longs;
 +import com.google.common.util.concurrent.RateLimiter;
 +import org.apache.cassandra.cache.CachingOptions;
 +import org.apache.cassandra.cache.InstrumentingCache;
 +import org.apache.cassandra.cache.KeyCacheKey;
 +import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 +import org.apache.cassandra.config.*;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 +import org.apache.cassandra.db.commitlog.ReplayPosition;
 +import org.apache.cassandra.db.compaction.ICompactionScanner;
 +import org.apache.cassandra.db.composites.CellName;
 +import org.apache.cassandra.db.filter.ColumnSlice;
 +import org.apache.cassandra.db.index.SecondaryIndex;
 +import org.apache.cassandra.dht.*;
 +import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
 +import org.apache.cassandra.io.compress.CompressedThrottledReader;
 +import org.apache.cassandra.io.compress.CompressionMetadata;
 +import org.apache.cassandra.io.sstable.*;
 +import org.apache.cassandra.io.sstable.metadata.*;
 +import org.apache.cassandra.io.util.*;
 +import org.apache.cassandra.metrics.RestorableMeter;
 +import org.apache.cassandra.metrics.StorageMetrics;
 +import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.service.CacheService;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.*;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.*;
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.concurrent.*;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicLong;
 +
 +import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR;
 +
 +
 +/**
 + * SSTableReaders are open()ed by Keyspace.onStart; after that they are created by SSTableWriter.renameAndOpen.
 + * Do not re-call open() on existing SSTable files; use the references kept by ColumnFamilyStore post-start instead.
 + */
 +public abstract class SSTableReader extends SSTable
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class);
 +
 +    private static final ScheduledThreadPoolExecutor syncExecutor = new ScheduledThreadPoolExecutor(1);
 +    private static final RateLimiter meterSyncThrottle = RateLimiter.create(100.0);
 +
 +    public static final Comparator<SSTableReader> maxTimestampComparator = new Comparator<SSTableReader>()
 +    {
 +        public int compare(SSTableReader o1, SSTableReader o2)
 +        {
 +            long ts1 = o1.getMaxTimestamp();
 +            long ts2 = o2.getMaxTimestamp();
 +            return (ts1 > ts2 ? -1 : (ts1 == ts2 ? 0 : 1));
 +        }
 +    };
 +
 +    public static final Comparator<SSTableReader> sstableComparator = new Comparator<SSTableReader>()
 +    {
 +        public int compare(SSTableReader o1, SSTableReader o2)
 +        {
 +            return o1.first.compareTo(o2.first);
 +        }
 +    };
 +
 +    public static final Ordering<SSTableReader> sstableOrdering = Ordering.from(sstableComparator);
 +
 +    /**
 +     * maxDataAge is a timestamp in local server time (e.g. System.currentTimeMilli) which represents an upper bound
 +     * to the newest piece of data stored in the sstable. In other words, this sstable does not contain items created
 +     * later than maxDataAge.
 +     *
 +     * The field is not serialized to disk, so relying on it for more than what truncate does is not advised.
 +     *
 +     * When a new sstable is flushed, maxDataAge is set to the time of creation.
 +     * When a sstable is created from compaction, maxDataAge is set to max of all merged sstables.
 +     *
 +     * The age is in milliseconds since epoc and is local to this host.
 +     */
 +    public final long maxDataAge;
 +
 +    public enum OpenReason
 +    {
 +        NORMAL,
 +        EARLY,
 +        METADATA_CHANGE
 +    }
 +
 +    public final OpenReason openReason;
 +
 +    // indexfile and datafile: might be null before a call to load()
 +    protected SegmentedFile ifile;
 +    protected SegmentedFile dfile;
 +
 +    protected IndexSummary indexSummary;
 +    protected IFilter bf;
 +
 +    protected final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
 +
 +    protected InstrumentingCache<KeyCacheKey, RowIndexEntry> keyCache;
 +
 +    protected final BloomFilterTracker bloomFilterTracker = new BloomFilterTracker();
 +
 +    protected final AtomicInteger references = new AtomicInteger(1);
 +    // technically isCompacted is not necessary since it should never be unreferenced unless it is also compacted,
 +    // but it seems like a good extra layer of protection against reference counting bugs to not delete data based on that alone
 +    protected final AtomicBoolean isCompacted = new AtomicBoolean(false);
 +    protected final AtomicBoolean isSuspect = new AtomicBoolean(false);
 +
 +    // not final since we need to be able to change level on a file.
 +    protected volatile StatsMetadata sstableMetadata;
 +
 +    protected final AtomicLong keyCacheHit = new AtomicLong(0);
 +    protected final AtomicLong keyCacheRequest = new AtomicLong(0);
 +
 +    /**
 +     * To support replacing this sstablereader with another object that represents that same underlying sstable, but with different associated resources,
 +     * we build a linked-list chain of replacement, which we synchronise using a shared object to make maintenance of the list across multiple threads simple.
 +     * On close we check if any of the closeable resources differ between any chains either side of us; any that are in neither of the adjacent links (if any) are closed.
 +     * Once we've made this decision we remove ourselves from the linked list, so that anybody behind/ahead will compare against only other still opened resources.
 +     */
 +    protected Object replaceLock = new Object();
 +    protected SSTableReader replacedBy;
 +    private SSTableReader replaces;
 +    private SSTableDeletingTask deletingTask;
 +    private Runnable runOnClose;
 +
 +    @VisibleForTesting
 +    public RestorableMeter readMeter;
 +    protected ScheduledFuture readMeterSyncFuture;
 +
 +    /**
 +     * Calculate approximate key count.
 +     * If cardinality estimator is available on all given sstables, then this method use them to estimate
 +     * key count.
 +     * If not, then this uses index summaries.
 +     *
 +     * @param sstables SSTables to calculate key count
 +     * @return estimated key count
 +     */
 +    public static long getApproximateKeyCount(Collection<SSTableReader> sstables)
 +    {
 +        long count = -1;
 +
 +        // check if cardinality estimator is available for all SSTables
 +        boolean cardinalityAvailable = !sstables.isEmpty() && Iterators.all(sstables.iterator(), new Predicate<SSTableReader>()
 +        {
 +            public boolean apply(SSTableReader sstable)
 +            {
 +                return sstable.descriptor.version.hasNewStatsFile();
 +            }
 +        });
 +
 +        // if it is, load them to estimate key count
 +        if (cardinalityAvailable)
 +        {
 +            boolean failed = false;
 +            ICardinality cardinality = null;
 +            for (SSTableReader sstable : sstables)
 +            {
 +                try
 +                {
 +                    CompactionMetadata metadata = (CompactionMetadata) sstable.descriptor.getMetadataSerializer().deserialize(sstable.descriptor, MetadataType.COMPACTION);
++                    assert metadata != null : sstable.getFilename();
 +                    if (cardinality == null)
 +                        cardinality = metadata.cardinalityEstimator;
 +                    else
 +                        cardinality = cardinality.merge(metadata.cardinalityEstimator);
 +                }
 +                catch (IOException e)
 +                {
 +                    logger.warn("Reading cardinality from Statistics.db failed.", e);
 +                    failed = true;
 +                    break;
 +                }
 +                catch (CardinalityMergeException e)
 +                {
 +                    logger.warn("Cardinality merge failed.", e);
 +                    failed = true;
 +                    break;
 +                }
 +            }
 +            if (cardinality != null && !failed)
 +                count = cardinality.cardinality();
 +        }
 +
 +        // if something went wrong above or cardinality is not available, calculate using index summary
 +        if (count < 0)
 +        {
 +            for (SSTableReader sstable : sstables)
 +                count += sstable.estimatedKeys();
 +        }
 +        return count;
 +    }
 +
 +    public static SSTableReader open(Descriptor descriptor) throws IOException
 +    {
 +        CFMetaData metadata;
 +        if (descriptor.cfname.contains(SECONDARY_INDEX_NAME_SEPARATOR))
 +        {
 +            int i = descriptor.cfname.indexOf(SECONDARY_INDEX_NAME_SEPARATOR);
 +            String parentName = descriptor.cfname.substring(0, i);
 +            CFMetaData parent = Schema.instance.getCFMetaData(descriptor.ksname, parentName);
 +            ColumnDefinition def = parent.getColumnDefinitionForIndex(descriptor.cfname.substring(i + 1));
 +            metadata = CFMetaData.newIndexMetadata(parent, def, SecondaryIndex.getIndexComparator(parent, def));
 +        }
 +        else
 +        {
 +            metadata = Schema.instance.getCFMetaData(descriptor.ksname, descriptor.cfname);
 +        }
 +        return open(descriptor, metadata);
 +    }
 +
 +    public static SSTableReader open(Descriptor desc, CFMetaData metadata) throws IOException
 +    {
 +        IPartitioner p = desc.cfname.contains(SECONDARY_INDEX_NAME_SEPARATOR)
 +                ? new LocalPartitioner(metadata.getKeyValidator())
 +                : StorageService.getPartitioner();
 +        return open(desc, componentsFor(desc), metadata, p);
 +    }
 +
 +    public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
 +    {
 +        return open(descriptor, components, metadata, partitioner, true);
 +    }
 +
 +    public static SSTableReader openNoValidation(Descriptor descriptor, Set<Component> components, CFMetaData metadata) throws IOException
 +    {
 +        return open(descriptor, components, metadata, StorageService.getPartitioner(), false);
 +    }
 +
 +    /**
 +     * Open SSTable reader to be used in batch mode(such as sstableloader).
 +     *
 +     * @param descriptor
 +     * @param components
 +     * @param metadata
 +     * @param partitioner
 +     * @return opened SSTableReader
 +     * @throws IOException
 +     */
 +    public static SSTableReader openForBatch(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException
 +    {
 +        // Minimum components without which we can't do anything
 +        assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
 +        assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
 +
 +        Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
 +                EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
 +        ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
 +        StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
 +
 +        // Check if sstable is created using same partitioner.
 +        // Partitioner can be null, which indicates older version of sstable or no stats available.
 +        // In that case, we skip the check.
 +        String partitionerName = partitioner.getClass().getCanonicalName();
 +        if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
 +        {
 +            logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s.  Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
 +                    descriptor, validationMetadata.partitioner, partitionerName));
 +            System.exit(1);
 +        }
 +
 +        logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
 +        SSTableReader sstable = internalOpen(descriptor, components, metadata, partitioner, System.currentTimeMillis(),
 +                statsMetadata, OpenReason.NORMAL);
 +
 +        // special implementation of load to use non-pooled SegmentedFile builders
 +        SegmentedFile.Builder ibuilder = new BufferedSegmentedFile.Builder();
 +        SegmentedFile.Builder dbuilder = sstable.compression
 +                ? new CompressedSegmentedFile.Builder(null)
 +                : new BufferedSegmentedFile.Builder();
 +        if (!sstable.loadSummary(ibuilder, dbuilder))
 +            sstable.buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL);
 +        sstable.ifile = ibuilder.complete(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
 +        sstable.dfile = dbuilder.complete(sstable.descriptor.filenameFor(Component.DATA));
 +        sstable.bf = FilterFactory.AlwaysPresent;
 +
 +        return sstable;
 +    }
 +
 +    private static SSTableReader open(Descriptor descriptor,
 +                                      Set<Component> components,
 +                                      CFMetaData metadata,
 +                                      IPartitioner partitioner,
 +                                      boolean validate) throws IOException
 +    {
 +        // Minimum components without which we can't do anything
 +        assert components.contains(Component.DATA) : "Data component is missing for sstable" + descriptor;
 +        assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is missing for sstable " + descriptor;
 +
 +        Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
 +                EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
 +        ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
 +        StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
 +
 +        // Check if sstable is created using same partitioner.
 +        // Partitioner can be null, which indicates older version of sstable or no stats available.
 +        // In that case, we skip the check.
 +        String partitionerName = partitioner.getClass().getCanonicalName();
 +        if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
 +        {
 +            logger.error(String.format("Cannot open %s; partitioner %s does not match system partitioner %s.  Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner, so you will need to edit that to match your old partitioner if upgrading.",
 +                    descriptor, validationMetadata.partitioner, partitionerName));
 +            System.exit(1);
 +        }
 +
 +        logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
 +        SSTableReader sstable = internalOpen(descriptor, components, metadata, partitioner, System.currentTimeMillis(),
 +                statsMetadata, OpenReason.NORMAL);
 +
 +        // load index and filter
 +        long start = System.nanoTime();
 +        sstable.load(validationMetadata);
 +        logger.debug("INDEX LOAD TIME for {}: {} ms.", descriptor, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
 +
 +        if (validate)
 +            sstable.validate();
 +
 +        if (sstable.getKeyCache() != null)
 +            logger.debug("key cache contains {}/{} keys", sstable.getKeyCache().size(), sstable.getKeyCache().getCapacity());
 +
 +        return sstable;
 +    }
 +
 +    public static void logOpenException(Descriptor descriptor, IOException e)
 +    {
 +        if (e instanceof FileNotFoundException)
 +            logger.error("Missing sstable component in {}; skipped because of {}", descriptor, e.getMessage());
 +        else
 +            logger.error("Corrupt sstable {}; skipped", descriptor, e);
 +    }
 +
 +    public static Collection<SSTableReader> openAll(Set<Map.Entry<Descriptor, Set<Component>>> entries,
 +                                                    final CFMetaData metadata,
 +                                                    final IPartitioner partitioner)
 +    {
 +        final Collection<SSTableReader> sstables = new LinkedBlockingQueue<>();
 +
 +        ExecutorService executor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("SSTableBatchOpen", FBUtilities.getAvailableProcessors());
 +        for (final Map.Entry<Descriptor, Set<Component>> entry : entries)
 +        {
 +            Runnable runnable = new Runnable()
 +            {
 +                public void run()
 +                {
 +                    SSTableReader sstable;
 +                    try
 +                    {
 +                        sstable = open(entry.getKey(), entry.getValue(), metadata, partitioner);
 +                    }
 +                    catch (IOException ex)
 +                    {
 +                        logger.error("Corrupt sstable {}; skipped", entry, ex);
 +                        return;
 +                    }
 +                    sstables.add(sstable);
 +                }
 +            };
 +            executor.submit(runnable);
 +        }
 +
 +        executor.shutdown();
 +        try
 +        {
 +            executor.awaitTermination(7, TimeUnit.DAYS);
 +        }
 +        catch (InterruptedException e)
 +        {
 +            throw new AssertionError(e);
 +        }
 +
 +        return sstables;
 +
 +    }
 +
 +    /**
 +     * Open a RowIndexedReader which already has its state initialized (by SSTableWriter).
 +     */
 +    public static SSTableReader internalOpen(Descriptor desc,
 +                                      Set<Component> components,
 +                                      CFMetaData metadata,
 +                                      IPartitioner partitioner,
 +                                      SegmentedFile ifile,
 +                                      SegmentedFile dfile,
 +                                      IndexSummary isummary,
 +                                      IFilter bf,
 +                                      long maxDataAge,
 +                                      StatsMetadata sstableMetadata,
 +                                      OpenReason openReason)
 +    {
 +        assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null && sstableMetadata != null;
 +
 +        SSTableReader reader = internalOpen(desc, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
 +
 +        reader.bf = bf;
 +        reader.ifile = ifile;
 +        reader.dfile = dfile;
 +        reader.indexSummary = isummary;
 +
 +        return reader;
 +    }
 +
 +
 +    private static SSTableReader internalOpen(final Descriptor descriptor,
 +                                            Set<Component> components,
 +                                            CFMetaData metadata,
 +                                            IPartitioner partitioner,
 +                                            Long maxDataAge,
 +                                            StatsMetadata sstableMetadata,
 +                                            OpenReason openReason)
 +    {
 +        Factory readerFactory = descriptor.getFormat().getReaderFactory();
 +
 +        return readerFactory.open(descriptor, components, metadata, partitioner, maxDataAge, sstableMetadata, openReason);
 +    }
 +
 +    protected SSTableReader(final Descriptor desc,
 +                            Set<Component> components,
 +                            CFMetaData metadata,
 +                            IPartitioner partitioner,
 +                            long maxDataAge,
 +                            StatsMetadata sstableMetadata,
 +                            OpenReason openReason)
 +    {
 +        super(desc, components, metadata, partitioner);
 +        this.sstableMetadata = sstableMetadata;
 +        this.maxDataAge = maxDataAge;
 +        this.openReason = openReason;
 +
 +        this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata);
 +
 +        deletingTask = new SSTableDeletingTask(this);
 +
 +        // Don't track read rates for tables in the system keyspace.  Also don't track reads for special operations (like early open)
 +        // this is to avoid overflowing the executor queue (see CASSANDRA-8066)
 +        if (Keyspace.SYSTEM_KS.equals(desc.ksname) || openReason != OpenReason.NORMAL)
 +        {
 +            readMeter = null;
 +            readMeterSyncFuture = null;
 +            return;
 +        }
 +
 +        readMeter = SystemKeyspace.getSSTableReadMeter(desc.ksname, desc.cfname, desc.generation);
 +        // sync the average read rate to system.sstable_activity every five minutes, starting one minute from now
 +        readMeterSyncFuture = syncExecutor.scheduleAtFixedRate(new Runnable()
 +        {
 +            public void run()
 +            {
 +                if (!isCompacted.get())
 +                {
 +                    meterSyncThrottle.acquire();
 +                    SystemKeyspace.persistSSTableReadMeter(desc.ksname, desc.cfname, desc.generation, readMeter);
 +                }
 +            }
 +        }, 1, 5, TimeUnit.MINUTES);
 +    }
 +
 +    public static long getTotalBytes(Iterable<SSTableReader> sstables)
 +    {
 +        long sum = 0;
 +        for (SSTableReader sstable : sstables)
 +        {
 +            sum += sstable.onDiskLength();
 +        }
 +        return sum;
 +    }
 +
 +    private void tidy(boolean release)
 +    {
 +        if (readMeterSyncFuture != null)
 +            readMeterSyncFuture.cancel(false);
 +
 +        if (references.get() != 0)
 +        {
 +            throw new IllegalStateException("SSTable is not fully released (" + references.get() + " references)");
 +        }
 +
 +        synchronized (replaceLock)
 +        {
 +            boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFiles = false;
 +
 +            if (replacedBy != null)
 +            {
 +                closeBf = replacedBy.bf != bf;
 +                closeSummary = replacedBy.indexSummary != indexSummary;
 +                closeFiles = replacedBy.dfile != dfile;
 +                // if the replacement sstablereader uses a different path, clean up our paths
 +                deleteFiles = !dfile.path.equals(replacedBy.dfile.path);
 +            }
 +
 +            if (replaces != null)
 +            {
 +                closeBf &= replaces.bf != bf;
 +                closeSummary &= replaces.indexSummary != indexSummary;
 +                closeFiles &= replaces.dfile != dfile;
 +                deleteFiles &= !dfile.path.equals(replaces.dfile.path);
 +            }
 +
 +            boolean deleteAll = false;
 +            if (release && isCompacted.get())
 +            {
 +                assert replacedBy == null;
 +                if (replaces != null)
 +                {
 +                    replaces.replacedBy = null;
 +                    replaces.deletingTask = deletingTask;
 +                    replaces.markObsolete();
 +                }
 +                else
 +                {
 +                    deleteAll = true;
 +                }
 +            }
 +            else
 +            {
 +                if (replaces != null)
 +                    replaces.replacedBy = replacedBy;
 +                if (replacedBy != null)
 +                    replacedBy.replaces = replaces;
 +            }
 +
 +            scheduleTidy(closeBf, closeSummary, closeFiles, deleteFiles, deleteAll);
 +        }
 +    }
 +
 +    private void scheduleTidy(final boolean closeBf, final boolean closeSummary, final boolean closeFiles, final boolean deleteFiles, final boolean deleteAll)
 +    {
 +        if (references.get() != 0)
 +            throw new IllegalStateException("SSTable is not fully released (" + references.get() + " references)");
 +
 +        final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId);
 +        final OpOrder.Barrier barrier;
 +        if (cfs != null)
 +        {
 +            barrier = cfs.readOrdering.newBarrier();
 +            barrier.issue();
 +        }
 +        else
 +            barrier = null;
 +
 +        StorageService.tasks.execute(new Runnable()
 +        {
 +            public void run()
 +            {
 +                if (barrier != null)
 +                    barrier.await();
 +                if (closeBf)
 +                    bf.close();
 +                if (closeSummary)
 +                    indexSummary.close();
 +                if (closeFiles)
 +                {
 +                    ifile.cleanup();
 +                    dfile.cleanup();
 +                }
 +                if (runOnClose != null)
 +                    runOnClose.run();
 +                if (deleteAll)
 +                {
 +                    /**
 +                     * Do the OS a favour and suggest (using fadvice call) that we
 +                     * don't want to see pages of this SSTable in memory anymore.
 +                     *
 +                     * NOTE: We can't use madvice in java because it requires the address of
 +                     * the mapping, so instead we always open a file and run fadvice(fd, 0, 0) on it
 +                     */
 +                    dropPageCache();
 +                    deletingTask.run();
 +                }
 +                else if (deleteFiles)
 +                {
 +                    FileUtils.deleteWithConfirm(new File(dfile.path));
 +                    FileUtils.deleteWithConfirm(new File(ifile.path));
 +                }
 +            }
 +        });
 +    }
 +
 +    public boolean equals(Object that)
 +    {
 +        return that instanceof SSTableReader && ((SSTableReader) that).descriptor.equals(this.descriptor);
 +    }
 +
 +    public int hashCode()
 +    {
 +        return this.descriptor.hashCode();
 +    }
 +
 +    public String getFilename()
 +    {
 +        return dfile.path;
 +    }
 +
 +    public String getIndexFilename()
 +    {
 +        return ifile.path;
 +    }
 +
 +    public void setTrackedBy(DataTracker tracker)
 +    {
 +        deletingTask.setTracker(tracker);
 +        // under normal operation we can do this at any time, but SSTR is also used outside C* proper,
 +        // e.g. by BulkLoader, which does not initialize the cache.  As a kludge, we set up the cache
 +        // here when we know we're being wired into the rest of the server infrastructure.
 +        keyCache = CacheService.instance.keyCache;
 +    }
 +
 +    private void load(ValidationMetadata validation) throws IOException
 +    {
 +        if (metadata.getBloomFilterFpChance() == 1.0)
 +        {
 +            // bf is disabled.
 +            load(false, true);
 +            bf = FilterFactory.AlwaysPresent;
 +        }
 +        else if (!components.contains(Component.FILTER) || validation == null)
 +        {
 +            // bf is enabled, but filter component is missing.
 +            load(true, true);
 +        }
 +        else if (validation.bloomFilterFPChance != metadata.getBloomFilterFpChance())
 +        {
 +            // bf fp chance in sstable metadata and it has changed since compaction.
 +            load(true, true);
 +        }
 +        else
 +        {
 +            // bf is enabled and fp chance matches the currently configured value.
 +            load(false, true);
 +            loadBloomFilter();
 +        }
 +    }
 +
 +    /**
 +     * Load bloom filter from Filter.db file.
 +     *
 +     * @throws IOException
 +     */
 +    private void loadBloomFilter() throws IOException
 +    {
 +        DataInputStream stream = null;
 +        try
 +        {
 +            stream = new DataInputStream(new BufferedInputStream(new FileInputStream(descriptor.filenameFor(Component.FILTER))));
 +            bf = FilterFactory.deserialize(stream, true);
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(stream);
 +        }
 +    }
 +
 +    /**
 +     * Loads ifile, dfile and indexSummary, and optionally recreates the bloom filter.
 +     * @param saveSummaryIfCreated for bulk loading purposes, if the summary was absent and needed to be built, you can
 +     *                             avoid persisting it to disk by setting this to false
 +     */
 +    private void load(boolean recreateBloomFilter, boolean saveSummaryIfCreated) throws IOException
 +    {
 +        SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
 +        SegmentedFile.Builder dbuilder = compression
 +                ? SegmentedFile.getCompressedBuilder()
 +                : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
 +
 +        boolean summaryLoaded = loadSummary(ibuilder, dbuilder);
 +        if (recreateBloomFilter || !summaryLoaded)
 +            buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded, Downsampling.BASE_SAMPLING_LEVEL);
 +
 +        ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
 +        dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
 +        if (saveSummaryIfCreated && (recreateBloomFilter || !summaryLoaded)) // save summary information to disk
 +            saveSummary(ibuilder, dbuilder);
 +    }
 +
 +    /**
 +     * Build index summary(and optionally bloom filter) by reading through Index.db file.
 +     *
 +     * @param recreateBloomFilter true if recreate bloom filter
 +     * @param ibuilder
 +     * @param dbuilder
 +     * @param summaryLoaded true if index summary is already loaded and not need to build again
 +     * @throws IOException
 +     */
 +    private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, boolean summaryLoaded, int samplingLevel) throws IOException
 +    {
 +        // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
 +        RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
 +
 +        try
 +        {
 +            long indexSize = primaryIndex.length();
 +            long histogramCount = sstableMetadata.estimatedRowSize.count();
 +            long estimatedKeys = histogramCount > 0 && !sstableMetadata.estimatedRowSize.isOverflowed()
 +                    ? histogramCount
 +                    : estimateRowsFromIndex(primaryIndex); // statistics is supposed to be optional
 +
 +            if (recreateBloomFilter)
 +                bf = FilterFactory.getFilter(estimatedKeys, metadata.getBloomFilterFpChance(), true);
 +
 +            IndexSummaryBuilder summaryBuilder = null;
 +            if (!summaryLoaded)
 +                summaryBuilder = new IndexSummaryBuilder(estimatedKeys, metadata.getMinIndexInterval(), samplingLevel);
 +
 +            long indexPosition;
 +            RowIndexEntry.IndexSerializer rowIndexSerializer = descriptor.getFormat().getIndexSerializer(metadata);
 +
 +            while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
 +            {
 +                ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
 +                RowIndexEntry indexEntry = rowIndexSerializer.deserialize(primaryIndex, descriptor.version);
 +                DecoratedKey decoratedKey = partitioner.decorateKey(key);
 +                if (first == null)
 +                    first = decoratedKey;
 +                last = decoratedKey;
 +
 +                if (recreateBloomFilter)
 +                    bf.add(decoratedKey.getKey());
 +
 +                // if summary was already read from disk we don't want to re-populate it using primary index
 +                if (!summaryLoaded)
 +                {
 +                    summaryBuilder.maybeAddEntry(decoratedKey, indexPosition);
 +                    ibuilder.addPotentialBoundary(indexPosition);
 +                    dbuilder.addPotentialBoundary(indexEntry.position);
 +                }
 +            }
 +
 +            if (!summaryLoaded)
 +                indexSummary = summaryBuilder.build(partitioner);
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(primaryIndex);
 +        }
 +
 +        first = getMinimalKey(first);
 +        last = getMinimalKey(last);
 +    }
 +
 +    /**
 +     * Load index summary from Summary.db file if it exists.
 +     *
 +     * if loaded index summary has different index interval from current value stored in schema,
 +     * then Summary.db file will be deleted and this returns false to rebuild summary.
 +     *
 +     * @param ibuilder
 +     * @param dbuilder
 +     * @return true if index summary is loaded successfully from Summary.db file.
 +     */
 +    public boolean loadSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
 +    {
 +        File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
 +        if (!summariesFile.exists())
 +            return false;
 +
 +        DataInputStream iStream = null;
 +        try
 +        {
 +            iStream = new DataInputStream(new FileInputStream(summariesFile));
 +            indexSummary = IndexSummary.serializer.deserialize(iStream, partitioner, descriptor.version.hasSamplingLevel(), metadata.getMinIndexInterval(), metadata.getMaxIndexInterval());
 +            first = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
 +            last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
 +            ibuilder.deserializeBounds(iStream);
 +            dbuilder.deserializeBounds(iStream);
 +        }
 +        catch (IOException e)
 +        {
 +            logger.debug("Cannot deserialize SSTable Summary File {}: {}", summariesFile.getPath(), e.getMessage());
 +            // corrupted; delete it and fall back to creating a new summary
 +            FileUtils.closeQuietly(iStream);
 +            // delete it and fall back to creating a new summary
 +            FileUtils.deleteWithConfirm(summariesFile);
 +            return false;
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(iStream);
 +        }
 +
 +        return true;
 +    }
 +
 +    /**
 +     * Save index summary to Summary.db file.
 +     *
 +     * @param ibuilder
 +     * @param dbuilder
 +     */
 +    public void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
 +    {
 +        saveSummary(ibuilder, dbuilder, indexSummary);
 +    }
 +
 +    private void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, IndexSummary summary)
 +    {
 +        File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
 +        if (summariesFile.exists())
 +            FileUtils.deleteWithConfirm(summariesFile);
 +
 +        DataOutputStreamAndChannel oStream = null;
 +        try
 +        {
 +            oStream = new DataOutputStreamAndChannel(new FileOutputStream(summariesFile));
 +            IndexSummary.serializer.serialize(summary, oStream, descriptor.version.hasSamplingLevel());
 +            ByteBufferUtil.writeWithLength(first.getKey(), oStream);
 +            ByteBufferUtil.writeWithLength(last.getKey(), oStream);
 +            ibuilder.serializeBounds(oStream);
 +            dbuilder.serializeBounds(oStream);
 +        }
 +        catch (IOException e)
 +        {
 +            logger.debug("Cannot save SSTable Summary: ", e);
 +
 +            // corrupted hence delete it and let it load it now.
 +            if (summariesFile.exists())
 +                FileUtils.deleteWithConfirm(summariesFile);
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(oStream);
 +        }
 +    }
 +
 +    public void setReplacedBy(SSTableReader replacement)
 +    {
 +        synchronized (replaceLock)
 +        {
 +            assert replacedBy == null;
 +            replacedBy = replacement;
 +            replacement.replaces = this;
 +            replacement.replaceLock = replaceLock;
 +        }
 +    }
 +
 +    public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose)
 +    {
 +        synchronized (replaceLock)
 +        {
 +            assert replacedBy == null;
 +
 +            if (newStart.compareTo(this.first) > 0)
 +            {
 +                if (newStart.compareTo(this.last) > 0)
 +                {
 +                    this.runOnClose = new Runnable()
 +                    {
 +                        public void run()
 +                        {
 +                            CLibrary.trySkipCache(dfile.path, 0, 0);
 +                            CLibrary.trySkipCache(ifile.path, 0, 0);
 +                            runOnClose.run();
 +                        }
 +                    };
 +                }
 +                else
 +                {
 +                    final long dataStart = getPosition(newStart, Operator.GE).position;
 +                    final long indexStart = getIndexScanPosition(newStart);
 +                    this.runOnClose = new Runnable()
 +                    {
 +                        public void run()
 +                        {
 +                            CLibrary.trySkipCache(dfile.path, 0, dataStart);
 +                            CLibrary.trySkipCache(ifile.path, 0, indexStart);
 +                            runOnClose.run();
 +                        }
 +                    };
 +                }
 +            }
 +
 +            SSTableReader replacement = internalOpen(descriptor, components, metadata, partitioner, ifile, dfile, indexSummary.readOnlyClone(), bf, maxDataAge, sstableMetadata,
 +                    openReason == OpenReason.EARLY ? openReason : OpenReason.METADATA_CHANGE);
 +            replacement.readMeterSyncFuture = this.readMeterSyncFuture;
 +            replacement.readMeter = this.readMeter;
 +            replacement.first = this.last.compareTo(newStart) > 0 ? newStart : this.last;
 +            replacement.last = this.last;
 +            setReplacedBy(replacement);
 +            return replacement;
 +        }
 +    }
 +
 +    /**
 +     * Returns a new SSTableReader with the same properties as this SSTableReader except that a new IndexSummary will
 +     * be built at the target samplingLevel.  This (original) SSTableReader instance will be marked as replaced, have
 +     * its DeletingTask removed, and have its periodic read-meter sync task cancelled.
 +     * @param samplingLevel the desired sampling level for the index summary on the new SSTableReader
 +     * @return a new SSTableReader
 +     * @throws IOException
 +     */
 +    public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore parent, int samplingLevel) throws IOException
 +    {
 +        synchronized (replaceLock)
 +        {
 +            assert replacedBy == null;
 +
 +            int minIndexInterval = metadata.getMinIndexInterval();
 +            int maxIndexInterval = metadata.getMaxIndexInterval();
 +            double effectiveInterval = indexSummary.getEffectiveIndexInterval();
 +
 +            IndexSummary newSummary;
 +            long oldSize = bytesOnDisk();
 +
 +            // We have to rebuild the summary from the on-disk primary index in three cases:
 +            // 1. The sampling level went up, so we need to read more entries off disk
 +            // 2. The min_index_interval changed (in either direction); this changes what entries would be in the summary
 +            //    at full sampling (and consequently at any other sampling level)
 +            // 3. The max_index_interval was lowered, forcing us to raise the sampling level
 +            if (samplingLevel > indexSummary.getSamplingLevel() || indexSummary.getMinIndexInterval() != minIndexInterval || effectiveInterval > maxIndexInterval)
 +            {
 +                newSummary = buildSummaryAtLevel(samplingLevel);
 +            }
 +            else if (samplingLevel < indexSummary.getSamplingLevel())
 +            {
 +                // we can use the existing index summary to make a smaller one
 +                newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel, minIndexInterval, partitioner);
 +
 +                SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
 +                SegmentedFile.Builder dbuilder = compression
 +                        ? SegmentedFile.getCompressedBuilder()
 +                        : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
 +                saveSummary(ibuilder, dbuilder, newSummary);
 +            }
 +            else
 +            {
 +                throw new AssertionError("Attempted to clone SSTableReader with the same index summary sampling level and " +
 +                        "no adjustments to min/max_index_interval");
 +            }
 +
 +            long newSize = bytesOnDisk();
 +            StorageMetrics.load.inc(newSize - oldSize);
 +            parent.metric.liveDiskSpaceUsed.inc(newSize - oldSize);
 +
 +            SSTableReader replacement = internalOpen(descriptor, components, metadata, partitioner, ifile, dfile, newSummary, bf, maxDataAge, sstableMetadata,
 +                    openReason == OpenReason.EARLY ? openReason : OpenReason.METADATA_CHANGE);
 +            replacement.readMeterSyncFuture = this.readMeterSyncFuture;
 +            replacement.readMeter = this.readMeter;
 +            replacement.first = this.first;
 +            replacement.last = this.last;
 +            setReplacedBy(replacement);
 +            return replacement;
 +        }
 +    }
 +
 +    private IndexSummary buildSummaryAtLevel(int newSamplingLevel) throws IOException
 +    {
 +        // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
 +        RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
 +        try
 +        {
 +            long indexSize = primaryIndex.length();
 +            IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(estimatedKeys(), metadata.getMinIndexInterval(), newSamplingLevel);
 +
 +            long indexPosition;
 +            while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
 +            {
 +                summaryBuilder.maybeAddEntry(partitioner.decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)), indexPosition);
 +                RowIndexEntry.Serializer.skip(primaryIndex);
 +            }
 +
 +            return summaryBuilder.build(partitioner);
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(primaryIndex);
 +        }
 +    }
 +
 +    public int getIndexSummarySamplingLevel()
 +    {
 +        return indexSummary.getSamplingLevel();
 +    }
 +
 +    public long getIndexSummaryOffHeapSize()
 +    {
 +        return indexSummary.getOffHeapSize();
 +    }
 +
 +    public int getMinIndexInterval()
 +    {
 +        return indexSummary.getMinIndexInterval();
 +    }
 +
 +    public double getEffectiveIndexInterval()
 +    {
 +        return indexSummary.getEffectiveIndexInterval();
 +    }
 +
 +    public void releaseSummary() throws IOException
 +    {
 +        indexSummary.close();
 +        indexSummary = null;
 +    }
 +
 +    private void validate()
 +    {
 +        if (this.first.compareTo(this.last) > 0)
 +            throw new IllegalStateException(String.format("SSTable first key %s > last key %s", this.first, this.last));
 +    }
 +
 +    /**
 +     * Gets the position in the index file to start scanning to find the given key (at most indexInterval keys away,
 +     * modulo downsampling of the index summary).
 +     */
 +    public long getIndexScanPosition(RowPosition key)
 +    {
 +        return getIndexScanPositionFromBinarySearchResult(indexSummary.binarySearch(key), indexSummary);
 +    }
 +
 +    protected static long getIndexScanPositionFromBinarySearchResult(int binarySearchResult, IndexSummary referencedIndexSummary)
 +    {
 +        if (binarySearchResult == -1)
 +            return -1;
 +        else
 +            return referencedIndexSummary.getPosition(getIndexSummaryIndexFromBinarySearchResult(binarySearchResult));
 +    }
 +
 +    protected static int getIndexSummaryIndexFromBinarySearchResult(int binarySearchResult)
 +    {
 +        if (binarySearchResult < 0)
 +        {
 +            // binary search gives us the first index _greater_ than the key searched for,
 +            // i.e., its insertion position
 +            int greaterThan = (binarySearchResult + 1) * -1;
 +            if (greaterThan == 0)
 +                return -1;
 +            return greaterThan - 1;
 +        }
 +        else
 +        {
 +            return binarySearchResult;
 +        }
 +    }
 +
 +    /**
 +     * Returns the compression metadata for this sstable.
 +     * @throws IllegalStateException if the sstable is not compressed
 +     */
 +    public CompressionMetadata getCompressionMetadata()
 +    {
 +        if (!compression)
 +            throw new IllegalStateException(this + " is not compressed");
 +
 +        CompressionMetadata cmd = ((ICompressedFile) dfile).getMetadata();
 +
 +        //We need the parent cf metadata
 +        String cfName = metadata.isSecondaryIndex() ? metadata.getParentColumnFamilyName() : metadata.cfName;
 +        cmd.parameters.setLiveMetadata(Schema.instance.getCFMetaData(metadata.ksName, cfName));
 +
 +        return cmd;
 +    }
 +
 +    /**
 +     * For testing purposes only.
 +     */
 +    public void forceFilterFailures()
 +    {
 +        bf = FilterFactory.AlwaysPresent;
 +    }
 +
 +    public IFilter getBloomFilter()
 +    {
 +        return bf;
 +    }
 +
 +    public long getBloomFilterSerializedSize()
 +    {
 +        return bf.serializedSize();
 +    }
 +
 +    /**
 +     * @return An estimate of the number of keys in this SSTable based on the index summary.
 +     */
 +    public long estimatedKeys()
 +    {
 +        return indexSummary.getEstimatedKeyCount();
 +    }
 +
 +    /**
 +     * @param ranges
 +     * @return An estimate of the number of keys for given ranges in this SSTable.
 +     */
 +    public long estimatedKeysForRanges(Collection<Range<Token>> ranges)
 +    {
 +        long sampleKeyCount = 0;
 +        List<Pair<Integer, Integer>> sampleIndexes = getSampleIndexesForRanges(indexSummary, ranges);
 +        for (Pair<Integer, Integer> sampleIndexRange : sampleIndexes)
 +            sampleKeyCount += (sampleIndexRange.right - sampleIndexRange.left + 1);
 +
 +        // adjust for the current sampling level: (BSL / SL) * index_interval_at_full_sampling
 +        long estimatedKeys = sampleKeyCount * (Downsampling.BASE_SAMPLING_LEVEL * indexSummary.getMinIndexInterval()) / indexSummary.getSamplingLevel();
 +        return Math.max(1, estimatedKeys);
 +    }
 +
 +    /**
 +     * Returns the number of entries in the IndexSummary.  At full sampling, this is approximately 1/INDEX_INTERVALth of
 +     * the keys in this SSTable.
 +     */
 +    public int getIndexSummarySize()
 +    {
 +        return indexSummary.size();
 +    }
 +
 +    /**
 +     * Returns the approximate number of entries the IndexSummary would contain if it were at full sampling.
 +     */
 +    public int getMaxIndexSummarySize()
 +    {
 +        return indexSummary.getMaxNumberOfEntries();
 +    }
 +
 +    /**
 +     * Returns the key for the index summary entry at `index`.
 +     */
 +    public byte[] getIndexSummaryKey(int index)
 +    {
 +        return indexSummary.getKey(index);
 +    }
 +
 +    private static List<Pair<Integer,Integer>> getSampleIndexesForRanges(IndexSummary summary, Collection<Range<Token>> ranges)
 +    {
 +        // use the index to determine a minimal section for each range
 +        List<Pair<Integer,Integer>> positions = new ArrayList<>();
 +
 +        for (Range<Token> range : Range.normalize(ranges))
 +        {
 +            RowPosition leftPosition = range.left.maxKeyBound();
 +            RowPosition rightPosition = range.right.maxKeyBound();
 +
 +            int left = summary.binarySearch(leftPosition);
 +            if (left < 0)
 +                left = (left + 1) * -1;
 +            else
 +                // left range are start exclusive
 +                left = left + 1;
 +            if (left == summary.size())
 +                // left is past the end of the sampling
 +                continue;
 +
 +            int right = Range.isWrapAround(range.left, range.right)
 +                    ? summary.size() - 1
 +                    : summary.binarySearch(rightPosition);
 +            if (right < 0)
 +            {
 +                // range are end inclusive so we use the previous index from what binarySearch give us
 +                // since that will be the last index we will return
 +                right = (right + 1) * -1;
 +                if (right == 0)
 +                    // Means the first key is already stricly greater that the right bound
 +                    continue;
 +                right--;
 +            }
 +
 +            if (left > right)
 +                // empty range
 +                continue;
 +            positions.add(Pair.create(left, right));
 +        }
 +        return positions;
 +    }
 +
 +    public Iterable<DecoratedKey> getKeySamples(final Range<Token> range)
 +    {
 +        final List<Pair<Integer, Integer>> indexRanges = getSampleIndexesForRanges(indexSummary, Collections.singletonList(range));
 +
 +        if (indexRanges.isEmpty())
 +            return Collections.emptyList();
 +
 +        return new Iterable<DecoratedKey>()
 +        {
 +            public Iterator<DecoratedKey> iterator()
 +            {
 +                return new Iterator<DecoratedKey>()
 +                {
 +                    private Iterator<Pair<Integer, Integer>> rangeIter = indexRanges.iterator();
 +                    private Pair<Integer, Integer> current;
 +                    private int idx;
 +
 +                    public boolean hasNext()
 +                    {
 +                        if (current == null || idx > current.right)
 +                        {
 +                            if (rangeIter.hasNext())
 +                            {
 +                                current = rangeIter.next();
 +                                idx = current.left;
 +                                return true;
 +                            }
 +                            return false;
 +                        }
 +
 +                        return true;
 +                    }
 +
 +                    public DecoratedKey next()
 +                    {
 +                        byte[] bytes = indexSummary.getKey(idx++);
 +                        return partitioner.decorateKey(ByteBuffer.wrap(bytes));
 +                    }
 +
 +                    public void remove()
 +                    {
 +                        throw new UnsupportedOperationException();
 +                    }
 +                };
 +            }
 +        };
 +    }
 +
 +    /**
 +     * Determine the minimal set of sections that can be extracted from this SSTable to cover the given ranges.
 +     * @return A sorted list of (offset,end) pairs that cover the given ranges in the datafile for this SSTable.
 +     */
 +    public List<Pair<Long,Long>> getPositionsForRanges(Collection<Range<Token>> ranges)
 +    {
 +        // use the index to determine a minimal section for each range
 +        List<Pair<Long,Long>> positions = new ArrayList<>();
 +        for (Range<Token> range : Range.normalize(ranges))
 +        {
 +            AbstractBounds<RowPosition> keyRange = range.toRowBounds();
 +            RowIndexEntry idxLeft = getPosition(keyRange.left, Operator.GT);
 +            long left = idxLeft == null ? -1 : idxLeft.position;
 +            if (left == -1)
 +                // left is past the end of the file
 +                continue;
 +            RowIndexEntry idxRight = getPosition(keyRange.right, Operator.GT);
 +            long right = idxRight == null ? -1 : idxRight.position;
 +            if (right == -1 || Range.isWrapAround(range.left, range.right))
 +                // right is past the end of the file, or it wraps
 +                right = uncompressedLength();
 +            if (left == right)
 +                // empty range
 +                continue;
 +            positions.add(Pair.create(left, right));
 +        }
 +        return positions;
 +    }
 +
 +    public void invalidateCacheKey(DecoratedKey key)
 +    {
 +        KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.getKey());
 +        keyCache.remove(cacheKey);
 +    }
 +
 +    public void cacheKey(DecoratedKey key, RowIndexEntry info)
 +    {
 +        CachingOptions caching = metadata.getCaching();
 +
 +        if (!caching.keyCache.isEnabled()
 +                || keyCache == null
 +                || keyCache.getCapacity() == 0)
 +        {
 +            return;
 +        }
 +
 +        KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.getKey());
 +        logger.trace("Adding cache entry for {} -> {}", cacheKey, info);
 +        keyCache.put(cacheKey, info);
 +    }
 +
 +    public RowIndexEntry getCachedPosition(DecoratedKey key, boolean updateStats)
 +    {
 +        return getCachedPosition(new KeyCacheKey(metadata.cfId, descriptor, key.getKey()), updateStats);
 +    }
 +
 +    protected RowIndexEntry getCachedPosition(KeyCacheKey unifiedKey, boolean updateStats)
 +    {
 +        if (keyCache != null && keyCache.getCapacity() > 0) {
 +            if (updateStats)
 +            {
 +                RowIndexEntry cachedEntry = keyCache.get(unifiedKey);
 +                keyCacheRequest.incrementAndGet();
 +                if (cachedEntry != null)
 +                    keyCacheHit.incrementAndGet();
 +                return cachedEntry;
 +            }
 +            else
 +            {
 +                return keyCache.getInternal(unifiedKey);
 +            }
 +        }
 +        return null;
 +    }
 +
 +    /**
 +     * Get position updating key cache and stats.
 +     * @see #getPosition(org.apache.cassandra.db.RowPosition, SSTableReader.Operator, boolean)
 +     */
 +    public RowIndexEntry getPosition(RowPosition key, Operator op)
 +    {
 +        return getPosition(key, op, true);
 +    }
 +
 +    /**
 +     * @param key The key to apply as the rhs to the given Operator. A 'fake' key is allowed to
 +     * allow key selection by token bounds but only if op != * EQ
 +     * @param op The Operator defining matching keys: the nearest key to the target matching the operator wins.
 +     * @param updateCacheAndStats true if updating stats and cache
 +     * @return The index entry corresponding to the key, or null if the key is not present
 +     */
 +    public abstract RowIndexEntry getPosition(RowPosition key, Operator op, boolean updateCacheAndStats);
 +
 +    //Corresponds to a name column
 +    public abstract OnDiskAtomIterator iterator(DecoratedKey key, SortedSet<CellName> columns);
 +    public abstract OnDiskAtomIterator iterator(FileDataInput file, DecoratedKey key, SortedSet<CellName> columns, RowIndexEntry indexEntry);
 +
 +    //Corresponds to a slice query
 +    public abstract OnDiskAtomIterator iterator(DecoratedKey key, ColumnSlice[] slices, boolean reverse);
 +    public abstract OnDiskAtomIterator iterator(FileDataInput file, DecoratedKey key, ColumnSlice[] slices, boolean reversed, RowIndexEntry indexEntry);
 +
 +    /**
 +     * Finds and returns the first key beyond a given token in this SSTable or null if no such key exists.
 +     */
 +    public DecoratedKey firstKeyBeyond(RowPosition token)
 +    {
 +        long sampledPosition = getIndexScanPosition(token);
 +        if (sampledPosition == -1)
 +            sampledPosition = 0;
 +
 +        Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
 +        while (segments.hasNext())
 +        {
 +            FileDataInput in = segments.next();
 +            try
 +            {
 +                while (!in.isEOF())
 +                {
 +                    ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
 +                    DecoratedKey indexDecoratedKey = partitioner.decorateKey(indexKey);
 +                    if (indexDecoratedKey.compareTo(token) > 0)
 +                        return indexDecoratedKey;
 +
 +                    RowIndexEntry.Serializer.skip(in);
 +                }
 +            }
 +            catch (IOException e)
 +            {
 +                markSuspect();
 +                throw new CorruptSSTableException(e, in.getPath());
 +            }
 +            finally
 +            {
 +                FileUtils.closeQuietly(in);
 +            }
 +        }
 +
 +        return null;
 +    }
 +
 +    /**
 +     * @return The length in bytes of the data for this SSTable. For
 +     * compressed files, this is not the same thing as the on disk size (see
 +     * onDiskLength())
 +     */
 +    public long uncompressedLength()
 +    {
 +        return dfile.length;
 +    }
 +
 +    /**
 +     * @return The length in bytes of the on disk size for this SSTable. For
 +     * compressed files, this is not the same thing as the data length (see
 +     * length())
 +     */
 +    public long onDiskLength()
 +    {
 +        return dfile.onDiskLength;
 +    }
 +
 +    public boolean acquireReference()
 +    {
 +        while (true)
 +        {
 +            int n = references.get();
 +            if (n <= 0)
 +                return false;
 +            if (references.compareAndSet(n, n + 1))
 +                return true;
 +        }
 +    }
 +
 +    @VisibleForTesting
 +    public int referenceCount()
 +    {
 +        return references.get();
 +    }
 +
 +    /**
 +     * Release reference to this SSTableReader.
 +     * If there is no one referring to this SSTable, and is marked as compacted,
 +     * all resources are cleaned up and files are deleted eventually.
 +     */
 +    public void releaseReference()
 +    {
 +        if (references.decrementAndGet() == 0)
 +            tidy(true);
 +        assert references.get() >= 0 : "Reference counter " +  references.get() + " for " + dfile.path;
 +    }
 +
 +    /**
 +     * Mark the sstable as obsolete, i.e., compacted into newer sstables.
 +     *
 +     * When calling this function, the caller must ensure that the SSTableReader is not referenced anywhere
 +     * except for threads holding a reference.
 +     *
 +     * @return true if the this is the first time the file was marked obsolete.  Calling this
 +     * multiple times is usually buggy (see exceptions in DataTracker.unmarkCompacting and removeOldSSTablesSize).
 +     */
 +    public boolean markObsolete()
 +    {
 +        if (logger.isDebugEnabled())
 +            logger.debug("Marking {} compacted", getFilename());
 +
 +        synchronized (replaceLock)
 +        {
-             assert replacedBy == null;
++            assert replacedBy == null : getFilename();
 +        }
 +        return !isCompacted.getAndSet(true);
 +    }
 +
 +    public boolean isMarkedCompacted()
 +    {
 +        return isCompacted.get();
 +    }
 +
 +    public void markSuspect()
 +    {
 +        if (logger.isDebugEnabled())
 +            logger.debug("Marking {} as a suspect for blacklisting.", getFilename());
 +
 +        isSuspect.getAndSet(true);
 +    }
 +
 +    public boolean isMarkedSuspect()
 +    {
 +        return isSuspect.get();
 +    }
 +
 +
 +    /**
 +     * I/O SSTableScanner
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public ICompactionScanner getScanner()
 +    {
 +        return getScanner((RateLimiter) null);
 +    }
 +
 +    public ICompactionScanner getScanner(RateLimiter limiter)
 +    {
 +        return getScanner(DataRange.allData(partitioner), limiter);
 +    }
 +
 +    /**
 +     *
 +     * @param dataRange filter to use when reading the columns
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public ICompactionScanner getScanner(DataRange dataRange)
 +    {
 +        return getScanner(dataRange, null);
 +    }
 +
 +    /**
 +     * Direct I/O SSTableScanner over a defined range of tokens.
 +     *
 +     * @param range the range of keys to cover
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public ICompactionScanner getScanner(Range<Token> range, RateLimiter limiter)
 +    {
 +        if (range == null)
 +            return getScanner(limiter);
 +        return getScanner(Collections.singletonList(range), limiter);
 +    }
 +
 +    /**
 +     * Direct I/O SSTableScanner over a defined collection of ranges of tokens.
 +     *
 +     * @param ranges the range of keys to cover
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public abstract ICompactionScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter);
 +
 +    /**
 +     *
 +     * @param dataRange filter to use when reading the columns
 +     * @return A Scanner for seeking over the rows of the SSTable.
 +     */
 +    public abstract ICompactionScanner getScanner(DataRange dataRange, RateLimiter limiter);
 +
 +
 +
 +    public FileDataInput getFileDataInput(long position)
 +    {
 +        return dfile.getSegment(position);
 +    }
 +
 +    /**
 +     * Tests if the sstable contains data newer than the given age param (in localhost currentMilli time).
 +     * This works in conjunction with maxDataAge which is an upper bound on the create of data in this sstable.
 +     * @param age The age to compare the maxDataAre of this sstable. Measured in millisec since epoc on this host
 +     * @return True iff this sstable contains data that's newer than the given age parameter.
 +     */
 +    public boolean newSince(long age)
 +    {
 +        return maxDataAge > age;
 +    }
 +
 +    public void createLinks(String snapshotDirectoryPath)
 +    {
 +        for (Component component : components)
 +        {
 +            File sourceFile = new File(descriptor.filenameFor(component));
 +            File targetLink = new File(snapshotDirectoryPath, sourceFile.getName());
 +            FileUtils.createHardLink(sourceFile, targetLink);
 +        }
 +    }
 +
 +    public boolean isRepaired()
 +    {
 +        return sstableMetadata.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE;
 +    }
 +
 +    public SSTableReader getCurrentReplacement()
 +    {
 +        synchronized (replaceLock)
 +        {
 +            SSTableReader cur = this, next = replacedBy;
 +            while (next != null)
 +            {
 +                cur = next;
 +                next = next.replacedBy;
 +            }
 +            return cur;
 +        }
 +    }
 +
 +    /**
 +     * TODO: Move someplace reusable
 +     */
 +    public abstract static class Operator
 +    {
 +        public static final Operator EQ = new Equals();
 +        public static final Operator GE = new GreaterThanOrEqualTo();
 +        public static final Operator GT = new GreaterThan();
 +
 +        /**
 +         * @param comparison The result of a call to compare/compareTo, with the desired field on the rhs.
 +         * @return less than 0 if the operator cannot match forward, 0 if it matches, greater than 0 if it might match forward.
 +         */
 +        public abstract int apply(int comparison);
 +
 +        final static class Equals extends Operator
 +        {
 +            public int apply(int comparison) { return -comparison; }
 +        }
 +
 +        final static class GreaterThanOrEqualTo extends Operator
 +        {
 +            public int apply(int comparison) { return comparison >= 0 ? 0 : -comparison; }
 +        }
 +
 +        final static class GreaterThan extends Operator
 +        {
 +            public int apply(int comparison) { return comparison > 0 ? 0 : 1; }
 +        }
 +    }
 +
 +    public long getBloomFilterFalsePositiveCount()
 +    {
 +        return bloomFilterTracker.getFalsePositiveCount();
 +    }
 +
 +    public long getRecentBloomFilterFalsePositiveCount()
 +    {
 +        return bloomFilterTracker.getRecentFalsePositiveCount();
 +    }
 +
 +    public long getBloomFilterTruePositiveCount()
 +    {
 +        return bloomFilterTracker.getTruePositiveCount();
 +    }
 +
 +    public long getRecentBloomFilterTruePositiveCount()
 +    {
 +        return bloomFilterTracker.getRecentTruePositiveCount();
 +    }
 +
 +    public InstrumentingCache<KeyCacheKey, RowIndexEntry> getKeyCache()
 +    {
 +        return keyCache;
 +    }
 +
 +    public EstimatedHistogram getEstimatedRowSize()
 +    {
 +        return sstableMetadata.estimatedRowSize;
 +    }
 +
 +    public EstimatedHistogram getEstimatedColumnCount()
 +    {
 +        return sstableMetadata.estimatedColumnCount;
 +    }
 +
 +    public double getEstimatedDroppableTombstoneRatio(int gcBefore)
 +    {
 +        return sstableMetadata.getEstimatedDroppableTombstoneRatio(gcBefore);
 +    }
 +
 +    public double getDroppableTombstonesBefore(int gcBefore)
 +    {
 +        return sstableMetadata.getDroppableTombstonesBefore(gcBefore);
 +    }
 +
 +    public double getCompressionRatio()
 +    {
 +        return sstableMetadata.compressionRatio;
 +    }
 +
 +    public ReplayPosition getReplayPosition()
 +    {
 +        return sstableMetadata.replayPosition;
 +    }
 +
 +    public long getMinTimestamp()
 +    {
 +        return sstableMetadata.minTimestamp;
 +    }
 +
 +    public long getMaxTimestamp()
 +    {
 +        return sstableMetadata.maxTimestamp;
 +    }
 +
 +    public Set<Integer> getAncestors()
 +    {
 +        try
 +        {
 +            CompactionMetadata compactionMetadata = (CompactionMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.COMPACTION);
 +            return compactionMetadata.ancestors;
 +        }
 +        catch (IOException e)
 +        {
 +            SSTableReader.logOpenException(descriptor, e);
 +            return Collections.emptySet();
 +        }
 +    }
 +
 +    public int getSSTableLevel()
 +    {
 +        return sstableMetadata.sstableLevel;
 +    }
 +
 +    /**
 +     * Reloads the sstable metadata from disk.
 +     *
 +     * Called after level is changed on sstable, for example if the sstable is dropped to L0
 +     *
 +     * Might be possible to remove in future versions
 +     *
 +     * @throws IOException
 +     */
 +    public void reloadSSTableMetadata() throws IOException
 +    {
 +        this.sstableMetadata = (StatsMetadata) descriptor.getMetadataSerializer().deserialize(descriptor, MetadataType.STATS);
 +    }
 +
 +    public StatsMetadata getSSTableMetadata()
 +    {
 +        return sstableMetadata;
 +    }
 +
 +    public RandomAccessReader openDataReader(RateLimiter limiter)
 +    {
 +        assert limiter != null;
 +        return compression
 +                ? CompressedThrottledReader.open(getFilename(), getCompressionMetadata(), limiter)
 +                : ThrottledReader.open(new File(getFilename()), limiter);
 +    }
 +
 +    public RandomAccessReader openDataReader()
 +    {
 +        return compression
 +                ? CompressedRandomAccessReader.open(getFilename(), getCompressionMetadata())
 +                : RandomAccessReader.open(new File(getFilename()));
 +    }
 +
 +    public RandomAccessReader openIndexReader()
 +    {
 +        return RandomAccessReader.open(new File(getIndexFilename()));
 +    }
 +
 +    /**
 +     * @param component component to get timestamp.
 +     * @return last modified time for given component. 0 if given component does not exist or IO error occurs.
 +     */
 +    public long getCreationTimeFor(Component component)
 +    {
 +        return new File(descriptor.filenameFor(component)).lastModified();
 +    }
 +
 +    /**
 +     * @return Number of key cache hit
 +     */
 +    public long getKeyCacheHit()
 +    {
 +        return keyCacheHit.get();
 +    }
 +
 +    /**
 +     * @return Number of key cache request
 +     */
 +    public long getKeyCacheRequest()
 +    {
 +        return keyCacheRequest.get();
 +    }
 +
 +    /**
 +     * @param sstables
 +     * @return true if all desired references were acquired.  Otherwise, it will unreference any partial acquisition, and return false.
 +     */
 +    public static boolean acquireReferences(Iterable<SSTableReader> sstables)
 +    {
 +        SSTableReader failed = null;
 +        for (SSTableReader sstable : sstables)
 +        {
 +            if (!sstable.acquireReference())
 +            {
 +                failed = sstable;
 +                break;
 +            }
 +        }
 +
 +        if (failed == null)
 +            return true;
 +
 +        for (SSTableReader sstable : sstables)
 +        {
 +            if (sstable == failed)
 +                break;
 +            sstable.releaseReference();
 +        }
 +        return false;
 +    }
 +
 +    public static void releaseReferences(Iterable<SSTableReader> sstables)
 +    {
 +        for (SSTableReader sstable : sstables)
 +        {
 +            sstable.releaseReference();
 +        }
 +    }
 +
 +    private void dropPageCache()
 +    {
 +        dropPageCache(dfile.path);
 +        dropPageCache(ifile.path);
 +    }
 +
 +    private void dropPageCache(String filePath)
 +    {
 +        RandomAccessFile file = null;
 +
 +        try
 +        {
 +            file = new RandomAccessFile(filePath, "r");
 +
 +            int fd = CLibrary.getfd(file.getFD());
 +
 +            if (fd > 0)
 +            {
 +                if (logger.isDebugEnabled())
 +                    logger.debug(String.format("Dropping page cache of file %s.", filePath));
 +
 +                CLibrary.trySkipCache(fd, 0, 0);
 +            }
 +        }
 +        catch (IOException e)
 +        {
 +            // we don't care if cache cleanup fails
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(file);
 +        }
 +    }
 +
 +    /**
 +     * Increment the total row read count and read rate for this SSTable.  This should not be incremented for range
 +     * slice queries, row cache hits, or non-query reads, like compaction.
 +     */
 +    public void incrementReadCount()
 +    {
 +        if (readMeter != null)
 +            readMeter.mark();
 +    }
 +
 +    protected class EmptyCompactionScanner implements ICompactionScanner
 +    {
 +        private final String filename;
 +
 +        public EmptyCompactionScanner(String filename)
 +        {
 +            this.filename = filename;
 +        }
 +
 +        public long getLengthInBytes()
 +        {
 +            return 0;
 +        }
 +
 +        public long getCurrentPosition()
 +        {
 +            return 0;
 +        }
 +
 +        public String getBackingFiles()
 +        {
 +            return filename;
 +        }
 +
 +        public boolean hasNext()
 +        {
 +            return false;
 +        }
 +
 +        public OnDiskAtomIterator next()
 +        {
 +            return null;
 +        }
 +
 +        public void close() throws IOException { }
 +
 +        public void remove() { }
 +    }
 +
 +    public static class SizeComparator implements Comparator<SSTableReader>
 +    {
 +        public int compare(SSTableReader o1, SSTableReader o2)
 +        {
 +            return Longs.compare(o1.onDiskLength(), o2.onDiskLength());
 +        }
 +    }
 +
 +    public static abstract class Factory
 +    {
 +        public abstract SSTableReader open(final Descriptor descriptor,
 +                                           Set<Component> components,
 +                                           CFMetaData metadata,
 +                                           IPartitioner partitioner,
 +                                           Long maxDataAge,
 +                                           StatsMetadata sstableMetadata,
 +                                           OpenReason openReason);
 +
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af44d1a7/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af44d1a7/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 3cd2ea8,ebc6e86..d49d8bb
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@@ -36,15 -31,12 +36,17 @@@ import org.junit.runner.RunWith
  import org.apache.cassandra.OrderedJUnit4ClassRunner;
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
 -import org.apache.cassandra.db.*;
 +import org.apache.cassandra.config.KSMetaData;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.Mutation;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
 -import org.apache.cassandra.io.sstable.SSTableReader;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.locator.SimpleStrategy;
+ import org.apache.cassandra.notifications.SSTableAddedNotification;
+ import org.apache.cassandra.notifications.SSTableRepairStatusChanged;
  import org.apache.cassandra.repair.RepairJobDesc;
  import org.apache.cassandra.repair.Validator;
  import org.apache.cassandra.service.ActiveRepairService;
@@@ -167,16 -95,16 +169,16 @@@ public class LeveledCompactionStrategyT
          }
  
          waitForLeveling(cfs);
-         LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) cfs.getCompactionStrategy();
+         WrappingCompactionStrategy strategy = (WrappingCompactionStrategy) cfs.getCompactionStrategy();
          // Checking we're not completely bad at math
-         assert strategy.getLevelSize(1) > 0;
-         assert strategy.getLevelSize(2) > 0;
+         assert strategy.getSSTableCountPerLevel()[1] > 0;
+         assert strategy.getSSTableCountPerLevel()[2] > 0;
  
          Range<Token> range = new Range<>(Util.token(""), Util.token(""));
 -        int gcBefore = keyspace.getColumnFamilyStore(cfname).gcBefore(System.currentTimeMillis());
 +        int gcBefore = keyspace.getColumnFamilyStore(CF_STANDARDDLEVELED).gcBefore(System.currentTimeMillis());
          UUID parentRepSession = UUID.randomUUID();
 -        ActiveRepairService.instance.registerParentRepairSession(parentRepSession, Arrays.asList(cfs), Arrays.asList(range));
 -        RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), ksname, cfname, range);
 +        ActiveRepairService.instance.registerParentRepairSession(parentRepSession, Arrays.asList(cfs), Arrays.asList(range), false);
 +        RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), KEYSPACE1, CF_STANDARDDLEVELED, range);
          Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore);
          CompactionManager.instance.submitValidation(cfs, validator).get();
      }