You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/10/31 17:06:48 UTC

svn commit: r1195542 - in /cassandra/trunk: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/compaction/ src/java/org/apache/cassandra/db/index/ src/java/org/apache/cassandra/db/index/keys/ src/java/org/apache/cassandra/db/migratio...

Author: jbellis
Date: Mon Oct 31 16:06:48 2011
New Revision: 1195542

URL: http://svn.apache.org/viewvc?rev=1195542&view=rev
Log:
replace compactionlock use in schema migration by checking CFS.isInvalidD
patch by jbellis; reviewed by slebresne for CASSANDRA-3116

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
    cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
    cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java
    cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1195542&r1=1195541&r2=1195542&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Oct 31 16:06:48 2011
@@ -2,6 +2,7 @@
  * off-heap cache to use sun.misc.Unsafe instead of JNA (CASSANDRA-3271)
  * EACH_QUORUM is only supported for writes (CASSANDRA-3272)
  * cleanup usage of StorageService.setMode() (CASANDRA-3388)
+ * replace compactionlock use in schema migration by checking CFS.isInvalidD
 
 
 1.0.1

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1195542&r1=1195541&r2=1195542&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon Oct 31 16:06:48 2011
@@ -25,8 +25,6 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -99,7 +97,7 @@ public class ColumnFamilyStore implement
     public final CFMetaData metadata;
     public final IPartitioner partitioner;
     private final String mbeanName;
-    private boolean invalid = false;
+    private volatile boolean valid = true;
 
     /* Memtables and SSTables on disk for this column family */
     private final DataTracker data;
@@ -129,9 +127,6 @@ public class ColumnFamilyStore implement
     private volatile DefaultInteger keyCacheSaveInSeconds;
     private volatile DefaultInteger rowCacheKeysToSave;
 
-    /** Lock to allow migrations to block all flushing, so we can be sure not to write orphaned data files */
-    public final Lock flushLock = new ReentrantLock();
-
     public static enum CacheType
     {
         KEY_CACHE_TYPE("KeyCache"),
@@ -192,6 +187,7 @@ public class ColumnFamilyStore implement
         if (metadata.compactionStrategyClass.equals(compactionStrategy.getClass()) && metadata.compactionStrategyOptions.equals(compactionStrategy.getOptions()))
             return;
 
+        // TODO is there a way to avoid locking here?
         CompactionManager.instance.getCompactionLock().lock();
         try
         {
@@ -258,12 +254,12 @@ public class ColumnFamilyStore implement
         }
     }
 
-    // called when dropping or renaming a CF. Performs mbean housekeeping and invalidates CFS to other operations.
-    public void unregisterMBean()
+    /** call when dropping or renaming a CF. Performs mbean housekeeping and invalidates CFS to other operations */
+    public void invalidate()
     {
         try
         {
-            invalid = true;   
+            valid = false;
             MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
             ObjectName nameObj = new ObjectName(mbeanName);
             if (mbs.isRegistered(nameObj))
@@ -719,13 +715,6 @@ public class ColumnFamilyStore implement
         }
     }
 
-    public boolean isDropped()
-    {
-        return isIndex()
-               ? Schema.instance.getCFMetaData(table.name, getParentColumnfamily()) == null
-               : Schema.instance.getCFMetaData(metadata.cfId) == null;
-    }
-
     public Future<?> forceFlush()
     {
         // during index build, 2ary index memtables can be dirty even if parent is not.  if so,
@@ -983,14 +972,14 @@ public class ColumnFamilyStore implement
         CompactionManager.instance.submitBackground(this);
     }
 
-    public boolean isInvalid()
+    public boolean isValid()
     {
-        return invalid;
+        return valid;
     }
 
