You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/10 20:48:35 UTC

svn commit: r813544 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/utils/ test/unit/org/apache/cassandra/

Author: jbellis
Date: Thu Sep 10 18:48:35 2009
New Revision: 813544

URL: http://svn.apache.org/viewvc?rev=813544&view=rev
Log:
Replace sstableLock with SSTableTracker, which performs updates to the sstable list atomically
without readers ever having to block.  (Readers will always either see the old list, or the new.)
We avoid a race on the delete of the old SSTable files on-disk by using a ReferenceQueue:
when the last reference is gone, a PhantomReference is added to the queue and can do cleanup.
In case Cassandra is killed between compaction and this cleanup, a -Compacted empty file
is written to disk; Cassandra removes any files thus tagged on startup.

patch by jbellis; reviewed by Chris Goffinet for CASSANDRA-414

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableTracker.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FileUtils.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/CleanupHelper.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=813544&r1=813543&r2=813544&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Sep 10 18:48:35 2009
@@ -77,10 +77,7 @@
     private AtomicReference<BinaryMemtable> binaryMemtable_;
 
     /* SSTables on disk for this column family */
-    private Set<SSTableReader> ssTables_ = new HashSet<SSTableReader>();
-
-    /* Modification lock used for protecting reads from compactions. */
-    private ReentrantReadWriteLock sstableLock_ = new ReentrantReadWriteLock(true);
+    private SSTableTracker ssTables_ = new SSTableTracker();
 
     private TimedStatsDeque readStats_ = new TimedStatsDeque(60000);
     private TimedStatsDeque writeStats_ = new TimedStatsDeque(60000);
