You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/23 07:49:36 UTC

svn commit: r1138740 - in /cassandra/trunk: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/compaction/ src/java/org/apache/cassandra/io/sstable/ src/java/org/apache/cassandra/io/util/ test/unit/org/apache/cassandra/io/sstable/

Author: jbellis
Date: Thu Jun 23 05:49:35 2011
New Revision: 1138740

URL: http://svn.apache.org/viewvc?rev=1138740&view=rev
Log:
clean up tmpfiles after failed compaction
patch by Aaron Morton; reviewed by slebresne and Stu Hood for CASSANDRA-2468

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingReference.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/io/util/FileUtils.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Jun 23 05:49:35 2011
@@ -7,6 +7,7 @@
    (CASSANDRA-2062)
  * Fixed the ability to set compaction strategy in cli using create column family command (CASSANDRA-2778)
  * Add startup flag to renew counter node id (CASSANDRA-2788)
+ * clean up tmp files after failed compaction (CASSANDRA-2468)
 
 
 0.8.2

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Jun 23 05:49:35 2011
@@ -454,7 +454,14 @@ public class ColumnFamilyStore implement
 
             if (components.contains(Component.COMPACTED_MARKER) || desc.temporary)
             {
-                SSTable.delete(desc, components);
+                try
+                {
+                    SSTable.delete(desc, components);
+                }
+                catch (IOException e)
+                {
+                    throw new IOError(e);
+                }
                 continue;
             }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Thu Jun 23 05:49:35 2011
@@ -224,14 +224,22 @@ public class Memtable
                                       + keySize // keys in data file
                                       + currentThroughput.get()) // data
                                      * 1.2); // bloom filter and row index overhead
+        SSTableReader ssTable;
+        // errors when creating the writer that may leave empty temp files.
         SSTableWriter writer = cfs.createFlushWriter(columnFamilies.size(), estimatedSize, context);
+        try
+        {
+            // (we can't clear out the map as-we-go to free up memory,
+            //  since the memtable is being used for queries in the "pending flush" category)
+            for (Map.Entry<DecoratedKey, ColumnFamily> entry : columnFamilies.entrySet())
+                writer.append(entry.getKey(), entry.getValue());
 
-        // (we can't clear out the map as-we-go to free up memory,
-        //  since the memtable is being used for queries in the "pending flush" category)
-        for (Map.Entry<DecoratedKey, ColumnFamily> entry : columnFamilies.entrySet())
-            writer.append(entry.getKey(), entry.getValue());
-
-        SSTableReader ssTable = writer.closeAndOpenReader();
+            ssTable = writer.closeAndOpenReader();
+        }
+        finally
+        {
+            writer.cleanupIfNecessary();
+        }
         logger.info(String.format("Completed flushing %s (%d bytes)",
                                   ssTable.getFilename(), new File(ssTable.getFilename()).length()));
         return ssTable;

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Thu Jun 23 05:49:35 2011
@@ -469,131 +469,142 @@ public class CompactionManager implement
                 assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex;
             }
 
+            // errors when creating the writer may leave empty temp files.
             SSTableWriter writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, null, Collections.singletonList(sstable));
+            SSTableReader newSstable = null;
             executor.beginCompaction(new ScrubInfo(dataFile, sstable));
             int goodRows = 0, badRows = 0, emptyRows = 0;
 
