You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2014/06/30 08:48:10 UTC

[1/3] git commit: Include high level sstables in lower level compactions if not compacted for a while.

Repository: cassandra
Updated Branches:
  refs/heads/trunk c027183ea -> 64772865c


Include high level sstables in lower level compactions if not compacted for a while.

Patch by marcuse, reviewed by kohlisankalp and yukim for CASSANDRA-7414


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fc9eeb99
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fc9eeb99
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fc9eeb99

Branch: refs/heads/trunk
Commit: fc9eeb99f9c1c13335c7df19b725e8b1b80b8cbb
Parents: 536ae7a
Author: Marcus Eriksson <ma...@apache.org>
Authored: Tue Jun 17 09:12:51 2014 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Jun 30 08:40:37 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/LeveledManifest.java          | 86 +++++++++++++++++++-
 2 files changed, 84 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc9eeb99/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e722392..e88753f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@
    operations to incorrect become full QUORUM (CASSANDRA-7345)
  * Properly handle unrecognized opcodes and flags (CASSANDRA-7440)
  * (Hadoop) close CqlRecordWriter clients when finished (CASSANDRA-7459)
+ * Make sure high level sstables get compacted (CASSANDRA-7414)
 
 
 2.0.9

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc9eeb99/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index a78a867..1118ddc 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.util.FileUtils;
@@ -56,12 +57,19 @@ public class LeveledManifest
      * or even OOMing when compacting highly overlapping sstables
      */
     private static final int MAX_COMPACTING_L0 = 32;
+    /**
+     * If we go this many rounds without compacting
+     * in the highest level, we start bringing in sstables from
+     * that level into lower level compactions
+     */
+    private static final int NO_COMPACTION_LIMIT = 25;
 
     private final ColumnFamilyStore cfs;
     private final List<SSTableReader>[] generations;
     private final RowPosition[] lastCompactedKeys;
     private final int maxSSTableSizeInBytes;
     private final SizeTieredCompactionStrategyOptions options;
+    private final int [] compactionCounter;
 
     private LeveledManifest(ColumnFamilyStore cfs, int maxSSTableSizeInMB, SizeTieredCompactionStrategyOptions options)
     {
@@ -80,6 +88,7 @@ public class LeveledManifest
             generations[i] = new ArrayList<SSTableReader>();
             lastCompactedKeys[i] = cfs.partitioner.getMinimumToken().minKeyBound();
         }
+        compactionCounter = new int[n];
     }
 
     public static LeveledManifest create(ColumnFamilyStore cfs, int maxSSTableSize, List<SSTableReader> sstables)
@@ -276,10 +285,18 @@ public class LeveledManifest
 
                 // L0 is fine, proceed with this level
                 Collection<SSTableReader> candidates = getCandidatesFor(i);
-                if (logger.isDebugEnabled())
-                    logger.debug("Compaction candidates for L{} are {}", i, toString(candidates));
                 if (!candidates.isEmpty())
-                    return new CompactionCandidate(candidates, getNextLevel(candidates), cfs.getCompactionStrategy().getMaxSSTableBytes());
+                {
+                    int nextLevel = getNextLevel(candidates);
+                    candidates = getOverlappingStarvedSSTables(nextLevel, candidates);
+                    if (logger.isDebugEnabled())
+                        logger.debug("Compaction candidates for L{} are {}", i, toString(candidates));
+                    return new CompactionCandidate(candidates, nextLevel, cfs.getCompactionStrategy().getMaxSSTableBytes());
+                }
+                else
+                {
+                    logger.debug("No compaction candidates for L{}", i);
+                }
             }
         }
 
@@ -292,6 +309,69 @@ public class LeveledManifest
         return new CompactionCandidate(candidates, getNextLevel(candidates), cfs.getCompactionStrategy().getMaxSSTableBytes());
     }
 