@@ -152,7 +149,7 @@
             for (File file : files)
             {
                 String filename = file.getName();
-                if (((file.length() == 0) || (filename.contains("-" + SSTable.TEMPFILE_MARKER))) && (filename.contains(columnFamily_)))
+                if (((file.length() == 0 && !filename.endsWith("-Compacted")) || (filename.contains("-" + SSTable.TEMPFILE_MARKER))) && (filename.contains(columnFamily_)))
                 {
                     file.delete();
                     continue;
@@ -169,9 +166,13 @@
         Collections.sort(sstableFiles, new FileUtils.FileComparator());
 
         /* Load the index files and the Bloom Filters associated with them. */
+        List<SSTableReader> sstables = new ArrayList<SSTableReader>();
         for (File file : sstableFiles)
         {
             String filename = file.getAbsolutePath();
+            if (SSTable.deleteIfCompacted(filename))
+                continue;
+
             SSTableReader sstable;
             try
             {
@@ -182,8 +183,9 @@
                 logger_.error("Corrupt file " + filename + "; skipped", ex);
                 continue;
             }
-            ssTables_.add(sstable);
+            sstables.add(sstable);
         }
+        ssTables_.onStart(sstables);
 
         // submit initial check-for-compaction request
         MinorCompactionManager.instance().submit(ColumnFamilyStore.this);
@@ -569,30 +571,11 @@
     */
     void addSSTable(SSTableReader sstable)
     {
-        int ssTableCount;
-        sstableLock_.writeLock().lock();
-        try
-        {
-            ssTables_.add(sstable);
-            ssTableCount = ssTables_.size();
-        }
-        finally
-        {
-            sstableLock_.writeLock().unlock();
-        }
-
-        /* it's ok if compaction gets submitted multiple times while one is already in process.
-           worst that happens is, compactor will count the sstable files and decide there are
-           not enough to bother with. */
-        if (ssTableCount >= MinorCompactionManager.MINCOMPACTION_THRESHOLD)
-        {
-            if (logger_.isDebugEnabled())
-              logger_.debug("Added " + sstable.getFilename() + ".  Submitting " + columnFamily_ + " for compaction");
-            MinorCompactionManager.instance().submit(this);
-        }
+        ssTables_.add(sstable);
+        MinorCompactionManager.instance().submit(this);
     }
 
-    private PriorityQueue<FileStruct> initializePriorityQueue(List<SSTableReader> sstables, List<Range> ranges) throws IOException
+    private PriorityQueue<FileStruct> initializePriorityQueue(Collection<SSTableReader> sstables, List<Range> ranges) throws IOException
     {
         PriorityQueue<FileStruct> pq = new PriorityQueue<FileStruct>();
         if (sstables.size() > 1 || (ranges != null && sstables.size() > 0))
@@ -688,7 +671,7 @@
      */
     void doMajorCompactionInternal(long skip) throws IOException
     {
-        List<SSTableReader> sstables;
+        Collection<SSTableReader> sstables;
         if (skip > 0L)
         {
             sstables = new ArrayList<SSTableReader>();
@@ -702,7 +685,7 @@
         }
         else
         {
-            sstables = new ArrayList<SSTableReader>(ssTables_);
+            sstables = ssTables_.getSSTables();
         }
         doFileCompaction(sstables);
     }
@@ -711,7 +694,7 @@
      * Add up all the files sizes this is the worst case file
      * size for compaction of all the list of files given.
      */
-    long getExpectedCompactedFileSize(List<SSTableReader> sstables)
+    long getExpectedCompactedFileSize(Iterable<SSTableReader> sstables)
     {
         long expectedFileSize = 0;
         for (SSTableReader sstable : sstables)
@@ -725,7 +708,7 @@
     /*
      *  Find the maximum size file in the list .
      */
-    SSTableReader getMaxSizeFile(List<SSTableReader> sstables)
+    SSTableReader getMaxSizeFile(Iterable<SSTableReader> sstables)
     {
         long maxSize = 0L;
         SSTableReader maxFile = null;
@@ -742,7 +725,7 @@
 
     boolean doAntiCompaction(List<Range> ranges, EndPoint target, List<String> fileList) throws IOException
     {
-        return doFileAntiCompaction(new ArrayList<SSTableReader>(ssTables_), ranges, target, fileList);
+        return doFileAntiCompaction(ssTables_.getSSTables(), ranges, target, fileList);
     }
 
     void forceCleanup()
@@ -758,8 +741,7 @@
      */
     void doCleanupCompaction() throws IOException
     {
-        List<SSTableReader> sstables = new ArrayList<SSTableReader>(ssTables_);
-        for (SSTableReader sstable : sstables)
+        for (SSTableReader sstable : ssTables_)
         {
             doCleanup(sstable);
         }
@@ -780,16 +762,7 @@
         doFileAntiCompaction(Arrays.asList(sstable), myRanges, null, newFiles);
         if (logger_.isDebugEnabled())
           logger_.debug("Original file : " + sstable + " of size " + sstable.length());
-        sstableLock_.writeLock().lock();
-        try
-        {
-            ssTables_.remove(sstable);
-            sstable.delete();
-        }
-        finally
-        {
-            sstableLock_.writeLock().unlock();
-        }
+        ssTables_.markCompacted(Arrays.asList(sstable));
     }
 
     /**
@@ -803,7 +776,7 @@
      * @return
      * @throws IOException
      */
-    boolean doFileAntiCompaction(List<SSTableReader> sstables, List<Range> ranges, EndPoint target, List<String> fileList) throws IOException
+    boolean doFileAntiCompaction(Collection<SSTableReader> sstables, List<Range> ranges, EndPoint target, List<String> fileList) throws IOException
     {
         boolean result = false;
         long startTime = System.currentTimeMillis();
@@ -973,7 +946,7 @@
     * to get the latest data.
     *
     */
-    private int doFileCompaction(List<SSTableReader> sstables) throws IOException
+    private int doFileCompaction(Collection<SSTableReader> sstables) throws IOException
     {
         logger_.info("Compacting [" + StringUtils.join(sstables, ",") + "]");
         String compactionFileLocation = DatabaseDescriptor.getDataFileLocationForTable(table_, getExpectedCompactedFileSize(sstables));
@@ -986,29 +959,28 @@
             return doFileCompaction(sstables);
         }
 
-        String newfile = null;
         long startTime = System.currentTimeMillis();
         long totalBytesRead = 0;
-        long totalBytesWritten = 0;
         long totalkeysRead = 0;
         long totalkeysWritten = 0;
         PriorityQueue<FileStruct> pq = initializePriorityQueue(sstables, null);
 
         if (pq.isEmpty())
         {
-            logger_.warn("Nothing to compact (all files empty or corrupt)");
+            logger_.warn("Nothing to compact (all files empty or corrupt). This should not happen.");
             // TODO clean out bad files, if any
             return 0;
         }
 
-        String mergedFileName = getTempSSTableFileName();
-        SSTableWriter writer = null;
+        int expectedBloomFilterSize = SSTableReader.getApproximateKeyCount(sstables);
+        if (expectedBloomFilterSize < 0)
+            expectedBloomFilterSize = SSTableReader.indexInterval();
+        String newFilename = new File(compactionFileLocation, getTempSSTableFileName()).getAbsolutePath();
+        SSTableWriter writer = new SSTableWriter(newFilename, expectedBloomFilterSize, StorageService.getPartitioner());
         SSTableReader ssTable = null;
         String lastkey = null;
         List<FileStruct> lfs = new ArrayList<FileStruct>();
         DataOutputBuffer bufOut = new DataOutputBuffer();
-        int expectedBloomFilterSize = SSTableReader.getApproximateKeyCount(sstables);
-        expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? expectedBloomFilterSize : SSTableReader.indexInterval();
         if (logger_.isDebugEnabled())
           logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
         List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
@@ -1060,11 +1032,6 @@
                     ColumnFamily.serializer().serializeWithIndexes(filestruct.getColumnFamily(), bufOut);
                 }
 
-                if (writer == null)
-                {
-                    String fname = new File(compactionFileLocation, mergedFileName).getAbsolutePath();
-                    writer = new SSTableWriter(fname, expectedBloomFilterSize, StorageService.getPartitioner());
-                }
                 writer.append(lastkey, bufOut);
                 totalkeysWritten++;
 
@@ -1087,34 +1054,14 @@
                 }
             }
         }
-        if (writer != null)
-        {
-            // TODO if all the keys were the same nothing will be done here
-            ssTable = writer.closeAndOpenReader();
-            newfile = writer.getFilename();
-        }
-        sstableLock_.writeLock().lock();
-        try
-        {
-            if (newfile != null)
-            {
-                ssTables_.add(ssTable);
-                totalBytesWritten += (new File(newfile)).length();
-            }
-            for (SSTableReader sstable : sstables)
-            {
-                ssTables_.remove(sstable);
-                sstable.delete();
-            }
-        }
-        finally
-        {
-            sstableLock_.writeLock().unlock();
-        }
+        ssTable = writer.closeAndOpenReader();
+        ssTables_.add(ssTable);
+        ssTables_.markCompacted(sstables);
+        MinorCompactionManager.instance().submit(ColumnFamilyStore.this);
 
         String format = "Compacted to %s.  %d/%d bytes for %d/%d keys read/written.  Time: %dms.";
         long dTime = System.currentTimeMillis() - startTime;
-        logger_.info(String.format(format, newfile, totalBytesRead, totalBytesWritten, totalkeysRead, totalkeysWritten, dTime));
+        logger_.info(String.format(format, writer.getFilename(), totalBytesRead, ssTable.length(), totalkeysRead, totalkeysWritten, dTime));
         return sstables.size();
     }
 
@@ -1240,12 +1187,7 @@
     /** not threadsafe.  caller must have lock_ acquired. */
     public Collection<SSTableReader> getSSTables()
     {
-        return Collections.unmodifiableCollection(ssTables_);
-    }
-
-    public ReentrantReadWriteLock.ReadLock getReadLock()
-    {
-        return sstableLock_.readLock();
+        return ssTables_.getSSTables();
     }
 
     public int getReadCount()
@@ -1316,7 +1258,6 @@
         }
 
         // we are querying top-level columns, do a merging fetch with indexes.
