You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/04/17 22:08:01 UTC

svn commit: r766132 - in /incubator/cassandra/trunk: src/org/apache/cassandra/db/ test/org/apache/cassandra/db/

Author: jbellis
Date: Fri Apr 17 20:08:01 2009
New Revision: 766132

URL: http://svn.apache.org/viewvc?rev=766132&view=rev
Log:
preserve tombstones until a GC grace period has elapsed.
patch by jbellis; reviewed by Eric Evans for #33

Modified:
    incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/IColumn.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java
    incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java?rev=766132&r1=766131&r2=766132&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java Fri Apr 17 20:08:01 2009
@@ -22,6 +22,7 @@
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Collection;
+import java.nio.ByteBuffer;
 
 import org.apache.commons.lang.ArrayUtils;
 
@@ -192,6 +193,11 @@
         return stringBuilder.toString().getBytes();
     }
 
+    public int getLocalDeletionTime()
+    {
+        assert isMarkedForDelete;
+        return ByteBuffer.wrap(value).getInt();
+    }
 }
 
 class ColumnSerializer implements ICompactSerializer2<IColumn>

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java?rev=766132&r1=766131&r2=766132&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java Fri Apr 17 20:08:01 2009
@@ -96,6 +96,7 @@
 
     private transient ICompactSerializer2<IColumn> columnSerializer_;
     private long markedForDeleteAt = Long.MIN_VALUE;
+    private int localDeletionTime = Integer.MIN_VALUE;
     private AtomicInteger size_ = new AtomicInteger(0);
     private EfficientBidiMap columns_;
 
@@ -156,11 +157,18 @@
         createColumnFactoryAndColumnSerializer(columnType);
     }
 
+    ColumnFamily cloneMeShallow()
+    {
+        ColumnFamily cf = new ColumnFamily(name_, type_);
+        cf.markedForDeleteAt = markedForDeleteAt;
+        cf.localDeletionTime = localDeletionTime;
+        return cf;
+    }
+
     ColumnFamily cloneMe()
     {
-    	ColumnFamily cf = new ColumnFamily(name_, type_);
-    	cf.markedForDeleteAt = markedForDeleteAt;
-    	cf.columns_ = columns_.cloneMe();
+        ColumnFamily cf = cloneMeShallow();
+        cf.columns_ = columns_.cloneMe();
     	return cf;
     }
 
@@ -292,8 +300,9 @@
     	columns_.remove(columnName);
     }
 
-    void delete(long timestamp)
+    void delete(int localtime, long timestamp)
     {
+        localDeletionTime = localtime;
         markedForDeleteAt = timestamp;
     }
 
@@ -413,10 +422,16 @@
     	return xorHash;
     }
 