-            while (!dataFile.isEOF())
+            try
             {
-                long rowStart = dataFile.getFilePointer();
-                if (logger.isDebugEnabled())
-                    logger.debug("Reading row at " + rowStart);
-
-                DecoratedKey key = null;
-                long dataSize = -1;
-                try
+                while (!dataFile.isEOF())
                 {
-                    key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, ByteBufferUtil.readWithShortLength(dataFile));
-                    dataSize = sstable.descriptor.hasIntRowSize ? dataFile.readInt() : dataFile.readLong();
+                    long rowStart = dataFile.getFilePointer();
                     if (logger.isDebugEnabled())
-                        logger.debug(String.format("row %s is %s bytes", ByteBufferUtil.bytesToHex(key.key), dataSize));
-                }
-                catch (Throwable th)
-                {
-                    throwIfFatal(th);
-                    // check for null key below
-                }
-
-                ByteBuffer currentIndexKey = nextIndexKey;
-                long nextRowPositionFromIndex;
-                try
-                {
-                    nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile);
-                    nextRowPositionFromIndex = indexFile.isEOF() ? dataFile.length() : indexFile.readLong();
-                }
-                catch (Throwable th)
-                {
-                    logger.warn("Error reading index file", th);
-                    nextIndexKey = null;
-                    nextRowPositionFromIndex = dataFile.length();
-                }
-
-                long dataStart = dataFile.getFilePointer();
-                long dataStartFromIndex = currentIndexKey == null
-                                        ? -1
-                                        : rowStart + 2 + currentIndexKey.remaining() + (sstable.descriptor.hasIntRowSize ? 4 : 8);
-                long dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex;
-                assert currentIndexKey != null || indexFile.isEOF();
-                if (logger.isDebugEnabled() && currentIndexKey != null)
-                    logger.debug(String.format("Index doublecheck: row %s is %s bytes", ByteBufferUtil.bytesToHex(currentIndexKey),  dataSizeFromIndex));
+                        logger.debug("Reading row at " + rowStart);
 
-                writer.mark();
-                try
-                {
-                    if (key == null)
-                        throw new IOError(new IOException("Unable to read row key from data file"));
-                    if (dataSize > dataFile.length())
-                        throw new IOError(new IOException("Impossible row size " + dataSize));
-                    SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true);
-                    AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
-                    if (compactedRow.isEmpty())
+                    DecoratedKey key = null;
+                    long dataSize = -1;
+                    try
                     {
-                        emptyRows++;
+                        key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, ByteBufferUtil.readWithShortLength(dataFile));
+                        dataSize = sstable.descriptor.hasIntRowSize ? dataFile.readInt() : dataFile.readLong();
+                        if (logger.isDebugEnabled())
+                            logger.debug(String.format("row %s is %s bytes", ByteBufferUtil.bytesToHex(key.key), dataSize));
                     }
-                    else
+                    catch (Throwable th)
                     {
-                        writer.append(compactedRow);
-                        goodRows++;
+                        throwIfFatal(th);
+                        // check for null key below
                     }
-                    if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex)
-                        logger.warn("Row scrubbed successfully but index file contains a different key or row size; consider rebuilding the index as described in http://www.mail-archive.com/user@cassandra.apache.org/msg03325.html");
-                }
-                catch (Throwable th)
-                {
-                    throwIfFatal(th);
-                    logger.warn("Non-fatal error reading row (stacktrace follows)", th);
-                    writer.reset();
 
-                    if (currentIndexKey != null
-                        && (key == null || !key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex || dataSize != dataSizeFromIndex))
+                    ByteBuffer currentIndexKey = nextIndexKey;
+                    long nextRowPositionFromIndex;
+                    try
+                    {
+                        nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile);
+                        nextRowPositionFromIndex = indexFile.isEOF() ? dataFile.length() : indexFile.readLong();
+                    }
+                    catch (Throwable th)
                     {
-                        logger.info(String.format("Retrying from row index; data is %s bytes starting at %s",
-                                                  dataSizeFromIndex, dataStartFromIndex));
-                        key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, currentIndexKey);
-                        try
+                        logger.warn("Error reading index file", th);
+                        nextIndexKey = null;
+                        nextRowPositionFromIndex = dataFile.length();
+                    }
+
+                    long dataStart = dataFile.getFilePointer();
+                    long dataStartFromIndex = currentIndexKey == null
+                                            ? -1
+                                            : rowStart + 2 + currentIndexKey.remaining() + (sstable.descriptor.hasIntRowSize ? 4 : 8);
+                    long dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex;
+                    assert currentIndexKey != null || indexFile.isEOF();
+                    if (logger.isDebugEnabled() && currentIndexKey != null)
+                        logger.debug(String.format("Index doublecheck: row %s is %s bytes", ByteBufferUtil.bytesToHex(currentIndexKey),  dataSizeFromIndex));
+
+                    writer.mark();
+                    try
+                    {
+                        if (key == null)
+                            throw new IOError(new IOException("Unable to read row key from data file"));
+                        if (dataSize > dataFile.length())
+                            throw new IOError(new IOException("Impossible row size " + dataSize));
+                        SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true);
+                        AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
+                        if (compactedRow.isEmpty())
+                        {
+                            emptyRows++;
+                        }
+                        else
+                        {
+                            writer.append(compactedRow);
+                            goodRows++;
+                        }
+                        if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex)
+                            logger.warn("Row scrubbed successfully but index file contains a different key or row size; consider rebuilding the index as described in http://www.mail-archive.com/user@cassandra.apache.org/msg03325.html");
+                    }
+                    catch (Throwable th)
+                    {
+                        throwIfFatal(th);
+                        logger.warn("Non-fatal error reading row (stacktrace follows)", th);
+                        writer.reset();
+
+                        if (currentIndexKey != null
+                            && (key == null || !key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex || dataSize != dataSizeFromIndex))
                         {
-                            SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex, dataSizeFromIndex, true);
-                            AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
-                            if (compactedRow.isEmpty())
+                            logger.info(String.format("Retrying from row index; data is %s bytes starting at %s",
+                                                      dataSizeFromIndex, dataStartFromIndex));
+                            key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, currentIndexKey);
+                            try
                             {
-                                emptyRows++;
+                                SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex, dataSizeFromIndex, true);
+                                AbstractCompactedRow compactedRow = controller.getCompactedRow(row);
+                                if (compactedRow.isEmpty())
+                                {
+                                    emptyRows++;
+                                }
+                                else
+                                {
+                                    writer.append(compactedRow);
+                                    goodRows++;
+                                }
                             }
