You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/10/30 16:18:31 UTC

[03/10] git commit: Compact hottest sstables first and optionally omit coldest patch by Tyler Hobbs; reviewed by jbellis for CASSANDRA-6109

Compact hottest sstables first and optionally omit coldest
patch by Tyler Hobbs; reviewed by jbellis for CASSANDRA-6109


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/37285304
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/37285304
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/37285304

Branch: refs/heads/trunk
Commit: 37285304ee484122410c977399024f2af132753c
Parents: 5eddf18
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Oct 29 15:25:39 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Oct 29 15:41:17 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../SizeTieredCompactionStrategy.java           | 149 +++++++++++---
 .../SizeTieredCompactionStrategyOptions.java    |  53 ++---
 .../cassandra/io/sstable/SSTableReader.java     |   4 +-
 .../SizeTieredCompactionStrategyTest.java       | 192 ++++++++++++++++++-
 5 files changed, 341 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/37285304/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7bf7f21..4815c1c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 2.0.3
+ * Compact hottest sstables first and optionally omit coldest from
+   compaction entirely (CASSANDRA-6109)
  * Fix modifying column_metadata from thrift (CASSANDRA-6182)
  * cqlsh: fix LIST USERS output (CASSANDRA-6242)
  * Add IRequestSink interface (CASSANDRA-6248)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37285304/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index cee5f97..5115860 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -20,8 +20,9 @@ package org.apache.cassandra.db.compaction;
 import java.util.*;
 import java.util.Map.Entry;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
-import com.google.common.primitives.Longs;
+import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,8 +55,10 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
         int minThreshold = cfs.getMinimumCompactionThreshold();
         int maxThreshold = cfs.getMaximumCompactionThreshold();
 
-        Set<SSTableReader> candidates = cfs.getUncompactingSSTables();
-        List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(filterSuspectSSTables(candidates)), options.bucketHigh, options.bucketLow, options.minSSTableSize);
+        Iterable<SSTableReader> candidates = filterSuspectSSTables(cfs.getUncompactingSSTables());
+        candidates = filterColdSSTables(Lists.newArrayList(candidates), options.coldReadsToOmit);
+
+        List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(candidates), options.bucketHigh, options.bucketLow, options.minSSTableSize);
         logger.debug("Compaction buckets are {}", buckets);
         updateEstimatedCompactionsByTasks(buckets);
         List<SSTableReader> mostInteresting = mostInterestingBucket(buckets, minThreshold, maxThreshold);
@@ -77,34 +80,88 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
         return Collections.singletonList(sstablesWithTombstones.get(0));
     }
 
