You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2012/07/13 20:18:00 UTC

git commit: Track tombstone for LCS; patch by yukim reviewed by jbellis for CASSANDRA-4234

Updated Branches:
  refs/heads/trunk 13f8eee99 -> 0091af932


Track tombstone for LCS; patch by yukim reviewed by jbellis for CASSANDRA-4234


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0091af93
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0091af93
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0091af93

Branch: refs/heads/trunk
Commit: 0091af932c6ef65a0a5917f123fe24398b79c079
Parents: 13f8eee
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri Jul 13 13:17:51 2012 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Jul 13 13:17:51 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    2 +-
 .../db/compaction/AbstractCompactionStrategy.java  |   44 ++++++++++++
 .../db/compaction/LeveledCompactionStrategy.java   |   45 ++++++++++++-
 .../cassandra/db/compaction/LeveledManifest.java   |    6 ++
 .../cassandra/db/compaction/OperationType.java     |    2 +
 .../compaction/SizeTieredCompactionStrategy.java   |   35 +---------
 .../cassandra/db/compaction/CompactionsTest.java   |   53 ++++++++++++++-
 7 files changed, 148 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0091af93/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 03e3fba..fb66fd6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,7 +12,7 @@
  * add inter-node message compression (CASSANDRA-3127)
  * remove COPP (CASSANDRA-2479)
  * Track tombstone expiration and compact when tombstone content is
