You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/03/08 09:48:09 UTC
[2/4] git commit: Don't promote sstables for cleanup,
scrub and updateSSTables
Don't promote sstables for cleanup, scrub and updateSSTables
patch by slebresne; reviewed by jbellis for CASSANDRA-3989
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/65059cf4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/65059cf4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/65059cf4
Branch: refs/heads/trunk
Commit: 65059cf48e0794e0459b1882961616f55382c756
Parents: 09f091a
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Mar 7 18:35:15 2012 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Thu Mar 8 09:46:09 2012 +0100
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../org/apache/cassandra/db/ColumnFamilyStore.java | 11 +++--
src/java/org/apache/cassandra/db/DataTracker.java | 15 +++---
.../db/compaction/AbstractCompactionTask.java | 8 +++
.../cassandra/db/compaction/CompactionManager.java | 8 ++-
.../cassandra/db/compaction/CompactionTask.java | 8 ++--
.../db/compaction/LeveledCompactionStrategy.java | 13 +++++-
.../cassandra/db/compaction/LeveledManifest.java | 35 +++++++++++---
.../cassandra/db/compaction/OperationType.java | 1 +
.../SSTableListChangedNotification.java | 7 +++-
.../cassandra/db/compaction/CompactionsTest.java | 3 +-
11 files changed, 81 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 16f8f0c..bf8d9fc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -45,6 +45,8 @@ Merged from 1.0:
* Pig: Composite column support (CASSANDRA-384)
* Avoid NPE during repair when a keyspace has no CFs (CASSANDRA-3988)
* Fix division-by-zero error on get_slice (CASSANDRA-4000)
+ * don't change manifest level for cleanup, scrub, and upgradesstables
+ operations under LeveledCompactionStrategy (CASSANDRA-3989)
1.1-beta1
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 2872d4f..48cdb8c 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -51,6 +51,7 @@ import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
import org.apache.cassandra.db.filter.ExtendedFilter;
+import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.filter.IFilter;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
@@ -937,15 +938,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this);
}
- public void markCompacted(Collection<SSTableReader> sstables)
+ public void markCompacted(Collection<SSTableReader> sstables, OperationType compactionType)
{
assert !sstables.isEmpty();
- data.markCompacted(sstables);
+ data.markCompacted(sstables, compactionType);
}
- public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Iterable<SSTableReader> replacements)
+ public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Iterable<SSTableReader> replacements, OperationType compactionType)
{
- data.replaceCompactedSSTables(sstables, replacements);
+ data.replaceCompactedSSTables(sstables, replacements, compactionType);
}
void replaceFlushed(Memtable memtable, SSTableReader sstable)
@@ -1966,6 +1967,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
}
if (!truncatedSSTables.isEmpty())
- markCompacted(truncatedSSTables);
+ markCompacted(truncatedSSTables, OperationType.UNKNOWN);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index d346eab..13c3369 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.notifications.INotification;
import org.apache.cassandra.notifications.INotificationConsumer;
@@ -237,16 +238,16 @@ public class DataTracker
while (!view.compareAndSet(currentView, newView));
}
- public void markCompacted(Collection<SSTableReader> sstables)
+ public void markCompacted(Collection<SSTableReader> sstables, OperationType compactionType)
{
replace(sstables, Collections.<SSTableReader>emptyList());
- notifySSTablesChanged(sstables, Collections.<SSTableReader>emptyList());
+ notifySSTablesChanged(sstables, Collections.<SSTableReader>emptyList(), compactionType);
}
- public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Iterable<SSTableReader> replacements)
+ public void replaceCompactedSSTables(Collection<SSTableReader> sstables, Iterable<SSTableReader> replacements, OperationType compactionType)
{
replace(sstables, replacements);
- notifySSTablesChanged(sstables, replacements);
+ notifySSTablesChanged(sstables, replacements, compactionType);
}
public void addInitialSSTables(Collection<SSTableReader> sstables)
@@ -286,7 +287,7 @@ public class DataTracker
// notifySSTablesChanged -> LeveledManifest.promote doesn't like a no-op "promotion"
return;
}
- notifySSTablesChanged(notCompacting, Collections.<SSTableReader>emptySet());
+ notifySSTablesChanged(notCompacting, Collections.<SSTableReader>emptySet(), OperationType.UNKNOWN);
postReplace(notCompacting, Collections.<SSTableReader>emptySet());
}
@@ -518,11 +519,11 @@ public class DataTracker
return (double) falseCount / (trueCount + falseCount);
}
- public void notifySSTablesChanged(Iterable<SSTableReader> removed, Iterable<SSTableReader> added)
+ public void notifySSTablesChanged(Iterable<SSTableReader> removed, Iterable<SSTableReader> added, OperationType compactionType)
{
for (INotificationConsumer subscriber : subscribers)
{
- INotification notification = new SSTableListChangedNotification(added, removed);
+ INotification notification = new SSTableListChangedNotification(added, removed, compactionType);
subscriber.handleNotification(notification, this);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
index e031e07..9826941 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java
@@ -30,12 +30,14 @@ public abstract class AbstractCompactionTask
protected final ColumnFamilyStore cfs;
protected Collection<SSTableReader> sstables;
protected boolean isUserDefined;
+ protected OperationType compactionType;
public AbstractCompactionTask(ColumnFamilyStore cfs, Collection<SSTableReader> sstables)
{
this.cfs = cfs;
this.sstables = sstables;
this.isUserDefined = false;
+ this.compactionType = OperationType.COMPACTION;
}
public abstract int execute(CompactionExecutorStatsCollector collector) throws IOException;
@@ -93,4 +95,10 @@ public abstract class AbstractCompactionTask
this.isUserDefined = isUserDefined;
return this;
}
+
+ public AbstractCompactionTask setCompactionType(OperationType compactionType)
+ {
+ this.compactionType = compactionType;
+ return this;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 7a36625..590c3d7 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -211,8 +211,10 @@ public class CompactionManager implements CompactionManagerMBean
for (final SSTableReader sstable : sstables)
{
// SSTables are marked by the caller
+ // NOTE: it is important that the task create one and only one sstable, even for Leveled compaction (see LeveledManifest.replace())
CompactionTask task = new CompactionTask(cfs, Collections.singletonList(sstable), Integer.MAX_VALUE);
task.isUserDefined(true);
+ task.setCompactionType(OperationType.UPGRADE_SSTABLES);
task.execute(executor);
}
}
@@ -624,7 +626,7 @@ public class CompactionManager implements CompactionManagerMBean
if (newSstable == null)
{
- cfs.markCompacted(Arrays.asList(sstable));
+ cfs.markCompacted(Arrays.asList(sstable), OperationType.SCRUB);
if (badRows > 0)
logger.warn("No valid rows found while scrubbing " + sstable + "; it is marked for deletion now. If you want to attempt manual recovery, you can find a copy in the pre-scrub snapshot");
else
@@ -632,7 +634,7 @@ public class CompactionManager implements CompactionManagerMBean
}
else
{
- cfs.replaceCompactedSSTables(Arrays.asList(sstable), Arrays.asList(newSstable));
+ cfs.replaceCompactedSSTables(Arrays.asList(sstable), Arrays.asList(newSstable), OperationType.SCRUB);
logger.info("Scrub of " + sstable + " complete: " + goodRows + " rows in new sstable and " + emptyRows + " empty (tombstoned) rows dropped");
if (badRows > 0)
logger.warn("Unable to recover " + badRows + " rows that were skipped. You can attempt manual recovery from the pre-scrub snapshot. You can also run nodetool repair to transfer the data from a healthy replica, if any");
@@ -777,7 +779,7 @@ public class CompactionManager implements CompactionManagerMBean
// flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd
cfs.indexManager.flushIndexesBlocking();
- cfs.replaceCompactedSSTables(Arrays.asList(sstable), results);
+ cfs.replaceCompactedSSTables(Arrays.asList(sstable), results, OperationType.CLEANUP);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 4d77af0..6961490 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -121,8 +121,8 @@ public class CompactionTask extends AbstractCompactionTask
logger.debug("Expected bloom filter size : " + keysPerSSTable);
AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction()
- ? new ParallelCompactionIterable(OperationType.COMPACTION, toCompact, controller)
- : new CompactionIterable(OperationType.COMPACTION, toCompact, controller);
+ ? new ParallelCompactionIterable(compactionType, toCompact, controller)
+ : new CompactionIterable(compactionType, toCompact, controller);
CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
Iterator<AbstractCompactedRow> nni = Iterators.filter(iter, Predicates.notNull());
Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();
@@ -143,7 +143,7 @@ public class CompactionTask extends AbstractCompactionTask
// don't mark compacted in the finally block, since if there _is_ nondeleted data,
// we need to sync it (via closeAndOpen) first, so there is no period during which
// a crash could cause data loss.
- cfs.markCompacted(toCompact);
+ cfs.markCompacted(toCompact, compactionType);
return 0;
}
@@ -199,7 +199,7 @@ public class CompactionTask extends AbstractCompactionTask
collector.finishCompaction(ci);
}
- cfs.replaceCompactedSSTables(toCompact, sstables);
+ cfs.replaceCompactedSSTables(toCompact, sstables, compactionType);
// TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up
for (Entry<SSTableReader, Map<DecoratedKey, Long>> ssTableReaderMapEntry : cachedKeyMap.entrySet())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 3bbbeb6..0d5bdc7 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -132,7 +132,18 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem
else if (notification instanceof SSTableListChangedNotification)
{
SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification;
- manifest.promote(listChangedNotification.removed, listChangedNotification.added);
+ switch (listChangedNotification.compactionType)
+ {
+ // Cleanup, scrub and updateSSTable shouldn't promote (see #3989)
+ case CLEANUP:
+ case SCRUB:
+ case UPGRADE_SSTABLES:
+ manifest.replace(listChangedNotification.removed, listChangedNotification.added);
+ break;
+ default:
+ manifest.promote(listChangedNotification.removed, listChangedNotification.added);
+ break;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 3c02cda..bbb41a5 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -60,7 +60,6 @@ public class LeveledManifest
private final Map<SSTableReader, Integer> sstableGenerations;
private final RowPosition[] lastCompactedKeys;
private final int maxSSTableSizeInMB;
- private int levelCount;
private LeveledManifest(ColumnFamilyStore cfs, int maxSSTableSizeInMB)
{
@@ -165,11 +164,10 @@ public class LeveledManifest
int maximumLevel = 0;
for (SSTableReader sstable : removed)
{
- int thisLevel = levelOf(sstable);
+ int thisLevel = remove(sstable);
assert thisLevel >= 0;
maximumLevel = Math.max(maximumLevel, thisLevel);
minimumLevel = Math.min(minimumLevel, thisLevel);
- remove(sstable);
}
// it's valid to do a remove w/o an add (e.g. on truncate)
@@ -189,6 +187,22 @@ public class LeveledManifest
serialize();
}
+ public synchronized void replace(Iterable<SSTableReader> removed, Iterable<SSTableReader> added)
+ {
+ // replace is for compaction operation that don't really change the
+ // content of a sstable (cleanup, scrub) and much replace one sstable by another
+ assert Iterables.size(removed) == 1;
+ assert Iterables.size(added) == 1;
+ SSTableReader toRemove = removed.iterator().next();
+ SSTableReader toAdd = added.iterator().next();
+ logDistribution();
+ if (logger.isDebugEnabled())
+ logger.debug("Replacing " + removed + " by " + toAdd);
+
+ add(toAdd, remove(toRemove));
+ serialize();
+ }
+
private String toString(Iterable<SSTableReader> sstables)
{
StringBuilder builder = new StringBuilder();
@@ -291,12 +305,15 @@ public class LeveledManifest
private void logDistribution()
{
- for (int i = 0; i < generations.length; i++)
+ if (logger.isDebugEnabled())
{
- if (!generations[i].isEmpty())
+ for (int i = 0; i < generations.length; i++)
{
- logger.debug("L{} contains {} SSTables ({} bytes) in {}",
- new Object[] {i, generations[i].size(), SSTableReader.getTotalBytes(generations[i]), this});
+ if (!generations[i].isEmpty())
+ {
+ logger.debug("L{} contains {} SSTables ({} bytes) in {}",
+ new Object[] {i, generations[i].size(), SSTableReader.getTotalBytes(generations[i]), this});
+ }
}
}
}
@@ -310,16 +327,18 @@ public class LeveledManifest
return level.intValue();
}
- private void remove(SSTableReader reader)
+ private int remove(SSTableReader reader)
{
int level = levelOf(reader);
assert level >= 0 : reader + " not present in manifest";
generations[level].remove(reader);
sstableGenerations.remove(reader);
+ return level;
}
private void add(SSTableReader sstable, int level)
{
+ assert level < generations.length : "Invalid level " + level + " out of " + (generations.length - 1);
generations[level].add(sstable);
sstableGenerations.put(sstable, Integer.valueOf(level));
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/src/java/org/apache/cassandra/db/compaction/OperationType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java
index 75e3d06..79f6c5e 100644
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@ -25,6 +25,7 @@ public enum OperationType
ROW_CACHE_SAVE("Row cache save"),
CLEANUP("Cleanup"),
SCRUB("Scrub"),
+ UPGRADE_SSTABLES("Upgrade sstables"),
INDEX_BUILD("Secondary index build"),
UNKNOWN("Unkown compaction type");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java b/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java
index 770fd48..ca7ead9 100644
--- a/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java
+++ b/src/java/org/apache/cassandra/notifications/SSTableListChangedNotification.java
@@ -19,13 +19,18 @@ package org.apache.cassandra.notifications;
import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.db.compaction.OperationType;
+
public class SSTableListChangedNotification implements INotification
{
public final Iterable<SSTableReader> removed;
public final Iterable<SSTableReader> added;
- public SSTableListChangedNotification(Iterable<SSTableReader> added, Iterable<SSTableReader> removed)
+ public final OperationType compactionType;
+
+ public SSTableListChangedNotification(Iterable<SSTableReader> added, Iterable<SSTableReader> removed, OperationType compactionType)
{
this.removed = removed;
this.added = added;
+ this.compactionType = compactionType;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/65059cf4/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index a9c66bf..bd49c23 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.CleanupHelper;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.io.sstable.*;
@@ -273,7 +274,7 @@ public class CompactionsTest extends CleanupHelper
assertEquals(2, store.getSSTables().size());
// Now, we remove the sstable that was just created to force the use of EchoedRow (so that it doesn't hide the problem)
- store.markCompacted(Collections.singleton(tmpSSTable));
+ store.markCompacted(Collections.singleton(tmpSSTable), OperationType.UNKNOWN);
assertEquals(1, store.getSSTables().size());
// Now assert we do have the 4 keys