+    /**
+     * Removes as many cold sstables as possible while retaining at least 1-coldReadsToOmit of the total reads/sec
+     * across all sstables
+     * @param sstables all sstables to consider
+     * @param coldReadsToOmit the proportion of total reads/sec that will be omitted (0=omit nothing, 1=omit everything)
+     * @return a list of sstables with the coldest sstables excluded until the reads they represent reaches coldReadsToOmit
+     */
+    @VisibleForTesting
+    static List<SSTableReader> filterColdSSTables(List<SSTableReader> sstables, double coldReadsToOmit)
+    {
+        // sort the sstables by hotness (coldest-first), breaking ties with size on disk (mainly for system tables and cold tables)
+        Collections.sort(sstables, new Comparator<SSTableReader>()
+        {
+            public int compare(SSTableReader o1, SSTableReader o2)
+            {
+                int comparison = Double.compare(hotness(o1), hotness(o2));
+                if (comparison != 0)
+                    return comparison;
+
+                return Long.compare(o1.bytesOnDisk(), o2.bytesOnDisk());
+            }
+        });
+
+        // calculate the total reads/sec across all sstables
+        double totalReads = 0.0;
+        for (SSTableReader sstr : sstables)
+            if (sstr.readMeter != null)
+                totalReads += sstr.readMeter.twoHourRate();
+
+        // if this is a system table with no read meters or we don't have any read rates yet, just return them all
+        if (totalReads == 0.0)
+            return sstables;
+
+        // iteratively ignore the coldest sstables until ignoring one more would put us over the coldReadsToOmit threshold
+        double maxColdReads = coldReadsToOmit * totalReads;
+
+        double totalColdReads = 0.0;
+        int cutoffIndex = 0;
+        while (cutoffIndex < sstables.size())
+        {
+            double reads = sstables.get(cutoffIndex).readMeter.twoHourRate();
+            if (totalColdReads + reads > maxColdReads)
+                break;
+
+            totalColdReads += reads;
+            cutoffIndex++;
+        }
+
+        return sstables.subList(cutoffIndex, sstables.size());
+    }
+
+    /**
+     * @param buckets list of buckets from which to return the most interesting, where "interesting" is the total hotness for reads
+     * @param minThreshold minimum number of sstables in a bucket to qualify as interesting
+     * @param maxThreshold maximum number of sstables to compact at once (the returned bucket will be trimmed down to this)
+     * @return a bucket (list) of sstables to compact
+     */
     public static List<SSTableReader> mostInterestingBucket(List<List<SSTableReader>> buckets, int minThreshold, int maxThreshold)
     {
-        // skip buckets containing less than minThreshold sstables, and limit other buckets to maxThreshold entries
-        List<List<SSTableReader>> prunedBuckets = new ArrayList<List<SSTableReader>>();
+        // skip buckets containing less than minThreshold sstables, and limit other buckets to maxThreshold sstables
+        final List<Pair<List<SSTableReader>, Double>> prunedBucketsAndHotness = new ArrayList<>(buckets.size());
         for (List<SSTableReader> bucket : buckets)
         {
-            if (bucket.size() < minThreshold)
-                continue;
-
-            Collections.sort(bucket, new Comparator<SSTableReader>()
-            {
-                public int compare(SSTableReader o1, SSTableReader o2)
-                {
-                    return o1.descriptor.generation - o2.descriptor.generation;
-                }
-            });
-            List<SSTableReader> prunedBucket = bucket.subList(0, Math.min(bucket.size(), maxThreshold));
-            prunedBuckets.add(prunedBucket);
+            Pair<List<SSTableReader>, Double> bucketAndHotness = trimToThresholdWithHotness(bucket, maxThreshold);
+            if (bucketAndHotness != null && bucketAndHotness.left.size() >= minThreshold)
+                prunedBucketsAndHotness.add(bucketAndHotness);
         }
-        if (prunedBuckets.isEmpty())
+        if (prunedBucketsAndHotness.isEmpty())
             return Collections.emptyList();
 
-        // prefer compacting buckets with smallest average size; that will yield the fastest improvement for read performance
-        return Collections.min(prunedBuckets, new Comparator<List<SSTableReader>>()
+        // prefer compacting the hottest bucket
+        Pair<List<SSTableReader>, Double> hottest = Collections.max(prunedBucketsAndHotness, new Comparator<Pair<List<SSTableReader>, Double>>()
         {
-            public int compare(List<SSTableReader> o1, List<SSTableReader> o2)
+            public int compare(Pair<List<SSTableReader>, Double> o1, Pair<List<SSTableReader>, Double> o2)
             {
-                return Longs.compare(avgSize(o1), avgSize(o2));
+                int comparison = Double.compare(o1.right, o2.right);
+                if (comparison != 0)
+                    return comparison;
+
+                // break ties by compacting the smallest sstables first (this will probably only happen for
+                // system tables and new/unread sstables)
+                return Long.compare(avgSize(o1.left), avgSize(o2.left));
             }
 
             private long avgSize(List<SSTableReader> sstables)
@@ -115,6 +172,44 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
                 return n / sstables.size();
             }
         });
+
+        return hottest.left;
+    }
+
+    /**
+     * Returns a (bucket, hotness) pair or null if there were not enough sstables in the bucket to meet minThreshold.
+     * If there are more than maxThreshold sstables, the coldest sstables will be trimmed to meet the threshold.
+     **/
+    @VisibleForTesting
+    static Pair<List<SSTableReader>, Double> trimToThresholdWithHotness(List<SSTableReader> bucket, int maxThreshold)
+    {
+        // sort by sstable hotness (descending)
+        Collections.sort(bucket, new Comparator<SSTableReader>()
+        {
+            public int compare(SSTableReader o1, SSTableReader o2)
+            {
+                return -1 * Double.compare(hotness(o1), hotness(o2));
+            }
+        });
+
+        // and then trim the coldest sstables off the end to meet the maxThreshold
+        List<SSTableReader> prunedBucket = bucket.subList(0, Math.min(bucket.size(), maxThreshold));
+
+        // bucket hotness is the sum of the hotness of all sstable members
+        double bucketHotness = 0.0;
+        for (SSTableReader sstr : prunedBucket)
+            bucketHotness += hotness(sstr);
+
+        return Pair.create(prunedBucket, bucketHotness);
+    }
+
+    /**
+     * Returns the reads per second per key for this sstable, or 0.0 if the sstable has no read meter
+     */
+    private static double hotness(SSTableReader sstr)
+    {
+        // system tables don't have read meters, just use 0.0 for the hotness
+        return sstr.readMeter == null ? 0.0 : sstr.readMeter.twoHourRate() / sstr.estimatedKeys();
     }
 
     public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
