You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/10/31 17:06:48 UTC
svn commit: r1195542 - in /cassandra/trunk: ./
src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/db/compaction/
src/java/org/apache/cassandra/db/index/
src/java/org/apache/cassandra/db/index/keys/
src/java/org/apache/cassandra/db/migratio...
Author: jbellis
Date: Mon Oct 31 16:06:48 2011
New Revision: 1195542
URL: http://svn.apache.org/viewvc?rev=1195542&view=rev
Log:
replace compactionlock use in schema migration by checking CFS.isInvalidD
patch by jbellis; reviewed by slebresne for CASSANDRA-3116
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java
cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java
cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1195542&r1=1195541&r2=1195542&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Oct 31 16:06:48 2011
@@ -2,6 +2,7 @@
* off-heap cache to use sun.misc.Unsafe instead of JNA (CASSANDRA-3271)
* EACH_QUORUM is only supported for writes (CASSANDRA-3272)
* cleanup usage of StorageService.setMode() (CASANDRA-3388)
+ * replace compactionlock use in schema migration by checking CFS.isInvalidD
1.0.1
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1195542&r1=1195541&r2=1195542&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon Oct 31 16:06:48 2011
@@ -25,8 +25,6 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -99,7 +97,7 @@ public class ColumnFamilyStore implement
public final CFMetaData metadata;
public final IPartitioner partitioner;
private final String mbeanName;
- private boolean invalid = false;
+ private volatile boolean valid = true;
/* Memtables and SSTables on disk for this column family */
private final DataTracker data;
@@ -129,9 +127,6 @@ public class ColumnFamilyStore implement
private volatile DefaultInteger keyCacheSaveInSeconds;
private volatile DefaultInteger rowCacheKeysToSave;
- /** Lock to allow migrations to block all flushing, so we can be sure not to write orphaned data files */
- public final Lock flushLock = new ReentrantLock();
-
public static enum CacheType
{
KEY_CACHE_TYPE("KeyCache"),
@@ -192,6 +187,7 @@ public class ColumnFamilyStore implement
if (metadata.compactionStrategyClass.equals(compactionStrategy.getClass()) && metadata.compactionStrategyOptions.equals(compactionStrategy.getOptions()))
return;
+ // TODO is there a way to avoid locking here?
CompactionManager.instance.getCompactionLock().lock();
try
{
@@ -258,12 +254,12 @@ public class ColumnFamilyStore implement
}
}
- // called when dropping or renaming a CF. Performs mbean housekeeping and invalidates CFS to other operations.
- public void unregisterMBean()
+ /** call when dropping or renaming a CF. Performs mbean housekeeping and invalidates CFS to other operations */
+ public void invalidate()
{
try
{
- invalid = true;
+ valid = false;
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName nameObj = new ObjectName(mbeanName);
if (mbs.isRegistered(nameObj))
@@ -719,13 +715,6 @@ public class ColumnFamilyStore implement
}
}
- public boolean isDropped()
- {
- return isIndex()
- ? Schema.instance.getCFMetaData(table.name, getParentColumnfamily()) == null
- : Schema.instance.getCFMetaData(metadata.cfId) == null;
- }
-
public Future<?> forceFlush()
{
// during index build, 2ary index memtables can be dirty even if parent is not. if so,
@@ -983,14 +972,14 @@ public class ColumnFamilyStore implement
CompactionManager.instance.submitBackground(this);
}
- public boolean isInvalid()
+ public boolean isValid()
{
- return invalid;
+ return valid;
}
- public void removeAllSSTables() throws IOException
+ public void unreferenceSSTables() throws IOException
{
- data.removeAllSSTables();
+ data.unreferenceSSTables();
indexManager.removeAllIndexes();
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java?rev=1195542&r1=1195541&r2=1195542&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java Mon Oct 31 16:06:48 2011
@@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.cache.AutoSavingCache;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.notifications.INotification;
import org.apache.cassandra.notifications.INotificationConsumer;
@@ -130,6 +129,18 @@ public class DataTracker
public void replaceFlushed(Memtable memtable, SSTableReader sstable)
{
+ if (!cfstore.isValid())
+ {
+ View currentView, newView;
+ do
+ {
+ currentView = view.get();
+ newView = currentView.replaceFlushed(memtable, sstable).replace(Arrays.asList(sstable), Collections.<SSTableReader>emptyList());
+ }
+ while (!view.compareAndSet(currentView, newView));
+ return;
+ }
+
View currentView, newView;
do
{
@@ -213,6 +224,16 @@ public class DataTracker
*/
public void unmarkCompacting(Collection<SSTableReader> unmark)
{
+ if (!cfstore.isValid())
+ {
+ // We don't know if the original compaction suceeded or failed, which makes it difficult to know
+ // if the sstable reference has already been released.
+ // A "good enough" approach is to mark the sstables involved compacted, which if compaction succeeded
+ // is harmlessly redundant, and if it failed ensures that at least the sstable will get deleted on restart.
+ for (SSTableReader sstable : unmark)
+ sstable.markCompacted();
+ }
+
View currentView, newView;
do
{
@@ -244,9 +265,23 @@ public class DataTracker
notifyAdded(sstable);
}
- public void removeAllSSTables()
+ /**
+ * removes all sstables that are not busy compacting.
+ */
+ public void unreferenceSSTables()
{
- replace(getSSTables(), Collections.<SSTableReader>emptyList());
+ Set<SSTableReader> notCompacting;
+
+ View currentView, newView;
+ do
+ {
+ currentView = view.get();
+ notCompacting = Sets.difference(ImmutableSet.copyOf(currentView.sstables), currentView.compacting);
+ newView = currentView.replace(notCompacting, Collections.<SSTableReader>emptySet());
+ }
+ while (!view.compareAndSet(currentView, newView));
+
+ postReplace(notCompacting, Collections.<SSTableReader>emptySet());
}
/** (Re)initializes the tracker, purging all references. */
@@ -261,6 +296,17 @@ public class DataTracker
private void replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
{
+ // removing sstables that are not marked compacting is a bug, since that means we could
+ // race with a compaction check
+ for (SSTableReader sstable : oldSSTables)
+ assert view.get().compacting.contains(sstable);
+
+ if (!cfstore.isValid())
+ {
+ removeOldSSTablesSize(replacements);
+ replacements = Collections.emptyList();
+ }
+
View currentView, newView;
do
{
@@ -269,6 +315,11 @@ public class DataTracker
}
while (!view.compareAndSet(currentView, newView));
+ postReplace(oldSSTables, replacements);
+ }
+
+ private void postReplace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
+ {
addNewSSTablesSize(replacements);
removeOldSSTablesSize(oldSSTables);
@@ -298,7 +349,9 @@ public class DataTracker
if (logger.isDebugEnabled())
logger.debug(String.format("removing %s from list of files tracked for %s.%s",
sstable.descriptor, cfstore.table.name, cfstore.getColumnFamilyName()));
- sstable.markCompacted();
+ boolean firstToCompact = sstable.markCompacted();
+ assert firstToCompact : sstable + " was already marked compacted";
+
sstable.releaseReference();
liveSize.addAndGet(-sstable.bytesOnDisk());
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=1195542&r1=1195541&r2=1195542&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Mon Oct 31 16:06:48 2011
@@ -281,19 +281,8 @@ public class Memtable
{
public void runMayThrow() throws IOException
{
- cfs.flushLock.lock();
- try
- {
- if (!cfs.isDropped())
- {
- SSTableReader sstable = writeSortedContents(context);
- cfs.replaceFlushed(Memtable.this, sstable);
- }
- }
- finally
- {
- cfs.flushLock.unlock();
- }
+ SSTableReader sstable = writeSortedContents(context);
+ cfs.replaceFlushed(Memtable.this, sstable);
latch.countDown();
}
});
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1195542&r1=1195541&r2=1195542&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Mon Oct 31 16:06:48 2011
@@ -29,7 +29,6 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
@@ -41,7 +40,6 @@ import org.apache.cassandra.config.*;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.io.sstable.SSTableDeletingTask;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.FileUtils;
@@ -343,7 +341,7 @@ public class Table
return;
unloadCf(cfs);
- cfs.removeAllSSTables();
+ cfs.unreferenceSSTables();
}
// disassociate a cfs from this table instance.
@@ -361,7 +359,7 @@ public class Table
{
throw new IOException(e);
}
- cfs.unregisterMBean();
+ cfs.invalidate();
}
/** adds a cf to internal structures, ends up creating disk files). */
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1195542&r1=1195541&r2=1195542&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Mon Oct 31 16:06:48 2011
@@ -68,13 +68,12 @@ public class CompactionManager implement
/**
* compactionLock has two purposes:
- * - Compaction acquires its readLock so that multiple compactions can happen simultaneously,
- * but the KS/CF migtations acquire its writeLock, so they can be sure no new SSTables will
- * be created for a dropped CF posthumously. (Thus, compaction checks CFS.isValid while the
- * lock is acquired.)
* - "Special" compactions will acquire writelock instead of readlock to make sure that all
- * other compaction activity is quiesced and they can grab ALL the sstables to do something.
- * TODO this is too big a hammer -- we should only care about quiescing all for the given CFS.
+ * other compaction activity is quiesced and they can grab ALL the sstables to do something.
+ * - Some schema migrations cannot run concurrently with compaction. (Currently, this is
+ * only when changing compaction strategy -- see CFS.maybeReloadCompactionStrategy.)
+ *
+ * TODO this is too big a hammer -- we should only care about quiescing all for the given CFS.
*/
private final ReentrantReadWriteLock compactionLock = new ReentrantReadWriteLock();
@@ -117,9 +116,6 @@ public class CompactionManager implement
compactionLock.readLock().lock();
try
{
- if (cfs.isInvalid())
- return 0;
-
AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
List<AbstractCompactionTask> tasks = strategy.getBackgroundTasks(getDefaultGcBefore(cfs));
for (AbstractCompactionTask task : tasks)
@@ -160,8 +156,6 @@ public class CompactionManager implement
compactionLock.writeLock().lock();
try
{
- if (cfStore.isInvalid())
- return this;
Collection<SSTableReader> tocleanup = cfStore.getDataTracker().markCompacting(cfStore.getSSTables(), 1, Integer.MAX_VALUE);
if (tocleanup == null || tocleanup.isEmpty())
return this;
@@ -206,9 +200,6 @@ public class CompactionManager implement
compactionLock.writeLock().lock();
try
{
- if (cfStore.isInvalid())
- return this;
-
Collection<SSTableReader> toscrub = cfStore.getDataTracker().markCompacting(cfStore.getSSTables(), 1, Integer.MAX_VALUE);
if (toscrub == null || toscrub.isEmpty())
return this;
@@ -258,8 +249,6 @@ public class CompactionManager implement
compactionLock.writeLock().lock();
try
{
- if (cfStore.isInvalid())
- return this;
AbstractCompactionStrategy strategy = cfStore.getCompactionStrategy();
for (AbstractCompactionTask task : strategy.getMaximalTasks(gcBefore))
{
@@ -339,9 +328,6 @@ public class CompactionManager implement
compactionLock.readLock().lock();
try
{
- if (cfs.isInvalid())
- return this;
-
// look up the sstables now that we're on the compaction executor, so we don't try to re-compact
// something that was already being compacted earlier.
Collection<SSTableReader> sstables = new ArrayList<SSTableReader>();
@@ -433,8 +419,7 @@ public class CompactionManager implement
compactionLock.readLock().lock();
try
{
- if (!cfStore.isInvalid())
- doValidationCompaction(cfStore, validator);
+ doValidationCompaction(cfStore, validator);
return this;
}
finally
@@ -801,6 +786,14 @@ public class CompactionManager implement
*/
private void doValidationCompaction(ColumnFamilyStore cfs, AntiEntropyService.Validator validator) throws IOException
{
+ // this isn't meant to be race-proof, because it's not -- it won't cause bugs for a CFS to be dropped
+ // mid-validation, or to attempt to validate a droped CFS. this is just a best effort to avoid useless work,
+ // particularly in the scenario where a validation is submitted before the drop, and there are compactions
+ // started prior to the drop keeping some sstables alive. Since validationCompaction can run
+ // concurrently with other compactions, it would otherwise go ahead and scan those again.
+ if (!cfs.isValid())
+ return;
+
// flush first so everyone is validating data that is as similar as possible
try
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndex.java?rev=1195542&r1=1195541&r2=1195542&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndex.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndex.java Mon Oct 31 16:06:48 2011
@@ -134,7 +134,7 @@ public abstract class SecondaryIndex
/**
* Unregisters this index's mbean if one exists
*/
- public abstract void unregisterMbean();
+ public abstract void invalidate();
/**
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java?rev=1195542&r1=1195541&r2=1195542&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java Mon Oct 31 16:06:48 2011
@@ -244,7 +244,7 @@ public class SecondaryIndexManager
public void unregisterMBeans()
{
for (Map.Entry<ByteBuffer, SecondaryIndex> entry : indexesByColumn.entrySet())
- entry.getValue().unregisterMbean();
+ entry.getValue().invalidate();
}
/**
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java?rev=1195542&r1=1195541&r2=1195542&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java Mon Oct 31 16:06:48 2011
@@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory;
public class KeysIndex extends PerColumnSecondaryIndex
{
private static final Logger logger = LoggerFactory.getLogger(KeysIndex.class);
- private ColumnFamilyStore indexCfs;
+ private ColumnFamilyStore indexedCfs;
public KeysIndex()
{
@@ -57,7 +57,7 @@ public class KeysIndex extends PerColumn
ColumnDefinition columnDef = columnDefs.iterator().next();
CFMetaData indexedCfMetadata = CFMetaData.newIndexMetadata(baseCfs.metadata, columnDef,indexComparator());
- indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.table,
+ indexedCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.table,
indexedCfMetadata.cfName,
new LocalPartitioner(columnDef.getValidator()),
indexedCfMetadata);
@@ -78,9 +78,9 @@ public class KeysIndex extends PerColumn
return;
int localDeletionTime = (int) (System.currentTimeMillis() / 1000);
- ColumnFamily cfi = ColumnFamily.create(indexCfs.metadata);
+ ColumnFamily cfi = ColumnFamily.create(indexedCfs.metadata);
cfi.addTombstone(rowKey, localDeletionTime, column.timestamp());
- indexCfs.apply(valueKey, cfi);
+ indexedCfs.apply(valueKey, cfi);
if (logger.isDebugEnabled())
logger.debug("removed index entry for cleaned-up value {}:{}", valueKey, cfi);
}
@@ -88,7 +88,7 @@ public class KeysIndex extends PerColumn
@Override
public void insertColumn(DecoratedKey<?> valueKey, ByteBuffer rowKey, IColumn column)
{
- ColumnFamily cfi = ColumnFamily.create(indexCfs.metadata);
+ ColumnFamily cfi = ColumnFamily.create(indexedCfs.metadata);
if (column instanceof ExpiringColumn)
{
ExpiringColumn ec = (ExpiringColumn)column;
@@ -101,7 +101,7 @@ public class KeysIndex extends PerColumn
if (logger.isDebugEnabled())
logger.debug("applying index row {}:{}", valueKey, cfi);
- indexCfs.apply(valueKey, cfi);
+ indexedCfs.apply(valueKey, cfi);
}
@Override
@@ -113,8 +113,8 @@ public class KeysIndex extends PerColumn
@Override
public void removeIndex(ByteBuffer columnName) throws IOException
{
- indexCfs.removeAllSSTables();
- indexCfs.unregisterMBean();
+ indexedCfs.unreferenceSSTables();
+ indexedCfs.invalidate();
}
@Override
@@ -122,7 +122,7 @@ public class KeysIndex extends PerColumn
{
try
{
- indexCfs.forceBlockingFlush();
+ indexedCfs.forceBlockingFlush();
}
catch (ExecutionException e)
{
@@ -135,15 +135,15 @@ public class KeysIndex extends PerColumn
}
@Override
- public void unregisterMbean()
+ public void invalidate()
{
- indexCfs.unregisterMBean();
+ indexedCfs.invalidate();
}
@Override
public ColumnFamilyStore getUnderlyingCfs()
{
- return indexCfs;
+ return indexedCfs;
}
@Override
@@ -155,13 +155,13 @@ public class KeysIndex extends PerColumn
@Override
public String getIndexName()
{
- return indexCfs.columnFamily;
+ return indexedCfs.columnFamily;
}
@Override
public void renameIndex(String newCfName) throws IOException
{
- indexCfs.renameSSTables(indexCfs.columnFamily.replace(baseCfs.columnFamily, newCfName));
+ indexedCfs.renameSSTables(indexedCfs.columnFamily.replace(baseCfs.columnFamily, newCfName));
}
@Override
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java?rev=1195542&r1=1195541&r2=1195542&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java Mon Oct 31 16:06:48 2011
@@ -79,18 +79,7 @@ public class DropColumnFamily extends Mi
if (!StorageService.instance.isClientMode())
{
cfs.snapshot(Table.getTimestampedSnapshotName(cfs.columnFamily));
-
- CompactionManager.instance.getCompactionLock().lock();
- cfs.flushLock.lock();
- try
- {
- Table.open(ksm.name, schema).dropCf(cfm.cfId);
- }
- finally
- {
- cfs.flushLock.unlock();
- CompactionManager.instance.getCompactionLock().unlock();
- }
+ Table.open(ksm.name, schema).dropCf(cfm.cfId);
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java?rev=1195542&r1=1195541&r2=1195542&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java Mon Oct 31 16:06:48 2011
@@ -48,41 +48,25 @@ public class DropKeyspace extends Migrat
public void applyModels() throws IOException
{
String snapshotName = Table.getTimestampedSnapshotName(name);
- CompactionManager.instance.getCompactionLock().lock();
- try
- {
- KSMetaData ksm = schema.getTableDefinition(name);
+ KSMetaData ksm = schema.getTableDefinition(name);
- // remove all cfs from the table instance.
- for (CFMetaData cfm : ksm.cfMetaData().values())
+ // remove all cfs from the table instance.
+ for (CFMetaData cfm : ksm.cfMetaData().values())
+ {
+ ColumnFamilyStore cfs = Table.open(ksm.name, schema).getColumnFamilyStore(cfm.cfName);
+ schema.purge(cfm);
+ if (!StorageService.instance.isClientMode())
{
- ColumnFamilyStore cfs = Table.open(ksm.name, schema).getColumnFamilyStore(cfm.cfName);
- schema.purge(cfm);
- if (!StorageService.instance.isClientMode())
- {
- cfs.snapshot(snapshotName);
- cfs.flushLock.lock();
- try
- {
- Table.open(ksm.name, schema).dropCf(cfm.cfId);
- }
- finally
- {
- cfs.flushLock.unlock();
- }
- }
+ cfs.snapshot(snapshotName);
+ Table.open(ksm.name, schema).dropCf(cfm.cfId);
}
-
- // remove the table from the static instances.
- Table table = Table.clear(ksm.name, schema);
- assert table != null;
- // reset defs.
- schema.clearTableDefinition(ksm, newVersion);
- }
- finally
- {
- CompactionManager.instance.getCompactionLock().unlock();
}
+
+ // remove the table from the static instances.
+ Table table = Table.clear(ksm.name, schema);
+ assert table != null;
+ // reset defs.
+ schema.clearTableDefinition(ksm, newVersion);
}
public void subdeflate(org.apache.cassandra.db.migration.avro.Migration mi)
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1195542&r1=1195541&r2=1195542&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Mon Oct 31 16:06:48 2011
@@ -719,23 +719,28 @@ public class SSTableReader extends SSTab
* Mark the sstable as compacted.
* When calling this function, the caller must ensure that the SSTableReader is not referenced anywhere
* except for threads holding a reference.
+ *
+ * @return true if the this is the first time the file was marked compacted. With rare exceptions
+ * (see DataTracker.unmarkCompacted) calling this multiple times would be buggy.
*/
- public void markCompacted()
+ public boolean markCompacted()
{
if (logger.isDebugEnabled())
logger.debug("Marking " + getFilename() + " compacted");
+
+ if (isCompacted.getAndSet(true))
+ return false;
+
try
{
if (!new File(descriptor.filenameFor(Component.COMPACTED_MARKER)).createNewFile())
- throw new IOException("Unable to create compaction marker");
+ throw new IOException("Compaction marker already exists");
}
catch (IOException e)
{
throw new IOError(e);
}
-
- boolean alreadyCompacted = isCompacted.getAndSet(true);
- assert !alreadyCompacted : this + " was already marked compacted";
+ return true;
}
/**
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java?rev=1195542&r1=1195541&r2=1195542&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java Mon Oct 31 16:06:48 2011
@@ -91,7 +91,7 @@ public class KeyCacheTest extends Cleanu
assert store.getKeyCacheSize() == 0;
// load the cache from disk
- store.unregisterMBean(); // unregistering old MBean to test how key cache will be loaded
+ store.invalidate(); // unregistering old MBean to test how key cache will be loaded
ColumnFamilyStore newStore = ColumnFamilyStore.createColumnFamilyStore(Table.open(TABLE1), COLUMN_FAMILY3);
assert newStore.getKeyCacheSize() == 100;
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java?rev=1195542&r1=1195541&r2=1195542&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java Mon Oct 31 16:06:48 2011
@@ -39,7 +39,6 @@ import org.apache.cassandra.db.filter.Qu
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
public class CompactionsTest extends CleanupHelper
{
@@ -231,7 +230,7 @@ public class CompactionsTest extends Cle
ColumnFamilyStore store = table.getColumnFamilyStore(cfname);
// disable compaction while flushing
- store.removeAllSSTables();
+ store.unreferenceSSTables();
store.disableAutoCompaction();
// Add test row
Modified: cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=1195542&r1=1195541&r2=1195542&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Mon Oct 31 16:06:48 2011
@@ -28,7 +28,6 @@ import java.net.InetAddress;
import java.util.*;
import org.apache.cassandra.CleanupHelper;
-import org.apache.cassandra.Util;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.context.CounterContext;
@@ -76,7 +75,7 @@ public class StreamingTransferTest exten
SSTableReader sstable = cfs.getSSTables().iterator().next();
// We acquire a reference now, because removeAllSSTables will mark the sstable compacted, and we have work to do with it
sstable.acquireReference();
- cfs.removeAllSSTables();
+ cfs.unreferenceSSTables();
// transfer the first and last key
int[] offs = new int[]{1, 3};