You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/10/20 20:41:20 UTC

svn commit: r827761 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/ test/unit/org/apache/cassandra/db/

Author: jbellis
Date: Tue Oct 20 18:41:19 2009
New Revision: 827761

URL: http://svn.apache.org/viewvc?rev=827761&view=rev
Log:
add test catching regression; call removeDeleted during compaction.
patch by jbellis; reviewed by eevans for CASSANDRA-500

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=827761&r1=827760&r2=827761&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue Oct 20 18:41:19 2009
@@ -47,6 +47,8 @@
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.collections.PredicateUtils;
+import org.apache.commons.collections.iterators.FilterIterator;
 
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
@@ -473,7 +475,7 @@
         return (int)(System.currentTimeMillis() / 1000) - DatabaseDescriptor.getGcGraceInSeconds();
     }
 
-    static ColumnFamily removeDeleted(ColumnFamily cf, int gcBefore)
+    public static ColumnFamily removeDeleted(ColumnFamily cf, int gcBefore)
     {
         if (cf == null)
         {
@@ -664,7 +666,9 @@
         {
             sstables = ssTables_.getSSTables();
         }
-        doFileCompaction(sstables);
+
+        if (sstables.size() > 1)
+            doFileCompaction(sstables);
     }
 
     /*
@@ -773,7 +777,7 @@
           logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
 
         SSTableWriter writer = null;
-        CompactionIterator ci = new CompactionIterator(sstables);
+        CompactionIterator ci = new CompactionIterator(sstables, getDefaultGCBefore());
 
         try
         {
@@ -819,6 +823,11 @@
         return results;
     }
 
+    private int doFileCompaction(Collection<SSTableReader> sstables) throws IOException
+    {
+        return doFileCompaction(sstables, getDefaultGCBefore());
+    }
+
     /*
     * This function does the actual compaction for files.
     * It maintains a priority queue of with the first key from each file
@@ -830,7 +839,7 @@
     * to get the latest data.
     *
     */
-    private int doFileCompaction(Collection<SSTableReader> sstables) throws IOException
+    int doFileCompaction(Collection<SSTableReader> sstables, int gcBefore) throws IOException
     {
         if (DatabaseDescriptor.isSnapshotBeforeCompaction())
             Table.open(table_).snapshot("compact-" + columnFamily_);
@@ -855,7 +864,8 @@
           logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
 
         SSTableWriter writer;
-        CompactionIterator ci = new CompactionIterator(sstables);
+        CompactionIterator ci = new CompactionIterator(sstables, gcBefore); // retain a handle so we can call close()
+        Iterator nni = new FilterIterator(ci, PredicateUtils.notNullPredicate());
 
         try
         {
@@ -868,9 +878,9 @@
             String newFilename = new File(compactionFileLocation, getTempSSTableFileName()).getAbsolutePath();
             writer = new SSTableWriter(newFilename, expectedBloomFilterSize, StorageService.getPartitioner());
 
-            while (ci.hasNext())
+            while (nni.hasNext())
             {
-                CompactionIterator.CompactedRow row = ci.next();
+                CompactionIterator.CompactedRow row = (CompactionIterator.CompactedRow) nni.next();
                 writer.append(row.key, row.buffer);
                 totalkeysWritten++;
             }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=827761&r1=827760&r2=827761&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java Tue Oct 20 18:41:19 2009
@@ -11,15 +11,18 @@
 import org.apache.cassandra.utils.ReducingIterator;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.ColumnFamilyStore;
 
 public class CompactionIterator extends ReducingIterator<IteratingRow, CompactionIterator.CompactedRow> implements Closeable
 {
     private final List<IteratingRow> rows = new ArrayList<IteratingRow>();
+    private final int gcBefore;
 
     @SuppressWarnings("unchecked")
-    public CompactionIterator(Iterable<SSTableReader> sstables) throws IOException
+    public CompactionIterator(Iterable<SSTableReader> sstables, int gcBefore) throws IOException
     {
         super(getCollatingIterator(sstables));
+        this.gcBefore = gcBefore;
     }
 
     @SuppressWarnings("unchecked")
@@ -81,7 +84,10 @@
                     cf.addAll(row.getColumnFamily());
                 }
             }
-            ColumnFamily.serializer().serializeWithIndexes(cf, buffer);
+            ColumnFamily cfPurged = ColumnFamilyStore.removeDeleted(cf, gcBefore);
+            if (cfPurged == null)
+                return null;
+            ColumnFamily.serializer().serializeWithIndexes(cfPurged, buffer);
         }
         else
         {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java?rev=827761&r1=827760&r2=827761&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/SSTable.java Tue Oct 20 18:41:19 2009
@@ -173,4 +173,12 @@
             return key + ":" + position;
         }
     }
+
+    @Override
+    public String toString()
+    {
+        return getClass().getName() + "(" +
+               "path='" + path + '\'' +
+               ')';
+    }
 }

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java?rev=827761&r1=827760&r2=827761&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java Tue Oct 20 18:41:19 2009
@@ -29,6 +29,7 @@
 import org.apache.cassandra.io.SSTableReader;
 import org.apache.cassandra.CleanupHelper;
 import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.db.filter.IdentityQueryFilter;
 import static junit.framework.Assert.assertEquals;
 
 public class CompactionsTest extends CleanupHelper
@@ -65,4 +66,46 @@
         }
         assertEquals(inserted.size(), table.getColumnFamilyStore("Standard1").getKeyRange("", "", 10000).keys.size());
     }
+
+    @Test
+    public void testCompactionPurge() throws IOException, ExecutionException, InterruptedException
+    {
+        Table table = Table.open("Keyspace1");
+        ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
+
+        String key = "key1";
+        RowMutation rm;
+
+        CompactionManager.instance().disableCompactions();
+
+        // inserts
+        rm = new RowMutation("Keyspace1", key);
+        for (int i = 0; i < 10; i++)
+        {
+            rm.add(new QueryPath("Standard1", null, String.valueOf(i).getBytes()), new byte[0], 0);
+        }
+        rm.apply();
+        store.forceBlockingFlush();
+
+        // deletes
+        for (int i = 0; i < 10; i++)
+        {
+            rm = new RowMutation("Keyspace1", key);
+            rm.delete(new QueryPath("Standard1", null, String.valueOf(i).getBytes()), 1);
+            rm.apply();
+        }
+        store.forceBlockingFlush();
+
+        // resurrect one row
+        rm = new RowMutation("Keyspace1", key);
+        rm.add(new QueryPath("Standard1", null, String.valueOf(5).getBytes()), new byte[0], 2);
+        rm.apply();
+        store.forceBlockingFlush();
+
+        // compact and test that all columns but the resurrected one is completely gone
+        store.doFileCompaction(store.getSSTables(), Integer.MAX_VALUE);
+        ColumnFamily cf = table.getColumnFamilyStore("Standard1").getColumnFamily(new IdentityQueryFilter(key, new QueryPath("Standard1")));
+        assert cf.getColumnCount() == 1;
+        assert cf.getColumn(String.valueOf(5).getBytes()) != null;
+    }
 }