You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/06/15 19:55:31 UTC

svn commit: r1136135 - in /cassandra/branches/cassandra-0.8: CHANGES.txt src/java/org/apache/cassandra/db/compaction/CompactionManager.java

Author: slebresne
Date: Wed Jun 15 17:55:31 2011
New Revision: 1136135

URL: http://svn.apache.org/viewvc?rev=1136135&view=rev
Log:
Avoids unmarking compacting sstable prematurely during cleanup
patch by slebresne; reviewed by jbellis for CASSANDRA-2769

Modified:
    cassandra/branches/cassandra-0.8/CHANGES.txt
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java

Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1136135&r1=1136134&r2=1136135&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Wed Jun 15 17:55:31 2011
@@ -57,6 +57,7 @@
    (CASSANDRA-2767)
  * use threadsafe collections for StreamInSession (CASSANDRA-2766)
  * avoid infinite loop when creating merkle tree (CASSANDRA-2758)
+ * avoids unmarking compacting sstable prematurely in cleanup (CASSANDRA-2769)
 
 
 0.8.0-final

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1136135&r1=1136134&r2=1136135&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Wed Jun 15 17:55:31 2011
@@ -845,56 +845,50 @@ public class CompactionManager implement
               logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
 
             SSTableWriter writer = null;
+
+            logger.info("Cleaning up " + sstable);
+            // Calculate the expected compacted filesize
+            long expectedRangeFileSize = cfs.getExpectedCompactedFileSize(Arrays.asList(sstable)) / 2;
+            String compactionFileLocation = table.getDataFileLocation(expectedRangeFileSize);
+            if (compactionFileLocation == null)
+                throw new IOException("disk full");
+
+            SSTableScanner scanner = sstable.getDirectScanner(CompactionIterator.FILE_BUFFER_SIZE);
+            SortedSet<ByteBuffer> indexedColumns = cfs.getIndexedColumns();
+            CleanupInfo ci = new CleanupInfo(sstable, scanner);
+            executor.beginCompaction(ci);
             try
             {
-                logger.info("Cleaning up " + sstable);
-                // Calculate the expected compacted filesize
-                long expectedRangeFileSize = cfs.getExpectedCompactedFileSize(Arrays.asList(sstable)) / 2;
-                String compactionFileLocation = table.getDataFileLocation(expectedRangeFileSize);
-                if (compactionFileLocation == null)
-                    throw new IOException("disk full");
-
-                SSTableScanner scanner = sstable.getDirectScanner(CompactionIterator.FILE_BUFFER_SIZE);
-                SortedSet<ByteBuffer> indexedColumns = cfs.getIndexedColumns();
-                CleanupInfo ci = new CleanupInfo(sstable, scanner);
-                executor.beginCompaction(ci);
-                try
+                while (scanner.hasNext())
                 {
-                    while (scanner.hasNext())
+                    SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
+                    if (Range.isTokenInRanges(row.getKey().token, ranges))
                     {
-                        SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
-                        if (Range.isTokenInRanges(row.getKey().token, ranges))
-                        {
-                            writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, writer, Collections.singletonList(sstable));
-                            writer.append(controller.getCompactedRow(row));
-                            totalkeysWritten++;
-                        }
-                        else
+                        writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, writer, Collections.singletonList(sstable));
+                        writer.append(controller.getCompactedRow(row));
+                        totalkeysWritten++;
+                    }
+                    else
+                    {
+                        cfs.invalidateCachedRow(row.getKey());
+                        if (!indexedColumns.isEmpty() || isCommutative)
                         {
-                            cfs.invalidateCachedRow(row.getKey());
-                            if (!indexedColumns.isEmpty() || isCommutative)
+                            while (row.hasNext())
                             {
-                                while (row.hasNext())
-                                {
-                                    IColumn column = row.next();
-                                    if (column instanceof CounterColumn)
-                                        renewer.maybeRenew((CounterColumn) column);
-                                    if (indexedColumns.contains(column.name()))
-                                        Table.cleanupIndexEntry(cfs, row.getKey().key, column);
-                                }
+                                IColumn column = row.next();
+                                if (column instanceof CounterColumn)
+                                    renewer.maybeRenew((CounterColumn) column);
+                                if (indexedColumns.contains(column.name()))
+                                    Table.cleanupIndexEntry(cfs, row.getKey().key, column);
                             }
                         }
                     }
                 }
-                finally
-                {
-                    scanner.close();
-                    executor.finishCompaction(ci);
-                }
             }
             finally
             {
-                cfs.getDataTracker().unmarkCompacting(Arrays.asList(sstable));
+                scanner.close();
+                executor.finishCompaction(ci);
             }
 
             List<SSTableReader> results = new ArrayList<SSTableReader>();