You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/10/29 21:41:31 UTC
[02/11] git commit: merge from 2.0
merge from 2.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2c6a5c00
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2c6a5c00
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2c6a5c00
Branch: refs/heads/trunk
Commit: 2c6a5c000f19a07909f4cecef3f14e33457cdfec
Parents: e7ff361 786672e
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Oct 29 15:31:10 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Oct 29 15:31:10 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../SizeTieredCompactionStrategy.java | 153 ++++++++++++---
.../SizeTieredCompactionStrategyOptions.java | 53 ++---
.../cassandra/io/sstable/SSTableReader.java | 4 +-
.../SizeTieredCompactionStrategyTest.java | 192 ++++++++++++++++++-
5 files changed, 341 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2c6a5c00/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c1ddf20,4815c1c..8cdbbe1
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,13 -1,6 +1,15 @@@
+2.1
+ * Parallelize fetching rows for low-cardinality indexes (CASSANDRA-1337)
+ * change logging from log4j to logback (CASSANDRA-5883)
+ * switch to LZ4 compression for internode communication (CASSANDRA-5887)
+ * Stop using Thrift-generated Index* classes internally (CASSANDRA-5971)
+ * Remove 1.2 network compatibility code (CASSANDRA-5960)
+ * Remove leveled json manifest migration code (CASSANDRA-5996)
+
+
2.0.3
+ * Compact hottest sstables first and optionally omit coldest from
+ compaction entirely (CASSANDRA-6109)
* Fix modifying column_metadata from thrift (CASSANDRA-6182)
* cqlsh: fix LIST USERS output (CASSANDRA-6242)
* Add IRequestSink interface (CASSANDRA-6248)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2c6a5c00/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 61f5668,5115860..f883597
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@@ -34,31 -35,7 +35,29 @@@ import org.apache.cassandra.utils.Pair
public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
{
private static final Logger logger = LoggerFactory.getLogger(SizeTieredCompactionStrategy.class);
-
- private static Comparator<SSTableReader> generationComparator = new Comparator<SSTableReader>()
- {
- public int compare(SSTableReader o1, SSTableReader o2)
- {
- return o1.descriptor.generation - o2.descriptor.generation;
- }
- };
-
- private static Comparator<List<SSTableReader>> avgBucketSizeComparator = new Comparator<List<SSTableReader>>()
+
++ private static final Comparator<Pair<List<SSTableReader>,Double>> bucketsByHotnessComparator = new Comparator<Pair<List<SSTableReader>, Double>>()
+ {
- public int compare(List<SSTableReader> o1, List<SSTableReader> o2)
++ public int compare(Pair<List<SSTableReader>, Double> o1, Pair<List<SSTableReader>, Double> o2)
+ {
- return Longs.compare(avgSize(o1), avgSize(o2));
++ int comparison = Double.compare(o1.right, o2.right);
++ if (comparison != 0)
++ return comparison;
++
++ // break ties by compacting the smallest sstables first (this will probably only happen for
++ // system tables and new/unread sstables)
++ return Long.compare(avgSize(o1.left), avgSize(o2.left));
+ }
+
+ private long avgSize(List<SSTableReader> sstables)
+ {
+ long n = 0;
+ for (SSTableReader sstable : sstables)
+ n += sstable.bytesOnDisk();
+ return n / sstables.size();
+ }
+ };
+
protected SizeTieredCompactionStrategyOptions options;
protected volatile int estimatedRemainingTasks;
@@@ -101,24 -80,136 +102,114 @@@
return Collections.singletonList(sstablesWithTombstones.get(0));
}
+ /**
+ * Removes as many cold sstables as possible while retaining at least 1-coldReadsToOmit of the total reads/sec
+ * across all sstables
+ * @param sstables all sstables to consider
+ * @param coldReadsToOmit the proportion of total reads/sec that will be omitted (0=omit nothing, 1=omit everything)
+ * @return a list of sstables with the coldest sstables excluded until the reads they represent reaches coldReadsToOmit
+ */
+ @VisibleForTesting
+ static List<SSTableReader> filterColdSSTables(List<SSTableReader> sstables, double coldReadsToOmit)
+ {
+ // sort the sstables by hotness (coldest-first), breaking ties with size on disk (mainly for system tables and cold tables)
+ Collections.sort(sstables, new Comparator<SSTableReader>()
+ {
+ public int compare(SSTableReader o1, SSTableReader o2)
+ {
+ int comparison = Double.compare(hotness(o1), hotness(o2));
+ if (comparison != 0)
+ return comparison;
+
+ return Long.compare(o1.bytesOnDisk(), o2.bytesOnDisk());
+ }
+ });
+
+ // calculate the total reads/sec across all sstables
+ double totalReads = 0.0;
+ for (SSTableReader sstr : sstables)
+ if (sstr.readMeter != null)
+ totalReads += sstr.readMeter.twoHourRate();
+
+ // if this is a system table with no read meters or we don't have any read rates yet, just return them all
+ if (totalReads == 0.0)
+ return sstables;
+
+ // iteratively ignore the coldest sstables until ignoring one more would put us over the coldReadsToOmit threshold
+ double maxColdReads = coldReadsToOmit * totalReads;
+
+ double totalColdReads = 0.0;
+ int cutoffIndex = 0;
+ while (cutoffIndex < sstables.size())
+ {
+ double reads = sstables.get(cutoffIndex).readMeter.twoHourRate();
+ if (totalColdReads + reads > maxColdReads)
+ break;
+
+ totalColdReads += reads;
+ cutoffIndex++;
+ }
+
+ return sstables.subList(cutoffIndex, sstables.size());
+ }
+
+ /**
+ * @param buckets list of buckets from which to return the most interesting, where "interesting" is the total hotness for reads
+ * @param minThreshold minimum number of sstables in a bucket to qualify as interesting
+ * @param maxThreshold maximum number of sstables to compact at once (the returned bucket will be trimmed down to this)
+ * @return a bucket (list) of sstables to compact
+ */
public static List<SSTableReader> mostInterestingBucket(List<List<SSTableReader>> buckets, int minThreshold, int maxThreshold)
{
- // skip buckets containing less than minThreshold sstables, and limit other buckets to maxThreshold entries
- List<List<SSTableReader>> prunedBuckets = new ArrayList<List<SSTableReader>>();
+ // skip buckets containing less than minThreshold sstables, and limit other buckets to maxThreshold sstables
+ final List<Pair<List<SSTableReader>, Double>> prunedBucketsAndHotness = new ArrayList<>(buckets.size());
for (List<SSTableReader> bucket : buckets)
{
- if (bucket.size() < minThreshold)
- continue;
-
- Collections.sort(bucket, generationComparator);
- List<SSTableReader> prunedBucket = bucket.subList(0, Math.min(bucket.size(), maxThreshold));
- prunedBuckets.add(prunedBucket);
+ Pair<List<SSTableReader>, Double> bucketAndHotness = trimToThresholdWithHotness(bucket, maxThreshold);
+ if (bucketAndHotness != null && bucketAndHotness.left.size() >= minThreshold)
+ prunedBucketsAndHotness.add(bucketAndHotness);
}
- if (prunedBuckets.isEmpty())
+ if (prunedBucketsAndHotness.isEmpty())
return Collections.emptyList();
-
- // prefer compacting buckets with smallest average size; that will yield the fastest improvement for read performance
- return Collections.min(prunedBuckets, avgBucketSizeComparator);
+
- // prefer compacting the hottest bucket
- Pair<List<SSTableReader>, Double> hottest = Collections.max(prunedBucketsAndHotness, new Comparator<Pair<List<SSTableReader>, Double>>()
- {
- public int compare(Pair<List<SSTableReader>, Double> o1, Pair<List<SSTableReader>, Double> o2)
- {
- int comparison = Double.compare(o1.right, o2.right);
- if (comparison != 0)
- return comparison;
-
- // break ties by compacting the smallest sstables first (this will probably only happen for
- // system tables and new/unread sstables)
- return Long.compare(avgSize(o1.left), avgSize(o2.left));
- }
-
- private long avgSize(List<SSTableReader> sstables)
- {
- long n = 0;
- for (SSTableReader sstable : sstables)
- n += sstable.bytesOnDisk();
- return n / sstables.size();
- }
- });
-
++ Pair<List<SSTableReader>, Double> hottest = Collections.max(prunedBucketsAndHotness, bucketsByHotnessComparator);
+ return hottest.left;
+ }
+
+ /**
+ * Returns a (bucket, hotness) pair or null if there were not enough sstables in the bucket to meet minThreshold.
+ * If there are more than maxThreshold sstables, the coldest sstables will be trimmed to meet the threshold.
+ **/
+ @VisibleForTesting
+ static Pair<List<SSTableReader>, Double> trimToThresholdWithHotness(List<SSTableReader> bucket, int maxThreshold)
+ {
+ // sort by sstable hotness (descending)
+ Collections.sort(bucket, new Comparator<SSTableReader>()
+ {
+ public int compare(SSTableReader o1, SSTableReader o2)
+ {
+ return -1 * Double.compare(hotness(o1), hotness(o2));
+ }
+ });
+
+ // and then trim the coldest sstables off the end to meet the maxThreshold
+ List<SSTableReader> prunedBucket = bucket.subList(0, Math.min(bucket.size(), maxThreshold));
+
+ // bucket hotness is the sum of the hotness of all sstable members
+ double bucketHotness = 0.0;
+ for (SSTableReader sstr : prunedBucket)
+ bucketHotness += hotness(sstr);
+
+ return Pair.create(prunedBucket, bucketHotness);
+ }
+
+ /**
+ * Returns the reads per second per key for this sstable, or 0.0 if the sstable has no read meter
+ */
+ private static double hotness(SSTableReader sstr)
+ {
+ // system tables don't have read meters, just use 0.0 for the hotness
+ return sstr.readMeter == null ? 0.0 : sstr.readMeter.twoHourRate() / sstr.estimatedKeys();
}
public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/2c6a5c00/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------