You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/11 16:26:17 UTC
svn commit: r773573 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra:
db/ColumnFamilyStore.java db/DBManager.java db/FileStruct.java
db/MinorCompactionManager.java io/Coordinate.java io/SSTable.java
Author: jbellis
Date: Mon May 11 14:26:16 2009
New Revision: 773573
URL: http://svn.apache.org/viewvc?rev=773573&view=rev
Log:
clean up compaction code and refactor to allow user-specified threadhold (minimum number of CFs to compact). patch by jbellis
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DBManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/FileStruct.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Coordinate.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=773573&r1=773572&r2=773573&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon May 11 14:26:16 2009
@@ -55,6 +55,7 @@
import org.apache.cassandra.utils.FileUtils;
import org.apache.cassandra.utils.LogUtil;
import org.apache.cassandra.utils.TimedStatsDeque;
+import org.apache.commons.lang.StringUtils;
/**
* Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
@@ -62,9 +63,9 @@
public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
- private static int threshHold_ = 4;
- private static final int bufSize_ = 128*1024*1024;
- private static int compactionMemoryThreshold_ = 1 << 30;
+ private static int COMPACTION_THRESHOLD = 4; // compact this many sstables at a time
+ private static final int BUFSIZE = 128*1024*1024;
+ private static final int COMPACTION_MEMORY_THRESHOLD = 1 << 30;
private static Logger logger_ = Logger.getLogger(ColumnFamilyStore.class);
private final String table_;
@@ -717,8 +718,8 @@
lock_.writeLock().unlock();
}
- if ((ssTableSize >= threshHold_ && !isCompacting_.get())
- || (isCompacting_.get() && ssTableSize % threshHold_ == 0))
+ if ((ssTableSize >= COMPACTION_THRESHOLD && !isCompacting_.get())
+ || (isCompacting_.get() && ssTableSize % COMPACTION_THRESHOLD == 0))
{
logger_.debug("Submitting for compaction ...");
MinorCompactionManager.instance().submit(ColumnFamilyStore.this);
@@ -731,7 +732,7 @@
PriorityQueue<FileStruct> pq = new PriorityQueue<FileStruct>();
if (files.size() > 1 || (ranges != null && files.size() > 0))
{
- int bufferSize = Math.min( (ColumnFamilyStore.compactionMemoryThreshold_ / files.size()), minBufferSize ) ;
+ int bufferSize = Math.min( (ColumnFamilyStore.COMPACTION_MEMORY_THRESHOLD / files.size()), minBufferSize ) ;
FileStruct fs = null;
for (String file : files)
{
@@ -805,40 +806,47 @@
return buckets.keySet();
}
+ public void doCompaction() throws IOException
+ {
+ doCompaction(COMPACTION_THRESHOLD);
+ }
+
/*
* Break the files into buckets and then compact.
*/
- void doCompaction() throws IOException
+ public void doCompaction(int threshold) throws IOException
{
isCompacting_.set(true);
List<String> files = new ArrayList<String>(ssTables_);
try
{
- int count;
- for(List<String> fileList : getCompactionBuckets(files, 50L*1024L*1024L))
+ int count;
+ for (List<String> fileList : getCompactionBuckets(files, 50L * 1024L * 1024L))
{
- Collections.sort( fileList , new FileNameComparator( FileNameComparator.Ascending));
- if(fileList.size() >= threshHold_ )
- {
- files.clear();
- count = 0;
- for(String file : fileList)
- {
- files.add(file);
- count++;
- if( count == threshHold_ )
- break;
- }
- // For each bucket if it has crossed the threshhold do the compaction
- // In case of range compaction merge the counting bloom filters also.
- if( count == threshHold_)
- doFileCompaction(files, bufSize_);
- }
- }
+ Collections.sort(fileList, new FileNameComparator(FileNameComparator.Ascending));
+ if (fileList.size() < threshold)
+ {
+ continue;
+ }
+ // For each bucket if it has crossed the threshhold do the compaction
+ // In case of range compaction merge the counting bloom filters also.
+ files.clear();
+ count = 0;
+ for (String file : fileList)
+ {
+ files.add(file);
+ count++;
+ if (count == threshold)
+ {
+ doFileCompaction(files, BUFSIZE);
+ break;
+ }
+ }
+ }
}
finally
{
- isCompacting_.set(false);
+ isCompacting_.set(false);
}
}
@@ -876,11 +884,11 @@
{
files = filesInternal;
}
- doFileCompaction(files, bufSize_);
+ doFileCompaction(files, BUFSIZE);
}
- catch ( Exception ex)
+ catch (IOException ex)
{
- ex.printStackTrace();
+ logger_.error(ex);
}
finally
{
@@ -932,10 +940,6 @@
{
result = doFileAntiCompaction(files, ranges, target, fileList, null);
}
- catch ( Exception ex)
- {
- ex.printStackTrace();
- }
finally
{
isCompacting_.set(false);
@@ -958,18 +962,17 @@
{
isCompacting_.set(true);
List<String> files = new ArrayList<String>(ssTables_);
- for(String file: files)
+ try
{
- try
- {
- doCleanup(file);
- }
- catch ( Exception ex)
- {
- ex.printStackTrace();
- }
+ for(String file: files)
+ {
+ doCleanup(file);
+ }
+ }
+ finally
+ {
+ isCompacting_.set(false);
}
- isCompacting_.set(false);
}
/**
* cleans up one particular file by removing keys that this node is not responsible for.
@@ -1048,7 +1051,7 @@
+ expectedRangeFileSize + " is greater than the safe limit of the disk space available.");
return result;
}
- PriorityQueue<FileStruct> pq = initializePriorityQueue(files, ranges, ColumnFamilyStore.bufSize_);
+ PriorityQueue<FileStruct> pq = initializePriorityQueue(files, ranges, ColumnFamilyStore.BUFSIZE);
if (pq.size() > 0)
{
mergedFileName = getTempFileName();
@@ -1234,18 +1237,11 @@
* to get the latest data.
*
*/
- void doFileCompaction(List<String> files, int minBufferSize) throws IOException
+ private void doFileCompaction(List<String> files, int minBufferSize) throws IOException
{
- String newfile = null;
- long startTime = System.currentTimeMillis();
- long totalBytesRead = 0;
- long totalBytesWritten = 0;
- long totalkeysRead = 0;
- long totalkeysWritten = 0;
- // Calculate the expected compacted filesize
- long expectedCompactedFileSize = getExpectedCompactedFileSize(files);
- String compactionFileLocation = DatabaseDescriptor.getCompactionFileLocation(expectedCompactedFileSize);
+ String compactionFileLocation = DatabaseDescriptor.getCompactionFileLocation(getExpectedCompactedFileSize(files));
// If the compaction file path is null that means we have no space left for this compaction.
+ // try again w/o the largest one.
if( compactionFileLocation == null )
{
String maxFile = getMaxSizeFile( files );
@@ -1253,7 +1249,15 @@
doFileCompaction(files , minBufferSize);
return;
}
+
+ String newfile = null;
+ long startTime = System.currentTimeMillis();
+ long totalBytesRead = 0;
+ long totalBytesWritten = 0;
+ long totalkeysRead = 0;
+ long totalkeysWritten = 0;
PriorityQueue<FileStruct> pq = initializePriorityQueue(files, null, minBufferSize);
+
if (pq.size() > 0)
{
String mergedFileName = getTempFileName( files );
@@ -1331,7 +1335,7 @@
}
catch ( Exception ex)
{
- ex.printStackTrace();
+ logger_.error("empty sstable file " + filestruct.getFileName(), ex);
filestruct.close();
continue;
}
@@ -1390,10 +1394,10 @@
}
if ( newfile != null )
{
- ssTables_.add(newfile);
logger_.debug("Inserting bloom filter for file " + newfile);
SSTable.storeBloomFilter(newfile, compactedBloomFilter);
- totalBytesWritten = (new File(newfile)).length();
+ ssTables_.add(newfile);
+ totalBytesWritten += (new File(newfile)).length();
}
}
finally
@@ -1405,11 +1409,9 @@
SSTable.delete(file);
}
}
- logger_.debug("Total time taken for compaction ..."
- + (System.currentTimeMillis() - startTime));
- logger_.debug("Total bytes Read for compaction ..." + totalBytesRead);
- logger_.debug("Total bytes written for compaction ..."
- + totalBytesWritten + " Total keys read ..." + totalkeysRead);
+ String format = "Compacted [%s] to %s. %d/%d bytes for %d/%d keys read/written. Time: %dms.";
+ long dTime = System.currentTimeMillis() - startTime;
+ logger_.info(String.format(format, StringUtils.join(files, ", "), newfile, totalBytesRead, totalBytesWritten, totalkeysRead, totalkeysWritten, dTime));
}
public boolean isSuper()
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DBManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DBManager.java?rev=773573&r1=773572&r2=773573&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DBManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/DBManager.java Mon May 11 14:26:16 2009
@@ -154,9 +154,4 @@
}
return storageMetadata;
}
-
- public static void main(String[] args) throws Throwable
- {
- DBManager.instance().start();
- }
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/FileStruct.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/FileStruct.java?rev=773573&r1=773572&r2=773573&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/FileStruct.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/FileStruct.java Mon May 11 14:26:16 2009
@@ -27,10 +27,13 @@
import org.apache.cassandra.io.SSTable;
import org.apache.cassandra.io.Coordinate;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.log4j.Logger;
public class FileStruct implements Comparable<FileStruct>, Iterator<String>
{
+ private static Logger logger = Logger.getLogger(FileStruct.class);
+
private String key = null; // decorated!
private boolean exhausted = false;
private IFileReader reader;
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java?rev=773573&r1=773572&r2=773573&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java Mon May 11 14:26:16 2009
@@ -72,12 +72,12 @@
FileCompactor(ColumnFamilyStore columnFamilyStore)
{
- columnFamilyStore_ = columnFamilyStore;
+ columnFamilyStore_ = columnFamilyStore;
}
public void run()
{
- logger_.debug("Started compaction ..."+columnFamilyStore_.columnFamily_);
+ logger_.debug("Started compaction ..." + columnFamilyStore_.columnFamily_);
try
{
columnFamilyStore_.doCompaction();
@@ -86,7 +86,7 @@
{
throw new RuntimeException(e);
}
- logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
+ logger_.debug("Finished compaction ..." + columnFamilyStore_.columnFamily_);
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Coordinate.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Coordinate.java?rev=773573&r1=773572&r2=773573&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Coordinate.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/Coordinate.java Mon May 11 14:26:16 2009
@@ -31,4 +31,12 @@
start_ = start;
end_ = end;
}
+
+ public String toString()
+ {
+ return "Coordinate(" +
+ "start_=" + start_ +
+ ", end_=" + end_ +
+ ')';
+ }
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=773573&r1=773572&r2=773573&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Mon May 11 14:26:16 2009
@@ -221,17 +221,11 @@
}
File file = new File(dataFile);
- if ( file.exists() )
+ assert file.exists();
+ /* delete the data file */
+ if (!file.delete())
{
- /* delete the data file */
- if (file.delete())
- {
- logger_.info("** Deleted " + file.getName() + " **");
- }
- else
- {
- logger_.error("Failed to delete " + file.getName());
- }
+ logger_.error("Failed to delete " + file.getName());
}
}
@@ -600,9 +594,8 @@
* @throws IOException
*/
private void dumpBlockIndexes() throws IOException
- {
- long position = dataWriter_.getCurrentPosition();
- firstBlockPosition_ = position;
+ {
+ firstBlockPosition_ = dataWriter_.getCurrentPosition();
for( SortedMap<String, BlockMetadata> block : blockIndexes_ )
{
dumpBlockIndex( block );
@@ -660,12 +653,16 @@
afterAppend(decoratedKey, currentPosition, value.length );
}
+ /*
+ TODO only the end_ part of the returned Coordinate is ever used. Apparently this code works, but it's definitely due for some cleanup
+ since the code fooling about with start_ appears to be irrelevant.
+ */
public static Coordinate getCoordinates(String decoratedKey, IFileReader dataReader, IPartitioner partitioner) throws IOException
{
List<KeyPositionInfo> indexInfo = indexMetadataMap_.get(dataReader.getFileName());
int size = (indexInfo == null) ? 0 : indexInfo.size();
long start = 0L;
- long end = dataReader.getEOF();
+ long end;
if ( size > 0 )
{
int index = Collections.binarySearch(indexInfo, new KeyPositionInfo(decoratedKey, partitioner));