You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/10/20 20:41:20 UTC
svn commit: r827761 - in /incubator/cassandra/trunk:
src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/
test/unit/org/apache/cassandra/db/
Author: jbellis
Date: Tue Oct 20 18:41:19 2009
New Revision: 827761
URL: http://svn.apache.org/viewvc?rev=827761&view=rev
Log:
add test catching regression; call removeDeleted during compaction.
patch by jbellis; reviewed by eevans for CASSANDRA-500
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=827761&r1=827760&r2=827761&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue Oct 20 18:41:19 2009
@@ -47,6 +47,8 @@
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.collections.PredicateUtils;
+import org.apache.commons.collections.iterators.FilterIterator;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -473,7 +475,7 @@
return (int)(System.currentTimeMillis() / 1000) - DatabaseDescriptor.getGcGraceInSeconds();
}
- static ColumnFamily removeDeleted(ColumnFamily cf, int gcBefore)
+ public static ColumnFamily removeDeleted(ColumnFamily cf, int gcBefore)
{
if (cf == null)
{
@@ -664,7 +666,9 @@
{
sstables = ssTables_.getSSTables();
}
- doFileCompaction(sstables);
+
+ if (sstables.size() > 1)
+ doFileCompaction(sstables);
}
/*
@@ -773,7 +777,7 @@
logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
SSTableWriter writer = null;
- CompactionIterator ci = new CompactionIterator(sstables);
+ CompactionIterator ci = new CompactionIterator(sstables, getDefaultGCBefore());
try
{
@@ -819,6 +823,11 @@
return results;
}
+ private int doFileCompaction(Collection<SSTableReader> sstables) throws IOException
+ {
+ return doFileCompaction(sstables, getDefaultGCBefore());
+ }
+
/*
* This function does the actual compaction for files.
* It maintains a priority queue of with the first key from each file
@@ -830,7 +839,7 @@
* to get the latest data.
*
*/
- private int doFileCompaction(Collection<SSTableReader> sstables) throws IOException
+ int doFileCompaction(Collection<SSTableReader> sstables, int gcBefore) throws IOException
{
if (DatabaseDescriptor.isSnapshotBeforeCompaction())
Table.open(table_).snapshot("compact-" + columnFamily_);
@@ -855,7 +864,8 @@
logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
SSTableWriter writer;
- CompactionIterator ci = new CompactionIterator(sstables);
+ CompactionIterator ci = new CompactionIterator(sstables, gcBefore); // retain a handle so we can call close()
+ Iterator nni = new FilterIterator(ci, PredicateUtils.notNullPredicate());
try
{
@@ -868,9 +878,9 @@
String newFilename = new File(compactionFileLocation, getTempSSTableFileName()).getAbsolutePath();
writer = new SSTableWriter(newFilename, expectedBloomFilterSize, StorageService.getPartitioner());
- while (ci.hasNext())
+ while (nni.hasNext())
{
- CompactionIterator.CompactedRow row = ci.next();
+ CompactionIterator.CompactedRow row = (CompactionIterator.CompactedRow) nni.next();
writer.append(row.key, row.buffer);
totalkeysWritten++;
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=827761&r1=827760&r2=827761&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java Tue Oct 20 18:41:19 2009
@@ -11,15 +11,18 @@
import org.apache.cassandra.utils.ReducingIterator;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.ColumnFamilyStore;
public class CompactionIterator extends ReducingIterator<IteratingRow, CompactionIterator.CompactedRow> implements Closeable
{
private final List<IteratingRow> rows = new ArrayList<IteratingRow>();
+ private final int gcBefore;
@SuppressWarnings("unchecked")
- public CompactionIterator(Iterable<SSTableReader> sstables) throws IOException
+ public CompactionIterator(Iterable<SSTableReader> sstables, int gcBefore) throws IOException
{
super(getCollatingIterator(sstables));
+ this.gcBefore = gcBefore;
}
@SuppressWarnings("unchecked")
@@ -81,7 +84,10 @@
cf.addAll(row.getColumnFamily());
}
}
- ColumnFamily.serializer().serializeWithIndexes(cf, buffer);
+ ColumnFamily cfPurged = ColumnFamilyStore.removeDeleted(cf, gcBefore);
+ if (cfPurged == null)
+ return null;
+ ColumnFamily.serializer().serializeWithIndexes(cfPurged, buffer);
}
else
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=827761&r1=827760&r2=827761&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Tue Oct 20 18:41:19 2009
@@ -173,4 +173,12 @@
return key + ":" + position;
}
}
+
+ @Override
+ public String toString()
+ {
+ return getClass().getName() + "(" +
+ "path='" + path + '\'' +
+ ')';
+ }
}
Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java?rev=827761&r1=827760&r2=827761&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java Tue Oct 20 18:41:19 2009
@@ -29,6 +29,7 @@
import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.CleanupHelper;
import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.filter.IdentityQueryFilter;
import static junit.framework.Assert.assertEquals;
public class CompactionsTest extends CleanupHelper
@@ -65,4 +66,46 @@
}
assertEquals(inserted.size(), table.getColumnFamilyStore("Standard1").getKeyRange("", "", 10000).keys.size());
}
+
+ @Test
+ public void testCompactionPurge() throws IOException, ExecutionException, InterruptedException
+ {
+ Table table = Table.open("Keyspace1");
+ ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
+
+ String key = "key1";
+ RowMutation rm;
+
+ CompactionManager.instance().disableCompactions();
+
+ // inserts
+ rm = new RowMutation("Keyspace1", key);
+ for (int i = 0; i < 10; i++)
+ {
+ rm.add(new QueryPath("Standard1", null, String.valueOf(i).getBytes()), new byte[0], 0);
+ }
+ rm.apply();
+ store.forceBlockingFlush();
+
+ // deletes
+ for (int i = 0; i < 10; i++)
+ {
+ rm = new RowMutation("Keyspace1", key);
+ rm.delete(new QueryPath("Standard1", null, String.valueOf(i).getBytes()), 1);
+ rm.apply();
+ }
+ store.forceBlockingFlush();
+
+ // resurrect one row
+ rm = new RowMutation("Keyspace1", key);
+ rm.add(new QueryPath("Standard1", null, String.valueOf(5).getBytes()), new byte[0], 2);
+ rm.apply();
+ store.forceBlockingFlush();
+
+ // compact and test that all columns but the resurrected one is completely gone
+ store.doFileCompaction(store.getSSTables(), Integer.MAX_VALUE);
+ ColumnFamily cf = table.getColumnFamilyStore("Standard1").getColumnFamily(new IdentityQueryFilter(key, new QueryPath("Standard1")));
+ assert cf.getColumnCount() == 1;
+ assert cf.getColumn(String.valueOf(5).getBytes()) != null;
+ }
}