You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/20 16:58:39 UTC
svn commit: r776720 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra:
db/ColumnFamilyStore.java db/MinorCompactionManager.java io/SSTable.java
Author: jbellis
Date: Wed May 20 14:58:39 2009
New Revision: 776720
URL: http://svn.apache.org/viewvc?rev=776720&view=rev
Log:
more cleanup of compaction code.
patch by jbellis; reviewed by Jun Rao for CASSANDRA-184
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=776720&r1=776719&r2=776720&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed May 20 14:58:39 2009
@@ -88,9 +88,6 @@
/* Modification lock used for protecting reads from compactions. */
private ReentrantReadWriteLock lock_ = new ReentrantReadWriteLock(true);
- /* Flag indicates if a compaction is in process */
- private AtomicBoolean isCompacting_ = new AtomicBoolean(false);
-
private TimedStatsDeque readStats_ = new TimedStatsDeque(60000);
private TimedStatsDeque diskReadStats_ = new TimedStatsDeque(60000);
@@ -756,25 +753,26 @@
*/
void storeLocation(String filename, BloomFilter bf)
{
- int ssTableSize = 0;
+ int ssTableCount;
lock_.writeLock().lock();
try
{
ssTables_.add(filename);
SSTable.storeBloomFilter(filename, bf);
- ssTableSize = ssTables_.size();
+ ssTableCount = ssTables_.size();
}
finally
{
lock_.writeLock().unlock();
}
- if ((ssTableSize >= MinorCompactionManager.COMPACTION_THRESHOLD && !isCompacting_.get())
- || (isCompacting_.get() && ssTableSize % MinorCompactionManager.COMPACTION_THRESHOLD == 0))
+ /* it's ok if compaction gets submitted multiple times while one is already in process.
+ worst that happens is, compactor will count the sstable files and decide there are
+ not enough to bother with. */
+ if (ssTableCount >= MinorCompactionManager.COMPACTION_THRESHOLD)
{
- logger_.debug("Submitting for compaction ...");
- MinorCompactionManager.instance().submit(ColumnFamilyStore.this);
- logger_.debug("Submitted for compaction ...");
+ logger_.debug("Submitting " + columnFamily_ + " for compaction");
+ MinorCompactionManager.instance().submit(this);
}
}
@@ -862,45 +860,37 @@
/*
* Break the files into buckets and then compact.
*/
- public int doCompaction(int threshold) throws IOException
+ int doCompaction(int threshold) throws IOException
{
- isCompacting_.set(true);
List<String> files = new ArrayList<String>(ssTables_);
int filesCompacted = 0;
- try
+ Set<List<String>> buckets = getCompactionBuckets(files, 50L * 1024L * 1024L);
+ for (List<String> fileList : buckets)
{
- Set<List<String>> buckets = getCompactionBuckets(files, 50L * 1024L * 1024L);
- for (List<String> fileList : buckets)
+ Collections.sort(fileList, new FileNameComparator(FileNameComparator.Ascending));
+ if (fileList.size() < threshold)
{
- Collections.sort(fileList, new FileNameComparator(FileNameComparator.Ascending));
- if (fileList.size() < threshold)
- {
- continue;
- }
- // For each bucket if it has crossed the threshhold do the compaction
- // In case of range compaction merge the counting bloom filters also.
- files.clear();
- int count = 0;
- for (String file : fileList)
+ continue;
+ }
+ // For each bucket if it has crossed the threshhold do the compaction
+ // In case of range compaction merge the counting bloom filters also.
+ files.clear();
+ int count = 0;
+ for (String file : fileList)
+ {
+ files.add(file);
+ count++;
+ if (count == threshold)
{
- files.add(file);
- count++;
- if (count == threshold)
- {
- filesCompacted += doFileCompaction(files, BUFSIZE);
- break;
- }
+ filesCompacted += doFileCompaction(files, BUFSIZE);
+ break;
}
}
}
- finally
- {
- isCompacting_.set(false);
- }
return filesCompacted;
}
- void doMajorCompaction(long skip)
+ void doMajorCompaction(long skip) throws IOException
{
doMajorCompactionInternal(skip);
}
@@ -911,39 +901,27 @@
* all files greater than skip GB are skipped for this compaction.
* Except if skip is 0 , in that case this is ignored and all files are taken.
*/
- void doMajorCompactionInternal(long skip)
+ void doMajorCompactionInternal(long skip) throws IOException
{
- isCompacting_.set(true);
List<String> filesInternal = new ArrayList<String>(ssTables_);
List<String> files;
- try
+ if (skip > 0L)
{
- if (skip > 0L)
+ files = new ArrayList<String>();
+ for (String file : filesInternal)
{
- files = new ArrayList<String>();
- for (String file : filesInternal)
+ File f = new File(file);
+ if (f.length() < skip * 1024L * 1024L * 1024L)
{
- File f = new File(file);
- if (f.length() < skip * 1024L * 1024L * 1024L)
- {
- files.add(file);
- }
+ files.add(file);
}
}
- else
- {
- files = filesInternal;
- }
- doFileCompaction(files, BUFSIZE);
}
- catch (IOException ex)
- {
- logger_.error(ex);
- }
- finally
+ else
{
- isCompacting_.set(false);
+ files = filesInternal;
}
+ doFileCompaction(files, BUFSIZE);
}
/*
@@ -983,19 +961,8 @@
boolean doAntiCompaction(List<Range> ranges, EndPoint target, List<String> fileList) throws IOException
{
- isCompacting_.set(true);
List<String> files = new ArrayList<String>(ssTables_);
- boolean result = true;
- try
- {
- result = doFileAntiCompaction(files, ranges, target, fileList, null);
- }
- finally
- {
- isCompacting_.set(false);
- }
- return result;
-
+ return doFileAntiCompaction(files, ranges, target, fileList, null);
}
void forceCleanup()
@@ -1011,18 +978,10 @@
*/
void doCleanupCompaction() throws IOException
{
- isCompacting_.set(true);
List<String> files = new ArrayList<String>(ssTables_);
- try
- {
- for (String file : files)
- {
- doCleanup(file);
- }
- }
- finally
+ for (String file : files)
{
- isCompacting_.set(false);
+ doCleanup(file);
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java?rev=776720&r1=776719&r2=776720&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java Wed May 20 14:58:39 2009
@@ -113,7 +113,14 @@
public void run()
{
logger_.debug("Started Major compaction for " + columnFamilyStore_.columnFamily_);
- columnFamilyStore_.doMajorCompaction(skip_);
+ try
+ {
+ columnFamilyStore_.doMajorCompaction(skip_);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
logger_.debug("Finished Major compaction for " + columnFamilyStore_.columnFamily_);
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=776720&r1=776719&r2=776720&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Wed May 20 14:58:39 2009
@@ -203,28 +203,28 @@
* This method deletes both the specified data file
* and the associated index file
*
- * @param dataFile - data file associated with the SSTable
+ * @param dataFileName - data file associated with the SSTable
*/
- public static void delete(String dataFile)
+ public static void delete(String dataFileName)
{
/* remove the cached index table from memory */
- indexMetadataMap_.remove(dataFile);
+ indexMetadataMap_.remove(dataFileName);
/* Delete the checksum file associated with this data file */
try
{
- ChecksumManager.onFileDelete(dataFile);
+ ChecksumManager.onFileDelete(dataFileName);
}
catch (IOException ex)
{
logger_.info(LogUtil.throwableToString(ex));
}
- File file = new File(dataFile);
- assert file.exists() : "attempted to delete non-existing file " + dataFile;
+ File file = new File(dataFileName);
+ assert file.exists() : "attempted to delete non-existing file " + dataFileName;
/* delete the data file */
if (!file.delete())
{
- logger_.error("Failed to delete " + file.getName());
+ logger_.error("Failed to delete " + dataFileName);
}
}