-    public long getMarkedForDeleteAt() {
+    public long getMarkedForDeleteAt()
+    {
         return markedForDeleteAt;
     }
 
+    public int getLocalDeletionTime()
+    {
+        return localDeletionTime;
+    }
+
     public String type()
     {
         return type_;
@@ -452,15 +467,11 @@
         {
             Collection<IColumn> columns = columnFamily.getAllColumns();
 
-            /* write the column family id */
             dos.writeUTF(columnFamily.name());
-            /* write if this cf is marked for delete */
-            dos.writeLong(columnFamily.getMarkedForDeleteAt());
+            dos.writeInt(columnFamily.localDeletionTime);
+            dos.writeLong(columnFamily.markedForDeleteAt);
 
-            /* write the size is the number of columns */
             dos.writeInt(columns.size());
-
-            /* write the column data */
             for ( IColumn column : columns )
             {
                 columnFamily.getColumnSerializer().serialize(column, dos);
@@ -475,7 +486,7 @@
         {
             String name = dis.readUTF();
             ColumnFamily cf = new ColumnFamily(name, DatabaseDescriptor.getColumnFamilyType(name));
-            cf.delete(dis.readLong());
+            cf.delete(dis.readInt(), dis.readLong());
             return cf;
         }
 

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java?rev=766132&r1=766131&r2=766132&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java Fri Apr 17 20:08:01 2009
@@ -23,10 +23,6 @@
 import java.lang.management.ManagementFactory;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
-import javax.management.InstanceAlreadyExistsException;
-import javax.management.MBeanRegistrationException;
-import javax.management.NotCompliantMBeanException;
-import javax.management.MalformedObjectNameException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -594,14 +590,15 @@
 
         // start from nothing so that we don't include potential deleted columns from the first instance
         ColumnFamily cf0 = columnFamilies.get(0);
-        ColumnFamily cf = new ColumnFamily(cf0.name(), cf0.type());
+        ColumnFamily cf = cf0.cloneMeShallow();
 
         // merge
         for (ColumnFamily cf2 : columnFamilies)
         {
             assert cf.name().equals(cf2.name());
             cf.addColumns(cf2);
-            cf.delete(Math.max(cf.getMarkedForDeleteAt(), cf2.getMarkedForDeleteAt()));
+            cf.delete(Math.max(cf.getLocalDeletionTime(), cf2.getLocalDeletionTime()),
+                      Math.max(cf.getMarkedForDeleteAt(), cf2.getMarkedForDeleteAt()));
         }
         return cf;
     }
@@ -619,29 +616,60 @@
         return removeDeleted(cf);
     }
 
-    static ColumnFamily removeDeleted(ColumnFamily cf) {
-        if (cf == null) {
+    static final int GC_GRACE_IN_SECONDS = 10 * 24 * 3600; // 10 days
+
+    /*
+     This is complicated because we need to preserve deleted columns, supercolumns, and columnfamilies
+     until they have been deleted for at least GC_GRACE_IN_SECONDS.  But, we do not need to preserve
+     their contents; just the object itself as a "tombstone" that can be used to repair other
+     replicas that do not know about the deletion.
+     */
+    static ColumnFamily removeDeleted(ColumnFamily cf)
+    {
+        return removeDeleted(cf, (int)(System.currentTimeMillis() / 1000) - GC_GRACE_IN_SECONDS);
+    }
+
+    static ColumnFamily removeDeleted(ColumnFamily cf, int gcBefore)
+    {
+        if (cf == null)
             return null;
-        }
-        for (String cname : new ArrayList<String>(cf.getColumns().keySet())) {
+
+        for (String cname : new ArrayList<String>(cf.getColumns().keySet()))
+        {
             IColumn c = cf.getColumns().get(cname);
-            if (c instanceof SuperColumn) {
-                long min_timestamp = Math.max(c.getMarkedForDeleteAt(), cf.getMarkedForDeleteAt());
+            if (c instanceof SuperColumn)
+            {
+                long minTimestamp = Math.max(c.getMarkedForDeleteAt(), cf.getMarkedForDeleteAt());
                 // don't operate directly on the supercolumn, it could be the one in the memtable
                 cf.remove(cname);
-                IColumn sc = new SuperColumn(cname);
-                for (IColumn subColumn : c.getSubColumns()) {
-                    if (!subColumn.isMarkedForDelete() && subColumn.timestamp() >= min_timestamp) {
-                        sc.addColumn(subColumn.name(), subColumn);
+                SuperColumn sc = new SuperColumn(cname);
+                sc.markForDeleteAt(c.getLocalDeletionTime(), c.getMarkedForDeleteAt());
+                for (IColumn subColumn : c.getSubColumns())
+                {
+                    if (subColumn.timestamp() >= minTimestamp)
+                    {
+                        if (!subColumn.isMarkedForDelete() || subColumn.getLocalDeletionTime() > gcBefore)
+                        {
+                            sc.addColumn(subColumn.name(), subColumn);
+                        }
                     }
                 }
-                if (sc.getSubColumns().size() > 0) {
+                if (sc.getSubColumns().size() > 0 || sc.getLocalDeletionTime() > gcBefore)
+                {
                     cf.addColumn(sc);
                 }
-            } else if (c.isMarkedForDelete() || c.timestamp() < cf.getMarkedForDeleteAt()) {
+            }
+            else if ((c.isMarkedForDelete() && c.getLocalDeletionTime() <= gcBefore)
+                     || c.timestamp() < cf.getMarkedForDeleteAt())
+            {
                 cf.remove(cname);
             }
         }
+
+        if (cf.getColumnCount() == 0 && cf.getLocalDeletionTime() <= gcBefore)
+        {
+            return null;
+        }
         return cf;
     }
 

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/IColumn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/IColumn.java?rev=766132&r1=766131&r2=766132&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/IColumn.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/IColumn.java Fri Apr 17 20:08:01 2009
@@ -43,4 +43,5 @@
     public IColumn diff(IColumn column);
     public int getObjectCount();
     public byte[] digest();
+    public int getLocalDeletionTime(); // for tombstone GC, so int is sufficient granularity
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java?rev=766132&r1=766131&r2=766132&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java Fri Apr 17 20:08:01 2009
@@ -276,7 +276,8 @@
             int newObjectCount = oldCf.getColumnCount();
             resolveSize(oldSize, newSize);
             resolveCount(oldObjectCount, newObjectCount);
-            oldCf.delete(Math.max(oldCf.getMarkedForDeleteAt(), columnFamily.getMarkedForDeleteAt()));
+            oldCf.delete(Math.max(oldCf.getLocalDeletionTime(), columnFamily.getLocalDeletionTime()),
+                         Math.max(oldCf.getMarkedForDeleteAt(), columnFamily.getMarkedForDeleteAt()));
         }
         else
         {

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java?rev=766132&r1=766131&r2=766132&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java Fri Apr 17 20:08:01 2009
@@ -29,6 +29,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.nio.ByteBuffer;
 
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
@@ -175,14 +176,16 @@
     {
         String[] values = RowMutation.getColumnAndColumnFamily(columnFamilyColumn);
         String cfName = values[0];
+
         if (modifications_.containsKey(cfName))
         {
             throw new IllegalArgumentException("ColumnFamily " + cfName + " is already being modified");
         }
-
         if (values.length == 0 || values.length > 3)
             throw new IllegalArgumentException("Column Family " + columnFamilyColumn + " in invalid format. Must be in <column family>:<column> format.");
 
+        int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
+
         ColumnFamily columnFamily = modifications_.get(cfName);
         if (columnFamily == null)
             columnFamily = new ColumnFamily(cfName, DatabaseDescriptor.getColumnType(cfName));
@@ -191,22 +194,26 @@
             if (columnFamily.isSuper())
             {
                 SuperColumn sc = new SuperColumn(values[1]);
-                sc.markForDeleteAt(timestamp);
+                sc.markForDeleteAt(localDeleteTime, timestamp);
                 columnFamily.addColumn(sc);
             }
             else
             {
-                columnFamily.addColumn(values[1], ArrayUtils.EMPTY_BYTE_ARRAY, timestamp, true);
+                ByteBuffer bytes = ByteBuffer.allocate(4);
+                bytes.putInt(localDeleteTime);
+                columnFamily.addColumn(values[1], bytes.array(), timestamp, true);
             }
         }
         else if (values.length == 3)
         {
-            columnFamily.addColumn(values[1] + ":" + values[2], ArrayUtils.EMPTY_BYTE_ARRAY, timestamp, true);
+            ByteBuffer bytes = ByteBuffer.allocate(4);
+            bytes.putInt(localDeleteTime);
+            columnFamily.addColumn(values[1] + ":" + values[2], bytes.array(), timestamp, true);
         }
         else
         {
             assert values.length == 1;
-            columnFamily.delete(timestamp);
+            columnFamily.delete(localDeleteTime, timestamp);
         }
         modifications_.put(cfName, columnFamily);
     }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java?rev=766132&r1=766131&r2=766132&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java Fri Apr 17 20:08:01 2009
@@ -49,6 +49,7 @@
 
 	private String name_;
     private EfficientBidiMap columns_ = new EfficientBidiMap(ColumnComparatorFactory.getComparator(ColumnComparatorFactory.ComparatorType.TIMESTAMP));
+    private int localDeletionTime = Integer.MIN_VALUE;
 	private long markedForDeleteAt = Long.MIN_VALUE;
     private AtomicInteger size_ = new AtomicInteger(0);
 
@@ -289,7 +290,14 @@
         return sb.toString();
     }
 
-    public void markForDeleteAt(long timestamp) {
+    public int getLocalDeletionTime()
+    {
+        return localDeletionTime;
+    }
+
+    public void markForDeleteAt(int localDeleteTime, long timestamp)
+    {
+        this.localDeletionTime = localDeleteTime;
         this.markedForDeleteAt = timestamp;
     }
 }
@@ -300,20 +308,14 @@
     {
     	SuperColumn superColumn = (SuperColumn)column;
         dos.writeUTF(superColumn.name());
+        dos.writeInt(superColumn.getLocalDeletionTime());
         dos.writeLong(superColumn.getMarkedForDeleteAt());
 
         Collection<IColumn> columns  = column.getSubColumns();
         int size = columns.size();
         dos.writeInt(size);
 
-        /*
-         * Add the total size of the columns. This is useful
-         * to skip over all the columns in this super column
-         * if we are not interested in this super column.
-        */
         dos.writeInt(superColumn.getSizeOfAllColumns());
-        // dos.writeInt(superColumn.size());
-
         for ( IColumn subColumn : columns )
         {
             Column.serializer().serialize(subColumn, dos);
@@ -328,7 +330,7 @@
     {
         String name = dis.readUTF();
         SuperColumn superColumn = new SuperColumn(name);
-        superColumn.markForDeleteAt(dis.readLong());
+        superColumn.markForDeleteAt(dis.readInt(), dis.readLong());
         return superColumn;
     }
 

Modified: incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=766132&r1=766131&r2=766132&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java (original)
+++ incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java Fri Apr 17 20:08:01 2009
@@ -19,6 +19,8 @@
 
 import org.apache.cassandra.ServerTest;
 import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
 
 public class ColumnFamilyStoreTest extends ServerTest
 {
@@ -207,7 +209,8 @@
         rm.apply();
 
         ColumnFamily retrieved = store.getColumnFamily("key1", "Standard1", new IdentityFilter());
-        assert retrieved.getColumnCount() == 0;
+        assert retrieved.getColumn("Column1").isMarkedForDelete();
+        assertNull(ColumnFamilyStore.removeDeleted(retrieved, Integer.MAX_VALUE));
     }
 
     @Test
@@ -229,7 +232,8 @@
         rm.apply();
 
         ColumnFamily retrieved = store.getColumnFamily("key1", "Super1:SC1", new IdentityFilter());
-        assert retrieved.getColumnCount() == 0;
+        assert retrieved.getColumn("SC1").getSubColumn("Column1").isMarkedForDelete();
+        assertNull(ColumnFamilyStore.removeDeleted(retrieved, Integer.MAX_VALUE));
     }
 
     @Test
@@ -258,7 +262,7 @@
         Collection<IColumn> subColumns = resolved.getAllColumns().first().getSubColumns();
         assert subColumns.size() == 1;
         assert subColumns.iterator().next().timestamp() == 0;
-        assert ColumnFamilyStore.removeDeleted(resolved).getColumnCount() == 0;
+        assertNull(ColumnFamilyStore.removeDeleted(resolved, Integer.MAX_VALUE));
     }
 
     @Test
@@ -281,7 +285,9 @@
         rm.apply();
 
         ColumnFamily retrieved = store.getColumnFamily("key1", "Standard1", new IdentityFilter());
-        assert retrieved.getColumnCount() == 0;
+        assert retrieved.isMarkedForDelete();
+        assertEquals(retrieved.getColumnCount(), 0);
+        assertNull(ColumnFamilyStore.removeDeleted(retrieved, Integer.MAX_VALUE));
     }
 
     @Test