-                            else
+                            catch (Throwable th2)
                             {
-                                writer.append(compactedRow);
-                                goodRows++;
+                                throwIfFatal(th2);
+                                // Skipping rows is dangerous for counters (see CASSANDRA-2759)
+                                if (isCommutative)
+                                    throw new IOError(th2);
+
+                                logger.warn("Retry failed too.  Skipping to next row (retry's stacktrace follows)", th2);
+                                writer.reset();
+                                dataFile.seek(nextRowPositionFromIndex);
+                                badRows++;
                             }
                         }
-                        catch (Throwable th2)
+                        else
                         {
-                            throwIfFatal(th2);
                             // Skipping rows is dangerous for counters (see CASSANDRA-2759)
                             if (isCommutative)
-                                throw new IOError(th2);
+                                throw new IOError(th);
 
-                            logger.warn("Retry failed too.  Skipping to next row (retry's stacktrace follows)", th2);
-                            writer.reset();
-                            dataFile.seek(nextRowPositionFromIndex);
+                            logger.warn("Row at " + dataStart + " is unreadable; skipping to next");
+                            if (currentIndexKey != null)
+                                dataFile.seek(nextRowPositionFromIndex);
                             badRows++;
                         }
                     }
-                    else
-                    {
-                        // Skipping rows is dangerous for counters (see CASSANDRA-2759)
-                        if (isCommutative)
-                            throw new IOError(th);
-
-                        logger.warn("Row at " + dataStart + " is unreadable; skipping to next");
-                        if (currentIndexKey != null)
-                            dataFile.seek(nextRowPositionFromIndex);
-                        badRows++;
-                    }
                 }
+
+                if (writer.getFilePointer() > 0)
+                    newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
+            }
+            finally
+            {
+                writer.cleanupIfNecessary();
             }
 
-            if (writer.getFilePointer() > 0)
+            if (newSstable != null)
             {
-                SSTableReader newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
                 cfs.replaceCompactedSSTables(Arrays.asList(sstable), Arrays.asList(newSstable));
                 logger.info("Scrub of " + sstable + " complete: " + goodRows + " rows in new sstable and " + emptyRows + " empty (tombstoned) rows dropped");
                 if (badRows > 0)
@@ -652,6 +663,7 @@ public class CompactionManager implement
               logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
 
             SSTableWriter writer = null;
+            SSTableReader newSstable = null;
 
             logger.info("Cleaning up " + sstable);
             // Calculate the expected compacted filesize
@@ -691,17 +703,21 @@ public class CompactionManager implement
                         }
                     }
                 }
