You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/10/29 21:41:31 UTC

[02/11] git commit: merge from 2.0

merge from 2.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2c6a5c00
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2c6a5c00
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2c6a5c00

Branch: refs/heads/trunk
Commit: 2c6a5c000f19a07909f4cecef3f14e33457cdfec
Parents: e7ff361 786672e
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Oct 29 15:31:10 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Oct 29 15:31:10 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../SizeTieredCompactionStrategy.java           | 153 ++++++++++++---
 .../SizeTieredCompactionStrategyOptions.java    |  53 ++---
 .../cassandra/io/sstable/SSTableReader.java     |   4 +-
 .../SizeTieredCompactionStrategyTest.java       | 192 ++++++++++++++++++-
 5 files changed, 341 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2c6a5c00/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c1ddf20,4815c1c..8cdbbe1
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,13 -1,6 +1,15 @@@
 +2.1
 + * Parallelize fetching rows for low-cardinality indexes (CASSANDRA-1337)
 + * change logging from log4j to logback (CASSANDRA-5883)
 + * switch to LZ4 compression for internode communication (CASSANDRA-5887)
 + * Stop using Thrift-generated Index* classes internally (CASSANDRA-5971)
 + * Remove 1.2 network compatibility code (CASSANDRA-5960)
 + * Remove leveled json manifest migration code (CASSANDRA-5996)
 +
 +
  2.0.3
+  * Compact hottest sstables first and optionally omit coldest from
+    compaction entirely (CASSANDRA-6109)
   * Fix modifying column_metadata from thrift (CASSANDRA-6182)
   * cqlsh: fix LIST USERS output (CASSANDRA-6242)
   * Add IRequestSink interface (CASSANDRA-6248)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2c6a5c00/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 61f5668,5115860..f883597
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@@ -34,31 -35,7 +35,29 @@@ import org.apache.cassandra.utils.Pair
  public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
  {
      private static final Logger logger = LoggerFactory.getLogger(SizeTieredCompactionStrategy.class);
-     
-     private static Comparator<SSTableReader> generationComparator = new Comparator<SSTableReader>()
-     {
-         public int compare(SSTableReader o1, SSTableReader o2)
-         {
-             return o1.descriptor.generation - o2.descriptor.generation;
-         }
-     };
-     
-     private static Comparator<List<SSTableReader>> avgBucketSizeComparator = new Comparator<List<SSTableReader>>()
+ 
++    private static final Comparator<Pair<List<SSTableReader>,Double>> bucketsByHotnessComparator = new Comparator<Pair<List<SSTableReader>, Double>>()
 +    {
-         public int compare(List<SSTableReader> o1, List<SSTableReader> o2)
++        public int compare(Pair<List<SSTableReader>, Double> o1, Pair<List<SSTableReader>, Double> o2)
 +        {
-             return Longs.compare(avgSize(o1), avgSize(o2));
++            int comparison = Double.compare(o1.right, o2.right);
++            if (comparison != 0)
++                return comparison;
++
++            // break ties by compacting the smallest sstables first (this will probably only happen for
++            // system tables and new/unread sstables)
++            return Long.compare(avgSize(o1.left), avgSize(o2.left));
 +        }
 +
 +        private long avgSize(List<SSTableReader> sstables)
 +        {
 +            long n = 0;
 +            for (SSTableReader sstable : sstables)
 +                n += sstable.bytesOnDisk();
 +            return n / sstables.size();
 +        }
 +    };
 +
      protected SizeTieredCompactionStrategyOptions options;
      protected volatile int estimatedRemainingTasks;
  
@@@ -101,24 -80,136 +102,114 @@@
          return Collections.singletonList(sstablesWithTombstones.get(0));
      }
  