-    public void removeAllSSTables() throws IOException
+    public void unreferenceSSTables() throws IOException
     {
-        data.removeAllSSTables();
+        data.unreferenceSSTables();
         indexManager.removeAllIndexes();
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java?rev=1195542&r1=1195541&r2=1195542&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java Mon Oct 31 16:06:48 2011
@@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.cache.AutoSavingCache;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.notifications.INotification;
 import org.apache.cassandra.notifications.INotificationConsumer;
@@ -130,6 +129,18 @@ public class DataTracker
 
     public void replaceFlushed(Memtable memtable, SSTableReader sstable)
     {
+        if (!cfstore.isValid())
+        {
+            View currentView, newView;
+            do
+            {
+                currentView = view.get();
+                newView = currentView.replaceFlushed(memtable, sstable).replace(Arrays.asList(sstable), Collections.<SSTableReader>emptyList());
+            }
+            while (!view.compareAndSet(currentView, newView));
+            return;
+        }
+
         View currentView, newView;
         do
         {
@@ -213,6 +224,16 @@ public class DataTracker
      */
     public void unmarkCompacting(Collection<SSTableReader> unmark)
     {
+        if (!cfstore.isValid())
+        {
+            // We don't know if the original compaction suceeded or failed, which makes it difficult to know
+            // if the sstable reference has already been released.
+            // A "good enough" approach is to mark the sstables involved compacted, which if compaction succeeded
+            // is harmlessly redundant, and if it failed ensures that at least the sstable will get deleted on restart.
+            for (SSTableReader sstable : unmark)
+                sstable.markCompacted();
+        }
+
         View currentView, newView;
         do
         {
@@ -244,9 +265,23 @@ public class DataTracker
         notifyAdded(sstable);
     }
 
-    public void removeAllSSTables()
+    /**
+     * removes all sstables that are not busy compacting.
+     */
+    public void unreferenceSSTables()
     {
-        replace(getSSTables(), Collections.<SSTableReader>emptyList());
+        Set<SSTableReader> notCompacting;
+
+        View currentView, newView;
+        do
+        {
+            currentView = view.get();
+            notCompacting = Sets.difference(ImmutableSet.copyOf(currentView.sstables), currentView.compacting);
+            newView = currentView.replace(notCompacting, Collections.<SSTableReader>emptySet());
+        }
+        while (!view.compareAndSet(currentView, newView));
+
+        postReplace(notCompacting, Collections.<SSTableReader>emptySet());
     }
 
     /** (Re)initializes the tracker, purging all references. */
@@ -261,6 +296,17 @@ public class DataTracker
 
     private void replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
     {
+        // removing sstables that are not marked compacting is a bug, since that means we could
+        // race with a compaction check
+        for (SSTableReader sstable : oldSSTables)
+            assert view.get().compacting.contains(sstable);
+
+        if (!cfstore.isValid())
+        {
+            removeOldSSTablesSize(replacements);
+            replacements = Collections.emptyList();
+        }
+
         View currentView, newView;
         do
         {
@@ -269,6 +315,11 @@ public class DataTracker
         }
         while (!view.compareAndSet(currentView, newView));
 
+        postReplace(oldSSTables, replacements);
+    }
+
+    private void postReplace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
+    {
         addNewSSTablesSize(replacements);
         removeOldSSTablesSize(oldSSTables);
 
@@ -298,7 +349,9 @@ public class DataTracker
             if (logger.isDebugEnabled())
                 logger.debug(String.format("removing %s from list of files tracked for %s.%s",
                             sstable.descriptor, cfstore.table.name, cfstore.getColumnFamilyName()));
-            sstable.markCompacted();
+            boolean firstToCompact = sstable.markCompacted();
+            assert firstToCompact : sstable + " was already marked compacted";
+
             sstable.releaseReference();
             liveSize.addAndGet(-sstable.bytesOnDisk());
         }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=1195542&r1=1195541&r2=1195542&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Mon Oct 31 16:06:48 2011
@@ -281,19 +281,8 @@ public class Memtable
         {
             public void runMayThrow() throws IOException
             {
-                cfs.flushLock.lock();
-                try
-                {
-                    if (!cfs.isDropped())
-                    {
-                        SSTableReader sstable = writeSortedContents(context);
-                        cfs.replaceFlushed(Memtable.this, sstable);
-                    }
-                }
-                finally
-                {
-                    cfs.flushLock.unlock();
-                }
+                SSTableReader sstable = writeSortedContents(context);
+                cfs.replaceFlushed(Memtable.this, sstable);
                 latch.countDown();
             }
         });

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1195542&r1=1195541&r2=1195542&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Mon Oct 31 16:06:48 2011
@@ -29,7 +29,6 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
@@ -41,7 +40,6 @@ import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.io.sstable.SSTableDeletingTask;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
@@ -343,7 +341,7 @@ public class Table
             return;
         
         unloadCf(cfs);
-        cfs.removeAllSSTables();
+        cfs.unreferenceSSTables();
     }
     
     // disassociate a cfs from this table instance.
@@ -361,7 +359,7 @@ public class Table
         {
             throw new IOException(e);
         }
-        cfs.unregisterMBean();
+        cfs.invalidate();
     }
     
     /** adds a cf to internal structures, ends up creating disk files). */

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1195542&r1=1195541&r2=1195542&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Mon Oct 31 16:06:48 2011
@@ -68,13 +68,12 @@ public class CompactionManager implement
 
     /**
      * compactionLock has two purposes:
-     * - Compaction acquires its readLock so that multiple compactions can happen simultaneously,
-     *   but the KS/CF migtations acquire its writeLock, so they can be sure no new SSTables will
-     *   be created for a dropped CF posthumously.  (Thus, compaction checks CFS.isValid while the
-     *   lock is acquired.)
      * - "Special" compactions will acquire writelock instead of readlock to make sure that all
-     *   other compaction activity is quiesced and they can grab ALL the sstables to do something.
-     *   TODO this is too big a hammer -- we should only care about quiescing all for the given CFS.
+     * other compaction activity is quiesced and they can grab ALL the sstables to do something.
+     * - Some schema migrations cannot run concurrently with compaction.  (Currently, this is
+     *   only when changing compaction strategy -- see CFS.maybeReloadCompactionStrategy.)
+     *
+     * TODO this is too big a hammer -- we should only care about quiescing all for the given CFS.
      */
     private final ReentrantReadWriteLock compactionLock = new ReentrantReadWriteLock();
 
@@ -117,9 +116,6 @@ public class CompactionManager implement
                 compactionLock.readLock().lock();
                 try
                 {
-                    if (cfs.isInvalid())
-                        return 0;
-
                     AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
                     List<AbstractCompactionTask> tasks = strategy.getBackgroundTasks(getDefaultGcBefore(cfs));
                     for (AbstractCompactionTask task : tasks)
@@ -160,8 +156,6 @@ public class CompactionManager implement
                 compactionLock.writeLock().lock();
                 try 
                 {
-                    if (cfStore.isInvalid())
-                        return this;
                     Collection<SSTableReader> tocleanup = cfStore.getDataTracker().markCompacting(cfStore.getSSTables(), 1, Integer.MAX_VALUE);
                     if (tocleanup == null || tocleanup.isEmpty())
                         return this;
@@ -206,9 +200,6 @@ public class CompactionManager implement
                 compactionLock.writeLock().lock();
                 try
                 {
-                    if (cfStore.isInvalid())
-                        return this;
-
                     Collection<SSTableReader> toscrub = cfStore.getDataTracker().markCompacting(cfStore.getSSTables(), 1, Integer.MAX_VALUE);
                     if (toscrub == null || toscrub.isEmpty())
                         return this;
@@ -258,8 +249,6 @@ public class CompactionManager implement
                 compactionLock.writeLock().lock();
                 try
                 {
-                    if (cfStore.isInvalid())
-                        return this;
                     AbstractCompactionStrategy strategy = cfStore.getCompactionStrategy();
                     for (AbstractCompactionTask task : strategy.getMaximalTasks(gcBefore))
                     {
@@ -339,9 +328,6 @@ public class CompactionManager implement
                 compactionLock.readLock().lock();
                 try
                 {
-                    if (cfs.isInvalid())
-                        return this;
-
                     // look up the sstables now that we're on the compaction executor, so we don't try to re-compact
                     // something that was already being compacted earlier.
                     Collection<SSTableReader> sstables = new ArrayList<SSTableReader>();
@@ -433,8 +419,7 @@ public class CompactionManager implement
                 compactionLock.readLock().lock();
                 try
                 {
-                    if (!cfStore.isInvalid())
-                        doValidationCompaction(cfStore, validator);
+                    doValidationCompaction(cfStore, validator);
                     return this;
                 }
                 finally
@@ -801,6 +786,14 @@ public class CompactionManager implement
      */
     private void doValidationCompaction(ColumnFamilyStore cfs, AntiEntropyService.Validator validator) throws IOException
     {
+        // this isn't meant to be race-proof, because it's not -- it won't cause bugs for a CFS to be dropped
+        // mid-validation, or to attempt to validate a droped CFS.  this is just a best effort to avoid useless work,
+        // particularly in the scenario where a validation is submitted before the drop, and there are compactions
+        // started prior to the drop keeping some sstables alive.  Since validationCompaction can run
+        // concurrently with other compactions, it would otherwise go ahead and scan those again.
+        if (!cfs.isValid())
+            return;
+
         // flush first so everyone is validating data that is as similar as possible
         try
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndex.java?rev=1195542&r1=1195541&r2=1195542&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndex.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndex.java Mon Oct 31 16:06:48 2011
@@ -134,7 +134,7 @@ public abstract class SecondaryIndex
     /**
      * Unregisters this index's mbean if one exists
      */
-    public abstract void unregisterMbean();
+    public abstract void invalidate();
     
     
     /**

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java?rev=1195542&r1=1195541&r2=1195542&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java Mon Oct 31 16:06:48 2011
@@ -244,7 +244,7 @@ public class SecondaryIndexManager
     public void unregisterMBeans()
     {
         for (Map.Entry<ByteBuffer, SecondaryIndex> entry : indexesByColumn.entrySet())
-            entry.getValue().unregisterMbean();
+            entry.getValue().invalidate();
     }
     
     /**

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java?rev=1195542&r1=1195541&r2=1195542&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java Mon Oct 31 16:06:48 2011
@@ -45,7 +45,7 @@ import org.slf4j.LoggerFactory;
 public class KeysIndex extends PerColumnSecondaryIndex
 {
     private static final Logger logger = LoggerFactory.getLogger(KeysIndex.class);
-    private ColumnFamilyStore indexCfs;
+    private ColumnFamilyStore indexedCfs;
 
     public KeysIndex() 
     {
@@ -57,7 +57,7 @@ public class KeysIndex extends PerColumn
 
         ColumnDefinition columnDef = columnDefs.iterator().next();
         CFMetaData indexedCfMetadata = CFMetaData.newIndexMetadata(baseCfs.metadata, columnDef,indexComparator());
-        indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.table,
+        indexedCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.table,
                                                                indexedCfMetadata.cfName,
                                                                new LocalPartitioner(columnDef.getValidator()),
                                                                indexedCfMetadata);
@@ -78,9 +78,9 @@ public class KeysIndex extends PerColumn
             return;
         
         int localDeletionTime = (int) (System.currentTimeMillis() / 1000);
-        ColumnFamily cfi = ColumnFamily.create(indexCfs.metadata);
+        ColumnFamily cfi = ColumnFamily.create(indexedCfs.metadata);
         cfi.addTombstone(rowKey, localDeletionTime, column.timestamp());
-        indexCfs.apply(valueKey, cfi);
+        indexedCfs.apply(valueKey, cfi);
         if (logger.isDebugEnabled())
             logger.debug("removed index entry for cleaned-up value {}:{}", valueKey, cfi);
     }
@@ -88,7 +88,7 @@ public class KeysIndex extends PerColumn
     @Override
     public void insertColumn(DecoratedKey<?> valueKey, ByteBuffer rowKey, IColumn column)
     {
-        ColumnFamily cfi = ColumnFamily.create(indexCfs.metadata);
+        ColumnFamily cfi = ColumnFamily.create(indexedCfs.metadata);
         if (column instanceof ExpiringColumn)
         {
             ExpiringColumn ec = (ExpiringColumn)column;
@@ -101,7 +101,7 @@ public class KeysIndex extends PerColumn
         if (logger.isDebugEnabled())
             logger.debug("applying index row {}:{}", valueKey, cfi);
         
-        indexCfs.apply(valueKey, cfi);
+        indexedCfs.apply(valueKey, cfi);
     }
     
     @Override
@@ -113,8 +113,8 @@ public class KeysIndex extends PerColumn
     @Override
     public void removeIndex(ByteBuffer columnName) throws IOException
     {        
-        indexCfs.removeAllSSTables();
-        indexCfs.unregisterMBean();
+        indexedCfs.unreferenceSSTables();
+        indexedCfs.invalidate();
     }
 
     @Override
@@ -122,7 +122,7 @@ public class KeysIndex extends PerColumn
     {       
         try
         {
-            indexCfs.forceBlockingFlush();
+            indexedCfs.forceBlockingFlush();
         } 
         catch (ExecutionException e)
         {
@@ -135,15 +135,15 @@ public class KeysIndex extends PerColumn
     }
 
     @Override
-    public void unregisterMbean()
+    public void invalidate()
     {
-        indexCfs.unregisterMBean();
+        indexedCfs.invalidate();
     }
 
     @Override
     public ColumnFamilyStore getUnderlyingCfs()
     {
-       return indexCfs;
+       return indexedCfs;
     }
 
     @Override
@@ -155,13 +155,13 @@ public class KeysIndex extends PerColumn
     @Override
     public String getIndexName()
     {
-        return indexCfs.columnFamily;
+        return indexedCfs.columnFamily;
     }
 
     @Override
     public void renameIndex(String newCfName) throws IOException
     {
-        indexCfs.renameSSTables(indexCfs.columnFamily.replace(baseCfs.columnFamily, newCfName));
+        indexedCfs.renameSSTables(indexedCfs.columnFamily.replace(baseCfs.columnFamily, newCfName));
     }
 
     @Override

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java?rev=1195542&r1=1195541&r2=1195542&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropColumnFamily.java Mon Oct 31 16:06:48 2011
@@ -79,18 +79,7 @@ public class DropColumnFamily extends Mi
         if (!StorageService.instance.isClientMode())
         {
             cfs.snapshot(Table.getTimestampedSnapshotName(cfs.columnFamily));
-
-            CompactionManager.instance.getCompactionLock().lock();
-            cfs.flushLock.lock();
-            try
-            {
-                Table.open(ksm.name, schema).dropCf(cfm.cfId);
-            }
-            finally
-            {
-                cfs.flushLock.unlock();
-                CompactionManager.instance.getCompactionLock().unlock();
-            }
+            Table.open(ksm.name, schema).dropCf(cfm.cfId);
         }
     }
     

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java?rev=1195542&r1=1195541&r2=1195542&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/DropKeyspace.java Mon Oct 31 16:06:48 2011
@@ -48,41 +48,25 @@ public class DropKeyspace extends Migrat
     public void applyModels() throws IOException
     {
         String snapshotName = Table.getTimestampedSnapshotName(name);
-        CompactionManager.instance.getCompactionLock().lock();
-        try
-        {
-            KSMetaData ksm = schema.getTableDefinition(name);
+        KSMetaData ksm = schema.getTableDefinition(name);
 
-            // remove all cfs from the table instance.
-            for (CFMetaData cfm : ksm.cfMetaData().values())
+        // remove all cfs from the table instance.
+        for (CFMetaData cfm : ksm.cfMetaData().values())
+        {
+            ColumnFamilyStore cfs = Table.open(ksm.name, schema).getColumnFamilyStore(cfm.cfName);
+            schema.purge(cfm);
+            if (!StorageService.instance.isClientMode())
             {
-                ColumnFamilyStore cfs = Table.open(ksm.name, schema).getColumnFamilyStore(cfm.cfName);
-                schema.purge(cfm);
-                if (!StorageService.instance.isClientMode())
-                {
-                    cfs.snapshot(snapshotName);
-                    cfs.flushLock.lock();
-                    try
-                    {
-                        Table.open(ksm.name, schema).dropCf(cfm.cfId);
-                    }
-                    finally
-                    {
-                        cfs.flushLock.unlock();
-                    }
-                }
+                cfs.snapshot(snapshotName);
+                Table.open(ksm.name, schema).dropCf(cfm.cfId);
             }
-                            
-            // remove the table from the static instances.
-            Table table = Table.clear(ksm.name, schema);
-            assert table != null;
-            // reset defs.
-            schema.clearTableDefinition(ksm, newVersion);
-        }
-        finally
-        {
-            CompactionManager.instance.getCompactionLock().unlock();
         }
+
+        // remove the table from the static instances.
+        Table table = Table.clear(ksm.name, schema);
+        assert table != null;
+        // reset defs.
+        schema.clearTableDefinition(ksm, newVersion);
     }
     
     public void subdeflate(org.apache.cassandra.db.migration.avro.Migration mi)

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1195542&r1=1195541&r2=1195542&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Mon Oct 31 16:06:48 2011
@@ -719,23 +719,28 @@ public class SSTableReader extends SSTab
      * Mark the sstable as compacted.
      * When calling this function, the caller must ensure that the SSTableReader is not referenced anywhere
      * except for threads holding a reference.
+     *
+     * @return true if the this is the first time the file was marked compacted.  With rare exceptions
+     * (see DataTracker.unmarkCompacted) calling this multiple times would be buggy.
      */
-    public void markCompacted()
+    public boolean markCompacted()
     {
         if (logger.isDebugEnabled())
             logger.debug("Marking " + getFilename() + " compacted");
+
+        if (isCompacted.getAndSet(true))
+            return false;
+
         try
         {
             if (!new File(descriptor.filenameFor(Component.COMPACTED_MARKER)).createNewFile())
-                throw new IOException("Unable to create compaction marker");
+                throw new IOException("Compaction marker already exists");
         }
         catch (IOException e)
         {
             throw new IOError(e);
         }
-
-        boolean alreadyCompacted = isCompacted.getAndSet(true);
-        assert !alreadyCompacted : this + " was already marked compacted";
+        return true;
     }
 
     /**

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java?rev=1195542&r1=1195541&r2=1195542&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/KeyCacheTest.java Mon Oct 31 16:06:48 2011
@@ -91,7 +91,7 @@ public class KeyCacheTest extends Cleanu
         assert store.getKeyCacheSize() == 0;
 
         // load the cache from disk
-        store.unregisterMBean(); // unregistering old MBean to test how key cache will be loaded
+        store.invalidate(); // unregistering old MBean to test how key cache will be loaded
         ColumnFamilyStore newStore = ColumnFamilyStore.createColumnFamilyStore(Table.open(TABLE1), COLUMN_FAMILY3);
         assert newStore.getKeyCacheSize() == 100;
 

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java?rev=1195542&r1=1195541&r2=1195542&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java Mon Oct 31 16:06:48 2011
@@ -39,7 +39,6 @@ import org.apache.cassandra.db.filter.Qu
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
 
 public class CompactionsTest extends CleanupHelper
 {
@@ -231,7 +230,7 @@ public class CompactionsTest extends Cle
         ColumnFamilyStore store = table.getColumnFamilyStore(cfname);
 
         // disable compaction while flushing
-        store.removeAllSSTables();
+        store.unreferenceSSTables();
         store.disableAutoCompaction();
 
         // Add test row

Modified: cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java?rev=1195542&r1=1195541&r2=1195542&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java Mon Oct 31 16:06:48 2011
@@ -28,7 +28,6 @@ import java.net.InetAddress;
 import java.util.*;
 
 import org.apache.cassandra.CleanupHelper;
-import org.apache.cassandra.Util;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.context.CounterContext;
@@ -76,7 +75,7 @@ public class StreamingTransferTest exten
         SSTableReader sstable = cfs.getSSTables().iterator().next();
         // We acquire a reference now, because removeAllSSTables will mark the sstable compacted, and we have work to do with it
         sstable.acquireReference();
-        cfs.removeAllSSTables();
+        cfs.unreferenceSSTables();
 
         // transfer the first and last key
         int[] offs = new int[]{1, 3};