+                if (writer != null)
+                    newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
             }
             finally
             {
                 scanner.close();
                 executor.finishCompaction(ci);
+                if (writer != null)
+                    writer.cleanupIfNecessary();
+                executor.finishCompaction(ci);
             }
 
             List<SSTableReader> results = new ArrayList<SSTableReader>();
-            if (writer != null)
+            if (newSstable != null)
             {
-                SSTableReader newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
                 results.add(newSstable);
 
                 String format = "Cleaned up to %s.  %,d to %,d (~%d%% of original) bytes for %,d keys.  Time: %,dms.";

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java Thu Jun 23 05:49:35 2011
@@ -127,7 +127,8 @@ public class CompactionTask extends Abst
         if (logger.isDebugEnabled())
             logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
 
-        SSTableWriter writer;
+        SSTableWriter writer = null;
+        final SSTableReader ssTable;
         CompactionIterator ci = new CompactionIterator(type, toCompact, controller); // retain a handle so we can call close()
         Iterator<AbstractCompactedRow> nni = Iterators.filter(ci, Predicates.notNull());
         Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();
@@ -164,15 +165,17 @@ public class CompactionTask extends Abst
                     }
                 }
             }
+            ssTable = writer.closeAndOpenReader(getMaxDataAge(toCompact));
         }
         finally
         {
             ci.close();
             if (collector != null)
                 collector.finishCompaction(ci);
+            if (writer != null)
+                writer.cleanupIfNecessary();
         }
 
-        SSTableReader ssTable = writer.closeAndOpenReader(getMaxDataAge(toCompact));
         cfs.replaceCompactedSSTables(toCompact, Arrays.asList(ssTable));
         for (Entry<DecoratedKey, Long> entry : cachedKeys.entrySet()) // empty if preheat is off
             ssTable.cacheKey(entry.getKey(), entry.getValue());

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java Thu Jun 23 05:49:35 2011
@@ -55,6 +55,21 @@ public class Descriptor
     public final boolean isLatestVersion;
     public final boolean usesOldBloomFilter;
 
+    public enum TempState
+    {
+        LIVE,
+        TEMP,
+        ANY;
+
+        boolean isMatch(Descriptor descriptor)
+        {
+            assert descriptor != null;
+            if (TempState.ANY == this)
+                return true;
+            return (TempState.TEMP == this) ? descriptor.temporary : !descriptor.temporary;
+        }
+    }
+
     /**
      * A descriptor that assumes CURRENT_VERSION.
      */

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java Thu Jun 23 05:49:35 2011
@@ -21,7 +21,6 @@ package org.apache.cassandra.io.sstable;
 
 import java.io.File;
 import java.io.FilenameFilter;
-import java.io.IOError;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
@@ -137,26 +136,20 @@ public abstract class SSTable
      *
      * @return true if the file was deleted
      */
-    public static boolean delete(Descriptor desc, Set<Component> components)
+    public static boolean delete(Descriptor desc, Set<Component> components) throws IOException
     {
-        try
-        {
-            // remove the DATA component first if it exists
-            if (components.contains(Component.DATA))
-                FileUtils.deleteWithConfirm(desc.filenameFor(Component.DATA));
-            for (Component component : components)
-            {
-                if (component.equals(Component.DATA) || component.equals(Component.COMPACTED_MARKER))
-                    continue;
-                FileUtils.deleteWithConfirm(desc.filenameFor(component));
-            }
-            // remove the COMPACTED_MARKER component last if it exists
-            FileUtils.delete(desc.filenameFor(Component.COMPACTED_MARKER));
-        }
-        catch (IOException e)
+        // remove the DATA component first if it exists
+        if (components.contains(Component.DATA))
+            FileUtils.deleteWithConfirm(desc.filenameFor(Component.DATA));
+        for (Component component : components)
         {
-            throw new IOError(e);
+            if (component.equals(Component.DATA) || component.equals(Component.COMPACTED_MARKER))
+                continue;
+            FileUtils.deleteWithConfirm(desc.filenameFor(component));
         }
+        // remove the COMPACTED_MARKER component last if it exists
+        FileUtils.delete(desc.filenameFor(Component.COMPACTED_MARKER));
+
         logger.debug("Deleted {}", desc);
         return true;
     }
