You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/06/30 16:16:09 UTC

[2/2] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1.0

Merge branch 'cassandra-2.0' into cassandra-2.1.0

Conflicts:
	CHANGES.txt
	src/java/org/apache/cassandra/db/compaction/LeveledManifest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4cc4acb7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4cc4acb7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4cc4acb7

Branch: refs/heads/cassandra-2.1.0
Commit: 4cc4acb7e5db5e2bd4974311a8164cb27ef55c29
Parents: 82aca6d fc9eeb9
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Jun 30 16:13:14 2014 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Jun 30 16:13:14 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/LeveledManifest.java          | 85 +++++++++++++++++++-
 2 files changed, 83 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cc4acb7/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 6464cd8,e88753f..c2e5a00
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -9,36 -4,29 +9,37 @@@ Merged from 2.0
     operations to incorrect become full QUORUM (CASSANDRA-7345)
   * Properly handle unrecognized opcodes and flags (CASSANDRA-7440)
   * (Hadoop) close CqlRecordWriter clients when finished (CASSANDRA-7459)
 + * Commit disk failure policy (CASSANDRA-7429)
+  * Make sure high level sstables get compacted (CASSANDRA-7414)
  
 -
 -2.0.9
 - * Fix CC#collectTimeOrderedData() tombstone optimisations (CASSANDRA-7394)
 - * Fix assertion error in CL.ANY timeout handling (CASSANDRA-7364)
 - * Handle empty CFs in Memtable#maybeUpdateLiveRatio() (CASSANDRA-7401)
 +2.1.0-rc2
 + * Fix heap size calculation for CompoundSparseCellName and 
 +   CompoundSparseCellName.WithCollection (CASSANDRA-7421)
 + * Allow counter mutations in UNLOGGED batches (CASSANDRA-7351)
 + * Modify reconcile logic to always pick a tombstone over a counter cell
 +   (CASSANDRA-7346)
 + * Avoid incremental compaction on Windows (CASSANDRA-7365)
 + * Fix exception when querying a composite-keyed table with a collection index
 +   (CASSANDRA-7372)
 + * Use node's host id in place of counter ids (CASSANDRA-7366)
   * Fix native protocol CAS batches (CASSANDRA-7337)
 + * Reduce likelihood of contention on local paxos locking (CASSANDRA-7359)
 + * Upgrade to Pig 0.12.1 (CASSANDRA-6556)
 + * Make sure we clear out repair sessions from netstats (CASSANDRA-7329)
 + * Don't fail streams on failure detector downs (CASSANDRA-3569)
 + * Add optional keyspace to DROP INDEX statement (CASSANDRA-7314)
 + * Reduce run time for CQL tests (CASSANDRA-7327)
 + * Fix heap size calculation on Windows (CASSANDRA-7352, 7353)
 + * RefCount native frames from netty (CASSANDRA-7245)
 + * Use tarball dir instead of /var for default paths (CASSANDRA-7136)
 + * Remove rows_per_partition_to_cache keyword (CASSANDRA-7193)
 + * Fix schema change response in native protocol v3 (CASSANDRA-7413)
 +Merged from 2.0:
 + * Fix assertion error in CL.ANY timeout handling (CASSANDRA-7364)
   * Add per-CF range read request latency metrics (CASSANDRA-7338)
   * Fix NPE in StreamTransferTask.createMessageForRetry() (CASSANDRA-7323)
 - * Add conditional CREATE/DROP USER support (CASSANDRA-7264)
 - * Swap local and global default read repair chances (CASSANDRA-7320)
 - * Add missing iso8601 patterns for date strings (CASSANDRA-6973)
 - * Support selecting multiple rows in a partition using IN (CASSANDRA-6875)
 - * cqlsh: always emphasize the partition key in DESC output (CASSANDRA-7274)
 - * Copy compaction options to make sure they are reloaded (CASSANDRA-7290)
 - * Add option to do more aggressive tombstone compactions (CASSANDRA-6563)
 - * Don't try to compact already-compacting files in HHOM (CASSANDRA-7288)
 - * Add authentication support to shuffle (CASSANDRA-6484)
 - * Cqlsh counts non-empty lines for "Blank lines" warning (CASSANDRA-7325)
   * Make StreamSession#closeSession() idempotent (CASSANDRA-7262)
   * Fix infinite loop on exception while streaming (CASSANDRA-7330)
 - * Reference sstables before populating key cache (CASSANDRA-7234)
   * Account for range tombstones in min/max column names (CASSANDRA-7235)
   * Improve sub range repair validation (CASSANDRA-7317)
   * Accept subtypes for function results, type casts (CASSANDRA-6766)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4cc4acb7/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index a21924b,1118ddc..47eb683
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@@ -34,8 -36,13 +34,9 @@@ import org.slf4j.LoggerFactory
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.RowPosition;
  import org.apache.cassandra.dht.Bounds;
+ import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.io.sstable.*;
 -import org.apache.cassandra.io.util.FileUtils;
 -import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.Pair;
  
  public class LeveledManifest
@@@ -48,16 -57,19 +49,23 @@@
       * or even OOMing when compacting highly overlapping sstables
       */
      private static final int MAX_COMPACTING_L0 = 32;