-        sstableLock_.readLock().lock();
         List<ColumnIterator> iterators = new ArrayList<ColumnIterator>();
         try
         {
@@ -1346,8 +1287,7 @@
             }
 
             /* add the SSTables on disk */
-            List<SSTableReader> sstables = new ArrayList<SSTableReader>(ssTables_);
-            for (SSTableReader sstable : sstables)
+            for (SSTableReader sstable : ssTables_)
             {
                 iter = filter.getSSTableColumnIterator(sstable);
                 if (iter.hasNext()) // initializes iter.CF
@@ -1382,7 +1322,6 @@
             }
 
             readStats_.add(System.currentTimeMillis() - start);
-            sstableLock_.readLock().unlock();
         }
     }
 
@@ -1395,19 +1334,6 @@
     public RangeReply getKeyRange(final String startWith, final String stopAt, int maxResults)
     throws IOException, ExecutionException, InterruptedException
     {
-        getReadLock().lock();
-        try
-        {
-            return getKeyRangeUnsafe(startWith, stopAt, maxResults);
-        }
-        finally
-        {
-            getReadLock().unlock();
-        }
-    }
-
-    private RangeReply getKeyRangeUnsafe(final String startWith, final String stopAt, int maxResults) throws IOException, ExecutionException, InterruptedException
-    {
         // (OPP key decoration is a no-op so using the "decorated" comparator against raw keys is fine)
         final Comparator<String> comparator = StorageService.getPartitioner().getDecoratedKeyComparator();
 
@@ -1435,7 +1361,7 @@
         }
 
         // sstables