+     /**
+      * Removes as many cold sstables as possible while retaining at least 1-coldReadsToOmit of the total reads/sec
+      * across all sstables
+      * @param sstables all sstables to consider
+      * @param coldReadsToOmit the proportion of total reads/sec that will be omitted (0=omit nothing, 1=omit everything)
+      * @return a list of sstables with the coldest sstables excluded until the reads they represent reaches coldReadsToOmit
+      */
+     @VisibleForTesting
+     static List<SSTableReader> filterColdSSTables(List<SSTableReader> sstables, double coldReadsToOmit)
+     {
+         // sort the sstables by hotness (coldest-first), breaking ties with size on disk (mainly for system tables and cold tables)
+         Collections.sort(sstables, new Comparator<SSTableReader>()
+         {
+             public int compare(SSTableReader o1, SSTableReader o2)
+             {
+                 int comparison = Double.compare(hotness(o1), hotness(o2));
+                 if (comparison != 0)
+                     return comparison;
+ 
+                 return Long.compare(o1.bytesOnDisk(), o2.bytesOnDisk());
+             }
+         });
+ 
+         // calculate the total reads/sec across all sstables
+         double totalReads = 0.0;
+         for (SSTableReader sstr : sstables)
+             if (sstr.readMeter != null)
+                 totalReads += sstr.readMeter.twoHourRate();
+ 
+         // if this is a system table with no read meters or we don't have any read rates yet, just return them all
+         if (totalReads == 0.0)
+             return sstables;
+ 
+         // iteratively ignore the coldest sstables until ignoring one more would put us over the coldReadsToOmit threshold
+         double maxColdReads = coldReadsToOmit * totalReads;
+ 
+         double totalColdReads = 0.0;
+         int cutoffIndex = 0;
+         while (cutoffIndex < sstables.size())
+         {
+             double reads = sstables.get(cutoffIndex).readMeter.twoHourRate();
+             if (totalColdReads + reads > maxColdReads)
+                 break;
+ 
+             totalColdReads += reads;
+             cutoffIndex++;
+         }
+ 
+         return sstables.subList(cutoffIndex, sstables.size());
+     }
+ 
+     /**
+      * @param buckets list of buckets from which to return the most interesting, where "interesting" is the total hotness for reads
+      * @param minThreshold minimum number of sstables in a bucket to qualify as interesting
+      * @param maxThreshold maximum number of sstables to compact at once (the returned bucket will be trimmed down to this)
+      * @return a bucket (list) of sstables to compact
+      */
      public static List<SSTableReader> mostInterestingBucket(List<List<SSTableReader>> buckets, int minThreshold, int maxThreshold)
      {
-         // skip buckets containing less than minThreshold sstables, and limit other buckets to maxThreshold entries
-         List<List<SSTableReader>> prunedBuckets = new ArrayList<List<SSTableReader>>();
+         // skip buckets containing less than minThreshold sstables, and limit other buckets to maxThreshold sstables
+         final List<Pair<List<SSTableReader>, Double>> prunedBucketsAndHotness = new ArrayList<>(buckets.size());
          for (List<SSTableReader> bucket : buckets)
          {
-             if (bucket.size() < minThreshold)
-                 continue;
- 
-             Collections.sort(bucket, generationComparator);
-             List<SSTableReader> prunedBucket = bucket.subList(0, Math.min(bucket.size(), maxThreshold));
-             prunedBuckets.add(prunedBucket);
+             Pair<List<SSTableReader>, Double> bucketAndHotness = trimToThresholdWithHotness(bucket, maxThreshold);
+             if (bucketAndHotness != null && bucketAndHotness.left.size() >= minThreshold)
+                 prunedBucketsAndHotness.add(bucketAndHotness);
          }
-         if (prunedBuckets.isEmpty())
+         if (prunedBucketsAndHotness.isEmpty())
              return Collections.emptyList();
-         
-         // prefer compacting buckets with smallest average size; that will yield the fastest improvement for read performance
-         return Collections.min(prunedBuckets, avgBucketSizeComparator);
+ 
 -        // prefer compacting the hottest bucket
 -        Pair<List<SSTableReader>, Double> hottest = Collections.max(prunedBucketsAndHotness, new Comparator<Pair<List<SSTableReader>, Double>>()
 -        {
 -            public int compare(Pair<List<SSTableReader>, Double> o1, Pair<List<SSTableReader>, Double> o2)
 -            {
 -                int comparison = Double.compare(o1.right, o2.right);
 -                if (comparison != 0)
 -                    return comparison;
 -
 -                // break ties by compacting the smallest sstables first (this will probably only happen for
 -                // system tables and new/unread sstables)
 -                return Long.compare(avgSize(o1.left), avgSize(o2.left));
 -            }
 -
 -            private long avgSize(List<SSTableReader> sstables)
 -            {
 -                long n = 0;
 -                for (SSTableReader sstable : sstables)
 -                    n += sstable.bytesOnDisk();
 -                return n / sstables.size();
 -            }
 -        });
 -
++        Pair<List<SSTableReader>, Double> hottest = Collections.max(prunedBucketsAndHotness, bucketsByHotnessComparator);
+         return hottest.left;
+     }
+ 
+     /**
+      * Returns a (bucket, hotness) pair or null if there were not enough sstables in the bucket to meet minThreshold.
+      * If there are more than maxThreshold sstables, the coldest sstables will be trimmed to meet the threshold.
+      **/
+     @VisibleForTesting
+     static Pair<List<SSTableReader>, Double> trimToThresholdWithHotness(List<SSTableReader> bucket, int maxThreshold)
+     {
+         // sort by sstable hotness (descending)
+         Collections.sort(bucket, new Comparator<SSTableReader>()
+         {
+             public int compare(SSTableReader o1, SSTableReader o2)
+             {
+                 return -1 * Double.compare(hotness(o1), hotness(o2));
+             }
+         });
+ 
+         // and then trim the coldest sstables off the end to meet the maxThreshold
+         List<SSTableReader> prunedBucket = bucket.subList(0, Math.min(bucket.size(), maxThreshold));
+ 
+         // bucket hotness is the sum of the hotness of all sstable members
+         double bucketHotness = 0.0;
+         for (SSTableReader sstr : prunedBucket)
+             bucketHotness += hotness(sstr);
+ 
+         return Pair.create(prunedBucket, bucketHotness);
+     }
+ 
+     /**
+      * Returns the reads per second per key for this sstable, or 0.0 if the sstable has no read meter
+      */
+     private static double hotness(SSTableReader sstr)
+     {
+         // system tables don't have read meters, just use 0.0 for the hotness
+         return sstr.readMeter == null ? 0.0 : sstr.readMeter.twoHourRate() / sstr.estimatedKeys();
      }
  
      public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2c6a5c00/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------