@@ -124,13 +219,13 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
 
         while (true)
         {
-            List<SSTableReader> smallestBucket = getNextBackgroundSSTables(gcBefore);
+            List<SSTableReader> hottestBucket = getNextBackgroundSSTables(gcBefore);
 
-            if (smallestBucket.isEmpty())
+            if (hottestBucket.isEmpty())
                 return null;
 
-            if (cfs.getDataTracker().markCompacting(smallestBucket))
-                return new CompactionTask(cfs, smallestBucket, gcBefore);
+            if (cfs.getDataTracker().markCompacting(hottestBucket))
+                return new CompactionTask(cfs, hottestBucket, gcBefore);
         }
     }
 
@@ -253,4 +348,4 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
             cfs.getMinimumCompactionThreshold(),
             cfs.getMaximumCompactionThreshold());
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37285304/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
index d7c9075..711ec6e 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
@@ -26,31 +26,48 @@ public final class SizeTieredCompactionStrategyOptions
     protected static final long DEFAULT_MIN_SSTABLE_SIZE = 50L * 1024L * 1024L;
     protected static final double DEFAULT_BUCKET_LOW = 0.5;
     protected static final double DEFAULT_BUCKET_HIGH = 1.5;
+    protected static final double DEFAULT_COLD_READS_TO_OMIT = 0.0;
     protected static final String MIN_SSTABLE_SIZE_KEY = "min_sstable_size";
     protected static final String BUCKET_LOW_KEY = "bucket_low";
     protected static final String BUCKET_HIGH_KEY = "bucket_high";
+    protected static final String MAX_COLD_READS_RATIO_KEY = "max_cold_reads_ratio";
 
     protected long minSSTableSize;
     protected double bucketLow;
     protected double bucketHigh;
+    protected double coldReadsToOmit;
 
     public SizeTieredCompactionStrategyOptions(Map<String, String> options)
     {
-
         String optionValue = options.get(MIN_SSTABLE_SIZE_KEY);
         minSSTableSize = optionValue == null ? DEFAULT_MIN_SSTABLE_SIZE : Long.parseLong(optionValue);
         optionValue = options.get(BUCKET_LOW_KEY);
         bucketLow = optionValue == null ? DEFAULT_BUCKET_LOW : Double.parseDouble(optionValue);
         optionValue = options.get(BUCKET_HIGH_KEY);
         bucketHigh = optionValue == null ? DEFAULT_BUCKET_HIGH : Double.parseDouble(optionValue);
+        optionValue = options.get(MAX_COLD_READS_RATIO_KEY);
+        coldReadsToOmit = optionValue == null ? DEFAULT_COLD_READS_TO_OMIT : Double.parseDouble(optionValue);
     }
 
     public SizeTieredCompactionStrategyOptions()
     {
-
         minSSTableSize = DEFAULT_MIN_SSTABLE_SIZE;
         bucketLow = DEFAULT_BUCKET_LOW;
         bucketHigh = DEFAULT_BUCKET_HIGH;
+        coldReadsToOmit = DEFAULT_COLD_READS_TO_OMIT;
+    }
+
+    private static double parseDouble(Map<String, String> options, String key, double defaultValue) throws ConfigurationException
+    {
+        String optionValue = options.get(key);
+        try
+        {
+            return optionValue == null ? defaultValue : Double.parseDouble(optionValue);
+        }
+        catch (NumberFormatException e)
+        {
+            throw new ConfigurationException(String.format("%s is not a parsable float for %s", optionValue, key), e);
+        }
     }
 
     public static Map<String, String> validateOptions(Map<String, String> options, Map<String, String> uncheckedOptions) throws ConfigurationException