-        for (SSTableReader sstable : getSSTables())
+        for (SSTableReader sstable : ssTables_)
         {
             FileStruct fs = sstable.getFileStruct();
             fs.seekTo(startWith);
@@ -1509,37 +1435,29 @@
      */
     public void snapshot(String snapshotName) throws IOException
     {
-        sstableLock_.readLock().lock();
-        try
-        {
-            for (SSTableReader ssTable : new ArrayList<SSTableReader>(ssTables_))
-            {
-                // mkdir
-                File sourceFile = new File(ssTable.getFilename());
-                File dataDirectory = sourceFile.getParentFile().getParentFile();
-                String snapshotDirectoryPath = Table.getSnapshotPath(dataDirectory.getAbsolutePath(), table_, snapshotName);
-                FileUtils.createDirectory(snapshotDirectoryPath);
-
-                // hard links
-                File targetLink = new File(snapshotDirectoryPath, sourceFile.getName());
-                FileUtils.createHardLink(sourceFile, targetLink);
-
-                sourceFile = new File(ssTable.indexFilename());
-                targetLink = new File(snapshotDirectoryPath, sourceFile.getName());
-                FileUtils.createHardLink(sourceFile, targetLink);
-
-                sourceFile = new File(ssTable.filterFilename());
-                targetLink = new File(snapshotDirectoryPath, sourceFile.getName());
-                FileUtils.createHardLink(sourceFile, targetLink);
-
-                if (logger_.isDebugEnabled())
-                    logger_.debug("Snapshot for " + table_ + " table data file " + sourceFile.getAbsolutePath() +    
-                        " created as " + targetLink.getAbsolutePath());
-            }
-        }
-        finally
+        for (SSTableReader ssTable : ssTables_)
         {
-            sstableLock_.readLock().unlock();
+            // mkdir
+            File sourceFile = new File(ssTable.getFilename());
+            File dataDirectory = sourceFile.getParentFile().getParentFile();
+            String snapshotDirectoryPath = Table.getSnapshotPath(dataDirectory.getAbsolutePath(), table_, snapshotName);
+            FileUtils.createDirectory(snapshotDirectoryPath);
+
+            // hard links
+            File targetLink = new File(snapshotDirectoryPath, sourceFile.getName());
+            FileUtils.createHardLink(sourceFile, targetLink);
+
+            sourceFile = new File(ssTable.indexFilename());
+            targetLink = new File(snapshotDirectoryPath, sourceFile.getName());
+            FileUtils.createHardLink(sourceFile, targetLink);
+
+            sourceFile = new File(ssTable.filterFilename());
+            targetLink = new File(snapshotDirectoryPath, sourceFile.getName());
+            FileUtils.createHardLink(sourceFile, targetLink);
+
+            if (logger_.isDebugEnabled())
+                logger_.debug("Snapshot for " + table_ + " table data file " + sourceFile.getAbsolutePath() +
+                    " created as " + targetLink.getAbsolutePath());
         }
     }
 
@@ -1548,14 +1466,6 @@
      */
     void clearUnsafe()
     {
-        sstableLock_.writeLock().lock();
-        try
-        {
-            memtable_.clearUnsafe();
-        }
-        finally
-        {
-            sstableLock_.writeLock().unlock();
-        }
+        memtable_.clearUnsafe();
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=813544&r1=813543&r2=813544&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java Thu Sep 10 18:48:35 2009
@@ -509,8 +509,7 @@
                 oldCommitLogHeader.and(commitLogHeader);
                 if (oldCommitLogHeader.isSafeToDelete())
                 {
-                    if (logger_.isDebugEnabled())
-                      logger_.debug("Deleting commit log:" + oldFile);
+                    logger_.info("Deleting obsolete commit log:" + oldFile);
                     FileUtils.deleteAsync(oldFile);
                     listOfDeletedFiles.add(oldFile);
                 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=813544&r1=813543&r2=813544&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Thu Sep 10 18:48:35 2009
@@ -217,7 +217,7 @@
         cfStore.addSSTable(ssTable);
         buffer.close();
         isFlushed_ = true;
-        logger_.info("Completed flushing " + this);
+        logger_.info("Flushed " + ssTable.getFilename());
     }
 
     public String toString()

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java?rev=813544&r1=813543&r2=813544&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java Thu Sep 10 18:48:35 2009
@@ -151,6 +151,11 @@
     
     private ScheduledExecutorService compactor_ = new DebuggableScheduledThreadPoolExecutor(1, new ThreadFactoryImpl("MINOR-COMPACTION-POOL"));
 
+    /**
+     * Call this whenever a compaction might be needed on the given columnfamily.
+     * It's okay to over-call (within reason) since the compactions are single-threaded,
+     * and if a call is unnecessary, it will just be no-oped in the bucketing phase.
+     */
     public Future<Integer> submit(final ColumnFamilyStore columnFamilyStore)
     {
         return submit(columnFamilyStore, MINCOMPACTION_THRESHOLD, MAXCOMPACTION_THRESHOLD);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=813544&r1=813543&r2=813544&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Thu Sep 10 18:48:35 2009
@@ -449,8 +449,7 @@
             {
                 if (logger_.isDebugEnabled())
                     logger_.debug("Removing snapshot directory " + snapshotPath);
-                if (!FileUtils.deleteDir(snapshotDir))
-                    throw new IOException("Could not clear snapshot directory " + snapshotPath);
+                FileUtils.deleteDir(snapshotDir);
             }
         }
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=813544&r1=813543&r2=813544&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Thu Sep 10 18:48:35 2009
@@ -22,12 +22,15 @@
 
 
 import java.io.File;
+import java.io.IOException;
 import java.util.List;
 
+import org.apache.log4j.Logger;
 import org.apache.commons.lang.StringUtils;
 
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.FileUtils;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.config.DatabaseDescriptor;
 
@@ -45,6 +48,9 @@
  */
 public abstract class SSTable
 {
+    private static final Logger logger = Logger.getLogger(SSTable.class);
+
+
     protected String path;
     protected IPartitioner partitioner;
     protected BloomFilter bf;
@@ -75,6 +81,37 @@
         return indexFilename(path);
     }
 
+    protected static String compactedFilename(String dataFile)
+    {
+        String[] parts = dataFile.split("-");
+        parts[parts.length - 1] = "Compacted";
+        return StringUtils.join(parts, "-");
+    }
+
+    /**
+     * We use a ReferenceQueue to manage deleting files that have been compacted
+     * and for which no more SSTable references exist.  But this is not guaranteed
+     * to run for each such file because of the semantics of the JVM gc.  So,
+     * we write a marker to `compactedFilename` when a file is compacted;
+     * if such a marker exists on startup, the file should be removed.
+     *
+     * @return true if the file was deleted
+     */
+    public static boolean deleteIfCompacted(String dataFilename) throws IOException
+    {
+        if (new File(compactedFilename(dataFilename)).exists())
+        {
+            delete(dataFilename);
+            return true;
+        }
+        return false;
+    }
+
+    protected String compactedFilename()
+    {
+        return compactedFilename(path);
+    }
+
     protected static String filterFilename(String dataFile)
     {
         String[] parts = dataFile.split("-");
@@ -102,6 +139,15 @@
         return new File(filename).getParentFile().getName();        
     }
 
+    static void delete(String path) throws IOException
+    {
+        FileUtils.deleteWithConfirm(new File(path));
+        FileUtils.deleteWithConfirm(new File(SSTable.indexFilename(path)));
+        FileUtils.deleteWithConfirm(new File(SSTable.filterFilename(path)));
+        FileUtils.deleteWithConfirm(new File(SSTable.compactedFilename(path)));
+        logger.info("Deleted " + path);
+    }
+
     /**
      * This is a simple container for the index Key and its corresponding position
      * in the data file. Binary search is performed on a list of these objects

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java?rev=813544&r1=813543&r2=813544&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableReader.java Thu Sep 10 18:48:35 2009
@@ -20,6 +20,9 @@
 
 import java.io.*;
 import java.util.*;
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.PhantomReference;
+import java.lang.ref.Reference;
 
 import org.apache.log4j.Logger;
 
@@ -27,7 +30,6 @@
 
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.utils.BloomFilter;
-import org.apache.cassandra.utils.FileUtils;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
@@ -45,12 +47,47 @@
 
     private static final FileSSTableMap openedFiles = new FileSSTableMap();
 
+    // `finalizers` is required to keep the PhantomReferences alive after the enclosing SSTR is itself
+    // unreferenced.  otherwise they will never get enqueued.
+    private static final Set<Reference<SSTableReader>> finalizers = new HashSet<Reference<SSTableReader>>();
+    private static final ReferenceQueue<SSTableReader> finalizerQueue = new ReferenceQueue<SSTableReader>()
+    {{
+        Runnable runnable = new Runnable()
+        {
+            public void run()
+            {
+                while (true)
+                {
+                    FileDeletingReference r = null;
+                    try
+                    {
+                        r = (FileDeletingReference) finalizerQueue.remove();
+                        finalizers.remove(r);
+                    }
+                    catch (InterruptedException e)
+                    {
+                        throw new RuntimeException(e);
+                    }
+                    try
+                    {
+                        r.cleanup();
+                    }
+                    catch (IOException e)
+                    {
+                        logger.error("Error deleting " + r.path, e);
+                    }
+                }
+            }
+        };
+        new Thread(runnable, "SSTABLE-DELETER").start();
+    }};
+
     public static int indexInterval()
     {
         return INDEX_INTERVAL;
     }
 
-    public static int getApproximateKeyCount(List<SSTableReader> sstables)
+    public static int getApproximateKeyCount(Iterable<SSTableReader> sstables)
     {
         int count = 0;
 
@@ -89,7 +126,7 @@
         return open(dataFileName, StorageService.getPartitioner());
     }
 
-    public static synchronized SSTableReader open(String dataFileName, IPartitioner partitioner) throws IOException
+    public static SSTableReader open(String dataFileName, IPartitioner partitioner) throws IOException
     {
         assert partitioner != null;
         assert openedFiles.get(dataFileName) == null;
@@ -104,18 +141,21 @@
         return sstable;
     }
 
+    FileDeletingReference phantomReference;
+
     SSTableReader(String filename, IPartitioner partitioner, List<KeyPosition> indexPositions, BloomFilter bloomFilter)
     {
         super(filename, partitioner);
         this.indexPositions = indexPositions;
         this.bf = bloomFilter;
+        phantomReference = new FileDeletingReference(this, finalizerQueue);
+        finalizers.add(phantomReference);
         openedFiles.put(filename, this);
     }
 
     private SSTableReader(String filename, IPartitioner partitioner)
     {
-        super(filename, partitioner);
-        openedFiles.put(filename, this);
+        this(filename, partitioner, null, null);
     }
 
     public List<KeyPosition> getIndexPositions()
@@ -264,12 +304,13 @@
         return ColumnFamilyStore.getGenerationFromFileName(path) - ColumnFamilyStore.getGenerationFromFileName(o.path);
     }
 
-    public void delete() throws IOException
+    public void markCompacted() throws IOException
     {
-        FileUtils.deleteWithConfirm(new File(path));
-        FileUtils.deleteWithConfirm(new File(indexFilename(path)));
-        FileUtils.deleteWithConfirm(new File(filterFilename(path)));
+        if (logger.isDebugEnabled())
+            logger.debug("Marking " + path + " compacted");
         openedFiles.remove(path);
+        new File(compactedFilename()).createNewFile();
+        phantomReference.deleteOnCleanup();
     }
 
     /** obviously only for testing */
@@ -363,3 +404,28 @@
         return "FileSSTableMap {" + StringUtils.join(map.keySet(), ", ") + "}";
     }
 }
