You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/12 01:52:17 UTC

svn commit: r773729 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/db: ColumnFamilyStore.java MemtableManager.java MinorCompactionManager.java

Author: jbellis
Date: Mon May 11 23:52:17 2009
New Revision: 773729

URL: http://svn.apache.org/viewvc?rev=773729&view=rev
Log:
more cleanup, mostly by reducing nesting level where possible.  patch by jbellis

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MemtableManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=773729&r1=773728&r2=773729&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon May 11 23:52:17 2009
@@ -756,7 +756,7 @@
         }
     }
 
-    PriorityQueue<FileStruct> initializePriorityQueue(List<String> files, List<Range> ranges, int minBufferSize)
+    private PriorityQueue<FileStruct> initializePriorityQueue(List<String> files, List<Range> ranges, int minBufferSize)
     {
         PriorityQueue<FileStruct> pq = new PriorityQueue<FileStruct>();
         if (files.size() > 1 || (ranges != null && files.size() > 0))
@@ -787,7 +787,7 @@
                     }
                     catch (Exception e)
                     {
-                        logger_.warn("Unable to close file :" + file);
+                        logger_.error("Unable to close file :" + file);
                     }
                 }
             }
@@ -964,7 +964,7 @@
         return maxFile;
     }
 
-    boolean doAntiCompaction(List<Range> ranges, EndPoint target, List<String> fileList)
+    boolean doAntiCompaction(List<Range> ranges, EndPoint target, List<String> fileList) throws IOException
     {
         isCompacting_.set(true);
         List<String> files = new ArrayList<String>(ssTables_);
@@ -992,7 +992,7 @@
      *
      * @throws IOException
      */
-    void doCleanupCompaction()
+    void doCleanupCompaction() throws IOException
     {
         isCompacting_.set(true);
         List<String> files = new ArrayList<String>(ssTables_);
@@ -1016,7 +1016,7 @@
      * @throws IOException
      */
     /* TODO: Take care of the comments later. */
-    void doCleanup(String file)
+    void doCleanup(String file) throws IOException
     {
         if (file == null)
         {
@@ -1065,7 +1065,7 @@
      * @return
      * @throws IOException
      */
-    boolean doFileAntiCompaction(List<String> files, List<Range> ranges, EndPoint target, List<String> fileList, List<BloomFilter> compactedBloomFilters)
+    boolean doFileAntiCompaction(List<String> files, List<Range> ranges, EndPoint target, List<String> fileList, List<BloomFilter> compactedBloomFilters) throws IOException
     {
         boolean result = false;
         long startTime = System.currentTimeMillis();
@@ -1076,188 +1076,175 @@
         String rangeFileLocation;
         String mergedFileName;
         IPartitioner p = StorageService.getPartitioner();
-        try
+        // Calculate the expected compacted filesize
+        long expectedRangeFileSize = getExpectedCompactedFileSize(files);
+        /* in the worst case a node will be giving out alf of its data so we take a chance */
+        expectedRangeFileSize = expectedRangeFileSize / 2;
+        rangeFileLocation = DatabaseDescriptor.getCompactionFileLocation(expectedRangeFileSize);
+        // If the compaction file path is null that means we have no space left for this compaction.
+        if (rangeFileLocation == null)
         {
-            // Calculate the expected compacted filesize
-            long expectedRangeFileSize = getExpectedCompactedFileSize(files);
-            /* in the worst case a node will be giving out alf of its data so we take a chance */
-            expectedRangeFileSize = expectedRangeFileSize / 2;
-            rangeFileLocation = DatabaseDescriptor.getCompactionFileLocation(expectedRangeFileSize);
-            // If the compaction file path is null that means we have no space left for this compaction.
-            if (rangeFileLocation == null)
-            {
-                logger_.warn("Total bytes to be written for range compaction  ..."
-                             + expectedRangeFileSize + "   is greater than the safe limit of the disk space available.");
-                return result;
-            }
-            PriorityQueue<FileStruct> pq = initializePriorityQueue(files, ranges, ColumnFamilyStore.BUFSIZE);
+            logger_.warn("Total bytes to be written for range compaction  ..."
+                         + expectedRangeFileSize + "   is greater than the safe limit of the disk space available.");
+            return result;
+        }
+        PriorityQueue<FileStruct> pq = initializePriorityQueue(files, ranges, ColumnFamilyStore.BUFSIZE);
+        if (pq.isEmpty())
+        {
+            return result;
+        }
+
+        mergedFileName = getTempFileName();
+        SSTable ssTableRange = null;
+        String lastkey = null;
+        List<FileStruct> lfs = new ArrayList<FileStruct>();
+        DataOutputBuffer bufOut = new DataOutputBuffer();
+        int expectedBloomFilterSize = SSTable.getApproximateKeyCount(files);
+        expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? expectedBloomFilterSize : SSTable.indexInterval();
+        logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
+        /* Create the bloom filter for the compacted file. */
+        BloomFilter compactedRangeBloomFilter = new BloomFilter(expectedBloomFilterSize, 15);
+        List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
+
+        while (pq.size() > 0 || lfs.size() > 0)
+        {
+            FileStruct fs = null;
             if (pq.size() > 0)
             {
-                mergedFileName = getTempFileName();
-                SSTable ssTableRange = null;
-                String lastkey = null;
-                List<FileStruct> lfs = new ArrayList<FileStruct>();
-                DataOutputBuffer bufOut = new DataOutputBuffer();
-                int expectedBloomFilterSize = SSTable.getApproximateKeyCount(files);
-                expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? expectedBloomFilterSize : SSTable.indexInterval();
-                logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
-                /* Create the bloom filter for the compacted file. */
-                BloomFilter compactedRangeBloomFilter = new BloomFilter(expectedBloomFilterSize, 15);
-                List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
-
-                while (pq.size() > 0 || lfs.size() > 0)
-                {
-                    FileStruct fs = null;
-                    if (pq.size() > 0)
-                    {
-                        fs = pq.poll();
-                    }
-                    if (fs != null
-                        && (lastkey == null || lastkey.equals(fs.getKey())))
-                    {
-                        // The keys are the same so we need to add this to the
-                        // ldfs list
-                        lastkey = fs.getKey();
-                        lfs.add(fs);
-                    }
-                    else
-                    {
-                        Collections.sort(lfs, new FileStructComparator());
-                        ColumnFamily columnFamily;
-                        bufOut.reset();
-                        if (lfs.size() > 1)
+                fs = pq.poll();
+            }
+            if (fs != null
+                && (lastkey == null || lastkey.equals(fs.getKey())))
+            {
+                // The keys are the same so we need to add this to the
+                // ldfs list
+                lastkey = fs.getKey();
+                lfs.add(fs);
+            }
+            else
+            {
+                Collections.sort(lfs, new FileStructComparator());
+                ColumnFamily columnFamily;
+                bufOut.reset();
+                if (lfs.size() > 1)
+                {
+                    for (FileStruct filestruct : lfs)
+                    {
+                        try
                         {
-                            for (FileStruct filestruct : lfs)
-                            {
-                                try
-                                {
-                                    /* read the length although we don't need it */
-                                    filestruct.getBufIn().readInt();
-                                    // Skip the Index
-                                    IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
-                                    // We want to add only 2 and resolve them right there in order to save on memory footprint
-                                    if (columnFamilies.size() > 1)
-                                    {
-                                        // Now merge the 2 column families
-                                        merge(columnFamilies);
-                                    }
-                                    // deserialize into column families
-                                    columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
-                                }
-                                catch (Exception ex)
-                                {
-                                    logger_.warn(LogUtil.throwableToString(ex));
-                                }
-                            }
-                            // Now after merging all crap append to the sstable
-                            columnFamily = resolveAndRemoveDeleted(columnFamilies);
-                            columnFamilies.clear();
-                            if (columnFamily != null)
+                            /* read the length although we don't need it */
+                            filestruct.getBufIn().readInt();
+                            // Skip the Index
+                            IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
+                            // We want to add only 2 and resolve them right there in order to save on memory footprint
+                            if (columnFamilies.size() > 1)
                             {
-                                /* serialize the cf with column indexes */
-                                ColumnFamily.serializerWithIndexes().serialize(columnFamily, bufOut);
+                                // Now merge the 2 column families
+                                merge(columnFamilies);
                             }
+                            // deserialize into column families
+                            columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
                         }
-                        else
+                        catch (Exception ex)
                         {
-                            FileStruct filestruct = lfs.get(0);
-                            try
-                            {
-                                /* read the length although we don't need it */
-                                int size = filestruct.getBufIn().readInt();
-                                bufOut.write(filestruct.getBufIn(), size);
-                            }
-                            catch (Exception ex)
-                            {
-                                logger_.warn(LogUtil.throwableToString(ex));
-                                filestruct.close();
-                                continue;
-                            }
+                            logger_.warn(LogUtil.throwableToString(ex));
                         }
-                        if (Range.isTokenInRanges(StorageService.getPartitioner().getInitialToken(lastkey), ranges))
+                    }
+                    // Now after merging all crap append to the sstable
+                    columnFamily = resolveAndRemoveDeleted(columnFamilies);
+                    columnFamilies.clear();
+                    if (columnFamily != null)
+                    {
+                        /* serialize the cf with column indexes */
+                        ColumnFamily.serializerWithIndexes().serialize(columnFamily, bufOut);
+                    }
+                }
+                else
+                {
+                    FileStruct filestruct = lfs.get(0);
+                    /* read the length although we don't need it */
+                    int size = filestruct.getBufIn().readInt();
+                    bufOut.write(filestruct.getBufIn(), size);
+                }
+                if (Range.isTokenInRanges(StorageService.getPartitioner().getInitialToken(lastkey), ranges))
+                {
+                    if (ssTableRange == null)
+                    {
+                        if (target != null)
                         {
-                            if (ssTableRange == null)
-                            {
-                                if (target != null)
-                                {
-                                    rangeFileLocation = rangeFileLocation + System.getProperty("file.separator") + "bootstrap";
-                                }
-                                FileUtils.createDirectory(rangeFileLocation);
-                                ssTableRange = new SSTable(rangeFileLocation, mergedFileName, StorageService.getPartitioner());
-                            }
-                            try
-                            {
-                                ssTableRange.append(lastkey, bufOut);
-                                compactedRangeBloomFilter.add(lastkey);
-                            }
-                            catch (Exception ex)
-                            {
-                                logger_.warn(LogUtil.throwableToString(ex));
-                            }
+                            rangeFileLocation = rangeFileLocation + System.getProperty("file.separator") + "bootstrap";
+                        }
+                        FileUtils.createDirectory(rangeFileLocation);
+                        ssTableRange = new SSTable(rangeFileLocation, mergedFileName, StorageService.getPartitioner());
+                    }
+                    try
+                    {
+                        ssTableRange.append(lastkey, bufOut);
+                        compactedRangeBloomFilter.add(lastkey);
+                    }
+                    catch (Exception ex)
+                    {
+                        logger_.warn(LogUtil.throwableToString(ex));
+                    }
+                }
+                totalkeysWritten++;
+                for (FileStruct filestruct : lfs)
+                {
+                    try
+                    {
+                        filestruct.advance();
+                        if (filestruct.isExhausted())
+                        {
+                            continue;
                         }
-                        totalkeysWritten++;
-                        for (FileStruct filestruct : lfs)
+                        /* keep on looping until we find a key in the range */
+                        while (!Range.isTokenInRanges(StorageService.getPartitioner().getInitialToken(filestruct.getKey()), ranges))
                         {
-                            try
-                            {
-                                filestruct.advance();
-                                if (filestruct.isExhausted())
-                                {
-                                    continue;
-                                }
-                                /* keep on looping until we find a key in the range */
-                                while (!Range.isTokenInRanges(StorageService.getPartitioner().getInitialToken(filestruct.getKey()), ranges))
-                                {
-                                    filestruct.advance();
-                                    if (filestruct.isExhausted())
-                                    {
-                                        break;
-                                    }
-                                }
-                                if (!filestruct.isExhausted())
-                                {
-                                    pq.add(filestruct);
-                                }
-                                totalkeysRead++;
-                            }
-                            catch (Exception ex)
+                            filestruct.advance();
+                            if (filestruct.isExhausted())
                             {
-                                // Ignore the exception as it might be a corrupted file
-                                // in any case we have read as far as possible from it
-                                // and it will be deleted after compaction.
-                                logger_.warn(LogUtil.throwableToString(ex));
-                                filestruct.close();
+                                break;
                             }
                         }
-                        lfs.clear();
-                        lastkey = null;
-                        if (fs != null)
+                        if (!filestruct.isExhausted())
                         {
-                            // Add back the fs since we processed the rest of
-                            // filestructs
-                            pq.add(fs);
+                            pq.add(filestruct);
                         }
+                        totalkeysRead++;
                     }
-                }
-
-                if (ssTableRange != null)
-                {
-                    ssTableRange.closeRename(compactedRangeBloomFilter);
-                    if (fileList != null)
+                    catch (Exception ex)
                     {
-                        fileList.add(ssTableRange.getDataFileLocation());
-                    }
-                    if (compactedBloomFilters != null)
-                    {
-                        compactedBloomFilters.add(compactedRangeBloomFilter);
+                        // Ignore the exception as it might be a corrupted file
+                        // in any case we have read as far as possible from it
+                        // and it will be deleted after compaction.
+                        logger_.warn("corrupt sstable?", ex);
+                        filestruct.close();
                     }
                 }
+                lfs.clear();
+                lastkey = null;
+                if (fs != null)
+                {
+                    // Add back the fs since we processed the rest of
+                    // filestructs
+                    pq.add(fs);
+                }
             }
         }
-        catch (Exception ex)
+
+        if (ssTableRange != null)
         {
-            logger_.error(LogUtil.throwableToString(ex));
+            ssTableRange.closeRename(compactedRangeBloomFilter);
+            if (fileList != null)
+            {
+                fileList.add(ssTableRange.getDataFileLocation());
+            }
+            if (compactedBloomFilters != null)
+            {
+                compactedBloomFilters.add(compactedRangeBloomFilter);
+            }
         }
+
         logger_.debug("Total time taken for range split   ..."
                       + (System.currentTimeMillis() - startTime));
         logger_.debug("Total bytes Read for range split  ..." + totalBytesRead);
@@ -1302,157 +1289,154 @@
         long totalkeysWritten = 0;
         PriorityQueue<FileStruct> pq = initializePriorityQueue(files, null, minBufferSize);
 
-        if (pq.size() > 0)
+        if (pq.isEmpty())
         {
-            String mergedFileName = getTempFileName(files);
-            SSTable ssTable = null;
-            String lastkey = null;
-            List<FileStruct> lfs = new ArrayList<FileStruct>();
-            DataOutputBuffer bufOut = new DataOutputBuffer();
-            int expectedBloomFilterSize = SSTable.getApproximateKeyCount(files);
-            expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? expectedBloomFilterSize : SSTable.indexInterval();
-            logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
-            /* Create the bloom filter for the compacted file. */
-            BloomFilter compactedBloomFilter = new BloomFilter(expectedBloomFilterSize, 15);
-            List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
-
-            while (pq.size() > 0 || lfs.size() > 0)
-            {
-                FileStruct fs = null;
-                if (pq.size() > 0)
-                {
-                    fs = pq.poll();
-                }
-                if (fs != null
-                    && (lastkey == null || lastkey.equals(fs.getKey())))
-                {
-                    // The keys are the same so we need to add this to the
-                    // ldfs list
-                    lastkey = fs.getKey();
-                    lfs.add(fs);
-                }
-                else
+            // TODO clean out bad files, if any
+            return 0;
+        }
+
+        String mergedFileName = getTempFileName(files);
+        SSTable ssTable = null;
+        String lastkey = null;
+        List<FileStruct> lfs = new ArrayList<FileStruct>();
+        DataOutputBuffer bufOut = new DataOutputBuffer();
+        int expectedBloomFilterSize = SSTable.getApproximateKeyCount(files);
+        expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? expectedBloomFilterSize : SSTable.indexInterval();
+        logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
+        /* Create the bloom filter for the compacted file. */
+        BloomFilter compactedBloomFilter = new BloomFilter(expectedBloomFilterSize, 15);
+        List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
+
+        while (pq.size() > 0 || lfs.size() > 0)
+        {
+            FileStruct fs = null;
+            if (pq.size() > 0)
+            {
+                fs = pq.poll();
+            }
+            if (fs != null
+                && (lastkey == null || lastkey.equals(fs.getKey())))
+            {
+                // The keys are the same so we need to add this to the
+                // ldfs list
+                lastkey = fs.getKey();
+                lfs.add(fs);
+            }
+            else
+            {
+                Collections.sort(lfs, new FileStructComparator());
+                ColumnFamily columnFamily;
+                bufOut.reset();
+                if (lfs.size() > 1)
                 {
-                    Collections.sort(lfs, new FileStructComparator());
-                    ColumnFamily columnFamily;
-                    bufOut.reset();
-                    if (lfs.size() > 1)
-                    {
-                        for (FileStruct filestruct : lfs)
-                        {
-                            try
-                            {
-                                /* read the length although we don't need it */
-                                filestruct.getBufIn().readInt();
-                                // Skip the Index
-                                IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
-                                // We want to add only 2 and resolve them right there in order to save on memory footprint
-                                if (columnFamilies.size() > 1)
-                                {
-                                    merge(columnFamilies);
-                                }
-                                // deserialize into column families
-                                columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
-                            }
-                            catch (Exception ex)
-                            {
-                                logger_.warn("error in filecompaction", ex);
-                            }
-                        }
-                        // Now after merging all crap append to the sstable
-                        columnFamily = resolveAndRemoveDeleted(columnFamilies);
-                        columnFamilies.clear();
-                        if (columnFamily != null)
-                        {
-                            /* serialize the cf with column indexes */
-                            ColumnFamily.serializerWithIndexes().serialize(columnFamily, bufOut);
-                        }
-                    }
-                    else
+                    for (FileStruct filestruct : lfs)
                     {
-                        FileStruct filestruct = lfs.get(0);
                         try
                         {
                             /* read the length although we don't need it */
-                            int size = filestruct.getBufIn().readInt();
-                            bufOut.write(filestruct.getBufIn(), size);
+                            filestruct.getBufIn().readInt();
+                            // Skip the Index
+                            IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
+                            // We want to add only 2 and resolve them right there in order to save on memory footprint
+                            if (columnFamilies.size() > 1)
+                            {
+                                merge(columnFamilies);
+                            }
+                            // deserialize into column families
+                            columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
                         }
                         catch (Exception ex)
                         {
-                            logger_.error("empty sstable file " + filestruct.getFileName(), ex);
-                            filestruct.close();
-                            continue;
+                            logger_.warn("error in filecompaction", ex);
                         }
                     }
-
-                    if (ssTable == null)
+                    // Now after merging all crap append to the sstable
+                    columnFamily = resolveAndRemoveDeleted(columnFamilies);
+                    columnFamilies.clear();
+                    if (columnFamily != null)
                     {
-                        ssTable = new SSTable(compactionFileLocation, mergedFileName, StorageService.getPartitioner());
+                        /* serialize the cf with column indexes */
+                        ColumnFamily.serializerWithIndexes().serialize(columnFamily, bufOut);
                     }
-                    ssTable.append(lastkey, bufOut);
+                }
+                else
+                {
+                    FileStruct filestruct = lfs.get(0);
+                    /* read the length although we don't need it */
+                    int size = filestruct.getBufIn().readInt();
+                    bufOut.write(filestruct.getBufIn(), size);
+                }
 
-                    /* Fill the bloom filter with the key */
-                    doFill(compactedBloomFilter, lastkey);
-                    totalkeysWritten++;
-                    for (FileStruct filestruct : lfs)
+                if (ssTable == null)
+                {
+                    ssTable = new SSTable(compactionFileLocation, mergedFileName, StorageService.getPartitioner());
+                }
+                ssTable.append(lastkey, bufOut);
+
+                /* Fill the bloom filter with the key */
+                doFill(compactedBloomFilter, lastkey);
+                totalkeysWritten++;
+                for (FileStruct filestruct : lfs)
+                {
+                    try
                     {
-                        try
+                        filestruct.advance();
+                        if (filestruct.isExhausted())
                         {
-                            filestruct.advance();
-                            if (filestruct.isExhausted())
-                            {
-                                continue;
-                            }
-                            pq.add(filestruct);
-                            totalkeysRead++;
-                        }
-                        catch (Throwable ex)
-                        {
-                            // Ignore the exception as it might be a corrupted file
-                            // in any case we have read as far as possible from it
-                            // and it will be deleted after compaction.
-                            filestruct.close();
+                            continue;
                         }
+                        pq.add(filestruct);
+                        totalkeysRead++;
                     }
-                    lfs.clear();
-                    lastkey = null;
-                    if (fs != null)
+                    catch (Throwable ex)
                     {
-                        /* Add back the fs since we processed the rest of filestructs */
-                        pq.add(fs);
+                        // Ignore the exception as it might be a corrupted file
+                        // in any case we have read as far as possible from it
+                        // and it will be deleted after compaction.
+                        logger_.warn("corrupt sstable?", ex);
+                        filestruct.close();
                     }
                 }
-            }
-            if (ssTable != null)
-            {
-                ssTable.closeRename(compactedBloomFilter);
-                newfile = ssTable.getDataFileLocation();
-            }
-            lock_.writeLock().lock();
-            try
-            {
-                for (String file : files)
+                lfs.clear();
+                lastkey = null;
+                if (fs != null)
                 {
-                    ssTables_.remove(file);
-                    SSTable.removeAssociatedBloomFilter(file);
-                }
-                if (newfile != null)
-                {
-                    logger_.debug("Inserting bloom filter for file " + newfile);
-                    SSTable.storeBloomFilter(newfile, compactedBloomFilter);
-                    ssTables_.add(newfile);
-                    totalBytesWritten += (new File(newfile)).length();
+                    /* Add back the fs since we processed the rest of filestructs */
+                    pq.add(fs);
                 }
             }
-            finally
+        }
+        if (ssTable != null)
+        {
+            // TODO if all the keys were the same nothing will be done here
+            ssTable.closeRename(compactedBloomFilter);
+            newfile = ssTable.getDataFileLocation();
+        }
+        lock_.writeLock().lock();
+        try
+        {
+            for (String file : files)
             {
-                lock_.writeLock().unlock();
+                ssTables_.remove(file);
+                SSTable.removeAssociatedBloomFilter(file);
             }
-            for (String file : files)
+            if (newfile != null)
             {
-                SSTable.delete(file);
+                logger_.debug("Inserting bloom filter for file " + newfile);
+                SSTable.storeBloomFilter(newfile, compactedBloomFilter);
+                ssTables_.add(newfile);
+                totalBytesWritten += (new File(newfile)).length();
             }
         }
+        finally
+        {
+            lock_.writeLock().unlock();
+        }
+        for (String file : files)
+        {
+            SSTable.delete(file);
+        }
+
         String format = "Compacted [%s] to %s.  %d/%d bytes for %d/%d keys read/written.  Time: %dms.";
         long dTime = System.currentTimeMillis() - startTime;
         logger_.info(String.format(format, StringUtils.join(files, ", "), newfile, totalBytesRead, totalBytesWritten, totalkeysRead, totalkeysWritten, dTime));

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MemtableManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MemtableManager.java?rev=773729&r1=773728&r2=773729&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MemtableManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MemtableManager.java Mon May 11 23:52:17 2009
@@ -124,31 +124,23 @@
     */
     void getColumnFamily(String key, String cfName, String cf, IFilter filter, List<ColumnFamily> columnFamilies)
     {
-    	rwLock_.readLock().lock();
-    	try
-    	{
-	        /* Get all memtables associated with this column family */
-	        List<Memtable> memtables = history_.get(cfName);
-	        if ( memtables != null )
-	        {
-		        Collections.sort(memtables);
-	        	int size = memtables.size();
-	            for ( int i = size - 1; i >= 0; --i  )
-	            {
-	                ColumnFamily columnFamily = memtables.get(i).getLocalCopy(key, cf, filter);
-	                if ( columnFamily != null )
-	                {
-	                    columnFamilies.add(columnFamily);
-	                    if( filter.isDone())
-	                    	break;
-	                }
-	            }
-	        }        
-    	}
-    	finally
-    	{
-        	rwLock_.readLock().unlock();
-    	}
+        List<Memtable> memtables = getUnflushedMemtables(cfName);
+        if ( memtables == null )
+        {
+            return;
+        }
+        Collections.sort(memtables);
+        int size = memtables.size();
+        for ( int i = size - 1; i >= 0; --i  )
+        {
+            ColumnFamily columnFamily = memtables.get(i).getLocalCopy(key, cf, filter);
+            if ( columnFamily != null )
+            {
+                columnFamilies.add(columnFamily);
+                if( filter.isDone())
+                    break;
+            }
+        }
     }
 
     public List<Memtable> getUnflushedMemtables(String cfName)
@@ -161,7 +153,7 @@
             {
                 return new ArrayList<Memtable>(memtables);
             }
-            return Arrays.asList(new Memtable[0]);
+            return Arrays.asList();
         }
         finally
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java?rev=773729&r1=773728&r2=773729&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java Mon May 11 23:52:17 2009
@@ -112,7 +112,14 @@
         {
         	boolean result;
             logger_.debug("Started  compaction ..."+columnFamilyStore_.columnFamily_);
-            result = columnFamilyStore_.doAntiCompaction(ranges_, target_,fileList_);
+            try
+            {
+                result = columnFamilyStore_.doAntiCompaction(ranges_, target_,fileList_);
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
             logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
             return result;
         }
@@ -149,7 +156,14 @@
         public void run()
         {
             logger_.debug("Started  compaction ..."+columnFamilyStore_.columnFamily_);
-            columnFamilyStore_.doCleanupCompaction();
+            try
+            {
+                columnFamilyStore_.doCleanupCompaction();
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
             logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
         }
     }