@@ -69,36 +86,26 @@ public final class SizeTieredCompactionStrategyOptions
             throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, MIN_SSTABLE_SIZE_KEY), e);
         }
 
-        double bucketLow, bucketHigh;
-        optionValue = options.get(BUCKET_LOW_KEY);
-        try
-        {
-            bucketLow = optionValue == null ? DEFAULT_BUCKET_LOW : Double.parseDouble(optionValue);
-        }
-        catch (NumberFormatException e)
-        {
-            throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, DEFAULT_BUCKET_LOW), e);
-        }
-
-        optionValue = options.get(BUCKET_HIGH_KEY);
-        try
-        {
-            bucketHigh = optionValue == null ? DEFAULT_BUCKET_HIGH : Double.parseDouble(optionValue);
-        }
-        catch (NumberFormatException e)
+        double bucketLow = parseDouble(options, BUCKET_LOW_KEY, DEFAULT_BUCKET_LOW);
+        double bucketHigh = parseDouble(options, BUCKET_HIGH_KEY, DEFAULT_BUCKET_HIGH);
+        if (bucketHigh <= bucketLow)
         {
-            throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, DEFAULT_BUCKET_HIGH), e);
+            throw new ConfigurationException(String.format("%s value (%s) is less than or equal to the %s value (%s)",
+                                                           BUCKET_HIGH_KEY, bucketHigh, BUCKET_LOW_KEY, bucketLow));
         }
 
-        if (bucketHigh <= bucketLow)
+        double maxColdReadsRatio = parseDouble(options, MAX_COLD_READS_RATIO_KEY, DEFAULT_COLD_READS_TO_OMIT);
+        if (maxColdReadsRatio < 0.0 || maxColdReadsRatio > 1.0)
         {
-            throw new ConfigurationException(String.format("Bucket high value (%s) is less than or equal bucket low value (%s)", bucketHigh, bucketLow));
+            throw new ConfigurationException(String.format("%s value (%s) should be between between 0.0 and 1.0",
+                                                           MAX_COLD_READS_RATIO_KEY, optionValue));
         }
 
         uncheckedOptions.remove(MIN_SSTABLE_SIZE_KEY);
         uncheckedOptions.remove(BUCKET_LOW_KEY);
         uncheckedOptions.remove(BUCKET_HIGH_KEY);
+        uncheckedOptions.remove(MAX_COLD_READS_RATIO_KEY);
 
         return uncheckedOptions;
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37285304/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 9837f4c..c961d44 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Longs;
 import com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
@@ -103,7 +104,8 @@ public class SSTableReader extends SSTable implements Closeable
     private final AtomicLong keyCacheHit = new AtomicLong(0);
     private final AtomicLong keyCacheRequest = new AtomicLong(0);
 
-    public final RestorableMeter readMeter;
+    @VisibleForTesting
+    public RestorableMeter readMeter;
 
     public static long getApproximateKeyCount(Iterable<SSTableReader> sstables, CFMetaData metadata)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37285304/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
index 89604c5..5e79bd8 100644
--- a/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
@@ -17,17 +17,80 @@
  */
 package org.apache.cassandra.db.compaction;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.nio.ByteBuffer;
+import java.util.*;
 
 import org.junit.Test;
 
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.metrics.RestorableMeter;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 
-import static org.junit.Assert.assertEquals;
+import static org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy.getBuckets;
+import static org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy.mostInterestingBucket;
+import static org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy.trimToThresholdWithHotness;
+import static org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy.filterColdSSTables;
+import static org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy.validateOptions;
 
