You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/05/12 01:48:20 UTC
svn commit: r773725 - in /incubator/cassandra/trunk:
src/java/org/apache/cassandra/db/ColumnFamilyStore.java
src/java/org/apache/cassandra/db/MinorCompactionManager.java
test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Author: jbellis
Date: Mon May 11 23:48:20 2009
New Revision: 773725
URL: http://svn.apache.org/viewvc?rev=773725&view=rev
Log:
unit test to expose bug system test is running into. patch by jbellis; reviewed by Jun Rao for CASSANDRA-153
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=773725&r1=773724&r2=773725&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon May 11 23:48:20 2009
@@ -806,22 +806,23 @@
return buckets.keySet();
}
- public void doCompaction() throws IOException
+ public int doCompaction() throws IOException
{
- doCompaction(COMPACTION_THRESHOLD);
+ return doCompaction(COMPACTION_THRESHOLD);
}
/*
* Break the files into buckets and then compact.
*/
- public void doCompaction(int threshold) throws IOException
+ public int doCompaction(int threshold) throws IOException
{
isCompacting_.set(true);
List<String> files = new ArrayList<String>(ssTables_);
+ int filesCompacted = 0;
try
{
- int count;
- for (List<String> fileList : getCompactionBuckets(files, 50L * 1024L * 1024L))
+ Set<List<String>> buckets = getCompactionBuckets(files, 50L * 1024L * 1024L);
+ for (List<String> fileList : buckets)
{
Collections.sort(fileList, new FileNameComparator(FileNameComparator.Ascending));
if (fileList.size() < threshold)
@@ -831,14 +832,14 @@
// For each bucket if it has crossed the threshhold do the compaction
// In case of range compaction merge the counting bloom filters also.
files.clear();
- count = 0;
+ int count = 0;
for (String file : fileList)
{
files.add(file);
count++;
if (count == threshold)
{
- doFileCompaction(files, BUFSIZE);
+ filesCompacted += doFileCompaction(files, BUFSIZE);
break;
}
}
@@ -848,6 +849,7 @@
{
isCompacting_.set(false);
}
+ return filesCompacted;
}
void doMajorCompaction(long skip)
@@ -1237,7 +1239,7 @@
* to get the latest data.
*
*/
- private void doFileCompaction(List<String> files, int minBufferSize) throws IOException
+ private int doFileCompaction(List<String> files, int minBufferSize) throws IOException
{
String compactionFileLocation = DatabaseDescriptor.getCompactionFileLocation(getExpectedCompactedFileSize(files));
// If the compaction file path is null that means we have no space left for this compaction.
@@ -1246,8 +1248,7 @@
{
String maxFile = getMaxSizeFile( files );
files.remove( maxFile );
- doFileCompaction(files , minBufferSize);
- return;
+ return doFileCompaction(files , minBufferSize);
}
String newfile = null;
@@ -1412,6 +1413,7 @@
String format = "Compacted [%s] to %s. %d/%d bytes for %d/%d keys read/written. Time: %dms.";
long dTime = System.currentTimeMillis() - startTime;
logger_.info(String.format(format, StringUtils.join(files, ", "), newfile, totalBytesRead, totalBytesWritten, totalkeysRead, totalkeysWritten, dTime));
+ return files.size();
}
public boolean isSuper()
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java?rev=773725&r1=773724&r2=773725&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java Mon May 11 23:48:20 2009
@@ -66,7 +66,7 @@
return instance_;
}
- class FileCompactor implements Runnable
+ class FileCompactor implements Callable<Integer>
{
private ColumnFamilyStore columnFamilyStore_;
@@ -75,18 +75,21 @@
columnFamilyStore_ = columnFamilyStore;
}
- public void run()
+ public Integer call()
{
logger_.debug("Started compaction ..." + columnFamilyStore_.columnFamily_);
try
{
- columnFamilyStore_.doCompaction();
+ return columnFamilyStore_.doCompaction();
}
catch (IOException e)
{
throw new RuntimeException(e);
}
- logger_.debug("Finished compaction ..." + columnFamilyStore_.columnFamily_);
+ finally
+ {
+ logger_.debug("Finished compaction ..." + columnFamilyStore_.columnFamily_);
+ }
}
}
@@ -164,13 +167,20 @@
compactor_.shutdownNow();
}
- public void submitPeriodicCompaction(ColumnFamilyStore columnFamilyStore)
- {
- compactor_.scheduleWithFixedDelay(new FileCompactor(columnFamilyStore), MinorCompactionManager.intervalInMins_,
+ public void submitPeriodicCompaction(final ColumnFamilyStore columnFamilyStore)
+ {
+ Runnable runnable = new Runnable() // having to wrap Callable in Runnable is retarded but that's what the API insists on.
+ {
+ public void run()
+ {
+ new FileCompactor(columnFamilyStore).call();
+ }
+ };
+ compactor_.scheduleWithFixedDelay(runnable, MinorCompactionManager.intervalInMins_,
MinorCompactionManager.intervalInMins_, TimeUnit.MINUTES);
}
- public Future submit(ColumnFamilyStore columnFamilyStore)
+ public Future<Integer> submit(ColumnFamilyStore columnFamilyStore)
{
return compactor_.submit(new FileCompactor(columnFamilyStore));
}
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=773725&r1=773724&r2=773725&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java Mon May 11 23:48:20 2009
@@ -3,14 +3,7 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-import java.util.SortedSet;
+import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -393,23 +386,56 @@
}
@Test
- public void testCompaction() throws IOException, ExecutionException, InterruptedException
+ public void testOneCompaction() throws IOException, ExecutionException, InterruptedException
{
Table table = Table.open("Table1");
ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
- for (int j = 0; j < 5; j++) {
- for (int i = 0; i < 10; i++) {
- long epoch = System.currentTimeMillis() / 1000;
- String key = String.format("%s.%s.%s", epoch, 1, i);
+ Set<String> inserted = new HashSet<String>();
+ for (int j = 0; j < 2; j++) {
+ String key = "0";
+ RowMutation rm = new RowMutation("Table1", key);
+ rm.add("Standard1:0", new byte[0], j);
+ rm.apply();
+ inserted.add(key);
+ store.forceBlockingFlush();
+ assertEquals(table.getKeyRange("", "", 10000).size(), inserted.size());
+ }
+ store.doCompaction(2);
+ assertEquals(table.getKeyRange("", "", 10000).size(), inserted.size());
+ }
+
+ @Test
+ public void testCompactions() throws IOException, ExecutionException, InterruptedException
+ {
+ // this test does enough rows to force multiple block indexes to be used
+ Table table = Table.open("Table1");
+ ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
+
+ final int ROWS_PER_SSTABLE = 10;
+ Set<String> inserted = new HashSet<String>();
+ for (int j = 0; j < (SSTable.indexInterval() * 3) / ROWS_PER_SSTABLE; j++) {
+ for (int i = 0; i < ROWS_PER_SSTABLE; i++) {
+ String key = String.valueOf(i % 2);
RowMutation rm = new RowMutation("Table1", key);
- rm.add("Standard1:A", new byte[0], epoch);
+ rm.add("Standard1:" + (i / 2), new byte[0], j * ROWS_PER_SSTABLE + i);
rm.apply();
+ inserted.add(key);
}
store.forceBlockingFlush();
+ assertEquals(table.getKeyRange("", "", 10000).size(), inserted.size());
+ }
+ while (true)
+ {
+ Future<Integer> ft = MinorCompactionManager.instance().submit(store);
+ if (ft.get() == 0)
+ break;
+ }
+ if (store.getSSTableFilenames().size() > 1)
+ {
+ store.doCompaction(store.getSSTableFilenames().size());
}
- Future ft = MinorCompactionManager.instance().submit(store);
- ft.get();
+ assertEquals(table.getKeyRange("", "", 10000).size(), inserted.size());
}
@Test