You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2012/10/30 17:51:08 UTC
git commit: Fix potential infinite loop in tombstone compaction;
patch by yukim reviewed by Sylvain Lebresne for CASSANDRA-4781
Updated Branches:
refs/heads/trunk 02b59eb1f -> b74a00b28
Fix potential infinite loop in tombstone compaction; patch by yukim reviewed by Sylvain Lebresne for CASSANDRA-4781
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b74a00b2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b74a00b2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b74a00b2
Branch: refs/heads/trunk
Commit: b74a00b289f7c6c7287b68db870409bc598ed287
Parents: 02b59eb
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Oct 30 11:51:00 2012 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Oct 30 11:51:00 2012 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../db/compaction/AbstractCompactionStrategy.java | 35 +++++++-
.../apache/cassandra/io/sstable/SSTableReader.java | 9 ++
test/unit/org/apache/cassandra/SchemaLoader.java | 6 +-
.../cassandra/db/compaction/CompactionsTest.java | 66 ++++-----------
5 files changed, 62 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b74a00b2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 69b4daa..9c09742 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -41,6 +41,7 @@
* Add booleans as literals in CQL3 (CASSANDRA-4776)
* Allow renaming PK columns in CQL3 (CASSANDRA-4822)
* Fix binary protocol NEW_NODE event (CASSANDRA-4679)
+ * Fix potential infinite loop in tombstone compaction (CASSANDRA-4781)
Merged from 1.1:
* add get[Row|Key]CacheEntries to CacheServiceMBean (CASSANDRA-4859)
* fix get_paged_slice to wrap to next row correctly (CASSANDRA-4816)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b74a00b2/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index ec328e5..94743f9 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -19,9 +19,13 @@ package org.apache.cassandra.db.compaction;
import java.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.SSTableReader;
/**
@@ -34,13 +38,19 @@ import org.apache.cassandra.io.sstable.SSTableReader;
*/
public abstract class AbstractCompactionStrategy
{
+ private static final Logger logger = LoggerFactory.getLogger(AbstractCompactionStrategy.class);
+
protected static final float DEFAULT_TOMBSTONE_THRESHOLD = 0.2f;
+ // minimum interval needed to perform tombstone removal compaction in seconds, default 86400 or 1 day.
+ protected static final long DEFAULT_TOMBSTONE_COMPACTION_INTERVAL = 86400;
protected static final String TOMBSTONE_THRESHOLD_OPTION = "tombstone_threshold";
+ protected static final String TOMBSTONE_COMPACTION_INTERVAL_OPTION = "tombstone_compaction_interval";
public final Map<String, String> options;
protected final ColumnFamilyStore cfs;
protected final float tombstoneThreshold;
+ protected long tombstoneCompactionInterval;
protected AbstractCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
{
@@ -50,6 +60,14 @@ public abstract class AbstractCompactionStrategy
String optionValue = options.get(TOMBSTONE_THRESHOLD_OPTION);
tombstoneThreshold = optionValue == null ? DEFAULT_TOMBSTONE_THRESHOLD : Float.parseFloat(optionValue);
+ optionValue = options.get(TOMBSTONE_COMPACTION_INTERVAL_OPTION);
+ tombstoneCompactionInterval = optionValue == null ? DEFAULT_TOMBSTONE_COMPACTION_INTERVAL : Long.parseLong(optionValue);
+ if (tombstoneCompactionInterval < 0)
+ {
+ logger.warn("tombstone_compaction_interval should not be negative({}). Using default value of {}.",
+ tombstoneCompactionInterval, DEFAULT_TOMBSTONE_COMPACTION_INTERVAL);
+ tombstoneCompactionInterval = DEFAULT_TOMBSTONE_COMPACTION_INTERVAL;
+ }
}
/**
@@ -129,12 +147,21 @@ public abstract class AbstractCompactionStrategy
}
/**
+ * Check if given sstable is worth dropping tombstones at gcBefore.
+ * Check is skipped if tombstone_compaction_interval time does not elapse since sstable creation and returns false.
+ *
* @param sstable SSTable to check
* @param gcBefore time to drop tombstones
* @return true if given sstable's tombstones are expected to be removed
*/
protected boolean worthDroppingTombstones(SSTableReader sstable, int gcBefore)
{
+ // since we use estimations to calculate, there is a chance that compaction will not drop tombstones actually.
+ // if that happens we will end up in infinite compaction loop, so first we check enough if enough time has
+ // elapsed since SSTable created.
+ if (System.currentTimeMillis() < sstable.getCreationTimeFor(Component.DATA) + tombstoneCompactionInterval * 1000)
+ return false;
+
double droppableRatio = sstable.getEstimatedDroppableTombstoneRatio(gcBefore);
if (droppableRatio <= tombstoneThreshold)
return false;
@@ -148,6 +175,11 @@ public abstract class AbstractCompactionStrategy
else
{
// what percentage of columns do we expect to compact outside of overlap?
+ if (sstable.getKeySamples().size() < 2)
+ {
+ // we have too few samples to estimate correct percentage
+ return false;
+ }
// first, calculate estimated keys that do not overlap
long keys = sstable.estimatedKeys();
Set<Range<Token>> ranges = new HashSet<Range<Token>>();
@@ -155,8 +187,7 @@ public abstract class AbstractCompactionStrategy
ranges.add(new Range<Token>(overlap.first.token, overlap.last.token, overlap.partitioner));
long remainingKeys = keys - sstable.estimatedKeysForRanges(ranges);
// next, calculate what percentage of columns we have within those keys
- double remainingKeysRatio = ((double) remainingKeys) / keys;
- long columns = sstable.getEstimatedColumnCount().percentile(remainingKeysRatio) * remainingKeys;
+ long columns = sstable.getEstimatedColumnCount().mean() * remainingKeys;
double remainingColumnsRatio = ((double) columns) / (sstable.getEstimatedColumnCount().count() * sstable.getEstimatedColumnCount().mean());
// return if we still expect to have droppable tombstones in rest of columns
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b74a00b2/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index b9b93a8..7957134 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -1116,6 +1116,15 @@ public class SSTableReader extends SSTable
}
/**
+ * @param component component to get timestamp.
+ * @return last modified time for given component. 0 if given component does not exist or IO error occurs.
+ */
+ public long getCreationTimeFor(Component component)
+ {
+ return new File(descriptor.filenameFor(component)).lastModified();
+ }
+
+ /**
* @param sstables
* @return true if all desired references were acquired. Otherwise, it will unreference any partial acquisition, and return false.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b74a00b2/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index 2197b2b..48fbc04 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -149,7 +149,9 @@ public class SchemaLoader
null,
null));
- // Make it easy to test leveled compaction
+ // Make it easy to test compaction
+ Map<String, String> compactionOptions = new HashMap<String, String>();
+ compactionOptions.put("tombstone_compaction_interval", "1");
Map<String, String> leveledOptions = new HashMap<String, String>();
leveledOptions.put("sstable_size_in_mb", "1");
@@ -159,7 +161,7 @@ public class SchemaLoader
opts_rf1,
// Column Families
- standardCFMD(ks1, "Standard1", withOldCfIds),
+ standardCFMD(ks1, "Standard1", withOldCfIds).compactionStrategyOptions(compactionOptions),
standardCFMD(ks1, "Standard2", withOldCfIds),
standardCFMD(ks1, "Standard3", withOldCfIds),
standardCFMD(ks1, "Standard4", withOldCfIds),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b74a00b2/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 05abc49..c92c7bb 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -61,17 +61,13 @@ public class CompactionsTest extends SchemaLoader
testBlacklisting(LeveledCompactionStrategy.class.getCanonicalName());
}
- /**
- * Test to see if sstable has enough expired columns, it is compacted itself.
- */
- @Test
- public void testSingleSSTableCompactionWithSizeTieredCompaction() throws Exception
+ public ColumnFamilyStore testSingleSSTableCompaction(String strategyClassName) throws Exception
{
Table table = Table.open(TABLE1);
ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
store.clearUnsafe();
store.metadata.gcGraceSeconds(1);
- store.setCompactionStrategyClass(SizeTieredCompactionStrategy.class.getCanonicalName());
+ store.setCompactionStrategyClass(strategyClassName);
// disable compaction while flushing
store.disableAutoCompaction();
@@ -96,8 +92,7 @@ public class CompactionsTest extends SchemaLoader
TimeUnit.SECONDS.sleep(5);
// enable compaction, submit background and wait for it to complete
- store.setMinimumCompactionThreshold(2);
- store.setMaximumCompactionThreshold(4);
+ store.enableAutoCompaction();
FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(store));
while (CompactionManager.instance.getPendingTasks() > 0 || CompactionManager.instance.getActiveCompactions() > 0)
TimeUnit.SECONDS.sleep(1);
@@ -109,55 +104,24 @@ public class CompactionsTest extends SchemaLoader
// make sure max timestamp of compacted sstables is recorded properly after compaction.
assertMaxTimestamp(store, timestamp);
+
+ return store;
}
+ /**
+ * Test to see if sstable has enough expired columns, it is compacted itself.
+ */
@Test
- public void testSingleSSTableCompactionWithLeveledCompaction() throws Exception
+ public void testSingleSSTableCompactionWithSizeTieredCompaction() throws Exception
{
- Table table = Table.open(TABLE1);
- ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
- store.clearUnsafe();
- store.metadata.gcGraceSeconds(1);
- store.setCompactionStrategyClass(LeveledCompactionStrategy.class.getCanonicalName());
+ testSingleSSTableCompaction(SizeTieredCompactionStrategy.class.getCanonicalName());
+ }
+ @Test
+ public void testSingleSSTableCompactionWithLeveledCompaction() throws Exception
+ {
+ ColumnFamilyStore store = testSingleSSTableCompaction(LeveledCompactionStrategy.class.getCanonicalName());
LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) store.getCompactionStrategy();
-
- // disable compaction while flushing
- store.disableAutoCompaction();
-
- long timestamp = System.currentTimeMillis();
- for (int i = 0; i < 10; i++)
- {
- DecoratedKey key = Util.dk(Integer.toString(i));
- RowMutation rm = new RowMutation(TABLE1, key.key);
- for (int j = 0; j < 10; j++)
- rm.add(new QueryPath("Standard1", null, ByteBufferUtil.bytes(Integer.toString(j))),
- ByteBufferUtil.EMPTY_BYTE_BUFFER,
- timestamp,
- j > 0 ? 3 : 0); // let first column never expire, since deleting all columns does not produce sstable
- rm.apply();
- }
- store.forceBlockingFlush();
- assertEquals(1, store.getSSTables().size());
- long originalSize = store.getSSTables().iterator().next().uncompressedLength();
-
- // wait enough to force single compaction
- TimeUnit.SECONDS.sleep(5);
-
- store.setMinimumCompactionThreshold(2);
- store.setMaximumCompactionThreshold(4);
- FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(store));
- while (CompactionManager.instance.getPendingTasks() > 0 || CompactionManager.instance.getActiveCompactions() > 0)
- TimeUnit.SECONDS.sleep(1);
-
- // and sstable with ttl should be compacted
- assertEquals(1, store.getSSTables().size());
- long size = store.getSSTables().iterator().next().uncompressedLength();
- assertTrue("should be less than " + originalSize + ", but was " + size, size < originalSize);
-
- // make sure max timestamp of compacted sstables is recorded properly after compaction.
- assertMaxTimestamp(store, timestamp);
-
// tombstone removal compaction should not promote level
assert strategy.getLevelSize(0) == 1;
}