-public class SizeTieredCompactionStrategyTest
+import static org.junit.Assert.*;
+
+public class SizeTieredCompactionStrategyTest extends SchemaLoader
 {
+
+    @Test
+    public void testOptionsValidation() throws ConfigurationException
+    {
+        Map<String, String> options = new HashMap<>();
+        options.put(SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY, "0.35");
+        options.put(SizeTieredCompactionStrategyOptions.BUCKET_LOW_KEY, "0.5");
+        options.put(SizeTieredCompactionStrategyOptions.BUCKET_HIGH_KEY, "1.5");
+        options.put(SizeTieredCompactionStrategyOptions.MIN_SSTABLE_SIZE_KEY, "10000");
+        Map<String, String> unvalidated = validateOptions(options);
+        assertTrue(unvalidated.isEmpty());
+
+        try
+        {
+            options.put(SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY, "-0.5");
+            validateOptions(options);
+            fail(String.format("Negative %s should be rejected", SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY));
+        }
+        catch (ConfigurationException e) {}
+
+        try
+        {
+            options.put(SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY, "10.0");
+            validateOptions(options);
+            fail(String.format("%s > 1.0 should be rejected", SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY));
+        }
+        catch (ConfigurationException e)
+        {
+            options.put(SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY, "0.25");
+        }
+
+        try
+        {
+            options.put(SizeTieredCompactionStrategyOptions.BUCKET_LOW_KEY, "1000.0");
+            validateOptions(options);
+            fail("bucket_low greater than bucket_high should be rejected");
+        }
+        catch (ConfigurationException e)
+        {
+            options.put(SizeTieredCompactionStrategyOptions.BUCKET_LOW_KEY, "0.5");
+        }
+
+        options.put("bad_option", "1.0");
+        unvalidated = validateOptions(options);
+        assertTrue(unvalidated.containsKey("bad_option"));
+    }
+
     @Test
     public void testGetBuckets()
     {
@@ -39,7 +102,7 @@ public class SizeTieredCompactionStrategyTest
             pairs.add(pair);
         }
 
-        List<List<String>> buckets = SizeTieredCompactionStrategy.getBuckets(pairs, 1.5, 0.5, 2);
+        List<List<String>> buckets = getBuckets(pairs, 1.5, 0.5, 2);
         assertEquals(3, buckets.size());
 
         for (List<String> bucket : buckets)
@@ -59,7 +122,7 @@ public class SizeTieredCompactionStrategyTest
             pairs.add(pair);
         }
 
-        buckets = SizeTieredCompactionStrategy.getBuckets(pairs, 1.5, 0.5, 2);
+        buckets = getBuckets(pairs, 1.5, 0.5, 2);
         assertEquals(2, buckets.size());
 
         for (List<String> bucket : buckets)
@@ -80,7 +143,120 @@ public class SizeTieredCompactionStrategyTest
             pairs.add(pair);
         }
 
-        buckets = SizeTieredCompactionStrategy.getBuckets(pairs, 1.5, 0.5, 10);
+        buckets = getBuckets(pairs, 1.5, 0.5, 10);
         assertEquals(1, buckets.size());
     }