+    /**
+     * If we do something that makes many levels contain too little data (cleanup, change sstable size) we will "never"
+     * compact the high levels.
+     *
+     * This method finds if we have gone many compaction rounds without doing any high-level compaction, if so
+     * we start bringing in one sstable from the highest level until that level is either empty or is doing compaction.
+     *
+     * @param targetLevel the level the candidates will be compacted into
+     * @param candidates the original sstables to compact
+     * @return
+     */
+    private Collection<SSTableReader> getOverlappingStarvedSSTables(int targetLevel, Collection<SSTableReader> candidates)
+    {
+        Set<SSTableReader> withStarvedCandidate = new HashSet<>(candidates);
+
+        for (int i = generations.length - 1; i > 0; i--)
+            compactionCounter[i]++;
+        compactionCounter[targetLevel] = 0;
+        if (logger.isDebugEnabled())
+        {
+            for (int j = 0; j < compactionCounter.length; j++)
+                logger.debug("CompactionCounter: {}: {}", j, compactionCounter[j]);
+        }
+
+        for (int i = generations.length - 1; i > 0; i--)
+        {
+            if (getLevelSize(i) > 0)
+            {
+                if (compactionCounter[i] > NO_COMPACTION_LIMIT)
+                {
+                    // we try to find an sstable that is fully contained within  the boundaries we are compacting;
+                    // say we are compacting 3 sstables: 0->30 in L1 and 0->12, 12->33 in L2
+                    // this means that we will not create overlap in L2 if we add an sstable
+                    // contained within 0 -> 33 to the compaction
+                    RowPosition max = null;
+                    RowPosition min = null;
+                    for (SSTableReader candidate : candidates)
+                    {
+                        if (min == null || candidate.first.compareTo(min) < 0)
+                            min = candidate.first;
+                        if (max == null || candidate.last.compareTo(max) > 0)
+                            max = candidate.last;
+                    }
+                    Set<SSTableReader> compacting = cfs.getDataTracker().getCompacting();
+                    Range<RowPosition> boundaries = new Range<>(min, max);
+                    for (SSTableReader sstable : getLevel(i))
+                    {
+                        Range<RowPosition> r = new Range<RowPosition>(sstable.first, sstable.last);
+                        if (boundaries.contains(r) && !compacting.contains(sstable))
+                        {
+                            logger.info("Adding high-level (L{}) {} to candidates", sstable.getSSTableLevel(), sstable);
+                            withStarvedCandidate.add(sstable);
+                            return withStarvedCandidate;
+                        }
+                    }
+                }
+                return candidates;
+            }
+        }
+
+        return candidates;
+    }
+
     public synchronized int getLevelSize(int i)
     {
         if (i >= generations.length)


[3/3] git commit: Merge branch 'cassandra-2.1' into trunk

Posted by ma...@apache.org.
Merge branch 'cassandra-2.1' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/64772865
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/64772865
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/64772865

Branch: refs/heads/trunk
Commit: 64772865caff5f7ed7dc4e009131846bba17e861
Parents: c027183 73a07da
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Jun 30 08:46:18 2014 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Jun 30 08:46:18 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/LeveledManifest.java          | 86 +++++++++++++++++++-
 2 files changed, 84 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/64772865/CHANGES.txt
----------------------------------------------------------------------


[2/3] git commit: Merge branch 'cassandra-2.0' into cassandra-2.1

Posted by ma...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
	src/java/org/apache/cassandra/db/compaction/LeveledManifest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/73a07daf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/73a07daf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/73a07daf

Branch: refs/heads/trunk
Commit: 73a07daf9dc17a6a94d9244a8e03f58d808c5f88
Parents: ef4a07b fc9eeb9
Author: Marcus Eriksson <ma...@apache.org>
Authored: Mon Jun 30 08:45:41 2014 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Mon Jun 30 08:45:41 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/LeveledManifest.java          | 86 +++++++++++++++++++-
 2 files changed, 84 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/73a07daf/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index b30e9a2,e88753f..6aa2e6a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -15,36 -4,29 +15,37 @@@ Merged from 2.0
     operations to incorrect become full QUORUM (CASSANDRA-7345)
   * Properly handle unrecognized opcodes and flags (CASSANDRA-7440)
   * (Hadoop) close CqlRecordWriter clients when finished (CASSANDRA-7459)
+  * Make sure high level sstables get compacted (CASSANDRA-7414)
  
  
 -2.0.9
 - * Fix CC#collectTimeOrderedData() tombstone optimisations (CASSANDRA-7394)
 - * Fix assertion error in CL.ANY timeout handling (CASSANDRA-7364)
 - * Handle empty CFs in Memtable#maybeUpdateLiveRatio() (CASSANDRA-7401)
 +2.1.0-rc2
 + * Fix heap size calculation for CompoundSparseCellName and 
 +   CompoundSparseCellName.WithCollection (CASSANDRA-7421)
 + * Allow counter mutations in UNLOGGED batches (CASSANDRA-7351)
 + * Modify reconcile logic to always pick a tombstone over a counter cell
 +   (CASSANDRA-7346)
 + * Avoid incremental compaction on Windows (CASSANDRA-7365)
 + * Fix exception when querying a composite-keyed table with a collection index
 +   (CASSANDRA-7372)
 + * Use node's host id in place of counter ids (CASSANDRA-7366)
   * Fix native protocol CAS batches (CASSANDRA-7337)
 + * Reduce likelihood of contention on local paxos locking (CASSANDRA-7359)
 + * Upgrade to Pig 0.12.1 (CASSANDRA-6556)
 + * Make sure we clear out repair sessions from netstats (CASSANDRA-7329)
 + * Don't fail streams on failure detector downs (CASSANDRA-3569)
 + * Add optional keyspace to DROP INDEX statement (CASSANDRA-7314)
 + * Reduce run time for CQL tests (CASSANDRA-7327)
 + * Fix heap size calculation on Windows (CASSANDRA-7352, 7353)
 + * RefCount native frames from netty (CASSANDRA-7245)
 + * Use tarball dir instead of /var for default paths (CASSANDRA-7136)
 + * Remove rows_per_partition_to_cache keyword (CASSANDRA-7193)
 + * Fix schema change response in native protocol v3 (CASSANDRA-7413)
 +Merged from 2.0:
 + * Fix assertion error in CL.ANY timeout handling (CASSANDRA-7364)
   * Add per-CF range read request latency metrics (CASSANDRA-7338)
   * Fix NPE in StreamTransferTask.createMessageForRetry() (CASSANDRA-7323)
 - * Add conditional CREATE/DROP USER support (CASSANDRA-7264)
 - * Swap local and global default read repair chances (CASSANDRA-7320)
 - * Add missing iso8601 patterns for date strings (CASSANDRA-6973)
 - * Support selecting multiple rows in a partition using IN (CASSANDRA-6875)
 - * cqlsh: always emphasize the partition key in DESC output (CASSANDRA-7274)
 - * Copy compaction options to make sure they are reloaded (CASSANDRA-7290)
 - * Add option to do more aggressive tombstone compactions (CASSANDRA-6563)
 - * Don't try to compact already-compacting files in HHOM (CASSANDRA-7288)
 - * Add authentication support to shuffle (CASSANDRA-6484)
 - * Cqlsh counts non-empty lines for "Blank lines" warning (CASSANDRA-7325)
   * Make StreamSession#closeSession() idempotent (CASSANDRA-7262)
   * Fix infinite loop on exception while streaming (CASSANDRA-7330)
 - * Reference sstables before populating key cache (CASSANDRA-7234)
   * Account for range tombstones in min/max column names (CASSANDRA-7235)
   * Improve sub range repair validation (CASSANDRA-7317)
   * Accept subtypes for function results, type casts (CASSANDRA-6766)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73a07daf/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index a21924b,1118ddc..aac57c0
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@@ -34,8 -36,13 +34,9 @@@ import org.slf4j.LoggerFactory
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.RowPosition;
  import org.apache.cassandra.dht.Bounds;
+ import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.io.sstable.*;
 -import org.apache.cassandra.io.util.FileUtils;
 -import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.Pair;
  
  public class LeveledManifest
@@@ -48,16 -57,19 +49,23 @@@
       * or even OOMing when compacting highly overlapping sstables
       */
      private static final int MAX_COMPACTING_L0 = 32;
+     /**
+      * If we go this many rounds without compacting
+      * in the highest level, we start bringing in sstables from
+      * that level into lower level compactions
+      */
+     private static final int NO_COMPACTION_LIMIT = 25;
  
      private final ColumnFamilyStore cfs;
 -    private final List<SSTableReader>[] generations;
 +    @VisibleForTesting
 +    protected final List<SSTableReader>[] generations;
 +    @VisibleForTesting
 +    protected final List<SSTableReader> unrepairedL0;
      private final RowPosition[] lastCompactedKeys;
      private final int maxSSTableSizeInBytes;
      private final SizeTieredCompactionStrategyOptions options;
 +    private boolean hasRepairedData = false;
+     private final int [] compactionCounter;
  
      private LeveledManifest(ColumnFamilyStore cfs, int maxSSTableSizeInMB, SizeTieredCompactionStrategyOptions options)
      {
@@@ -74,10 -85,10 +82,11 @@@
          lastCompactedKeys = new RowPosition[n];
          for (int i = 0; i < generations.length; i++)
          {
 -            generations[i] = new ArrayList<SSTableReader>();
 +            generations[i] = new ArrayList<>();
              lastCompactedKeys[i] = cfs.partitioner.getMinimumToken().minKeyBound();
          }
 +        unrepairedL0 = new ArrayList<>();
+         compactionCounter = new int[n];
      }
  
      public static LeveledManifest create(ColumnFamilyStore cfs, int maxSSTableSize, List<SSTableReader> sstables)
@@@ -389,17 -309,69 +406,80 @@@
          return new CompactionCandidate(candidates, getNextLevel(candidates), cfs.getCompactionStrategy().getMaxSSTableBytes());
      }
  
 +    private List<SSTableReader> getSSTablesForSTCS(Collection<SSTableReader> sstables)
 +    {
 +        Iterable<SSTableReader> candidates = cfs.getDataTracker().getUncompactingSSTables(sstables);
 +        List<Pair<SSTableReader,Long>> pairs = SizeTieredCompactionStrategy.createSSTableAndLengthPairs(AbstractCompactionStrategy.filterSuspectSSTables(candidates));
 +        List<List<SSTableReader>> buckets = SizeTieredCompactionStrategy.getBuckets(pairs,
 +                                                                                    options.bucketHigh,
 +                                                                                    options.bucketLow,
 +                                                                                    options.minSSTableSize);
 +        return SizeTieredCompactionStrategy.mostInterestingBucket(buckets, 4, 32);
 +    }
 +
