You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2012/10/30 17:51:08 UTC

git commit: Fix potential infinite loop in tombstone compaction; patch by yukim reviewed by Sylvain Lebresne for CASSANDRA-4781

Updated Branches:
  refs/heads/trunk 02b59eb1f -> b74a00b28


Fix potential infinite loop in tombstone compaction; patch by yukim reviewed by Sylvain Lebresne for CASSANDRA-4781


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b74a00b2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b74a00b2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b74a00b2

Branch: refs/heads/trunk
Commit: b74a00b289f7c6c7287b68db870409bc598ed287
Parents: 02b59eb
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Oct 30 11:51:00 2012 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Oct 30 11:51:00 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../db/compaction/AbstractCompactionStrategy.java  |   35 +++++++-
 .../apache/cassandra/io/sstable/SSTableReader.java |    9 ++
 test/unit/org/apache/cassandra/SchemaLoader.java   |    6 +-
 .../cassandra/db/compaction/CompactionsTest.java   |   66 ++++-----------
 5 files changed, 62 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b74a00b2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 69b4daa..9c09742 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -41,6 +41,7 @@
  * Add booleans as literals in CQL3 (CASSANDRA-4776)
  * Allow renaming PK columns in CQL3 (CASSANDRA-4822)
  * Fix binary protocol NEW_NODE event (CASSANDRA-4679)
