You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/02 04:02:31 UTC
svn commit: r810335 - in /incubator/cassandra/trunk: ./
interface/gen-java/org/apache/cassandra/service/ src/java/org/
src/java/org/apache/cassandra/db/ test/unit/org/
test/unit/org/apache/cassandra/db/
Author: jbellis
Date: Wed Sep 2 02:02:30 2009
New Revision: 810335
URL: http://svn.apache.org/viewvc?rev=810335&view=rev
Log:
merge from 0.4 branch
Modified:
incubator/cassandra/trunk/ (props changed)
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java (props changed)
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Column.java (props changed)
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java (props changed)
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/NotFoundException.java (props changed)
incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/SuperColumn.java (props changed)
incubator/cassandra/trunk/src/java/org/ (props changed)
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
incubator/cassandra/trunk/test/unit/org/ (props changed)
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
Propchange: incubator/cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep 2 02:02:30 2009
@@ -1,2 +1,2 @@
/incubator/cassandra/branches/cassandra-0.3:774578-796573
-/incubator/cassandra/branches/cassandra-0.4:810145-810281
+/incubator/cassandra/branches/cassandra-0.4:810145-810334
Propchange: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep 2 02:02:30 2009
@@ -1,3 +1,3 @@
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
-/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-810281
+/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-810334
/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Cassandra.java:749219-768588
Propchange: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep 2 02:02:30 2009
@@ -1,4 +1,3 @@
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
-/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-810281
-/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/Column.java:749219-794428
+/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-810334
/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/column_t.java:749219-768588
Propchange: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep 2 02:02:30 2009
@@ -1,3 +1,3 @@
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
-/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-810281
+/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-810334
/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:749219-768588
Propchange: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep 2 02:02:30 2009
@@ -1,3 +1,3 @@
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
-/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-810281
+/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-810334
/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:749219-768588
Propchange: incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep 2 02:02:30 2009
@@ -1,4 +1,3 @@
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
-/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-810281
-/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:749219-794428
+/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-810334
/incubator/cassandra/trunk/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:749219-768588
Propchange: incubator/cassandra/trunk/src/java/org/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep 2 02:02:30 2009
@@ -1,3 +1,3 @@
/incubator/cassandra/branches/cassandra-0.3/src/java/org:774578-796573
-/incubator/cassandra/branches/cassandra-0.4/src/java/org:810145-810281
+/incubator/cassandra/branches/cassandra-0.4/src/java/org:810145-810334
/incubator/cassandra/trunk/src/java/org:749219-769885
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=810335&r1=810334&r2=810335&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Sep 2 02:02:30 2009
@@ -175,16 +175,17 @@
for (File file : sstableFiles)
{
String filename = file.getAbsolutePath();
+ SSTableReader sstable;
try
{
- SSTableReader sstable = SSTableReader.open(filename);
- ssTables_.put(filename, sstable);
+ sstable = SSTableReader.open(filename);
}
catch (IOException ex)
{
- logger_.error("Corrupt file " + filename, ex);
- FileUtils.delete(filename);
+ logger_.error("Corrupt file " + filename + "; skipped", ex);
+ continue;
}
+ ssTables_.put(filename, sstable);
}
// submit initial check-for-compaction request
@@ -637,7 +638,7 @@
/* it's ok if compaction gets submitted multiple times while one is already in process.
worst that happens is, compactor will count the sstable files and decide there are
not enough to bother with. */
- if (ssTableCount >= MinorCompactionManager.COMPACTION_THRESHOLD)
+ if (ssTableCount >= MinorCompactionManager.MINCOMPACTION_THRESHOLD)
{
if (logger_.isDebugEnabled())
logger_.debug("Submitting " + columnFamily_ + " for compaction");
@@ -668,7 +669,7 @@
/*
* Group files of similar size into buckets.
*/
- static Set<List<String>> getCompactionBuckets(List<String> files, long min)
+ static Set<List<String>> getCompactionBuckets(Iterable<String> files, long min)
{
Map<List<String>, Long> buckets = new ConcurrentHashMap<List<String>, Long>();
for (String fname : files)
@@ -710,32 +711,17 @@
/*
* Break the files into buckets and then compact.
*/
- int doCompaction(int threshold) throws IOException
+ int doCompaction(int minThreshold, int maxThreshold) throws IOException
{
- List<String> files = new ArrayList<String>(ssTables_.keySet());
int filesCompacted = 0;
- Set<List<String>> buckets = getCompactionBuckets(files, 50L * 1024L * 1024L);
- for (List<String> fileList : buckets)
+ for (List<String> files : getCompactionBuckets(ssTables_.keySet(), 50L * 1024L * 1024L))
{
- Collections.sort(fileList, new FileNameComparator(FileNameComparator.Ascending));
- if (fileList.size() < threshold)
+ if (files.size() < minThreshold)
{
continue;
}
- // For each bucket if it has crossed the threshhold do the compaction
- // In case of range compaction merge the counting bloom filters also.
- files.clear();
- int count = 0;
- for (String file : fileList)
- {
- files.add(file);
- count++;
- if (count == threshold)
- {
- filesCompacted += doFileCompaction(files, BUFSIZE);
- break;
- }
- }
+ Collections.sort(files, new FileNameComparator(FileNameComparator.Ascending));
+ filesCompacted += doFileCompaction(files.subList(0, Math.min(files.size(), maxThreshold)), BUFSIZE);
}
return filesCompacted;
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java?rev=810335&r1=810334&r2=810335&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java Wed Sep 2 02:02:30 2009
@@ -21,11 +21,8 @@
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -33,7 +30,7 @@
import org.apache.cassandra.concurrent.ThreadFactoryImpl;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.service.StorageService;
+
import org.apache.log4j.Logger;
class MinorCompactionManager
@@ -42,7 +39,8 @@
private static Lock lock_ = new ReentrantLock();
private static Logger logger_ = Logger.getLogger(MinorCompactionManager.class);
private static final long intervalInMins_ = 5;
- static final int COMPACTION_THRESHOLD = 4; // compact this many sstables at a time
+ static final int MINCOMPACTION_THRESHOLD = 4; // compact this many sstables min at a time
+ static final int MAXCOMPACTION_THRESHOLD = 32; // compact this many sstables max at a time
public static MinorCompactionManager instance()
{
@@ -155,16 +153,16 @@
public Future<Integer> submit(final ColumnFamilyStore columnFamilyStore)
{
- return submit(columnFamilyStore, COMPACTION_THRESHOLD);
+ return submit(columnFamilyStore, MINCOMPACTION_THRESHOLD, MAXCOMPACTION_THRESHOLD);
}
- Future<Integer> submit(final ColumnFamilyStore columnFamilyStore, final int threshold)
+ Future<Integer> submit(final ColumnFamilyStore columnFamilyStore, final int minThreshold, final int maxThreshold)
{
Callable<Integer> callable = new Callable<Integer>()
{
public Integer call() throws IOException
{
- return columnFamilyStore.doCompaction(threshold);
+ return columnFamilyStore.doCompaction(minThreshold, maxThreshold);
}
};
return compactor_.submit(callable);
Propchange: incubator/cassandra/trunk/test/unit/org/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep 2 02:02:30 2009
@@ -1,3 +1,3 @@
/incubator/cassandra/branches/cassandra-0.3/test/unit/org:774578-796573
-/incubator/cassandra/branches/cassandra-0.4/test/unit/org:810145-810281
+/incubator/cassandra/branches/cassandra-0.4/test/unit/org:810145-810334
/incubator/cassandra/trunk/test/unit/org:749219-768583
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java?rev=810335&r1=810334&r2=810335&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java Wed Sep 2 02:02:30 2009
@@ -23,7 +23,6 @@
import java.util.concurrent.Future;
import java.util.Set;
import java.util.HashSet;
-import java.util.Arrays;
import org.junit.Test;
@@ -62,7 +61,7 @@
}
if (store.getSSTables().size() > 1)
{
- store.doCompaction(store.getSSTables().size());
+ store.doCompaction(2, store.getSSTables().size());
}
assertEquals(table.getColumnFamilyStore("Standard1").getKeyRange("", "", 10000).keys.size(), inserted.size());
}
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java?rev=810335&r1=810334&r2=810335&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/OneCompactionTest.java Wed Sep 2 02:02:30 2009
@@ -23,7 +23,6 @@
import java.util.concurrent.Future;
import java.util.Set;
import java.util.HashSet;
-import java.util.Arrays;
import org.junit.Test;
@@ -47,7 +46,7 @@
store.forceBlockingFlush();
assertEquals(inserted.size(), table.getColumnFamilyStore(columnFamilyName).getKeyRange("", "", 10000).keys.size());
}
- Future<Integer> ft = MinorCompactionManager.instance().submit(store, 2);
+ Future<Integer> ft = MinorCompactionManager.instance().submit(store, 2, 32);
ft.get();
assertEquals(1, store.getSSTables().size());
assertEquals(table.getColumnFamilyStore(columnFamilyName).getKeyRange("", "", 10000).keys.size(), inserted.size());
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java?rev=810335&r1=810334&r2=810335&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/RemoveSuperColumnTest.java Wed Sep 2 02:02:30 2009
@@ -57,7 +57,7 @@
store.forceBlockingFlush();
validateRemoveTwoSources();
- Future<Integer> ft = MinorCompactionManager.instance().submit(store, 2);
+ Future<Integer> ft = MinorCompactionManager.instance().submit(store, 2, 32);
ft.get();
assertEquals(1, store.getSSTables().size());
validateRemoveCompacted();
@@ -109,7 +109,7 @@
store.forceBlockingFlush();
validateRemoveWithNewData();
- Future<Integer> ft = MinorCompactionManager.instance().submit(store, 2);
+ Future<Integer> ft = MinorCompactionManager.instance().submit(store, 2, 32);
ft.get();
assertEquals(1, store.getSSTables().size());
validateRemoveWithNewData();
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java?rev=810335&r1=810334&r2=810335&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java Wed Sep 2 02:02:30 2009
@@ -304,7 +304,7 @@
// compact so we have a big row with more than the minimum index count
if (cfStore.getSSTables().size() > 1)
{
- cfStore.doCompaction(cfStore.getSSTables().size());
+ cfStore.doCompaction(2, cfStore.getSSTables().size());
}
SSTableReader sstable = cfStore.getSSTables().iterator().next();
long position = sstable.getPosition(key);