@@ -196,7 +189,7 @@ public abstract class SSTable
     /**
      * Discovers existing components for the descriptor. Slow: only intended for use outside the critical path.
      */
-    static Set<Component> componentsFor(final Descriptor desc, final boolean liveOnly)
+    static Set<Component> componentsFor(final Descriptor desc, final Descriptor.TempState matchState)
     {
         final Set<Component> components = new HashSet<Component>();
         desc.directory.list(new FilenameFilter()
@@ -204,7 +197,7 @@ public abstract class SSTable
             public boolean accept(File dir, String name)
             {
                 Pair<Descriptor,Component> component = tryComponentFromFilename(dir, name);
-                if (component != null && component.left.equals(desc) && (!liveOnly || !component.left.temporary))
+                if (component != null && component.left.equals(desc) && (matchState.isMatch(component.left)))
                     components.add(component.right);
                 return false;
             }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingReference.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingReference.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingReference.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingReference.java Thu Jun 23 05:49:35 2011
@@ -20,6 +20,7 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.File;
+import java.io.IOError;
 import java.io.IOException;
 import java.lang.ref.PhantomReference;
 import java.lang.ref.ReferenceQueue;
@@ -94,7 +95,15 @@ public class SSTableDeletingReference ex
                 }
             }
             // let the remainder be cleaned up by delete
-            SSTable.delete(desc, Sets.difference(components, Collections.singleton(Component.DATA)));
+            try
+            {
+                SSTable.delete(desc, Sets.difference(components, Collections.singleton(Component.DATA)));
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+
             tracker.spaceReclaimed(size);
         }
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Thu Jun 23 05:49:35 2011
@@ -138,7 +138,7 @@ public class SSTableReader extends SSTab
 
     public static SSTableReader open(Descriptor desc) throws IOException
     {
-        Set<Component> components = componentsFor(desc, false);
+        Set<Component> components = componentsFor(desc, Descriptor.TempState.ANY);
         return open(desc, components, DatabaseDescriptor.getCFMetaData(desc.ksname, desc.cfname), StorageService.getPartitioner());
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Thu Jun 23 05:49:35 2011
@@ -168,6 +168,26 @@ public class SSTableWriter extends SSTab
         afterAppend(decoratedKey, currentPosition);
     }
 