+
+class FileDeletingReference extends PhantomReference<SSTableReader>
+{
+    public final String path;
+    private boolean deleteOnCleanup;
+
+    FileDeletingReference(SSTableReader referent, ReferenceQueue<? super SSTableReader> q)
+    {
+        super(referent, q);
+        this.path = referent.path;
+    }
+
+    public void deleteOnCleanup()
+    {
+        deleteOnCleanup = true;
+    }
+
+    public void cleanup() throws IOException
+    {
+        if (deleteOnCleanup)
+        {
+            SSTable.delete(path);
+        }
+    }
+}

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableTracker.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableTracker.java?rev=813544&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableTracker.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTableTracker.java Thu Sep 10 18:48:35 2009
@@ -0,0 +1,53 @@
+package org.apache.cassandra.io;
+
+import java.util.*;
+import java.io.IOException;
+
+
+public class SSTableTracker implements Iterable<SSTableReader>
+{
+    private volatile Set<SSTableReader> sstables = Collections.emptySet();
+
+    // TODO get rid of onstart crap.  this should really be part of the constructor,
+    // but CFS isn't designed to set this up in the constructor, yet.
+    public synchronized void onStart(Collection<SSTableReader> sstables)
+    {
+        this.sstables = Collections.unmodifiableSet(new HashSet<SSTableReader>(sstables));
+    }
+
+    public synchronized void add(SSTableReader sstable)
+    {
+        Set<SSTableReader> sstablesNew = new HashSet<SSTableReader>(sstables);
+        sstablesNew.add(sstable);
+        sstables = Collections.unmodifiableSet(sstablesNew);
+    }
+
+    // todo replace w/ compactionfinished for CASSANDRA-431
+    public synchronized void markCompacted(Iterable<SSTableReader> compacted) throws IOException
+    {
+        Set<SSTableReader> sstablesNew = new HashSet<SSTableReader>(sstables);
+        for (SSTableReader sstable : compacted)
+        {
+            sstablesNew.remove(sstable);
+            sstable.markCompacted();
+        }
+        sstables = Collections.unmodifiableSet(sstablesNew);
+    }
+
+    // the modifiers create new, unmodifiable objects each time; the volatile fences the assignment
+    // so we don't need any further synchronization for the common case here
+    public Set<SSTableReader> getSSTables()
+    {
+        return sstables;
+    }
+
+    public int size()
+    {
+        return sstables.size();
+    }
+
+    public Iterator<SSTableReader> iterator()
+    {
+        return sstables.iterator();
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FileUtils.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FileUtils.java?rev=813544&r1=813543&r2=813544&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FileUtils.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/utils/FileUtils.java Thu Sep 10 18:48:35 2009
@@ -52,7 +52,7 @@
         assert file.exists() : "attempted to delete non-existing file " + file.getName();
         if (!file.delete())
         {
-            throw new IOException("Failed to delete " + file.getName());
+            throw new IOException("Failed to delete " + file.getAbsolutePath());
         }
     }
 
@@ -69,10 +69,10 @@
         {
         	if(file_ == null)
         		return;
-        	logger_.info("*** Deleting " + file_.getName() + " ***");
-        	if(!file_.delete())
+        	logger_.debug("Deleting " + file_.getName());
+        	if (!file_.delete())
         	{
-            	logger_.warn("Warning : Unable to delete file " + file_.getAbsolutePath());
+            	logger_.error("Unable to delete file " + file_.getAbsolutePath());
         	}
         }
     }