-   higher than a configurable threshold, default 20% (CASSANDRA-3442)
+   higher than a configurable threshold, default 20% (CASSANDRA-3442, 4234)
  * update MurmurHash to version 3 (CASSANDRA-2975)
  * (CLI) track elapsed time for `delete' operation (CASSANDRA-4060)
  * (CLI) jline version is bumped to 1.0 to properly  support

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0091af93/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 41128b0..bf6c87f 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -38,15 +38,23 @@ import org.apache.cassandra.service.StorageService;
  */
 public abstract class AbstractCompactionStrategy
 {
+    protected static final float DEFAULT_TOMBSTONE_THRESHOLD = 0.2f;
+    protected static final String TOMBSTONE_THRESHOLD_KEY = "tombstone_threshold";
+
     protected final ColumnFamilyStore cfs;
     protected final Map<String, String> options;
 
+    protected float tombstoneThreshold;
+
     protected AbstractCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
     {
         assert cfs != null;
         this.cfs = cfs;
         this.options = options;
 
+        String optionValue = options.get(TOMBSTONE_THRESHOLD_KEY);
+        tombstoneThreshold = (null != optionValue) ? Float.parseFloat(optionValue) : DEFAULT_TOMBSTONE_THRESHOLD;
+
         // start compactions in five minutes (if no flushes have occurred by then to do so)
         Runnable runnable = new Runnable()
         {
@@ -146,4 +154,40 @@ public abstract class AbstractCompactionStrategy
     {
         return getScanners(toCompact, null);
     }
+
+    /**
+     * @param sstable SSTable to check
+     * @param gcBefore time to drop tombstones
+     * @return true if given sstable's tombstones are expected to be removed
+     */
+    protected boolean worthDroppingTombstones(SSTableReader sstable, int gcBefore)
+    {
+        double droppableRatio = sstable.getEstimatedDroppableTombstoneRatio(gcBefore);
+        if (droppableRatio <= tombstoneThreshold)
+            return false;
+
+        Set<SSTableReader> overlaps = cfs.getOverlappingSSTables(Collections.singleton(sstable));
+        if (overlaps.isEmpty())
+        {
+            // there is no overlap, tombstones are safely droppable
+            return true;
+        }
+        else
+        {
+            // what percentage of columns do we expect to compact outside of overlap?
+            // first, calculate estimated keys that do not overlap
+            long keys = sstable.estimatedKeys();
+            Set<Range<Token>> ranges = new HashSet<Range<Token>>();
+            for (SSTableReader overlap : overlaps)
+                ranges.add(new Range<Token>(overlap.first.token, overlap.last.token, overlap.partitioner));
+            long remainingKeys = keys - sstable.estimatedKeysForRanges(ranges);
+            // next, calculate what percentage of columns we have within those keys
+            double remainingKeysRatio = ((double) remainingKeys) / keys;
+            long columns = sstable.getEstimatedColumnCount().percentile(remainingKeysRatio) * remainingKeys;
+            double remainingColumnsRatio = ((double) columns) / (sstable.getEstimatedColumnCount().count() * sstable.getEstimatedColumnCount().mean());
+
+            // return if we still expect to have droppable tombstones in rest of columns
+            return remainingColumnsRatio * droppableRatio > tombstoneThreshold;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0091af93/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 3e124de..9ac4fce 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -23,11 +23,11 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.*;
+import com.google.common.primitives.Doubles;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -110,13 +110,22 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
         }
 
         Collection<SSTableReader> sstables = manifest.getCompactionCandidates();
+        OperationType op = OperationType.COMPACTION;
         if (sstables.isEmpty())
         {
-            logger.debug("No compaction necessary for {}", this);
-            return null;
+            // if there is no sstable to compact in standard way, try compacting based on droppable tombstone ratio
+            SSTableReader sstable = findDroppableSSTable(gcBefore);
+            if (sstable == null)
+            {
+                logger.debug("No compaction necessary for {}", this);
+                return null;
+            }
+            sstables = Collections.singleton(sstable);
+            op = OperationType.TOMBSTONE_COMPACTION;
         }
 
         LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, sstables, gcBefore, this.maxSSTableSizeInMB);
+        newTask.setCompactionType(op);
         return task.compareAndSet(currentTask, newTask)
                ? newTask
                : null;
@@ -148,6 +157,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
                 case CLEANUP:
                 case SCRUB:
                 case UPGRADE_SSTABLES:
+                case TOMBSTONE_COMPACTION: // Also when performing tombstone removal.
                     manifest.replace(listChangedNotification.removed, listChangedNotification.added);
                     break;
                 default:
@@ -280,4 +290,33 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
     {
         return String.format("LCS@%d(%s)", hashCode(), cfs.columnFamily);
     }
+
+    private SSTableReader findDroppableSSTable(final int gcBefore)
+    {
+        level:
+        for (int i = manifest.getLevelCount(); i >= 0; i--)
+        {
+            // sort sstables by droppable ratio in descending order
+            SortedSet<SSTableReader> sstables = manifest.getLevelSorted(i, new Comparator<SSTableReader>()
+            {
+                public int compare(SSTableReader o1, SSTableReader o2)
+                {
+                    double r1 = o1.getEstimatedDroppableTombstoneRatio(gcBefore);
+                    double r2 = o2.getEstimatedDroppableTombstoneRatio(gcBefore);
+                    return -1 * Doubles.compare(r1, r2);
+                }
+            });
+            if (sstables.isEmpty())
+                continue;
+
+            for (SSTableReader sstable : sstables)
+            {
+                if (sstable.getEstimatedDroppableTombstoneRatio(gcBefore) <= tombstoneThreshold)
+                    continue level;
+                else if (!sstable.isMarkedSuspect() && worthDroppingTombstones(sstable, gcBefore))
+                    return sstable;
+            }
+        }
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0091af93/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 567c919..53efb80 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -22,6 +22,7 @@ import java.io.IOError;
 import java.io.IOException;
 import java.util.*;
 
+import com.google.common.collect.ImmutableSortedSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.google.common.primitives.Ints;
@@ -568,6 +569,11 @@ public class LeveledManifest
         return 0;
     }
 
+    public synchronized SortedSet<SSTableReader> getLevelSorted(int level, Comparator<SSTableReader> comparator)
+    {
+        return ImmutableSortedSet.copyOf(comparator, generations[level]);
+    }
+
     public List<SSTableReader> getLevel(int i)
     {
         return generations[i];

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0091af93/src/java/org/apache/cassandra/db/compaction/OperationType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java
index 79f6c5e..e46e92d 100644
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@ -27,6 +27,8 @@ public enum OperationType
     SCRUB("Scrub"),
     UPGRADE_SSTABLES("Upgrade sstables"),
     INDEX_BUILD("Secondary index build"),
+    /** Compaction for tombstone removal */
+    TOMBSTONE_COMPACTION("Tombstone Compaction"),
     UNKNOWN("Unkown compaction type");
 
     private final String type;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0091af93/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 67d2e77..d1ac516 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -24,8 +24,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.utils.Pair;
@@ -34,12 +32,9 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
 {
     private static final Logger logger = LoggerFactory.getLogger(SizeTieredCompactionStrategy.class);
     protected static final long DEFAULT_MIN_SSTABLE_SIZE = 50L * 1024L * 1024L;
-    protected static final float DEFAULT_TOMBSTONE_THRESHOLD = 0.2f;
     protected static final String MIN_SSTABLE_SIZE_KEY = "min_sstable_size";
-    protected static final String TOMBSTONE_THRESHOLD_KEY = "tombstone_threshold";
     protected long minSSTableSize;
     protected volatile int estimatedRemainingTasks;
-    protected float tombstoneThreshold;
 
     public SizeTieredCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
     {
@@ -49,8 +44,6 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
         minSSTableSize = (null != optionValue) ? Long.parseLong(optionValue) : DEFAULT_MIN_SSTABLE_SIZE;
         cfs.setMaximumCompactionThreshold(cfs.metadata.getMaxCompactionThreshold());
         cfs.setMinimumCompactionThreshold(cfs.metadata.getMinCompactionThreshold());
-        optionValue = options.get(TOMBSTONE_THRESHOLD_KEY);
-        tombstoneThreshold = (null != optionValue) ? Float.parseFloat(optionValue) : DEFAULT_TOMBSTONE_THRESHOLD;
     }
 
     public AbstractCompactionTask getNextBackgroundTask(final int gcBefore)
@@ -90,34 +83,8 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
             {
                 for (SSTableReader table : bucket)
                 {
-                    double droppableRatio = table.getEstimatedDroppableTombstoneRatio(gcBefore);
-                    if (droppableRatio <= tombstoneThreshold)
-                        continue;
-
-                    Set<SSTableReader> overlaps = cfs.getOverlappingSSTables(Collections.singleton(table));
-                    if (overlaps.isEmpty())
-                    {
-                        // there is no overlap, tombstones are safely droppable
+                    if (worthDroppingTombstones(table, gcBefore))
                         prunedBuckets.add(Collections.singletonList(table));
-                    }
-                    else
-                    {
-                        // what percentage of columns do we expect to compact outside of overlap?
-                        // first, calculate estimated keys that do not overlap
-                        long keys = table.estimatedKeys();
-                        Set<Range<Token>> ranges = new HashSet<Range<Token>>();
-                        for (SSTableReader overlap : overlaps)
-                            ranges.add(new Range<Token>(overlap.first.token, overlap.last.token, overlap.partitioner));
-                        long remainingKeys = keys - table.estimatedKeysForRanges(ranges);
-                        // next, calculate what percentage of columns we have within those keys
-                        double remainingKeysRatio = ((double) remainingKeys) / keys;
-                        long columns = table.getEstimatedColumnCount().percentile(remainingKeysRatio) * remainingKeys;
-                        double remainingColumnsRatio = ((double) columns) / (table.getEstimatedColumnCount().count() * table.getEstimatedColumnCount().mean());
-
-                        // if we still expect to have droppable tombstones in rest of columns, then try compacting it
-                        if (remainingColumnsRatio * droppableRatio > tombstoneThreshold)
-                            prunedBuckets.add(Collections.singletonList(table));
-                    }
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0091af93/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index b1ae773..2057822 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -107,7 +107,7 @@ public class CompactionsTest extends SchemaLoader
      * Test to see if sstable has enough expired columns, it is compacted itself.
      */
     @Test
-    public void testSingleSSTableCompaction() throws Exception
+    public void testSingleSSTableCompactionWithSizeTieredCompaction() throws Exception
     {
         Table table = Table.open(TABLE1);
         ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
@@ -154,6 +154,57 @@ public class CompactionsTest extends SchemaLoader
     }
 
     @Test
+    public void testSingleSSTableCompactionWithLeveledCompaction() throws Exception
+    {
+        Table table = Table.open(TABLE1);
+        ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
+        store.clearUnsafe();
+        store.metadata.gcGraceSeconds(1);
+        store.setCompactionStrategyClass(LeveledCompactionStrategy.class.getCanonicalName());
+
+        LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) store.getCompactionStrategy();
+
+        // disable compaction while flushing
+        store.disableAutoCompaction();
+
+        long timestamp = System.currentTimeMillis();
+        for (int i = 0; i < 10; i++)
+        {
+            DecoratedKey key = Util.dk(Integer.toString(i));
+            RowMutation rm = new RowMutation(TABLE1, key.key);
+            for (int j = 0; j < 10; j++)
+                rm.add(new QueryPath("Standard1", null, ByteBufferUtil.bytes(Integer.toString(j))),
+                              ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                              timestamp,
+                              j > 0 ? 3 : 0); // let first column never expire, since deleting all columns does not produce sstable
+            rm.apply();
+        }
+        store.forceBlockingFlush();
+        assertEquals(1, store.getSSTables().size());
+        long originalSize = store.getSSTables().iterator().next().uncompressedLength();
+
+        // wait enough to force single compaction
+        TimeUnit.SECONDS.sleep(5);
+
+        store.setMinimumCompactionThreshold(2);
+        store.setMaximumCompactionThreshold(4);
+        FBUtilities.waitOnFuture(CompactionManager.instance.submitBackground(store));
+        while (CompactionManager.instance.getPendingTasks() > 0 || CompactionManager.instance.getActiveCompactions() > 0)
+            TimeUnit.SECONDS.sleep(1);
+
+        // and sstable with ttl should be compacted
+        assertEquals(1, store.getSSTables().size());
+        long size = store.getSSTables().iterator().next().uncompressedLength();
+        assertTrue("should be less than " + originalSize + ", but was " + size, size < originalSize);
+
+        // make sure max timestamp of compacted sstables is recorded properly after compaction.
+        assertMaxTimestamp(store, timestamp);
+
+        // tombstone removal compaction should not promote level
+        assert strategy.getLevelSize(0) == 1;
+    }
+
+    @Test
     public void testSuperColumnCompactions() throws IOException, ExecutionException, InterruptedException
     {
         Table table = Table.open(TABLE1);