+    /**
+     * Attempt to close the index writer and data file before deleting all temp components for the sstable
+     */
+    public void cleanupIfNecessary()
+    {
+        FileUtils.closeQuietly(iwriter);
+        FileUtils.closeQuietly(dataFile);
+
+        try
+        {
+            Set<Component> components = SSTable.componentsFor(descriptor, Descriptor.TempState.TEMP);
+            if (!components.isEmpty())
+                SSTable.delete(descriptor, components);
+        }
+        catch (Exception e)
+        {
+            logger.error(String.format("Failed deleting temp components for %s", descriptor), e);
+        }
+    }
+
     public SSTableReader closeAndOpenReader() throws IOException
     {
         return closeAndOpenReader(System.currentTimeMillis());
@@ -300,26 +320,53 @@ public class SSTableWriter extends SSTab
 
         public SSTableReader build() throws IOException
         {
-            if (cfs.isInvalid())
-                return null;
-            maybeOpenIndexer();
+            try
+            {
+                if (cfs.isInvalid())
+                    return null;
+                maybeOpenIndexer();
+
+                File ifile = new File(desc.filenameFor(SSTable.COMPONENT_INDEX));
+                File ffile = new File(desc.filenameFor(SSTable.COMPONENT_FILTER));
+                assert !ifile.exists();
+                assert !ffile.exists();
 
-            File ifile = new File(desc.filenameFor(SSTable.COMPONENT_INDEX));
-            File ffile = new File(desc.filenameFor(SSTable.COMPONENT_FILTER));
-            assert !ifile.exists();
-            assert !ffile.exists();
+                long estimatedRows = indexer.prepareIndexing();
 
-            long estimatedRows = indexer.prepareIndexing();
+                // build the index and filter
+                long rows = indexer.index();
+
+                logger.debug("estimated row count was {} of real count", ((double)estimatedRows) / rows);
+                return SSTableReader.open(rename(desc, SSTable.componentsFor(desc, Descriptor.TempState.ANY)));
+            }
+            finally
+            {
+                cleanupIfNecessary();
+            }
+        }
 
-            // build the index and filter
-            long rows = indexer.index();
+        /**
+        * Attempt to close the index writer before deleting all temp components for the sstable
+        */
+        public void cleanupIfNecessary()
+        {
+            FileUtils.closeQuietly(indexer);
 
-            logger.debug("estimated row count was {} of real count", ((double)estimatedRows) / rows);
-            return SSTableReader.open(rename(desc, SSTable.componentsFor(desc, false)));
+            try
+            {
+                Set<Component> components = SSTable.componentsFor(desc, Descriptor.TempState.TEMP);
+                if (!components.isEmpty())
+                    SSTable.delete(desc, components);
+            }
+            catch (Exception e)
+            {
+                logger.error(String.format("Failed deleting temp components for %s", desc), e);
+            }
         }
+
     }
 
-    static class RowIndexer
+    static class RowIndexer implements Closeable
     {
         protected final Descriptor desc;
         public final BufferedRandomAccessFile dfile;
@@ -376,7 +423,7 @@ public class SSTableWriter extends SSTab
             }
         }
 
-        void close() throws IOException
+        public void close() throws IOException
         {
             dfile.close();
             iwriter.close();
@@ -465,6 +512,11 @@ public class SSTableWriter extends SSTab
             writeMetadata(desc, rowSizes, columnCounts, ReplayPosition.NONE);
             return rows;
         }
+
+        public String toString()
+        {
+            return "RowIndexer(" + desc + ")";
+        }
     }
 
     /*
@@ -533,7 +585,7 @@ public class SSTableWriter extends SSTab
         }
 
         @Override
-        void close() throws IOException
+        public void close() throws IOException
         {
             super.close();
             writerDfile.close();
@@ -543,7 +595,7 @@ public class SSTableWriter extends SSTab
     /**
      * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed.
      */
-    static class IndexWriter
+    static class IndexWriter implements Closeable
     {
         private final BufferedRandomAccessFile indexFile;
         public final Descriptor desc;
@@ -610,5 +662,10 @@ public class SSTableWriter extends SSTab
             // we assume that if that worked then we won't be trying to reset.
             indexFile.reset(mark);
         }
+
+        public String toString()
+        {
+            return "IndexWriter(" + desc + ")";
+        }
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/FileUtils.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/FileUtils.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/FileUtils.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/FileUtils.java Thu Jun 23 05:49:35 2011
@@ -92,7 +92,7 @@ public class FileUtils
         }
         catch (Exception e)
         {
-            logger_.warn("Failed closing stream", e);
+            logger_.warn("Failed closing " + c, e);
         }
     }
 

Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java Thu Jun 23 05:49:35 2011
@@ -75,9 +75,9 @@ public class SSTableTest extends Cleanup
         ssTable = SSTableReader.open(ssTable.descriptor); // read the index from disk
         verifyMany(ssTable, map);
 
-        Set<Component> live = SSTable.componentsFor(ssTable.descriptor, true);
+        Set<Component> live = SSTable.componentsFor(ssTable.descriptor, Descriptor.TempState.LIVE);
         assert !live.isEmpty() : "SSTable has live components";
-        Set<Component> all = SSTable.componentsFor(ssTable.descriptor, false);
+        Set<Component> all = SSTable.componentsFor(ssTable.descriptor, Descriptor.TempState.ANY);
         assert live.equals(all) : "live components same as all components";
         all.removeAll(live);
         assert all.isEmpty() : "SSTable has no temp components";