You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2012/07/13 20:18:00 UTC
git commit: Track tombstone for LCS;
patch by yukim reviewed by jbellis for CASSANDRA-4234
Updated Branches:
refs/heads/trunk 13f8eee99 -> 0091af932
Track tombstone for LCS; patch by yukim reviewed by jbellis for CASSANDRA-4234
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0091af93
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0091af93
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0091af93
Branch: refs/heads/trunk
Commit: 0091af932c6ef65a0a5917f123fe24398b79c079
Parents: 13f8eee
Author: Yuki Morishita <yu...@apache.org>
Authored: Fri Jul 13 13:17:51 2012 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Fri Jul 13 13:17:51 2012 -0500
----------------------------------------------------------------------
CHANGES.txt | 2 +-
.../db/compaction/AbstractCompactionStrategy.java | 44 ++++++++++++
.../db/compaction/LeveledCompactionStrategy.java | 45 ++++++++++++-
.../cassandra/db/compaction/LeveledManifest.java | 6 ++
.../cassandra/db/compaction/OperationType.java | 2 +
.../compaction/SizeTieredCompactionStrategy.java | 35 +---------
.../cassandra/db/compaction/CompactionsTest.java | 53 ++++++++++++++-
7 files changed, 148 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0091af93/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 03e3fba..fb66fd6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,7 +12,7 @@
* add inter-node message compression (CASSANDRA-3127)
* remove COPP (CASSANDRA-2479)
* Track tombstone expiration and compact when tombstone content is
- higher than a configurable threshold, default 20% (CASSANDRA-3442)
+ higher than a configurable threshold, default 20% (CASSANDRA-3442, 4234)
* update MurmurHash to version 3 (CASSANDRA-2975)
* (CLI) track elapsed time for `delete' operation (CASSANDRA-4060)
* (CLI) jline version is bumped to 1.0 to properly support
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0091af93/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 41128b0..bf6c87f 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -38,15 +38,23 @@ import org.apache.cassandra.service.StorageService;
*/
public abstract class AbstractCompactionStrategy
{
+ protected static final float DEFAULT_TOMBSTONE_THRESHOLD = 0.2f;
+ protected static final String TOMBSTONE_THRESHOLD_KEY = "tombstone_threshold";
+
protected final ColumnFamilyStore cfs;
protected final Map<String, String> options;
+ protected float tombstoneThreshold;
+
protected AbstractCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
{
assert cfs != null;
this.cfs = cfs;
this.options = options;
+ String optionValue = options.get(TOMBSTONE_THRESHOLD_KEY);
+ tombstoneThreshold = (null != optionValue) ? Float.parseFloat(optionValue) : DEFAULT_TOMBSTONE_THRESHOLD;
+
// start compactions in five minutes (if no flushes have occurred by then to do so)
Runnable runnable = new Runnable()
{
@@ -146,4 +154,40 @@ public abstract class AbstractCompactionStrategy
{
return getScanners(toCompact, null);
}
+
+ /**
+ * @param sstable SSTable to check
+ * @param gcBefore time to drop tombstones
+ * @return true if given sstable's tombstones are expected to be removed
+ */
+ protected boolean worthDroppingTombstones(SSTableReader sstable, int gcBefore)
+ {
+ double droppableRatio = sstable.getEstimatedDroppableTombstoneRatio(gcBefore);
+ if (droppableRatio <= tombstoneThreshold)
+ return false;
+
+ Set<SSTableReader> overlaps = cfs.getOverlappingSSTables(Collections.singleton(sstable));
+ if (overlaps.isEmpty())
+ {
+ // there is no overlap, tombstones are safely droppable
+ return true;
+ }
+ else
+ {
+ // what percentage of columns do we expect to compact outside of overlap?
+ // first, calculate estimated keys that do not overlap
+ long keys = sstable.estimatedKeys();
+ Set<Range<Token>> ranges = new HashSet<Range<Token>>();
+ for (SSTableReader overlap : overlaps)
+ ranges.add(new Range<Token>(overlap.first.token, overlap.last.token, overlap.partitioner));
+ long remainingKeys = keys - sstable.estimatedKeysForRanges(ranges);
+ // next, calculate what percentage of columns we have within those keys
+ double remainingKeysRatio = ((double) remainingKeys) / keys;
+ long columns = sstable.getEstimatedColumnCount().percentile(remainingKeysRatio) * remainingKeys;
+ double remainingColumnsRatio = ((double) columns) / (sstable.getEstimatedColumnCount().count() * sstable.getEstimatedColumnCount().mean());
+
+ // return if we still expect to have droppable tombstones in rest of columns
+ return remainingColumnsRatio * droppableRatio > tombstoneThreshold;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0091af93/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 3e124de..9ac4fce 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -23,11 +23,11 @@ import java.util.concurrent.atomic.AtomicReference;
import com.google.common.base.Joiner;
import com.google.common.collect.*;
+import com.google.common.primitives.Doubles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
@@ -110,13 +110,22 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
}
Collection<SSTableReader> sstables = manifest.getCompactionCandidates();
+ OperationType op = OperationType.COMPACTION;
if (sstables.isEmpty())
{
- logger.debug("No compaction necessary for {}", this);
- return null;
+ // if there is no sstable to compact in standard way, try compacting based on droppable tombstone ratio
+ SSTableReader sstable = findDroppableSSTable(gcBefore);
+ if (sstable == null)
+ {
+ logger.debug("No compaction necessary for {}", this);
+ return null;
+ }
+ sstables = Collections.singleton(sstable);
+ op = OperationType.TOMBSTONE_COMPACTION;
}
LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, sstables, gcBefore, this.maxSSTableSizeInMB);
+ newTask.setCompactionType(op);
return task.compareAndSet(currentTask, newTask)
? newTask
: null;
@@ -148,6 +157,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
case CLEANUP:
case SCRUB:
case UPGRADE_SSTABLES:
+ case TOMBSTONE_COMPACTION: // Also when performing tombstone removal.
manifest.replace(listChangedNotification.removed, listChangedNotification.added);
break;
default:
@@ -280,4 +290,33 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
{
return String.format("LCS@%d(%s)", hashCode(), cfs.columnFamily);
}
+
+ private SSTableReader findDroppableSSTable(final int gcBefore)
+ {
+ level:
+ for (int i = manifest.getLevelCount(); i >= 0; i--)
+ {
+ // sort sstables by droppable ratio in descending order
+ SortedSet<SSTableReader> sstables = manifest.getLevelSorted(i, new Comparator<SSTableReader>()
+ {
+ public int compare(SSTableReader o1, SSTableReader o2)
+ {
+ double r1 = o1.getEstimatedDroppableTombstoneRatio(gcBefore);
+ double r2 = o2.getEstimatedDroppableTombstoneRatio(gcBefore);
+ return -1 * Doubles.compare(r1, r2);
+ }
+ });
+ if (sstables.isEmpty())
+ continue;
+
+ for (SSTableReader sstable : sstables)
+ {
+ if (sstable.getEstimatedDroppableTombstoneRatio(gcBefore) <= tombstoneThreshold)
+ continue level;
+ else if (!sstable.isMarkedSuspect() && worthDroppingTombstones(sstable, gcBefore))
+ return sstable;
+ }
+ }
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0091af93/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 567c919..53efb80 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -22,6 +22,7 @@ import java.io.IOError;
import java.io.IOException;
import java.util.*;
+import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
@@ -568,6 +569,11 @@ public class LeveledManifest
return 0;
}
+ public synchronized SortedSet<SSTableReader> getLevelSorted(int level, Comparator<SSTableReader> comparator)
+ {
+ return ImmutableSortedSet.copyOf(comparator, generations[level]);
+ }
+
public List<SSTableReader> getLevel(int i)
{
return generations[i];
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0091af93/src/java/org/apache/cassandra/db/compaction/OperationType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java
index 79f6c5e..e46e92d 100644
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@ -27,6 +27,8 @@ public enum OperationType
SCRUB("Scrub"),
UPGRADE_SSTABLES("Upgrade sstables"),
INDEX_BUILD("Secondary index build"),
+ /** Compaction for tombstone removal */
+ TOMBSTONE_COMPACTION("Tombstone Compaction"),
UNKNOWN("Unkown compaction type");
private final String type;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0091af93/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 67d2e77..d1ac516 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -24,8 +24,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.utils.Pair;
@@ -34,12 +32,9 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
{
private static final Logger logger = LoggerFactory.getLogger(SizeTieredCompactionStrategy.class);
protected static final long DEFAULT_MIN_SSTABLE_SIZE = 50L * 1024L * 1024L;
- protected static final float DEFAULT_TOMBSTONE_THRESHOLD = 0.2f;
protected static final String MIN_SSTABLE_SIZE_KEY = "min_sstable_size";
- protected static final String TOMBSTONE_THRESHOLD_KEY = "tombstone_threshold";
protected long minSSTableSize;
protected volatile int estimatedRemainingTasks;
- protected float tombstoneThreshold;
public SizeTieredCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
{
@@ -49,8 +44,6 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
minSSTableSize = (null != optionValue) ? Long.parseLong(optionValue) : DEFAULT_MIN_SSTABLE_SIZE;
cfs.setMaximumCompactionThreshold(cfs.metadata.getMaxCompactionThreshold());
cfs.setMinimumCompactionThreshold(cfs.metadata.getMinCompactionThreshold());
- optionValue = options.get(TOMBSTONE_THRESHOLD_KEY);
- tombstoneThreshold = (null != optionValue) ? Float.parseFloat(optionValue) : DEFAULT_TOMBSTONE_THRESHOLD;
}
public AbstractCompactionTask getNextBackgroundTask(final int gcBefore)
@@ -90,34 +83,8 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
{
for (SSTableReader table : bucket)
{
- double droppableRatio = table.getEstimatedDroppableTombstoneRatio(gcBefore);
- if (droppableRatio <= tombstoneThreshold)
- continue;
-
- Set<SSTableReader> overlaps = cfs.getOverlappingSSTables(Collections.singleton(table));
- if (overlaps.isEmpty())
- {
- // there is no overlap, tombstones are safely droppable
+ if (worthDroppingTombstones(table, gcBefore))
prunedBuckets.add(Collections.singletonList(table));
- }
- else
- {
- // what percentage of columns do we expect to compact outside of overlap?
- // first, calculate estimated keys that do not overlap
- long keys = table.estimatedKeys();
- Set<Range<Token>> ranges = new HashSet<Range<Token>>();
- for (SSTableReader overlap : overlaps)
- ranges.add(new Range<Token>(overlap.first.token, overlap.last.token, overlap.partitioner));
- long remainingKeys = keys - table.estimatedKeysForRanges(ranges);
- // next, calculate what percentage of columns we have within those keys
- double remainingKeysRatio = ((double) remainingKeys) / keys;
- long columns = table.getEstimatedColumnCount().percentile(remainingKeysRatio) * remainingKeys;
- double remainingColumnsRatio = ((double) columns) / (table.getEstimatedColumnCount().count() * table.getEstimatedColumnCount().mean());
-
- // if we still expect to have droppable tombstones in rest of columns, then try compacting it
- if (remainingColumnsRatio * droppableRatio > tombstoneThreshold)
- prunedBuckets.add(Collections.singletonList(table));
- }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0091af93/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index b1ae773..2057822 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -107,7 +107,7 @@ public class CompactionsTest extends SchemaLoader
* Test to see if sstable has enough expired columns, it is compacted itself.
*/
@Test
- public void testSingleSSTableCompaction() throws Exception
+ public void testSingleSSTableCompactionWithSizeTieredCompaction() throws Exception
{
Table table = Table.open(TABLE1);
ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
@@ -154,6 +154,57 @@ public class CompactionsTest extends SchemaLoader
}
@Test
+ public void testSingleSSTableCompactionWithLeveledCompaction() throws Exception
+ {
+ Table table = Table.open(TABLE1);
+ ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
+ store.clearUnsafe();
+ store.metadata.gcGraceSeconds(1);
+ store.setCompactionStrategyClass(LeveledCompactionStrategy.class.getCanonicalName());
+
+ LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) store.getCompactionStrategy();
+
+ // disable compaction while flushing
+ store.disableAutoCompaction();
+
+ long timestamp = System.currentTimeMillis();
+ for (int i = 0; i < 10; i++)
+ {
+ DecoratedKey key = Util.dk(Integer.toString(i));
+ RowMutation rm = new RowMutation(TABLE1, key.key);
+ for (int j = 0; j < 10; j++)
+ rm.add(new QueryPath("Standard1", null, ByteBufferUtil.bytes(Integer.toString(j))),
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ timestamp,
+ j > 0 ? 3 : 0); // let first column never expire, since deleting all columns does not produce sstable
+ rm.apply();
+ }
+ store.forceBlockingFlush();
+ assertEquals(1, store.getSSTables().size());
+ long originalSize = store.getSSTables().iterator().next().uncompressedLength();
+
+ // wait enough to force single compaction
+ TimeUnit.SECONDS.sleep(5);
+
+ store.setMinimumCompactionThreshold(2);
+ store.setMaximumCompactionThreshold(4);
+ FBUtilities.waitOnFuture(CompactionManager.instance.submitBackground(store));
+ while (CompactionManager.instance.getPendingTasks() > 0 || CompactionManager.instance.getActiveCompactions() > 0)
+ TimeUnit.SECONDS.sleep(1);
+
+ // and sstable with ttl should be compacted
+ assertEquals(1, store.getSSTables().size());
+ long size = store.getSSTables().iterator().next().uncompressedLength();
+ assertTrue("should be less than " + originalSize + ", but was " + size, size < originalSize);
+
+ // make sure max timestamp of compacted sstables is recorded properly after compaction.
+ assertMaxTimestamp(store, timestamp);
+
+ // tombstone removal compaction should not promote level
+ assert strategy.getLevelSize(0) == 1;
+ }
+
+ @Test
public void testSuperColumnCompactions() throws IOException, ExecutionException, InterruptedException
{
Table table = Table.open(TABLE1);