You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/02 04:01:04 UTC
svn commit: r810334 - in /incubator/cassandra/branches/cassandra-0.4:
src/java/org/apache/cassandra/db/ test/unit/org/apache/cassandra/db/
Author: jbellis
Date: Wed Sep 2 02:01:03 2009
New Revision: 810334
URL: http://svn.apache.org/viewvc?rev=810334&view=rev
Log:
update non-major compaction logic to use two threshold values: minimum sstables to compact (4) and maximum to compact at once (32). before, the same threshold (4) was used for both. the new logic lets the system recover faster from a bulk load that generates sstables faster than compaction can combine them. patch by Chris Goffinet and jbellis for CASSANDRA-407
Modified:
incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/MinorCompactionManager.java
incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/CompactionsTest.java
incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/OneCompactionTest.java
incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java
incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/TableTest.java
Modified: incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=810334&r1=810333&r2=810334&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Sep 2 02:01:03 2009
@@ -638,7 +638,7 @@
/* it's ok if compaction gets submitted multiple times while one is already in process.
worst that happens is, compactor will count the sstable files and decide there are
not enough to bother with. */
- if (ssTableCount >= MinorCompactionManager.COMPACTION_THRESHOLD)
+ if (ssTableCount >= MinorCompactionManager.MINCOMPACTION_THRESHOLD)
{
if (logger_.isDebugEnabled())
logger_.debug("Submitting " + columnFamily_ + " for compaction");
@@ -669,7 +669,7 @@
/*
* Group files of similar size into buckets.
*/
- static Set<List<String>> getCompactionBuckets(List<String> files, long min)
+ static Set<List<String>> getCompactionBuckets(Iterable<String> files, long min)
{
Map<List<String>, Long> buckets = new ConcurrentHashMap<List<String>, Long>();
for (String fname : files)
@@ -711,32 +711,17 @@
/*
* Break the files into buckets and then compact.
*/
- int doCompaction(int threshold) throws IOException
+ int doCompaction(int minThreshold, int maxThreshold) throws IOException
{
- List<String> files = new ArrayList<String>(ssTables_.keySet());
int filesCompacted = 0;
- Set<List<String>> buckets = getCompactionBuckets(files, 50L * 1024L * 1024L);
- for (List<String> fileList : buckets)
+ for (List<String> files : getCompactionBuckets(ssTables_.keySet(), 50L * 1024L * 1024L))
{
- Collections.sort(fileList, new FileNameComparator(FileNameComparator.Ascending));
- if (fileList.size() < threshold)
+ if (files.size() < minThreshold)
{
continue;
}
- // For each bucket if it has crossed the threshhold do the compaction
- // In case of range compaction merge the counting bloom filters also.
- files.clear();
- int count = 0;
- for (String file : fileList)
- {
- files.add(file);
- count++;
- if (count == threshold)
- {
- filesCompacted += doFileCompaction(files, BUFSIZE);
- break;
- }
- }
+ Collections.sort(files, new FileNameComparator(FileNameComparator.Ascending));
+ filesCompacted += doFileCompaction(files.subList(0, Math.min(files.size(), maxThreshold)), BUFSIZE);
}
return filesCompacted;
}
Modified: incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/MinorCompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/MinorCompactionManager.java?rev=810334&r1=810333&r2=810334&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/MinorCompactionManager.java (original)
+++ incubator/cassandra/branches/cassandra-0.4/src/java/org/apache/cassandra/db/MinorCompactionManager.java Wed Sep 2 02:01:03 2009
@@ -21,11 +21,8 @@
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -33,7 +30,7 @@
import org.apache.cassandra.concurrent.ThreadFactoryImpl;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.service.StorageService;
+
import org.apache.log4j.Logger;
class MinorCompactionManager
@@ -42,7 +39,8 @@
private static Lock lock_ = new ReentrantLock();
private static Logger logger_ = Logger.getLogger(MinorCompactionManager.class);
private static final long intervalInMins_ = 5;
- static final int COMPACTION_THRESHOLD = 4; // compact this many sstables at a time
+ static final int MINCOMPACTION_THRESHOLD = 4; // compact this many sstables min at a time
+ static final int MAXCOMPACTION_THRESHOLD = 32; // compact this many sstables max at a time
public static MinorCompactionManager instance()
{
@@ -155,16 +153,16 @@
public Future<Integer> submit(final ColumnFamilyStore columnFamilyStore)
{
- return submit(columnFamilyStore, COMPACTION_THRESHOLD);
+ return submit(columnFamilyStore, MINCOMPACTION_THRESHOLD, MAXCOMPACTION_THRESHOLD);
}
- Future<Integer> submit(final ColumnFamilyStore columnFamilyStore, final int threshold)
+ Future<Integer> submit(final ColumnFamilyStore columnFamilyStore, final int minThreshold, final int maxThreshold)
{
Callable<Integer> callable = new Callable<Integer>()
{
public Integer call() throws IOException
{
- return columnFamilyStore.doCompaction(threshold);
+ return columnFamilyStore.doCompaction(minThreshold, maxThreshold);
}
};
return compactor_.submit(callable);
Modified: incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/CompactionsTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/CompactionsTest.java?rev=810334&r1=810333&r2=810334&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/CompactionsTest.java (original)
+++ incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/CompactionsTest.java Wed Sep 2 02:01:03 2009
@@ -23,7 +23,6 @@
import java.util.concurrent.Future;
import java.util.Set;
import java.util.HashSet;
-import java.util.Arrays;
import org.junit.Test;
@@ -62,7 +61,7 @@
}
if (store.getSSTables().size() > 1)
{
- store.doCompaction(store.getSSTables().size());
+ store.doCompaction(2, store.getSSTables().size());
}
assertEquals(table.getColumnFamilyStore("Standard1").getKeyRange("", "", 10000).keys.size(), inserted.size());
}
Modified: incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/OneCompactionTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/OneCompactionTest.java?rev=810334&r1=810333&r2=810334&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/OneCompactionTest.java (original)
+++ incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/OneCompactionTest.java Wed Sep 2 02:01:03 2009
@@ -23,7 +23,6 @@
import java.util.concurrent.Future;
import java.util.Set;
import java.util.HashSet;
-import java.util.Arrays;
import org.junit.Test;
@@ -47,7 +46,7 @@
store.forceBlockingFlush();
assertEquals(inserted.size(), table.getColumnFamilyStore(columnFamilyName).getKeyRange("", "", 10000).keys.size());
}
- Future<Integer> ft = MinorCompactionManager.instance().submit(store, 2);
+ Future<Integer> ft = MinorCompactionManager.instance().submit(store, 2, 32);
ft.get();
assertEquals(1, store.getSSTables().size());
assertEquals(table.getColumnFamilyStore(columnFamilyName).getKeyRange("", "", 10000).keys.size(), inserted.size());
Modified: incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java?rev=810334&r1=810333&r2=810334&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java (original)
+++ incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java Wed Sep 2 02:01:03 2009
@@ -57,7 +57,7 @@
store.forceBlockingFlush();
validateRemoveTwoSources();
- Future<Integer> ft = MinorCompactionManager.instance().submit(store, 2);
+ Future<Integer> ft = MinorCompactionManager.instance().submit(store, 2, 32);
ft.get();
assertEquals(1, store.getSSTables().size());
validateRemoveCompacted();
@@ -109,7 +109,7 @@
store.forceBlockingFlush();
validateRemoveWithNewData();
- Future<Integer> ft = MinorCompactionManager.instance().submit(store, 2);
+ Future<Integer> ft = MinorCompactionManager.instance().submit(store, 2, 32);
ft.get();
assertEquals(1, store.getSSTables().size());
validateRemoveWithNewData();
Modified: incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/TableTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/TableTest.java?rev=810334&r1=810333&r2=810334&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/TableTest.java (original)
+++ incubator/cassandra/branches/cassandra-0.4/test/unit/org/apache/cassandra/db/TableTest.java Wed Sep 2 02:01:03 2009
@@ -304,7 +304,7 @@
// compact so we have a big row with more than the minimum index count
if (cfStore.getSSTables().size() > 1)
{
- cfStore.doCompaction(cfStore.getSSTables().size());
+ cfStore.doCompaction(2, cfStore.getSSTables().size());
}
SSTableReader sstable = cfStore.getSSTables().iterator().next();
long position = sstable.getPosition(key);