You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2011/06/15 19:55:31 UTC
svn commit: r1136135 - in /cassandra/branches/cassandra-0.8: CHANGES.txt
src/java/org/apache/cassandra/db/compaction/CompactionManager.java
Author: slebresne
Date: Wed Jun 15 17:55:31 2011
New Revision: 1136135
URL: http://svn.apache.org/viewvc?rev=1136135&view=rev
Log:
Avoids unmarking compacting sstable prematurely during cleanup
patch by slebresne; reviewed by jbellis for CASSANDRA-2769
Modified:
cassandra/branches/cassandra-0.8/CHANGES.txt
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1136135&r1=1136134&r2=1136135&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Wed Jun 15 17:55:31 2011
@@ -57,6 +57,7 @@
(CASSANDRA-2767)
* use threadsafe collections for StreamInSession (CASSANDRA-2766)
* avoid infinite loop when creating merkle tree (CASSANDRA-2758)
+ * avoids unmarking compacting sstable prematurely in cleanup (CASSANDRA-2769)
0.8.0-final
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1136135&r1=1136134&r2=1136135&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Wed Jun 15 17:55:31 2011
@@ -845,56 +845,50 @@ public class CompactionManager implement
logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
SSTableWriter writer = null;
+
+ logger.info("Cleaning up " + sstable);
+ // Calculate the expected compacted filesize
+ long expectedRangeFileSize = cfs.getExpectedCompactedFileSize(Arrays.asList(sstable)) / 2;
+ String compactionFileLocation = table.getDataFileLocation(expectedRangeFileSize);
+ if (compactionFileLocation == null)
+ throw new IOException("disk full");
+
+ SSTableScanner scanner = sstable.getDirectScanner(CompactionIterator.FILE_BUFFER_SIZE);
+ SortedSet<ByteBuffer> indexedColumns = cfs.getIndexedColumns();
+ CleanupInfo ci = new CleanupInfo(sstable, scanner);
+ executor.beginCompaction(ci);
try
{
- logger.info("Cleaning up " + sstable);
- // Calculate the expected compacted filesize
- long expectedRangeFileSize = cfs.getExpectedCompactedFileSize(Arrays.asList(sstable)) / 2;
- String compactionFileLocation = table.getDataFileLocation(expectedRangeFileSize);
- if (compactionFileLocation == null)
- throw new IOException("disk full");
-
- SSTableScanner scanner = sstable.getDirectScanner(CompactionIterator.FILE_BUFFER_SIZE);
- SortedSet<ByteBuffer> indexedColumns = cfs.getIndexedColumns();
- CleanupInfo ci = new CleanupInfo(sstable, scanner);
- executor.beginCompaction(ci);
- try
+ while (scanner.hasNext())
{
- while (scanner.hasNext())
+ SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
+ if (Range.isTokenInRanges(row.getKey().token, ranges))
{
- SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
- if (Range.isTokenInRanges(row.getKey().token, ranges))
- {
- writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, writer, Collections.singletonList(sstable));
- writer.append(controller.getCompactedRow(row));
- totalkeysWritten++;
- }
- else
+ writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize, writer, Collections.singletonList(sstable));
+ writer.append(controller.getCompactedRow(row));
+ totalkeysWritten++;
+ }
+ else
+ {
+ cfs.invalidateCachedRow(row.getKey());
+ if (!indexedColumns.isEmpty() || isCommutative)
{
- cfs.invalidateCachedRow(row.getKey());
- if (!indexedColumns.isEmpty() || isCommutative)
+ while (row.hasNext())
{
- while (row.hasNext())
- {
- IColumn column = row.next();
- if (column instanceof CounterColumn)
- renewer.maybeRenew((CounterColumn) column);
- if (indexedColumns.contains(column.name()))
- Table.cleanupIndexEntry(cfs, row.getKey().key, column);
- }
+ IColumn column = row.next();
+ if (column instanceof CounterColumn)
+ renewer.maybeRenew((CounterColumn) column);
+ if (indexedColumns.contains(column.name()))
+ Table.cleanupIndexEntry(cfs, row.getKey().key, column);
}
}
}
}
- finally
- {
- scanner.close();
- executor.finishCompaction(ci);
- }
}
finally
{
- cfs.getDataTracker().unmarkCompacting(Arrays.asList(sstable));
+ scanner.close();
+ executor.finishCompaction(ci);
}
List<SSTableReader> results = new ArrayList<SSTableReader>();