You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/12 01:52:17 UTC
svn commit: r773729 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra/db:
ColumnFamilyStore.java MemtableManager.java MinorCompactionManager.java
Author: jbellis
Date: Mon May 11 23:52:17 2009
New Revision: 773729
URL: http://svn.apache.org/viewvc?rev=773729&view=rev
Log:
more cleanup, mostly by reducing nesting level where possible. patch by jbellis
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MemtableManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=773729&r1=773728&r2=773729&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon May 11 23:52:17 2009
@@ -756,7 +756,7 @@
}
}
- PriorityQueue<FileStruct> initializePriorityQueue(List<String> files, List<Range> ranges, int minBufferSize)
+ private PriorityQueue<FileStruct> initializePriorityQueue(List<String> files, List<Range> ranges, int minBufferSize)
{
PriorityQueue<FileStruct> pq = new PriorityQueue<FileStruct>();
if (files.size() > 1 || (ranges != null && files.size() > 0))
@@ -787,7 +787,7 @@
}
catch (Exception e)
{
- logger_.warn("Unable to close file :" + file);
+ logger_.error("Unable to close file :" + file);
}
}
}
@@ -964,7 +964,7 @@
return maxFile;
}
- boolean doAntiCompaction(List<Range> ranges, EndPoint target, List<String> fileList)
+ boolean doAntiCompaction(List<Range> ranges, EndPoint target, List<String> fileList) throws IOException
{
isCompacting_.set(true);
List<String> files = new ArrayList<String>(ssTables_);
@@ -992,7 +992,7 @@
*
* @throws IOException
*/
- void doCleanupCompaction()
+ void doCleanupCompaction() throws IOException
{
isCompacting_.set(true);
List<String> files = new ArrayList<String>(ssTables_);
@@ -1016,7 +1016,7 @@
* @throws IOException
*/
/* TODO: Take care of the comments later. */
- void doCleanup(String file)
+ void doCleanup(String file) throws IOException
{
if (file == null)
{
@@ -1065,7 +1065,7 @@
* @return
* @throws IOException
*/
- boolean doFileAntiCompaction(List<String> files, List<Range> ranges, EndPoint target, List<String> fileList, List<BloomFilter> compactedBloomFilters)
+ boolean doFileAntiCompaction(List<String> files, List<Range> ranges, EndPoint target, List<String> fileList, List<BloomFilter> compactedBloomFilters) throws IOException
{
boolean result = false;
long startTime = System.currentTimeMillis();
@@ -1076,188 +1076,175 @@
String rangeFileLocation;
String mergedFileName;
IPartitioner p = StorageService.getPartitioner();
- try
+ // Calculate the expected compacted filesize
+ long expectedRangeFileSize = getExpectedCompactedFileSize(files);
+ /* in the worst case a node will be giving out alf of its data so we take a chance */
+ expectedRangeFileSize = expectedRangeFileSize / 2;
+ rangeFileLocation = DatabaseDescriptor.getCompactionFileLocation(expectedRangeFileSize);
+ // If the compaction file path is null that means we have no space left for this compaction.
+ if (rangeFileLocation == null)
{
- // Calculate the expected compacted filesize
- long expectedRangeFileSize = getExpectedCompactedFileSize(files);
- /* in the worst case a node will be giving out alf of its data so we take a chance */
- expectedRangeFileSize = expectedRangeFileSize / 2;
- rangeFileLocation = DatabaseDescriptor.getCompactionFileLocation(expectedRangeFileSize);
- // If the compaction file path is null that means we have no space left for this compaction.
- if (rangeFileLocation == null)
- {
- logger_.warn("Total bytes to be written for range compaction ..."
- + expectedRangeFileSize + " is greater than the safe limit of the disk space available.");
- return result;
- }
- PriorityQueue<FileStruct> pq = initializePriorityQueue(files, ranges, ColumnFamilyStore.BUFSIZE);
+ logger_.warn("Total bytes to be written for range compaction ..."
+ + expectedRangeFileSize + " is greater than the safe limit of the disk space available.");
+ return result;
+ }
+ PriorityQueue<FileStruct> pq = initializePriorityQueue(files, ranges, ColumnFamilyStore.BUFSIZE);
+ if (pq.isEmpty())
+ {
+ return result;
+ }
+
+ mergedFileName = getTempFileName();
+ SSTable ssTableRange = null;
+ String lastkey = null;
+ List<FileStruct> lfs = new ArrayList<FileStruct>();
+ DataOutputBuffer bufOut = new DataOutputBuffer();
+ int expectedBloomFilterSize = SSTable.getApproximateKeyCount(files);
+ expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? expectedBloomFilterSize : SSTable.indexInterval();
+ logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
+ /* Create the bloom filter for the compacted file. */
+ BloomFilter compactedRangeBloomFilter = new BloomFilter(expectedBloomFilterSize, 15);
+ List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
+
+ while (pq.size() > 0 || lfs.size() > 0)
+ {
+ FileStruct fs = null;
if (pq.size() > 0)
{
- mergedFileName = getTempFileName();
- SSTable ssTableRange = null;
- String lastkey = null;
- List<FileStruct> lfs = new ArrayList<FileStruct>();
- DataOutputBuffer bufOut = new DataOutputBuffer();
- int expectedBloomFilterSize = SSTable.getApproximateKeyCount(files);
- expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? expectedBloomFilterSize : SSTable.indexInterval();
- logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
- /* Create the bloom filter for the compacted file. */
- BloomFilter compactedRangeBloomFilter = new BloomFilter(expectedBloomFilterSize, 15);
- List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
-
- while (pq.size() > 0 || lfs.size() > 0)
- {
- FileStruct fs = null;
- if (pq.size() > 0)
- {
- fs = pq.poll();
- }
- if (fs != null
- && (lastkey == null || lastkey.equals(fs.getKey())))
- {
- // The keys are the same so we need to add this to the
- // ldfs list
- lastkey = fs.getKey();
- lfs.add(fs);
- }
- else
- {
- Collections.sort(lfs, new FileStructComparator());
- ColumnFamily columnFamily;
- bufOut.reset();
- if (lfs.size() > 1)
+ fs = pq.poll();
+ }
+ if (fs != null
+ && (lastkey == null || lastkey.equals(fs.getKey())))
+ {
+ // The keys are the same so we need to add this to the
+ // ldfs list
+ lastkey = fs.getKey();
+ lfs.add(fs);
+ }
+ else
+ {
+ Collections.sort(lfs, new FileStructComparator());
+ ColumnFamily columnFamily;
+ bufOut.reset();
+ if (lfs.size() > 1)
+ {
+ for (FileStruct filestruct : lfs)
+ {
+ try
{
- for (FileStruct filestruct : lfs)
- {
- try
- {
- /* read the length although we don't need it */
- filestruct.getBufIn().readInt();
- // Skip the Index
- IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
- // We want to add only 2 and resolve them right there in order to save on memory footprint
- if (columnFamilies.size() > 1)
- {
- // Now merge the 2 column families
- merge(columnFamilies);
- }
- // deserialize into column families
- columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
- }
- catch (Exception ex)
- {
- logger_.warn(LogUtil.throwableToString(ex));
- }
- }
- // Now after merging all crap append to the sstable
- columnFamily = resolveAndRemoveDeleted(columnFamilies);
- columnFamilies.clear();
- if (columnFamily != null)
+ /* read the length although we don't need it */
+ filestruct.getBufIn().readInt();
+ // Skip the Index
+ IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
+ // We want to add only 2 and resolve them right there in order to save on memory footprint
+ if (columnFamilies.size() > 1)
{
- /* serialize the cf with column indexes */
- ColumnFamily.serializerWithIndexes().serialize(columnFamily, bufOut);
+ // Now merge the 2 column families
+ merge(columnFamilies);
}
+ // deserialize into column families
+ columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
}
- else
+ catch (Exception ex)
{
- FileStruct filestruct = lfs.get(0);
- try
- {
- /* read the length although we don't need it */
- int size = filestruct.getBufIn().readInt();
- bufOut.write(filestruct.getBufIn(), size);
- }
- catch (Exception ex)
- {
- logger_.warn(LogUtil.throwableToString(ex));
- filestruct.close();
- continue;
- }
+ logger_.warn(LogUtil.throwableToString(ex));
}
- if (Range.isTokenInRanges(StorageService.getPartitioner().getInitialToken(lastkey), ranges))
+ }
+ // Now after merging all crap append to the sstable
+ columnFamily = resolveAndRemoveDeleted(columnFamilies);
+ columnFamilies.clear();
+ if (columnFamily != null)
+ {
+ /* serialize the cf with column indexes */
+ ColumnFamily.serializerWithIndexes().serialize(columnFamily, bufOut);
+ }
+ }
+ else
+ {
+ FileStruct filestruct = lfs.get(0);
+ /* read the length although we don't need it */
+ int size = filestruct.getBufIn().readInt();
+ bufOut.write(filestruct.getBufIn(), size);
+ }
+ if (Range.isTokenInRanges(StorageService.getPartitioner().getInitialToken(lastkey), ranges))
+ {
+ if (ssTableRange == null)
+ {
+ if (target != null)
{
- if (ssTableRange == null)
- {
- if (target != null)
- {
- rangeFileLocation = rangeFileLocation + System.getProperty("file.separator") + "bootstrap";
- }
- FileUtils.createDirectory(rangeFileLocation);
- ssTableRange = new SSTable(rangeFileLocation, mergedFileName, StorageService.getPartitioner());
- }
- try
- {
- ssTableRange.append(lastkey, bufOut);
- compactedRangeBloomFilter.add(lastkey);
- }
- catch (Exception ex)
- {
- logger_.warn(LogUtil.throwableToString(ex));
- }
+ rangeFileLocation = rangeFileLocation + System.getProperty("file.separator") + "bootstrap";
+ }
+ FileUtils.createDirectory(rangeFileLocation);
+ ssTableRange = new SSTable(rangeFileLocation, mergedFileName, StorageService.getPartitioner());
+ }
+ try
+ {
+ ssTableRange.append(lastkey, bufOut);
+ compactedRangeBloomFilter.add(lastkey);
+ }
+ catch (Exception ex)
+ {
+ logger_.warn(LogUtil.throwableToString(ex));
+ }
+ }
+ totalkeysWritten++;
+ for (FileStruct filestruct : lfs)
+ {
+ try
+ {
+ filestruct.advance();
+ if (filestruct.isExhausted())
+ {
+ continue;
}
- totalkeysWritten++;
- for (FileStruct filestruct : lfs)
+ /* keep on looping until we find a key in the range */
+ while (!Range.isTokenInRanges(StorageService.getPartitioner().getInitialToken(filestruct.getKey()), ranges))
{
- try
- {
- filestruct.advance();
- if (filestruct.isExhausted())
- {
- continue;
- }
- /* keep on looping until we find a key in the range */
- while (!Range.isTokenInRanges(StorageService.getPartitioner().getInitialToken(filestruct.getKey()), ranges))
- {
- filestruct.advance();
- if (filestruct.isExhausted())
- {
- break;
- }
- }
- if (!filestruct.isExhausted())
- {
- pq.add(filestruct);
- }
- totalkeysRead++;
- }
- catch (Exception ex)
+ filestruct.advance();
+ if (filestruct.isExhausted())
{
- // Ignore the exception as it might be a corrupted file
- // in any case we have read as far as possible from it
- // and it will be deleted after compaction.
- logger_.warn(LogUtil.throwableToString(ex));
- filestruct.close();
+ break;
}
}
- lfs.clear();
- lastkey = null;
- if (fs != null)
+ if (!filestruct.isExhausted())
{
- // Add back the fs since we processed the rest of
- // filestructs
- pq.add(fs);
+ pq.add(filestruct);
}
+ totalkeysRead++;
}
- }
-
- if (ssTableRange != null)
- {
- ssTableRange.closeRename(compactedRangeBloomFilter);
- if (fileList != null)
+ catch (Exception ex)
{
- fileList.add(ssTableRange.getDataFileLocation());
- }
- if (compactedBloomFilters != null)
- {
- compactedBloomFilters.add(compactedRangeBloomFilter);
+ // Ignore the exception as it might be a corrupted file
+ // in any case we have read as far as possible from it
+ // and it will be deleted after compaction.
+ logger_.warn("corrupt sstable?", ex);
+ filestruct.close();
}
}
+ lfs.clear();
+ lastkey = null;
+ if (fs != null)
+ {
+ // Add back the fs since we processed the rest of
+ // filestructs
+ pq.add(fs);
+ }
}
}
- catch (Exception ex)
+
+ if (ssTableRange != null)
{
- logger_.error(LogUtil.throwableToString(ex));
+ ssTableRange.closeRename(compactedRangeBloomFilter);
+ if (fileList != null)
+ {
+ fileList.add(ssTableRange.getDataFileLocation());
+ }
+ if (compactedBloomFilters != null)
+ {
+ compactedBloomFilters.add(compactedRangeBloomFilter);
+ }
}
+
logger_.debug("Total time taken for range split ..."
+ (System.currentTimeMillis() - startTime));
logger_.debug("Total bytes Read for range split ..." + totalBytesRead);
@@ -1302,157 +1289,154 @@
long totalkeysWritten = 0;
PriorityQueue<FileStruct> pq = initializePriorityQueue(files, null, minBufferSize);
- if (pq.size() > 0)
+ if (pq.isEmpty())
{
- String mergedFileName = getTempFileName(files);
- SSTable ssTable = null;
- String lastkey = null;
- List<FileStruct> lfs = new ArrayList<FileStruct>();
- DataOutputBuffer bufOut = new DataOutputBuffer();
- int expectedBloomFilterSize = SSTable.getApproximateKeyCount(files);
- expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? expectedBloomFilterSize : SSTable.indexInterval();
- logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
- /* Create the bloom filter for the compacted file. */
- BloomFilter compactedBloomFilter = new BloomFilter(expectedBloomFilterSize, 15);
- List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
-
- while (pq.size() > 0 || lfs.size() > 0)
- {
- FileStruct fs = null;
- if (pq.size() > 0)
- {
- fs = pq.poll();
- }
- if (fs != null
- && (lastkey == null || lastkey.equals(fs.getKey())))
- {
- // The keys are the same so we need to add this to the
- // ldfs list
- lastkey = fs.getKey();
- lfs.add(fs);
- }
- else
+ // TODO clean out bad files, if any
+ return 0;
+ }
+
+ String mergedFileName = getTempFileName(files);
+ SSTable ssTable = null;
+ String lastkey = null;
+ List<FileStruct> lfs = new ArrayList<FileStruct>();
+ DataOutputBuffer bufOut = new DataOutputBuffer();
+ int expectedBloomFilterSize = SSTable.getApproximateKeyCount(files);
+ expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? expectedBloomFilterSize : SSTable.indexInterval();
+ logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
+ /* Create the bloom filter for the compacted file. */
+ BloomFilter compactedBloomFilter = new BloomFilter(expectedBloomFilterSize, 15);
+ List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
+
+ while (pq.size() > 0 || lfs.size() > 0)
+ {
+ FileStruct fs = null;
+ if (pq.size() > 0)
+ {
+ fs = pq.poll();
+ }
+ if (fs != null
+ && (lastkey == null || lastkey.equals(fs.getKey())))
+ {
+ // The keys are the same so we need to add this to the
+ // ldfs list
+ lastkey = fs.getKey();
+ lfs.add(fs);
+ }
+ else
+ {
+ Collections.sort(lfs, new FileStructComparator());
+ ColumnFamily columnFamily;
+ bufOut.reset();
+ if (lfs.size() > 1)
{
- Collections.sort(lfs, new FileStructComparator());
- ColumnFamily columnFamily;
- bufOut.reset();
- if (lfs.size() > 1)
- {
- for (FileStruct filestruct : lfs)
- {
- try
- {
- /* read the length although we don't need it */
- filestruct.getBufIn().readInt();
- // Skip the Index
- IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
- // We want to add only 2 and resolve them right there in order to save on memory footprint
- if (columnFamilies.size() > 1)
- {
- merge(columnFamilies);
- }
- // deserialize into column families
- columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
- }
- catch (Exception ex)
- {
- logger_.warn("error in filecompaction", ex);
- }
- }
- // Now after merging all crap append to the sstable
- columnFamily = resolveAndRemoveDeleted(columnFamilies);
- columnFamilies.clear();
- if (columnFamily != null)
- {
- /* serialize the cf with column indexes */
- ColumnFamily.serializerWithIndexes().serialize(columnFamily, bufOut);
- }
- }
- else
+ for (FileStruct filestruct : lfs)
{
- FileStruct filestruct = lfs.get(0);
try
{
/* read the length although we don't need it */
- int size = filestruct.getBufIn().readInt();
- bufOut.write(filestruct.getBufIn(), size);
+ filestruct.getBufIn().readInt();
+ // Skip the Index
+ IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
+ // We want to add only 2 and resolve them right there in order to save on memory footprint
+ if (columnFamilies.size() > 1)
+ {
+ merge(columnFamilies);
+ }
+ // deserialize into column families
+ columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
}
catch (Exception ex)
{
- logger_.error("empty sstable file " + filestruct.getFileName(), ex);
- filestruct.close();
- continue;
+ logger_.warn("error in filecompaction", ex);
}
}
-
- if (ssTable == null)
+ // Now after merging all crap append to the sstable
+ columnFamily = resolveAndRemoveDeleted(columnFamilies);
+ columnFamilies.clear();
+ if (columnFamily != null)
{
- ssTable = new SSTable(compactionFileLocation, mergedFileName, StorageService.getPartitioner());
+ /* serialize the cf with column indexes */
+ ColumnFamily.serializerWithIndexes().serialize(columnFamily, bufOut);
}
- ssTable.append(lastkey, bufOut);
+ }
+ else
+ {
+ FileStruct filestruct = lfs.get(0);
+ /* read the length although we don't need it */
+ int size = filestruct.getBufIn().readInt();
+ bufOut.write(filestruct.getBufIn(), size);
+ }
- /* Fill the bloom filter with the key */
- doFill(compactedBloomFilter, lastkey);
- totalkeysWritten++;
- for (FileStruct filestruct : lfs)
+ if (ssTable == null)
+ {
+ ssTable = new SSTable(compactionFileLocation, mergedFileName, StorageService.getPartitioner());
+ }
+ ssTable.append(lastkey, bufOut);
+
+ /* Fill the bloom filter with the key */
+ doFill(compactedBloomFilter, lastkey);
+ totalkeysWritten++;
+ for (FileStruct filestruct : lfs)
+ {
+ try
{
- try
+ filestruct.advance();
+ if (filestruct.isExhausted())
{
- filestruct.advance();
- if (filestruct.isExhausted())
- {
- continue;
- }
- pq.add(filestruct);
- totalkeysRead++;
- }
- catch (Throwable ex)
- {
- // Ignore the exception as it might be a corrupted file
- // in any case we have read as far as possible from it
- // and it will be deleted after compaction.
- filestruct.close();
+ continue;
}
+ pq.add(filestruct);
+ totalkeysRead++;
}
- lfs.clear();
- lastkey = null;
- if (fs != null)
+ catch (Throwable ex)
{
- /* Add back the fs since we processed the rest of filestructs */
- pq.add(fs);
+ // Ignore the exception as it might be a corrupted file
+ // in any case we have read as far as possible from it
+ // and it will be deleted after compaction.
+ logger_.warn("corrupt sstable?", ex);
+ filestruct.close();
}
}
- }
- if (ssTable != null)
- {
- ssTable.closeRename(compactedBloomFilter);
- newfile = ssTable.getDataFileLocation();
- }
- lock_.writeLock().lock();
- try
- {
- for (String file : files)
+ lfs.clear();
+ lastkey = null;
+ if (fs != null)
{
- ssTables_.remove(file);
- SSTable.removeAssociatedBloomFilter(file);
- }
- if (newfile != null)
- {
- logger_.debug("Inserting bloom filter for file " + newfile);
- SSTable.storeBloomFilter(newfile, compactedBloomFilter);
- ssTables_.add(newfile);
- totalBytesWritten += (new File(newfile)).length();
+ /* Add back the fs since we processed the rest of filestructs */
+ pq.add(fs);
}
}
- finally
+ }
+ if (ssTable != null)
+ {
+ // TODO if all the keys were the same nothing will be done here
+ ssTable.closeRename(compactedBloomFilter);
+ newfile = ssTable.getDataFileLocation();
+ }
+ lock_.writeLock().lock();
+ try
+ {
+ for (String file : files)
{
- lock_.writeLock().unlock();
+ ssTables_.remove(file);
+ SSTable.removeAssociatedBloomFilter(file);
}
- for (String file : files)
+ if (newfile != null)
{
- SSTable.delete(file);
+ logger_.debug("Inserting bloom filter for file " + newfile);
+ SSTable.storeBloomFilter(newfile, compactedBloomFilter);
+ ssTables_.add(newfile);
+ totalBytesWritten += (new File(newfile)).length();
}
}
+ finally
+ {
+ lock_.writeLock().unlock();
+ }
+ for (String file : files)
+ {
+ SSTable.delete(file);
+ }
+
String format = "Compacted [%s] to %s. %d/%d bytes for %d/%d keys read/written. Time: %dms.";
long dTime = System.currentTimeMillis() - startTime;
logger_.info(String.format(format, StringUtils.join(files, ", "), newfile, totalBytesRead, totalBytesWritten, totalkeysRead, totalkeysWritten, dTime));
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MemtableManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MemtableManager.java?rev=773729&r1=773728&r2=773729&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MemtableManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MemtableManager.java Mon May 11 23:52:17 2009
@@ -124,31 +124,23 @@
*/
void getColumnFamily(String key, String cfName, String cf, IFilter filter, List<ColumnFamily> columnFamilies)
{
- rwLock_.readLock().lock();
- try
- {
- /* Get all memtables associated with this column family */
- List<Memtable> memtables = history_.get(cfName);
- if ( memtables != null )
- {
- Collections.sort(memtables);
- int size = memtables.size();
- for ( int i = size - 1; i >= 0; --i )
- {
- ColumnFamily columnFamily = memtables.get(i).getLocalCopy(key, cf, filter);
- if ( columnFamily != null )
- {
- columnFamilies.add(columnFamily);
- if( filter.isDone())
- break;
- }
- }
- }
- }
- finally
- {
- rwLock_.readLock().unlock();
- }
+ List<Memtable> memtables = getUnflushedMemtables(cfName);
+ if ( memtables == null )
+ {
+ return;
+ }
+ Collections.sort(memtables);
+ int size = memtables.size();
+ for ( int i = size - 1; i >= 0; --i )
+ {
+ ColumnFamily columnFamily = memtables.get(i).getLocalCopy(key, cf, filter);
+ if ( columnFamily != null )
+ {
+ columnFamilies.add(columnFamily);
+ if( filter.isDone())
+ break;
+ }
+ }
}
public List<Memtable> getUnflushedMemtables(String cfName)
@@ -161,7 +153,7 @@
{
return new ArrayList<Memtable>(memtables);
}
- return Arrays.asList(new Memtable[0]);
+ return Arrays.asList();
}
finally
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java?rev=773729&r1=773728&r2=773729&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java Mon May 11 23:52:17 2009
@@ -112,7 +112,14 @@
{
boolean result;
logger_.debug("Started compaction ..."+columnFamilyStore_.columnFamily_);
- result = columnFamilyStore_.doAntiCompaction(ranges_, target_,fileList_);
+ try
+ {
+ result = columnFamilyStore_.doAntiCompaction(ranges_, target_,fileList_);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
return result;
}
@@ -149,7 +156,14 @@
public void run()
{
logger_.debug("Started compaction ..."+columnFamilyStore_.columnFamily_);
- columnFamilyStore_.doCleanupCompaction();
+ try
+ {
+ columnFamilyStore_.doCleanupCompaction();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
}
}