You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/10/22 17:41:11 UTC

svn commit: r828755 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/ test/unit/org/apache/cassandra/db/

Author: jbellis
Date: Thu Oct 22 15:41:11 2009
New Revision: 828755

URL: http://svn.apache.org/viewvc?rev=828755&view=rev
Log:
all rows go through deserialize/removeDeleted so we can GC tombstones.
patch by jbellis; reviewed by junrao for CASSANDRA-507

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=828755&r1=828754&r2=828755&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Oct 22 15:41:11 2009
@@ -667,8 +667,7 @@
             sstables = ssTables_.getSSTables();
         }
 
-        if (sstables.size() > 1)
-            doFileCompaction(sstables);
+        doFileCompaction(sstables);
     }
 
     /*
@@ -838,6 +837,8 @@
     * that occur in multiple files and are the same then a resolution is done
     * to get the latest data.
     *
+    * The collection of sstables passed may be empty (but not null); even if
+    * it is not empty, it may compact down to nothing if all rows are deleted.
     */
     int doFileCompaction(Collection<SSTableReader> sstables, int gcBefore) throws IOException
     {
@@ -869,9 +870,12 @@
 
         try
         {
-            if (!ci.hasNext())
+            if (!nni.hasNext())
             {
-                logger_.warn("Nothing to compact (all files empty or corrupt). This should not happen.");
+                // don't mark compacted in the finally block, since if there _is_ nondeleted data,
+                // we need to sync it (via closeAndOpen) first, so there is no period during which
+                // a crash could cause data loss.
+                ssTables_.markCompacted(sstables);
                 return 0;
             }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=828755&r1=828754&r2=828755&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Thu Oct 22 15:41:11 2009
@@ -203,7 +203,6 @@
     public SSTableReader writeSortedContents(List<DecoratedKey> sortedKeys) throws IOException
     {
         logger_.info("Writing " + this);
-        IPartitioner<?> partitioner = StorageService.getPartitioner();
         ColumnFamilyStore cfStore = Table.open(table_).getColumnFamilyStore(cfName_);
         SSTableWriter writer = new SSTableWriter(cfStore.getTempSSTablePath(), columnFamilies_.size(), StorageService.getPartitioner());
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=828755&r1=828754&r2=828755&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java Thu Oct 22 15:41:11 2009
@@ -70,31 +70,26 @@
     {
         DataOutputBuffer buffer = new DataOutputBuffer();
         DecoratedKey key = rows.get(0).getKey();
-        if (rows.size() > 1)
+        assert rows.size() > 0;
+
+        ColumnFamily cf = null;
+        for (IteratingRow row : rows)
         {
-            ColumnFamily cf = null;
-            for (IteratingRow row : rows)
+            if (cf == null)
             {
-                if (cf == null)
-                {
-                    cf = row.getColumnFamily();
-                }
-                else
-                {
-                    cf.addAll(row.getColumnFamily());
-                }
+                cf = row.getColumnFamily();
+            }
+            else
+            {
+                cf.addAll(row.getColumnFamily());
             }
-            ColumnFamily cfPurged = ColumnFamilyStore.removeDeleted(cf, gcBefore);
-            if (cfPurged == null)
-                return null;
-            ColumnFamily.serializer().serializeWithIndexes(cfPurged, buffer);
-        }
-        else
-        {
-            assert rows.size() == 1;
-            rows.get(0).echoData(buffer);
         }
         rows.clear();
+
+        ColumnFamily cfPurged = ColumnFamilyStore.removeDeleted(cf, gcBefore);
+        if (cfPurged == null)
+            return null;
+        ColumnFamily.serializer().serializeWithIndexes(cfPurged, buffer);
         return new CompactedRow(key, buffer);
     }
 

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java?rev=828755&r1=828754&r2=828755&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/CompactionsTest.java Thu Oct 22 15:41:11 2009
@@ -70,19 +70,20 @@
     @Test
     public void testCompactionPurge() throws IOException, ExecutionException, InterruptedException
     {
+        CompactionManager.instance().disableCompactions();
+
         Table table = Table.open("Keyspace1");
-        ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
+        String cfName = "Standard1";
+        ColumnFamilyStore store = table.getColumnFamilyStore(cfName);
 
         String key = "key1";
         RowMutation rm;
 
-        CompactionManager.instance().disableCompactions();
-
         // inserts
         rm = new RowMutation("Keyspace1", key);
         for (int i = 0; i < 10; i++)
         {
-            rm.add(new QueryPath("Standard1", null, String.valueOf(i).getBytes()), new byte[0], 0);
+            rm.add(new QueryPath(cfName, null, String.valueOf(i).getBytes()), new byte[0], 0);
         }
         rm.apply();
         store.forceBlockingFlush();
@@ -91,21 +92,59 @@
         for (int i = 0; i < 10; i++)
         {
             rm = new RowMutation("Keyspace1", key);
-            rm.delete(new QueryPath("Standard1", null, String.valueOf(i).getBytes()), 1);
+            rm.delete(new QueryPath(cfName, null, String.valueOf(i).getBytes()), 1);
             rm.apply();
         }
         store.forceBlockingFlush();
 
         // resurrect one row
         rm = new RowMutation("Keyspace1", key);
-        rm.add(new QueryPath("Standard1", null, String.valueOf(5).getBytes()), new byte[0], 2);
+        rm.add(new QueryPath(cfName, null, String.valueOf(5).getBytes()), new byte[0], 2);
         rm.apply();
         store.forceBlockingFlush();
 
         // compact and test that all columns but the resurrected one is completely gone
         store.doFileCompaction(store.getSSTables(), Integer.MAX_VALUE);
-        ColumnFamily cf = table.getColumnFamilyStore("Standard1").getColumnFamily(new IdentityQueryFilter(key, new QueryPath("Standard1")));
+        ColumnFamily cf = table.getColumnFamilyStore(cfName).getColumnFamily(new IdentityQueryFilter(key, new QueryPath(cfName)));
         assert cf.getColumnCount() == 1;
         assert cf.getColumn(String.valueOf(5).getBytes()) != null;
     }
+
+    @Test
+    public void testCompactionPurgeOneFile() throws IOException, ExecutionException, InterruptedException
+    {
+        CompactionManager.instance().disableCompactions();
+
+        Table table = Table.open("Keyspace1");
+        String cfName = "Standard2";
+        ColumnFamilyStore store = table.getColumnFamilyStore(cfName);
+
+        String key = "key1";
+        RowMutation rm;
+
+        // inserts
+        rm = new RowMutation("Keyspace1", key);
+        for (int i = 0; i < 5; i++)
+        {
+            rm.add(new QueryPath(cfName, null, String.valueOf(i).getBytes()), new byte[0], 0);
+        }
+        rm.apply();
+
+        // deletes
+        for (int i = 0; i < 5; i++)
+        {
+            rm = new RowMutation("Keyspace1", key);
+            rm.delete(new QueryPath(cfName, null, String.valueOf(i).getBytes()), 1);
+            rm.apply();
+        }
+        store.forceBlockingFlush();
+
+        assert store.getSSTables().size() == 1 : store.getSSTables(); // inserts & deletes were in the same memtable -> only deletes in sstable
+
+        // compact and test that the row is completely gone
+        store.doFileCompaction(store.getSSTables(), Integer.MAX_VALUE);
+        assert store.getSSTables().isEmpty();
+        ColumnFamily cf = table.getColumnFamilyStore(cfName).getColumnFamily(new IdentityQueryFilter(key, new QueryPath(cfName)));
+        assert cf == null : cf;
+    }
 }