+     /**
+      * If we do something that makes many levels contain too little data (cleanup, change sstable size) we will "never"
+      * compact the high levels.
+      *
+      * This method finds if we have gone many compaction rounds without doing any high-level compaction, if so
+      * we start bringing in one sstable from the highest level until that level is either empty or is doing compaction.
+      *
+      * @param targetLevel the level the candidates will be compacted into
+      * @param candidates the original sstables to compact
+      * @return
+      */
+     private Collection<SSTableReader> getOverlappingStarvedSSTables(int targetLevel, Collection<SSTableReader> candidates)
+     {
+         Set<SSTableReader> withStarvedCandidate = new HashSet<>(candidates);
+ 
+         for (int i = generations.length - 1; i > 0; i--)
+             compactionCounter[i]++;
+         compactionCounter[targetLevel] = 0;
+         if (logger.isDebugEnabled())
+         {
+             for (int j = 0; j < compactionCounter.length; j++)
+                 logger.debug("CompactionCounter: {}: {}", j, compactionCounter[j]);
+         }
+ 
+         for (int i = generations.length - 1; i > 0; i--)
+         {
+             if (getLevelSize(i) > 0)
+             {
+                 if (compactionCounter[i] > NO_COMPACTION_LIMIT)
+                 {
+                     // we try to find an sstable that is fully contained within  the boundaries we are compacting;
+                     // say we are compacting 3 sstables: 0->30 in L1 and 0->12, 12->33 in L2
+                     // this means that we will not create overlap in L2 if we add an sstable
+                     // contained within 0 -> 33 to the compaction
+                     RowPosition max = null;
+                     RowPosition min = null;
+                     for (SSTableReader candidate : candidates)
+                     {
+                         if (min == null || candidate.first.compareTo(min) < 0)
+                             min = candidate.first;
+                         if (max == null || candidate.last.compareTo(max) > 0)
+                             max = candidate.last;
+                     }
+                     Set<SSTableReader> compacting = cfs.getDataTracker().getCompacting();
+                     Range<RowPosition> boundaries = new Range<>(min, max);
+                     for (SSTableReader sstable : getLevel(i))
+                     {
+                         Range<RowPosition> r = new Range<RowPosition>(sstable.first, sstable.last);
+                         if (boundaries.contains(r) && !compacting.contains(sstable))
+                         {
+                             logger.info("Adding high-level (L{}) {} to candidates", sstable.getSSTableLevel(), sstable);
+                             withStarvedCandidate.add(sstable);
+                             return withStarvedCandidate;
+                         }
+                     }
+                 }
+                 return candidates;
+             }
+         }
+ 
+         return candidates;
+     }
+ 
      public synchronized int getLevelSize(int i)
      {
          if (i >= generations.length)