+     /**
+      * If we go this many rounds without compacting
+      * in the highest level, we start bringing in sstables from
+      * that level into lower level compactions
+      */
+     private static final int NO_COMPACTION_LIMIT = 25;
  
      private final ColumnFamilyStore cfs;
 -    private final List<SSTableReader>[] generations;
 +    @VisibleForTesting
 +    protected final List<SSTableReader>[] generations;
 +    @VisibleForTesting
 +    protected final List<SSTableReader> unrepairedL0;
      private final RowPosition[] lastCompactedKeys;
      private final int maxSSTableSizeInBytes;
      private final SizeTieredCompactionStrategyOptions options;
 +    private boolean hasRepairedData = false;
+     private final int [] compactionCounter;
  
      private LeveledManifest(ColumnFamilyStore cfs, int maxSSTableSizeInMB, SizeTieredCompactionStrategyOptions options)
      {
@@@ -74,10 -85,10 +82,11 @@@
          lastCompactedKeys = new RowPosition[n];
          for (int i = 0; i < generations.length; i++)
          {
 -            generations[i] = new ArrayList<SSTableReader>();
 +            generations[i] = new ArrayList<>();
              lastCompactedKeys[i] = cfs.partitioner.getMinimumToken().minKeyBound();
          }
 +        unrepairedL0 = new ArrayList<>();
+         compactionCounter = new int[n];
      }
  
      public static LeveledManifest create(ColumnFamilyStore cfs, int maxSSTableSize, List<SSTableReader> sstables)
@@@ -389,16 -309,68 +406,78 @@@
          return new CompactionCandidate(candidates, getNextLevel(candidates), cfs.getCompactionStrategy().getMaxSSTableBytes());
      }
  
 +    private List<SSTableReader> getSSTablesForSTCS(Collection<SSTableReader> sstables)
 +    {
 +        Iterable<SSTableReader> candidates = cfs.getDataTracker().getUncompactingSSTables(sstables);
 +        List<Pair<SSTableReader,Long>> pairs = SizeTieredCompactionStrategy.createSSTableAndLengthPairs(AbstractCompactionStrategy.filterSuspectSSTables(candidates));
 +        List<List<SSTableReader>> buckets = SizeTieredCompactionStrategy.getBuckets(pairs,
 +                                                                                    options.bucketHigh,
 +                                                                                    options.bucketLow,
 +                                                                                    options.minSSTableSize);
 +        return SizeTieredCompactionStrategy.mostInterestingBucket(buckets, 4, 32);
 +    }
+     /**
+      * If we do something that makes many levels contain too little data (cleanup, change sstable size) we will "never"
+      * compact the high levels.
+      *
+      * This method finds if we have gone many compaction rounds without doing any high-level compaction, if so
+      * we start bringing in one sstable from the highest level until that level is either empty or is doing compaction.
+      *
+      * @param targetLevel the level the candidates will be compacted into
+      * @param candidates the original sstables to compact
+      * @return
+      */
+     private Collection<SSTableReader> getOverlappingStarvedSSTables(int targetLevel, Collection<SSTableReader> candidates)
+     {
+         Set<SSTableReader> withStarvedCandidate = new HashSet<>(candidates);
+ 
+         for (int i = generations.length - 1; i > 0; i--)
+             compactionCounter[i]++;
+         compactionCounter[targetLevel] = 0;
+         if (logger.isDebugEnabled())
+         {
+             for (int j = 0; j < compactionCounter.length; j++)
+                 logger.debug("CompactionCounter: {}: {}", j, compactionCounter[j]);
+         }
+ 
+         for (int i = generations.length - 1; i > 0; i--)
+         {
+             if (getLevelSize(i) > 0)
+             {
+                 if (compactionCounter[i] > NO_COMPACTION_LIMIT)
+                 {
+                     // we try to find an sstable that is fully contained within  the boundaries we are compacting;
+                     // say we are compacting 3 sstables: 0->30 in L1 and 0->12, 12->33 in L2
+                     // this means that we will not create overlap in L2 if we add an sstable
+                     // contained within 0 -> 33 to the compaction
+                     RowPosition max = null;
+                     RowPosition min = null;
+                     for (SSTableReader candidate : candidates)
+                     {
+                         if (min == null || candidate.first.compareTo(min) < 0)
+                             min = candidate.first;
+                         if (max == null || candidate.last.compareTo(max) > 0)
+                             max = candidate.last;
+                     }
+                     Set<SSTableReader> compacting = cfs.getDataTracker().getCompacting();
+                     Range<RowPosition> boundaries = new Range<>(min, max);
+                     for (SSTableReader sstable : getLevel(i))
+                     {
+                         Range<RowPosition> r = new Range<RowPosition>(sstable.first, sstable.last);
+                         if (boundaries.contains(r) && !compacting.contains(sstable))
+                         {
+                             logger.info("Adding high-level (L{}) {} to candidates", sstable.getSSTableLevel(), sstable);
+                             withStarvedCandidate.add(sstable);
+                             return withStarvedCandidate;
+                         }
+                     }
+                 }
+                 return candidates;
+             }
+         }
+ 
+         return candidates;
+     }
  
      public synchronized int getLevelSize(int i)
      {