You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/10/29 21:41:37 UTC
[08/11] git commit: Compact hottest sstables first and optionally
omit coldest patch by Tyler Hobbs; reviewed by jbellis for CASSANDRA-6109
Compact hottest sstables first and optionally omit coldest
patch by Tyler Hobbs; reviewed by jbellis for CASSANDRA-6109
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/37285304
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/37285304
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/37285304
Branch: refs/heads/cassandra-2.0
Commit: 37285304ee484122410c977399024f2af132753c
Parents: 5eddf18
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Oct 29 15:25:39 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Oct 29 15:41:17 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../SizeTieredCompactionStrategy.java | 149 +++++++++++---
.../SizeTieredCompactionStrategyOptions.java | 53 ++---
.../cassandra/io/sstable/SSTableReader.java | 4 +-
.../SizeTieredCompactionStrategyTest.java | 192 ++++++++++++++++++-
5 files changed, 341 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/37285304/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7bf7f21..4815c1c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
2.0.3
+ * Compact hottest sstables first and optionally omit coldest from
+ compaction entirely (CASSANDRA-6109)
* Fix modifying column_metadata from thrift (CASSANDRA-6182)
* cqlsh: fix LIST USERS output (CASSANDRA-6242)
* Add IRequestSink interface (CASSANDRA-6248)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/37285304/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index cee5f97..5115860 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -20,8 +20,9 @@ package org.apache.cassandra.db.compaction;
import java.util.*;
import java.util.Map.Entry;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
-import com.google.common.primitives.Longs;
+import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,8 +55,10 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
int minThreshold = cfs.getMinimumCompactionThreshold();
int maxThreshold = cfs.getMaximumCompactionThreshold();
- Set<SSTableReader> candidates = cfs.getUncompactingSSTables();
- List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(filterSuspectSSTables(candidates)), options.bucketHigh, options.bucketLow, options.minSSTableSize);
+ Iterable<SSTableReader> candidates = filterSuspectSSTables(cfs.getUncompactingSSTables());
+ candidates = filterColdSSTables(Lists.newArrayList(candidates), options.coldReadsToOmit);
+
+ List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(candidates), options.bucketHigh, options.bucketLow, options.minSSTableSize);
logger.debug("Compaction buckets are {}", buckets);
updateEstimatedCompactionsByTasks(buckets);
List<SSTableReader> mostInteresting = mostInterestingBucket(buckets, minThreshold, maxThreshold);
@@ -77,34 +80,88 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
return Collections.singletonList(sstablesWithTombstones.get(0));
}
+ /**
+ * Removes as many cold sstables as possible while retaining at least 1-coldReadsToOmit of the total reads/sec
+ * across all sstables
+ * @param sstables all sstables to consider
+ * @param coldReadsToOmit the proportion of total reads/sec that will be omitted (0=omit nothing, 1=omit everything)
+ * @return a list of sstables with the coldest sstables excluded until the reads they represent reaches coldReadsToOmit
+ */
+ @VisibleForTesting
+ static List<SSTableReader> filterColdSSTables(List<SSTableReader> sstables, double coldReadsToOmit)
+ {
+ // sort the sstables by hotness (coldest-first), breaking ties with size on disk (mainly for system tables and cold tables)
+ Collections.sort(sstables, new Comparator<SSTableReader>()
+ {
+ public int compare(SSTableReader o1, SSTableReader o2)
+ {
+ int comparison = Double.compare(hotness(o1), hotness(o2));
+ if (comparison != 0)
+ return comparison;
+
+ return Long.compare(o1.bytesOnDisk(), o2.bytesOnDisk());
+ }
+ });
+
+ // calculate the total reads/sec across all sstables
+ double totalReads = 0.0;
+ for (SSTableReader sstr : sstables)
+ if (sstr.readMeter != null)
+ totalReads += sstr.readMeter.twoHourRate();
+
+ // if this is a system table with no read meters or we don't have any read rates yet, just return them all
+ if (totalReads == 0.0)
+ return sstables;
+
+ // iteratively ignore the coldest sstables until ignoring one more would put us over the coldReadsToOmit threshold
+ double maxColdReads = coldReadsToOmit * totalReads;
+
+ double totalColdReads = 0.0;
+ int cutoffIndex = 0;
+ while (cutoffIndex < sstables.size())
+ {
+ double reads = sstables.get(cutoffIndex).readMeter.twoHourRate();
+ if (totalColdReads + reads > maxColdReads)
+ break;
+
+ totalColdReads += reads;
+ cutoffIndex++;
+ }
+
+ return sstables.subList(cutoffIndex, sstables.size());
+ }
+
+ /**
+ * @param buckets list of buckets from which to return the most interesting, where "interesting" is the total hotness for reads
+ * @param minThreshold minimum number of sstables in a bucket to qualify as interesting
+ * @param maxThreshold maximum number of sstables to compact at once (the returned bucket will be trimmed down to this)
+ * @return a bucket (list) of sstables to compact
+ */
public static List<SSTableReader> mostInterestingBucket(List<List<SSTableReader>> buckets, int minThreshold, int maxThreshold)
{
- // skip buckets containing less than minThreshold sstables, and limit other buckets to maxThreshold entries
- List<List<SSTableReader>> prunedBuckets = new ArrayList<List<SSTableReader>>();
+ // skip buckets containing less than minThreshold sstables, and limit other buckets to maxThreshold sstables
+ final List<Pair<List<SSTableReader>, Double>> prunedBucketsAndHotness = new ArrayList<>(buckets.size());
for (List<SSTableReader> bucket : buckets)
{
- if (bucket.size() < minThreshold)
- continue;
-
- Collections.sort(bucket, new Comparator<SSTableReader>()
- {
- public int compare(SSTableReader o1, SSTableReader o2)
- {
- return o1.descriptor.generation - o2.descriptor.generation;
- }
- });
- List<SSTableReader> prunedBucket = bucket.subList(0, Math.min(bucket.size(), maxThreshold));
- prunedBuckets.add(prunedBucket);
+ Pair<List<SSTableReader>, Double> bucketAndHotness = trimToThresholdWithHotness(bucket, maxThreshold);
+ if (bucketAndHotness != null && bucketAndHotness.left.size() >= minThreshold)
+ prunedBucketsAndHotness.add(bucketAndHotness);
}
- if (prunedBuckets.isEmpty())
+ if (prunedBucketsAndHotness.isEmpty())
return Collections.emptyList();
- // prefer compacting buckets with smallest average size; that will yield the fastest improvement for read performance
- return Collections.min(prunedBuckets, new Comparator<List<SSTableReader>>()
+ // prefer compacting the hottest bucket
+ Pair<List<SSTableReader>, Double> hottest = Collections.max(prunedBucketsAndHotness, new Comparator<Pair<List<SSTableReader>, Double>>()
{
- public int compare(List<SSTableReader> o1, List<SSTableReader> o2)
+ public int compare(Pair<List<SSTableReader>, Double> o1, Pair<List<SSTableReader>, Double> o2)
{
- return Longs.compare(avgSize(o1), avgSize(o2));
+ int comparison = Double.compare(o1.right, o2.right);
+ if (comparison != 0)
+ return comparison;
+
+ // break ties by compacting the smallest sstables first (this will probably only happen for
+ // system tables and new/unread sstables)
+ return Long.compare(avgSize(o1.left), avgSize(o2.left));
}
private long avgSize(List<SSTableReader> sstables)
@@ -115,6 +172,44 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
return n / sstables.size();
}
});
+
+ return hottest.left;
+ }
+
+ /**
+ * Returns a (bucket, hotness) pair or null if there were not enough sstables in the bucket to meet minThreshold.
+ * If there are more than maxThreshold sstables, the coldest sstables will be trimmed to meet the threshold.
+ **/
+ @VisibleForTesting
+ static Pair<List<SSTableReader>, Double> trimToThresholdWithHotness(List<SSTableReader> bucket, int maxThreshold)
+ {
+ // sort by sstable hotness (descending)
+ Collections.sort(bucket, new Comparator<SSTableReader>()
+ {
+ public int compare(SSTableReader o1, SSTableReader o2)
+ {
+ return -1 * Double.compare(hotness(o1), hotness(o2));
+ }
+ });
+
+ // and then trim the coldest sstables off the end to meet the maxThreshold
+ List<SSTableReader> prunedBucket = bucket.subList(0, Math.min(bucket.size(), maxThreshold));
+
+ // bucket hotness is the sum of the hotness of all sstable members
+ double bucketHotness = 0.0;
+ for (SSTableReader sstr : prunedBucket)
+ bucketHotness += hotness(sstr);
+
+ return Pair.create(prunedBucket, bucketHotness);
+ }
+
+ /**
+ * Returns the reads per second per key for this sstable, or 0.0 if the sstable has no read meter
+ */
+ private static double hotness(SSTableReader sstr)
+ {
+ // system tables don't have read meters, just use 0.0 for the hotness
+ return sstr.readMeter == null ? 0.0 : sstr.readMeter.twoHourRate() / sstr.estimatedKeys();
}
public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
@@ -124,13 +219,13 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
while (true)
{
- List<SSTableReader> smallestBucket = getNextBackgroundSSTables(gcBefore);
+ List<SSTableReader> hottestBucket = getNextBackgroundSSTables(gcBefore);
- if (smallestBucket.isEmpty())
+ if (hottestBucket.isEmpty())
return null;
- if (cfs.getDataTracker().markCompacting(smallestBucket))
- return new CompactionTask(cfs, smallestBucket, gcBefore);
+ if (cfs.getDataTracker().markCompacting(hottestBucket))
+ return new CompactionTask(cfs, hottestBucket, gcBefore);
}
}
@@ -253,4 +348,4 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
cfs.getMinimumCompactionThreshold(),
cfs.getMaximumCompactionThreshold());
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/37285304/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
index d7c9075..711ec6e 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyOptions.java
@@ -26,31 +26,48 @@ public final class SizeTieredCompactionStrategyOptions
protected static final long DEFAULT_MIN_SSTABLE_SIZE = 50L * 1024L * 1024L;
protected static final double DEFAULT_BUCKET_LOW = 0.5;
protected static final double DEFAULT_BUCKET_HIGH = 1.5;
+ protected static final double DEFAULT_COLD_READS_TO_OMIT = 0.0;
protected static final String MIN_SSTABLE_SIZE_KEY = "min_sstable_size";
protected static final String BUCKET_LOW_KEY = "bucket_low";
protected static final String BUCKET_HIGH_KEY = "bucket_high";
+ protected static final String MAX_COLD_READS_RATIO_KEY = "max_cold_reads_ratio";
protected long minSSTableSize;
protected double bucketLow;
protected double bucketHigh;
+ protected double coldReadsToOmit;
public SizeTieredCompactionStrategyOptions(Map<String, String> options)
{
-
String optionValue = options.get(MIN_SSTABLE_SIZE_KEY);
minSSTableSize = optionValue == null ? DEFAULT_MIN_SSTABLE_SIZE : Long.parseLong(optionValue);
optionValue = options.get(BUCKET_LOW_KEY);
bucketLow = optionValue == null ? DEFAULT_BUCKET_LOW : Double.parseDouble(optionValue);
optionValue = options.get(BUCKET_HIGH_KEY);
bucketHigh = optionValue == null ? DEFAULT_BUCKET_HIGH : Double.parseDouble(optionValue);
+ optionValue = options.get(MAX_COLD_READS_RATIO_KEY);
+ coldReadsToOmit = optionValue == null ? DEFAULT_COLD_READS_TO_OMIT : Double.parseDouble(optionValue);
}
public SizeTieredCompactionStrategyOptions()
{
-
minSSTableSize = DEFAULT_MIN_SSTABLE_SIZE;
bucketLow = DEFAULT_BUCKET_LOW;
bucketHigh = DEFAULT_BUCKET_HIGH;
+ coldReadsToOmit = DEFAULT_COLD_READS_TO_OMIT;
+ }
+
+ private static double parseDouble(Map<String, String> options, String key, double defaultValue) throws ConfigurationException
+ {
+ String optionValue = options.get(key);
+ try
+ {
+ return optionValue == null ? defaultValue : Double.parseDouble(optionValue);
+ }
+ catch (NumberFormatException e)
+ {
+ throw new ConfigurationException(String.format("%s is not a parsable float for %s", optionValue, key), e);
+ }
}
public static Map<String, String> validateOptions(Map<String, String> options, Map<String, String> uncheckedOptions) throws ConfigurationException
@@ -69,36 +86,26 @@ public final class SizeTieredCompactionStrategyOptions
throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, MIN_SSTABLE_SIZE_KEY), e);
}
- double bucketLow, bucketHigh;
- optionValue = options.get(BUCKET_LOW_KEY);
- try
- {
- bucketLow = optionValue == null ? DEFAULT_BUCKET_LOW : Double.parseDouble(optionValue);
- }
- catch (NumberFormatException e)
- {
- throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, DEFAULT_BUCKET_LOW), e);
- }
-
- optionValue = options.get(BUCKET_HIGH_KEY);
- try
- {
- bucketHigh = optionValue == null ? DEFAULT_BUCKET_HIGH : Double.parseDouble(optionValue);
- }
- catch (NumberFormatException e)
+ double bucketLow = parseDouble(options, BUCKET_LOW_KEY, DEFAULT_BUCKET_LOW);
+ double bucketHigh = parseDouble(options, BUCKET_HIGH_KEY, DEFAULT_BUCKET_HIGH);
+ if (bucketHigh <= bucketLow)
{
- throw new ConfigurationException(String.format("%s is not a parsable int (base10) for %s", optionValue, DEFAULT_BUCKET_HIGH), e);
+ throw new ConfigurationException(String.format("%s value (%s) is less than or equal to the %s value (%s)",
+ BUCKET_HIGH_KEY, bucketHigh, BUCKET_LOW_KEY, bucketLow));
}
- if (bucketHigh <= bucketLow)
+ double maxColdReadsRatio = parseDouble(options, MAX_COLD_READS_RATIO_KEY, DEFAULT_COLD_READS_TO_OMIT);
+ if (maxColdReadsRatio < 0.0 || maxColdReadsRatio > 1.0)
{
- throw new ConfigurationException(String.format("Bucket high value (%s) is less than or equal bucket low value (%s)", bucketHigh, bucketLow));
+ throw new ConfigurationException(String.format("%s value (%s) should be between between 0.0 and 1.0",
+ MAX_COLD_READS_RATIO_KEY, optionValue));
}
uncheckedOptions.remove(MIN_SSTABLE_SIZE_KEY);
uncheckedOptions.remove(BUCKET_LOW_KEY);
uncheckedOptions.remove(BUCKET_HIGH_KEY);
+ uncheckedOptions.remove(MAX_COLD_READS_RATIO_KEY);
return uncheckedOptions;
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/37285304/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 9837f4c..c961d44 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
@@ -103,7 +104,8 @@ public class SSTableReader extends SSTable implements Closeable
private final AtomicLong keyCacheHit = new AtomicLong(0);
private final AtomicLong keyCacheRequest = new AtomicLong(0);
- public final RestorableMeter readMeter;
+ @VisibleForTesting
+ public RestorableMeter readMeter;
public static long getApproximateKeyCount(Iterable<SSTableReader> sstables, CFMetaData metadata)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/37285304/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
index 89604c5..5e79bd8 100644
--- a/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
@@ -17,17 +17,80 @@
*/
package org.apache.cassandra.db.compaction;
-import java.util.ArrayList;
-import java.util.List;
+import java.nio.ByteBuffer;
+import java.util.*;
import org.junit.Test;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.metrics.RestorableMeter;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Pair;
-import static org.junit.Assert.assertEquals;
+import static org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy.getBuckets;
+import static org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy.mostInterestingBucket;
+import static org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy.trimToThresholdWithHotness;
+import static org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy.filterColdSSTables;
+import static org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy.validateOptions;
-public class SizeTieredCompactionStrategyTest
+import static org.junit.Assert.*;
+
+public class SizeTieredCompactionStrategyTest extends SchemaLoader
{
+
+ @Test
+ public void testOptionsValidation() throws ConfigurationException
+ {
+ Map<String, String> options = new HashMap<>();
+ options.put(SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY, "0.35");
+ options.put(SizeTieredCompactionStrategyOptions.BUCKET_LOW_KEY, "0.5");
+ options.put(SizeTieredCompactionStrategyOptions.BUCKET_HIGH_KEY, "1.5");
+ options.put(SizeTieredCompactionStrategyOptions.MIN_SSTABLE_SIZE_KEY, "10000");
+ Map<String, String> unvalidated = validateOptions(options);
+ assertTrue(unvalidated.isEmpty());
+
+ try
+ {
+ options.put(SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY, "-0.5");
+ validateOptions(options);
+ fail(String.format("Negative %s should be rejected", SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY));
+ }
+ catch (ConfigurationException e) {}
+
+ try
+ {
+ options.put(SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY, "10.0");
+ validateOptions(options);
+ fail(String.format("%s > 1.0 should be rejected", SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY));
+ }
+ catch (ConfigurationException e)
+ {
+ options.put(SizeTieredCompactionStrategyOptions.MAX_COLD_READS_RATIO_KEY, "0.25");
+ }
+
+ try
+ {
+ options.put(SizeTieredCompactionStrategyOptions.BUCKET_LOW_KEY, "1000.0");
+ validateOptions(options);
+ fail("bucket_low greater than bucket_high should be rejected");
+ }
+ catch (ConfigurationException e)
+ {
+ options.put(SizeTieredCompactionStrategyOptions.BUCKET_LOW_KEY, "0.5");
+ }
+
+ options.put("bad_option", "1.0");
+ unvalidated = validateOptions(options);
+ assertTrue(unvalidated.containsKey("bad_option"));
+ }
+
@Test
public void testGetBuckets()
{
@@ -39,7 +102,7 @@ public class SizeTieredCompactionStrategyTest
pairs.add(pair);
}
- List<List<String>> buckets = SizeTieredCompactionStrategy.getBuckets(pairs, 1.5, 0.5, 2);
+ List<List<String>> buckets = getBuckets(pairs, 1.5, 0.5, 2);
assertEquals(3, buckets.size());
for (List<String> bucket : buckets)
@@ -59,7 +122,7 @@ public class SizeTieredCompactionStrategyTest
pairs.add(pair);
}
- buckets = SizeTieredCompactionStrategy.getBuckets(pairs, 1.5, 0.5, 2);
+ buckets = getBuckets(pairs, 1.5, 0.5, 2);
assertEquals(2, buckets.size());
for (List<String> bucket : buckets)
@@ -80,7 +143,120 @@ public class SizeTieredCompactionStrategyTest
pairs.add(pair);
}
- buckets = SizeTieredCompactionStrategy.getBuckets(pairs, 1.5, 0.5, 10);
+ buckets = getBuckets(pairs, 1.5, 0.5, 10);
assertEquals(1, buckets.size());
}
-}
+
+ @Test
+ public void testPrepBucket() throws Exception
+ {
+ String ksname = "Keyspace1";
+ String cfname = "Standard1";
+ Keyspace keyspace = Keyspace.open(ksname);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+
+ ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+
+ // create 3 sstables
+ int numSSTables = 3;
+ for (int r = 0; r < numSSTables; r++)
+ {
+ DecoratedKey key = Util.dk(String.valueOf(r));
+ RowMutation rm = new RowMutation(ksname, key.key);
+ rm.add(cfname, ByteBufferUtil.bytes("column"), value, 0);
+ rm.apply();
+ cfs.forceBlockingFlush();
+ }
+ cfs.forceBlockingFlush();
+
+ List<SSTableReader> sstrs = new ArrayList<>(cfs.getSSTables());
+ Pair<List<SSTableReader>, Double> bucket;
+
+ List<SSTableReader> interestingBucket = mostInterestingBucket(Collections.singletonList(sstrs.subList(0, 2)), 4, 32);
+ assertTrue("nothing should be returned when all buckets are below the min threshold", interestingBucket.isEmpty());
+
+ sstrs.get(0).readMeter = new RestorableMeter(100.0, 100.0);
+ sstrs.get(1).readMeter = new RestorableMeter(200.0, 200.0);
+ sstrs.get(2).readMeter = new RestorableMeter(300.0, 300.0);
+
+ long estimatedKeys = sstrs.get(0).estimatedKeys();
+
+ // if we have more than the max threshold, the coldest should be dropped
+ bucket = trimToThresholdWithHotness(sstrs, 2);
+ assertEquals("one bucket should have been dropped", 2, bucket.left.size());
+ double expectedBucketHotness = (200.0 + 300.0) / estimatedKeys;
+ assertEquals(String.format("bucket hotness (%f) should be close to %f", bucket.right, expectedBucketHotness),
+ expectedBucketHotness, bucket.right, 1.0);
+ }
+
+ @Test
+ public void testFilterColdSSTables() throws Exception
+ {
+ String ksname = "Keyspace1";
+ String cfname = "Standard1";
+ Keyspace keyspace = Keyspace.open(ksname);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfname);
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+
+ ByteBuffer value = ByteBuffer.wrap(new byte[100]);
+
+ // create 10 sstables
+ int numSSTables = 10;
+ for (int r = 0; r < numSSTables; r++)
+ {
+ DecoratedKey key = Util.dk(String.valueOf(r));
+ RowMutation rm = new RowMutation(ksname, key.key);
+ rm.add(cfname, ByteBufferUtil.bytes("column"), value, 0);
+ rm.apply();
+ cfs.forceBlockingFlush();
+ }
+ cfs.forceBlockingFlush();
+
+ List<SSTableReader> filtered;
+ List<SSTableReader> sstrs = new ArrayList<>(cfs.getSSTables());
+
+ for (SSTableReader sstr : sstrs)
+ sstr.readMeter = null;
+ filtered = filterColdSSTables(sstrs, 0.05);
+ assertEquals("when there are no read meters, no sstables should be filtered", sstrs.size(), filtered.size());
+
+ for (SSTableReader sstr : sstrs)
+ sstr.readMeter = new RestorableMeter(0.0, 0.0);
+ filtered = filterColdSSTables(sstrs, 0.05);
+ assertEquals("when all read meters are zero, no sstables should be filtered", sstrs.size(), filtered.size());
+
+ // leave all read rates at 0 besides one
+ sstrs.get(0).readMeter = new RestorableMeter(1000.0, 1000.0);
+ filtered = filterColdSSTables(sstrs, 0.05);
+ assertEquals("there should only be one hot sstable", 1, filtered.size());
+ assertEquals(1000.0, filtered.get(0).readMeter.twoHourRate(), 0.5);
+
+ // the total read rate is 100, and we'll set a threshold of 2.5%, so two of the sstables with read
+ // rate 1.0 should be ignored, but not the third
+ for (SSTableReader sstr : sstrs)
+ sstr.readMeter = new RestorableMeter(0.0, 0.0);
+ sstrs.get(0).readMeter = new RestorableMeter(97.0, 97.0);
+ sstrs.get(1).readMeter = new RestorableMeter(1.0, 1.0);
+ sstrs.get(2).readMeter = new RestorableMeter(1.0, 1.0);
+ sstrs.get(3).readMeter = new RestorableMeter(1.0, 1.0);
+
+ filtered = filterColdSSTables(sstrs, 0.025);
+ assertEquals(2, filtered.size());
+ assertEquals(98.0, filtered.get(0).readMeter.twoHourRate() + filtered.get(1).readMeter.twoHourRate(), 0.5);
+
+ // make sure a threshold of 0.0 doesn't result in any sstables being filtered
+ for (SSTableReader sstr : sstrs)
+ sstr.readMeter = new RestorableMeter(1.0, 1.0);
+ filtered = filterColdSSTables(sstrs, 0.0);
+ assertEquals(sstrs.size(), filtered.size());
+
+ // just for fun, set a threshold where all sstables are considered cold
+ for (SSTableReader sstr : sstrs)
+ sstr.readMeter = new RestorableMeter(1.0, 1.0);
+ filtered = filterColdSSTables(sstrs, 1.0);
+ assertTrue(filtered.isEmpty());
+ }
+}
\ No newline at end of file