+ * Fix potential infinite loop in tombstone compaction (CASSANDRA-4781)
 Merged from 1.1:
  * add get[Row|Key]CacheEntries to CacheServiceMBean (CASSANDRA-4859)
  * fix get_paged_slice to wrap to next row correctly (CASSANDRA-4816)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b74a00b2/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index ec328e5..94743f9 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -19,9 +19,13 @@ package org.apache.cassandra.db.compaction;
 
 import java.util.*;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.SSTableReader;
 
 /**
@@ -34,13 +38,19 @@ import org.apache.cassandra.io.sstable.SSTableReader;
  */
 public abstract class AbstractCompactionStrategy
 {
+    private static final Logger logger = LoggerFactory.getLogger(AbstractCompactionStrategy.class);
+
     protected static final float DEFAULT_TOMBSTONE_THRESHOLD = 0.2f;
+    // minimum interval needed to perform tombstone removal compaction in seconds, default 86400 or 1 day.
+    protected static final long DEFAULT_TOMBSTONE_COMPACTION_INTERVAL = 86400;
     protected static final String TOMBSTONE_THRESHOLD_OPTION = "tombstone_threshold";
+    protected static final String TOMBSTONE_COMPACTION_INTERVAL_OPTION = "tombstone_compaction_interval";
 
     public final Map<String, String> options;
 
     protected final ColumnFamilyStore cfs;
     protected final float tombstoneThreshold;
+    protected long tombstoneCompactionInterval;
 
     protected AbstractCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
     {
@@ -50,6 +60,14 @@ public abstract class AbstractCompactionStrategy
 
         String optionValue = options.get(TOMBSTONE_THRESHOLD_OPTION);
         tombstoneThreshold = optionValue == null ? DEFAULT_TOMBSTONE_THRESHOLD : Float.parseFloat(optionValue);
+        optionValue = options.get(TOMBSTONE_COMPACTION_INTERVAL_OPTION);
+        tombstoneCompactionInterval = optionValue == null ? DEFAULT_TOMBSTONE_COMPACTION_INTERVAL : Long.parseLong(optionValue);
+        if (tombstoneCompactionInterval < 0)
+        {
+            logger.warn("tombstone_compaction_interval should not be negative({}). Using default value of {}.",
+                        tombstoneCompactionInterval, DEFAULT_TOMBSTONE_COMPACTION_INTERVAL);
+            tombstoneCompactionInterval = DEFAULT_TOMBSTONE_COMPACTION_INTERVAL;
+        }
     }
 
     /**
@@ -129,12 +147,21 @@ public abstract class AbstractCompactionStrategy
     }
 
     /**
+     * Check if given sstable is worth dropping tombstones at gcBefore.
+     * Check is skipped if tombstone_compaction_interval time does not elapse since sstable creation and returns false.
+     *
      * @param sstable SSTable to check
      * @param gcBefore time to drop tombstones
      * @return true if given sstable's tombstones are expected to be removed
      */
     protected boolean worthDroppingTombstones(SSTableReader sstable, int gcBefore)
     {
+        // since we use estimations to calculate, there is a chance that compaction will not drop tombstones actually.
+        // if that happens we will end up in infinite compaction loop, so first we check enough if enough time has
+        // elapsed since SSTable created.
+        if (System.currentTimeMillis() < sstable.getCreationTimeFor(Component.DATA) + tombstoneCompactionInterval * 1000)
+           return false;
+
         double droppableRatio = sstable.getEstimatedDroppableTombstoneRatio(gcBefore);
         if (droppableRatio <= tombstoneThreshold)
             return false;
@@ -148,6 +175,11 @@ public abstract class AbstractCompactionStrategy
         else
         {
             // what percentage of columns do we expect to compact outside of overlap?
+            if (sstable.getKeySamples().size() < 2)
+            {
+                // we have too few samples to estimate correct percentage
+                return false;
+            }
             // first, calculate estimated keys that do not overlap
             long keys = sstable.estimatedKeys();
             Set<Range<Token>> ranges = new HashSet<Range<Token>>();
@@ -155,8 +187,7 @@ public abstract class AbstractCompactionStrategy
                 ranges.add(new Range<Token>(overlap.first.token, overlap.last.token, overlap.partitioner));
             long remainingKeys = keys - sstable.estimatedKeysForRanges(ranges);
             // next, calculate what percentage of columns we have within those keys
-            double remainingKeysRatio = ((double) remainingKeys) / keys;
-            long columns = sstable.getEstimatedColumnCount().percentile(remainingKeysRatio) * remainingKeys;
+            long columns = sstable.getEstimatedColumnCount().mean() * remainingKeys;
             double remainingColumnsRatio = ((double) columns) / (sstable.getEstimatedColumnCount().count() * sstable.getEstimatedColumnCount().mean());
 
             // return if we still expect to have droppable tombstones in rest of columns

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b74a00b2/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index b9b93a8..7957134 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -1116,6 +1116,15 @@ public class SSTableReader extends SSTable
     }
 
     /**
+     * @param component component to get timestamp.
+     * @return last modified time for given component. 0 if given component does not exist or IO error occurs.
+     */
+    public long getCreationTimeFor(Component component)
+    {
+        return new File(descriptor.filenameFor(component)).lastModified();
+    }
+
+    /**
      * @param sstables
      * @return true if all desired references were acquired.  Otherwise, it will unreference any partial acquisition, and return false.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b74a00b2/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index 2197b2b..48fbc04 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -149,7 +149,9 @@ public class SchemaLoader
             null,
             null));
 
-        // Make it easy to test leveled compaction
+        // Make it easy to test compaction
+        Map<String, String> compactionOptions = new HashMap<String, String>();
+        compactionOptions.put("tombstone_compaction_interval", "1");
         Map<String, String> leveledOptions = new HashMap<String, String>();
         leveledOptions.put("sstable_size_in_mb", "1");
 
@@ -159,7 +161,7 @@ public class SchemaLoader
                                            opts_rf1,
 
                                            // Column Families
-                                           standardCFMD(ks1, "Standard1", withOldCfIds),
+                                           standardCFMD(ks1, "Standard1", withOldCfIds).compactionStrategyOptions(compactionOptions),
                                            standardCFMD(ks1, "Standard2", withOldCfIds),
                                            standardCFMD(ks1, "Standard3", withOldCfIds),
                                            standardCFMD(ks1, "Standard4", withOldCfIds),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b74a00b2/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 05abc49..c92c7bb 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -61,17 +61,13 @@ public class CompactionsTest extends SchemaLoader
         testBlacklisting(LeveledCompactionStrategy.class.getCanonicalName());
     }
 
-    /**
-     * Test to see if sstable has enough expired columns, it is compacted itself.
-     */
-    @Test
-    public void testSingleSSTableCompactionWithSizeTieredCompaction() throws Exception
+    public ColumnFamilyStore testSingleSSTableCompaction(String strategyClassName) throws Exception
     {
         Table table = Table.open(TABLE1);
         ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
         store.clearUnsafe();
         store.metadata.gcGraceSeconds(1);
-        store.setCompactionStrategyClass(SizeTieredCompactionStrategy.class.getCanonicalName());
+        store.setCompactionStrategyClass(strategyClassName);
 
         // disable compaction while flushing
         store.disableAutoCompaction();
@@ -96,8 +92,7 @@ public class CompactionsTest extends SchemaLoader
         TimeUnit.SECONDS.sleep(5);
 
         // enable compaction, submit background and wait for it to complete
-        store.setMinimumCompactionThreshold(2);
-        store.setMaximumCompactionThreshold(4);
+        store.enableAutoCompaction();
         FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(store));
         while (CompactionManager.instance.getPendingTasks() > 0 || CompactionManager.instance.getActiveCompactions() > 0)
             TimeUnit.SECONDS.sleep(1);
@@ -109,55 +104,24 @@ public class CompactionsTest extends SchemaLoader
 
         // make sure max timestamp of compacted sstables is recorded properly after compaction.
         assertMaxTimestamp(store, timestamp);
+
+        return store;
     }
 
+    /**
+     * Test to see if sstable has enough expired columns, it is compacted itself.
+     */
     @Test
-    public void testSingleSSTableCompactionWithLeveledCompaction() throws Exception
+    public void testSingleSSTableCompactionWithSizeTieredCompaction() throws Exception
     {
-        Table table = Table.open(TABLE1);
-        ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
-        store.clearUnsafe();
-        store.metadata.gcGraceSeconds(1);
-        store.setCompactionStrategyClass(LeveledCompactionStrategy.class.getCanonicalName());
+        testSingleSSTableCompaction(SizeTieredCompactionStrategy.class.getCanonicalName());
+    }
 
+    @Test
+    public void testSingleSSTableCompactionWithLeveledCompaction() throws Exception
+    {
+        ColumnFamilyStore store = testSingleSSTableCompaction(LeveledCompactionStrategy.class.getCanonicalName());
         LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) store.getCompactionStrategy();
-
-        // disable compaction while flushing
-        store.disableAutoCompaction();
-
-        long timestamp = System.currentTimeMillis();
-        for (int i = 0; i < 10; i++)
-        {
-            DecoratedKey key = Util.dk(Integer.toString(i));
-            RowMutation rm = new RowMutation(TABLE1, key.key);
-            for (int j = 0; j < 10; j++)
-                rm.add(new QueryPath("Standard1", null, ByteBufferUtil.bytes(Integer.toString(j))),
-                              ByteBufferUtil.EMPTY_BYTE_BUFFER,
-                              timestamp,
-                              j > 0 ? 3 : 0); // let first column never expire, since deleting all columns does not produce sstable
-            rm.apply();
-        }
-        store.forceBlockingFlush();
-        assertEquals(1, store.getSSTables().size());
-        long originalSize = store.getSSTables().iterator().next().uncompressedLength();
-
-        // wait enough to force single compaction
-        TimeUnit.SECONDS.sleep(5);
-
-        store.setMinimumCompactionThreshold(2);
-        store.setMaximumCompactionThreshold(4);
-        FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(store));
-        while (CompactionManager.instance.getPendingTasks() > 0 || CompactionManager.instance.getActiveCompactions() > 0)
-            TimeUnit.SECONDS.sleep(1);
-
-        // and sstable with ttl should be compacted
-        assertEquals(1, store.getSSTables().size());
-        long size = store.getSSTables().iterator().next().uncompressedLength();
-        assertTrue("should be less than " + originalSize + ", but was " + size, size < originalSize);
-
-        // make sure max timestamp of compacted sstables is recorded properly after compaction.
-        assertMaxTimestamp(store, timestamp);
-
         // tombstone removal compaction should not promote level
         assert strategy.getLevelSize(0) == 1;
     }