@@ -248,24 +248,21 @@
     /**
      * Deletes all files and subdirectories under "dir".
      * @param dir Directory to be deleted
-     * @return boolean Returns "true" if all deletions were successful.
-     *                 If a deletion fails, the method stops attempting to
-     *                 delete and returns "false".
+     * @throws IOException if any part of the tree cannot be deleted
      */
-    public static boolean deleteDir(File dir) {
-
-        if (dir.isDirectory()) {
+    public static void deleteDir(File dir) throws IOException
+    {
+        if (dir.isDirectory())
+        {
             String[] children = dir.list();
-            for (int i=0; i<children.length; i++) {
-                boolean success = deleteDir(new File(dir, children[i]));
-                if (!success) {
-                    return false;
-                }
+            for (int i = 0; i < children.length; i++)
+            {
+                deleteDir(new File(dir, children[i]));
             }
         }
 
         // The directory is now empty so now it can be smoked
-        return dir.delete();
+        deleteWithConfirm(dir);
     }
 
     /**

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/CleanupHelper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/CleanupHelper.java?rev=813544&r1=813543&r2=813544&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/CleanupHelper.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/CleanupHelper.java Thu Sep 10 18:48:35 2009
@@ -48,8 +48,6 @@
             }
             for (File f : dir.listFiles())
             {
-                if (logger.isDebugEnabled())
-                logger.debug("deleting " + f);
                 if (!f.delete()) {
                     logger.error("could not delete " + f);
             }
@@ -69,8 +67,6 @@
                 // table directory
                 if (tableFile.isDirectory()) {
                     for (File dataFile : tableFile.listFiles()) {
-                        if (logger.isDebugEnabled())
-                            logger.debug("deleting " + dataFile);
                         if (!dataFile.delete()) {
                             logger.error("could not delete " + dataFile);
                         }