-}
+
+    @Test
+    public void testPrepBucket() throws Exception
+    {
+        String ksname = "Keyspace1";
+        String cfname = "Standard1";
+        Keyspace keyspace = Keyspace.open(ksname);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+
+        // create 3 sstables
+        int numSSTables = 3;
+        for (int r = 0; r < numSSTables; r++)
+        {
+            DecoratedKey key = Util.dk(String.valueOf(r));
+            RowMutation rm = new RowMutation(ksname, key.key);
+            rm.add(cfname, ByteBufferUtil.bytes("column"), value, 0);
+            rm.apply();
+            cfs.forceBlockingFlush();
+        }
+        cfs.forceBlockingFlush();
+
+        List<SSTableReader> sstrs = new ArrayList<>(cfs.getSSTables());
+        Pair<List<SSTableReader>, Double> bucket;
+
+        List<SSTableReader> interestingBucket = mostInterestingBucket(Collections.singletonList(sstrs.subList(0, 2)), 4, 32);
+        assertTrue("nothing should be returned when all buckets are below the min threshold", interestingBucket.isEmpty());
+
+        sstrs.get(0).readMeter = new RestorableMeter(100.0, 100.0);
+        sstrs.get(1).readMeter = new RestorableMeter(200.0, 200.0);
+        sstrs.get(2).readMeter = new RestorableMeter(300.0, 300.0);
+
+        long estimatedKeys = sstrs.get(0).estimatedKeys();
+
+        // if we have more than the max threshold, the coldest should be dropped
+        bucket = trimToThresholdWithHotness(sstrs, 2);
+        assertEquals("one bucket should have been dropped", 2, bucket.left.size());
+        double expectedBucketHotness = (200.0 + 300.0) / estimatedKeys;
+        assertEquals(String.format("bucket hotness (%f) should be close to %f", bucket.right, expectedBucketHotness),
+                     expectedBucketHotness, bucket.right, 1.0);
+    }
+
+    @Test
+    public void testFilterColdSSTables() throws Exception
+    {
+        String ksname = "Keyspace1";
+        String cfname = "Standard1";
+        Keyspace keyspace = Keyspace.open(ksname);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+
+        // create 10 sstables
+        int numSSTables = 10;
+        for (int r = 0; r < numSSTables; r++)
+        {
+            DecoratedKey key = Util.dk(String.valueOf(r));
+            RowMutation rm = new RowMutation(ksname, key.key);
+            rm.add(cfname, ByteBufferUtil.bytes("column"), value, 0);
+            rm.apply();
+            cfs.forceBlockingFlush();
+        }
+        cfs.forceBlockingFlush();
+
+        List<SSTableReader> filtered;
+        List<SSTableReader> sstrs = new ArrayList<>(cfs.getSSTables());
+
+        for (SSTableReader sstr : sstrs)
+            sstr.readMeter = null;
+        filtered = filterColdSSTables(sstrs, 0.05);
+        assertEquals("when there are no read meters, no sstables should be filtered", sstrs.size(), filtered.size());
+
+        for (SSTableReader sstr : sstrs)
+            sstr.readMeter = new RestorableMeter(0.0, 0.0);
+        filtered = filterColdSSTables(sstrs, 0.05);
+        assertEquals("when all read meters are zero, no sstables should be filtered", sstrs.size(), filtered.size());
+
+        // leave all read rates at 0 besides one
+        sstrs.get(0).readMeter = new RestorableMeter(1000.0, 1000.0);
+        filtered = filterColdSSTables(sstrs, 0.05);
+        assertEquals("there should only be one hot sstable", 1, filtered.size());
+        assertEquals(1000.0, filtered.get(0).readMeter.twoHourRate(), 0.5);
+
+        // the total read rate is 100, and we'll set a threshold of 2.5%, so two of the sstables with read
+        // rate 1.0 should be ignored, but not the third
+        for (SSTableReader sstr : sstrs)
+            sstr.readMeter = new RestorableMeter(0.0, 0.0);
+        sstrs.get(0).readMeter = new RestorableMeter(97.0, 97.0);
+        sstrs.get(1).readMeter = new RestorableMeter(1.0, 1.0);
+        sstrs.get(2).readMeter = new RestorableMeter(1.0, 1.0);
+        sstrs.get(3).readMeter = new RestorableMeter(1.0, 1.0);
+
+        filtered = filterColdSSTables(sstrs, 0.025);
+        assertEquals(2, filtered.size());
+        assertEquals(98.0, filtered.get(0).readMeter.twoHourRate() + filtered.get(1).readMeter.twoHourRate(), 0.5);
+
+        // make sure a threshold of 0.0 doesn't result in any sstables being filtered
+        for (SSTableReader sstr : sstrs)
+            sstr.readMeter = new RestorableMeter(1.0, 1.0);
+        filtered = filterColdSSTables(sstrs, 0.0);
+        assertEquals(sstrs.size(), filtered.size());
+
+        // just for fun, set a threshold where all sstables are considered cold
+        for (SSTableReader sstr : sstrs)
+            sstr.readMeter = new RestorableMeter(1.0, 1.0);
+        filtered = filterColdSSTables(sstrs, 1.0);
+        assertTrue(filtered.isEmpty());
+    }
+}
\ No newline at end of file