You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/04/17 22:08:01 UTC
svn commit: r766132 - in /incubator/cassandra/trunk:
src/org/apache/cassandra/db/ test/org/apache/cassandra/db/
Author: jbellis
Date: Fri Apr 17 20:08:01 2009
New Revision: 766132
URL: http://svn.apache.org/viewvc?rev=766132&view=rev
Log:
preserve tombstones until a GC grace period has elapsed.
patch by jbellis; reviewed by Eric Evans for #33
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/IColumn.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java
incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java?rev=766132&r1=766131&r2=766132&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Column.java Fri Apr 17 20:08:01 2009
@@ -22,6 +22,7 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Collection;
+import java.nio.ByteBuffer;
import org.apache.commons.lang.ArrayUtils;
@@ -192,6 +193,11 @@
return stringBuilder.toString().getBytes();
}
+ public int getLocalDeletionTime()
+ {
+ assert isMarkedForDelete;
+ return ByteBuffer.wrap(value).getInt();
+ }
}
class ColumnSerializer implements ICompactSerializer2<IColumn>
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java?rev=766132&r1=766131&r2=766132&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamily.java Fri Apr 17 20:08:01 2009
@@ -96,6 +96,7 @@
private transient ICompactSerializer2<IColumn> columnSerializer_;
private long markedForDeleteAt = Long.MIN_VALUE;
+ private int localDeletionTime = Integer.MIN_VALUE;
private AtomicInteger size_ = new AtomicInteger(0);
private EfficientBidiMap columns_;
@@ -156,11 +157,18 @@
createColumnFactoryAndColumnSerializer(columnType);
}
+ ColumnFamily cloneMeShallow()
+ {
+ ColumnFamily cf = new ColumnFamily(name_, type_);
+ cf.markedForDeleteAt = markedForDeleteAt;
+ cf.localDeletionTime = localDeletionTime;
+ return cf;
+ }
+
ColumnFamily cloneMe()
{
- ColumnFamily cf = new ColumnFamily(name_, type_);
- cf.markedForDeleteAt = markedForDeleteAt;
- cf.columns_ = columns_.cloneMe();
+ ColumnFamily cf = cloneMeShallow();
+ cf.columns_ = columns_.cloneMe();
return cf;
}
@@ -292,8 +300,9 @@
columns_.remove(columnName);
}
- void delete(long timestamp)
+ void delete(int localtime, long timestamp)
{
+ localDeletionTime = localtime;
markedForDeleteAt = timestamp;
}
@@ -413,10 +422,16 @@
return xorHash;
}
- public long getMarkedForDeleteAt() {
+ public long getMarkedForDeleteAt()
+ {
return markedForDeleteAt;
}
+ public int getLocalDeletionTime()
+ {
+ return localDeletionTime;
+ }
+
public String type()
{
return type_;
@@ -452,15 +467,11 @@
{
Collection<IColumn> columns = columnFamily.getAllColumns();
- /* write the column family id */
dos.writeUTF(columnFamily.name());
- /* write if this cf is marked for delete */
- dos.writeLong(columnFamily.getMarkedForDeleteAt());
+ dos.writeInt(columnFamily.localDeletionTime);
+ dos.writeLong(columnFamily.markedForDeleteAt);
- /* write the size is the number of columns */
dos.writeInt(columns.size());
-
- /* write the column data */
for ( IColumn column : columns )
{
columnFamily.getColumnSerializer().serialize(column, dos);
@@ -475,7 +486,7 @@
{
String name = dis.readUTF();
ColumnFamily cf = new ColumnFamily(name, DatabaseDescriptor.getColumnFamilyType(name));
- cf.delete(dis.readLong());
+ cf.delete(dis.readInt(), dis.readLong());
return cf;
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java?rev=766132&r1=766131&r2=766132&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java Fri Apr 17 20:08:01 2009
@@ -23,10 +23,6 @@
import java.lang.management.ManagementFactory;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import javax.management.InstanceAlreadyExistsException;
-import javax.management.MBeanRegistrationException;
-import javax.management.NotCompliantMBeanException;
-import javax.management.MalformedObjectNameException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -594,14 +590,15 @@
// start from nothing so that we don't include potential deleted columns from the first instance
ColumnFamily cf0 = columnFamilies.get(0);
- ColumnFamily cf = new ColumnFamily(cf0.name(), cf0.type());
+ ColumnFamily cf = cf0.cloneMeShallow();
// merge
for (ColumnFamily cf2 : columnFamilies)
{
assert cf.name().equals(cf2.name());
cf.addColumns(cf2);
- cf.delete(Math.max(cf.getMarkedForDeleteAt(), cf2.getMarkedForDeleteAt()));
+ cf.delete(Math.max(cf.getLocalDeletionTime(), cf2.getLocalDeletionTime()),
+ Math.max(cf.getMarkedForDeleteAt(), cf2.getMarkedForDeleteAt()));
}
return cf;
}
@@ -619,29 +616,60 @@
return removeDeleted(cf);
}
- static ColumnFamily removeDeleted(ColumnFamily cf) {
- if (cf == null) {
+ static final int GC_GRACE_IN_SECONDS = 10 * 24 * 3600; // 10 days
+
+ /*
+ This is complicated because we need to preserve deleted columns, supercolumns, and columnfamilies
+ until they have been deleted for at least GC_GRACE_IN_SECONDS. But, we do not need to preserve
+ their contents; just the object itself as a "tombstone" that can be used to repair other
+ replicas that do not know about the deletion.
+ */
+ static ColumnFamily removeDeleted(ColumnFamily cf)
+ {
+ return removeDeleted(cf, (int)(System.currentTimeMillis() / 1000) - GC_GRACE_IN_SECONDS);
+ }
+
+ static ColumnFamily removeDeleted(ColumnFamily cf, int gcBefore)
+ {
+ if (cf == null)
return null;
- }
- for (String cname : new ArrayList<String>(cf.getColumns().keySet())) {
+
+ for (String cname : new ArrayList<String>(cf.getColumns().keySet()))
+ {
IColumn c = cf.getColumns().get(cname);
- if (c instanceof SuperColumn) {
- long min_timestamp = Math.max(c.getMarkedForDeleteAt(), cf.getMarkedForDeleteAt());
+ if (c instanceof SuperColumn)
+ {
+ long minTimestamp = Math.max(c.getMarkedForDeleteAt(), cf.getMarkedForDeleteAt());
// don't operate directly on the supercolumn, it could be the one in the memtable
cf.remove(cname);
- IColumn sc = new SuperColumn(cname);
- for (IColumn subColumn : c.getSubColumns()) {
- if (!subColumn.isMarkedForDelete() && subColumn.timestamp() >= min_timestamp) {
- sc.addColumn(subColumn.name(), subColumn);
+ SuperColumn sc = new SuperColumn(cname);
+ sc.markForDeleteAt(c.getLocalDeletionTime(), c.getMarkedForDeleteAt());
+ for (IColumn subColumn : c.getSubColumns())
+ {
+ if (subColumn.timestamp() >= minTimestamp)
+ {
+ if (!subColumn.isMarkedForDelete() || subColumn.getLocalDeletionTime() > gcBefore)
+ {
+ sc.addColumn(subColumn.name(), subColumn);
+ }
}
}
- if (sc.getSubColumns().size() > 0) {
+ if (sc.getSubColumns().size() > 0 || sc.getLocalDeletionTime() > gcBefore)
+ {
cf.addColumn(sc);
}
- } else if (c.isMarkedForDelete() || c.timestamp() < cf.getMarkedForDeleteAt()) {
+ }
+ else if ((c.isMarkedForDelete() && c.getLocalDeletionTime() <= gcBefore)
+ || c.timestamp() < cf.getMarkedForDeleteAt())
+ {
cf.remove(cname);
}
}
+
+ if (cf.getColumnCount() == 0 && cf.getLocalDeletionTime() <= gcBefore)
+ {
+ return null;
+ }
return cf;
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/IColumn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/IColumn.java?rev=766132&r1=766131&r2=766132&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/IColumn.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/IColumn.java Fri Apr 17 20:08:01 2009
@@ -43,4 +43,5 @@
public IColumn diff(IColumn column);
public int getObjectCount();
public byte[] digest();
+ public int getLocalDeletionTime(); // for tombstone GC, so int is sufficient granularity
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java?rev=766132&r1=766131&r2=766132&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/Memtable.java Fri Apr 17 20:08:01 2009
@@ -276,7 +276,8 @@
int newObjectCount = oldCf.getColumnCount();
resolveSize(oldSize, newSize);
resolveCount(oldObjectCount, newObjectCount);
- oldCf.delete(Math.max(oldCf.getMarkedForDeleteAt(), columnFamily.getMarkedForDeleteAt()));
+ oldCf.delete(Math.max(oldCf.getLocalDeletionTime(), columnFamily.getLocalDeletionTime()),
+ Math.max(oldCf.getMarkedForDeleteAt(), columnFamily.getMarkedForDeleteAt()));
}
else
{
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java?rev=766132&r1=766131&r2=766132&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/RowMutation.java Fri Apr 17 20:08:01 2009
@@ -29,6 +29,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import java.nio.ByteBuffer;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
@@ -175,14 +176,16 @@
{
String[] values = RowMutation.getColumnAndColumnFamily(columnFamilyColumn);
String cfName = values[0];
+
if (modifications_.containsKey(cfName))
{
throw new IllegalArgumentException("ColumnFamily " + cfName + " is already being modified");
}
-
if (values.length == 0 || values.length > 3)
throw new IllegalArgumentException("Column Family " + columnFamilyColumn + " in invalid format. Must be in <column family>:<column> format.");
+ int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
+
ColumnFamily columnFamily = modifications_.get(cfName);
if (columnFamily == null)
columnFamily = new ColumnFamily(cfName, DatabaseDescriptor.getColumnType(cfName));
@@ -191,22 +194,26 @@
if (columnFamily.isSuper())
{
SuperColumn sc = new SuperColumn(values[1]);
- sc.markForDeleteAt(timestamp);
+ sc.markForDeleteAt(localDeleteTime, timestamp);
columnFamily.addColumn(sc);
}
else
{
- columnFamily.addColumn(values[1], ArrayUtils.EMPTY_BYTE_ARRAY, timestamp, true);
+ ByteBuffer bytes = ByteBuffer.allocate(4);
+ bytes.putInt(localDeleteTime);
+ columnFamily.addColumn(values[1], bytes.array(), timestamp, true);
}
}
else if (values.length == 3)
{
- columnFamily.addColumn(values[1] + ":" + values[2], ArrayUtils.EMPTY_BYTE_ARRAY, timestamp, true);
+ ByteBuffer bytes = ByteBuffer.allocate(4);
+ bytes.putInt(localDeleteTime);
+ columnFamily.addColumn(values[1] + ":" + values[2], bytes.array(), timestamp, true);
}
else
{
assert values.length == 1;
- columnFamily.delete(timestamp);
+ columnFamily.delete(localDeleteTime, timestamp);
}
modifications_.put(cfName, columnFamily);
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java?rev=766132&r1=766131&r2=766132&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/SuperColumn.java Fri Apr 17 20:08:01 2009
@@ -49,6 +49,7 @@
private String name_;
private EfficientBidiMap columns_ = new EfficientBidiMap(ColumnComparatorFactory.getComparator(ColumnComparatorFactory.ComparatorType.TIMESTAMP));
+ private int localDeletionTime = Integer.MIN_VALUE;
private long markedForDeleteAt = Long.MIN_VALUE;
private AtomicInteger size_ = new AtomicInteger(0);
@@ -289,7 +290,14 @@
return sb.toString();
}
- public void markForDeleteAt(long timestamp) {
+ public int getLocalDeletionTime()
+ {
+ return localDeletionTime;
+ }
+
+ public void markForDeleteAt(int localDeleteTime, long timestamp)
+ {
+ this.localDeletionTime = localDeleteTime;
this.markedForDeleteAt = timestamp;
}
}
@@ -300,20 +308,14 @@
{
SuperColumn superColumn = (SuperColumn)column;
dos.writeUTF(superColumn.name());
+ dos.writeInt(superColumn.getLocalDeletionTime());
dos.writeLong(superColumn.getMarkedForDeleteAt());
Collection<IColumn> columns = column.getSubColumns();
int size = columns.size();
dos.writeInt(size);
- /*
- * Add the total size of the columns. This is useful
- * to skip over all the columns in this super column
- * if we are not interested in this super column.
- */
dos.writeInt(superColumn.getSizeOfAllColumns());
- // dos.writeInt(superColumn.size());
-
for ( IColumn subColumn : columns )
{
Column.serializer().serialize(subColumn, dos);
@@ -328,7 +330,7 @@
{
String name = dis.readUTF();
SuperColumn superColumn = new SuperColumn(name);
- superColumn.markForDeleteAt(dis.readLong());
+ superColumn.markForDeleteAt(dis.readInt(), dis.readLong());
return superColumn;
}
Modified: incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=766132&r1=766131&r2=766132&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java (original)
+++ incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java Fri Apr 17 20:08:01 2009
@@ -19,6 +19,8 @@
import org.apache.cassandra.ServerTest;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
public class ColumnFamilyStoreTest extends ServerTest
{
@@ -207,7 +209,8 @@
rm.apply();
ColumnFamily retrieved = store.getColumnFamily("key1", "Standard1", new IdentityFilter());
- assert retrieved.getColumnCount() == 0;
+ assert retrieved.getColumn("Column1").isMarkedForDelete();
+ assertNull(ColumnFamilyStore.removeDeleted(retrieved, Integer.MAX_VALUE));
}
@Test
@@ -229,7 +232,8 @@
rm.apply();
ColumnFamily retrieved = store.getColumnFamily("key1", "Super1:SC1", new IdentityFilter());
- assert retrieved.getColumnCount() == 0;
+ assert retrieved.getColumn("SC1").getSubColumn("Column1").isMarkedForDelete();
+ assertNull(ColumnFamilyStore.removeDeleted(retrieved, Integer.MAX_VALUE));
}
@Test
@@ -258,7 +262,7 @@
Collection<IColumn> subColumns = resolved.getAllColumns().first().getSubColumns();
assert subColumns.size() == 1;
assert subColumns.iterator().next().timestamp() == 0;
- assert ColumnFamilyStore.removeDeleted(resolved).getColumnCount() == 0;
+ assertNull(ColumnFamilyStore.removeDeleted(resolved, Integer.MAX_VALUE));
}
@Test
@@ -281,7 +285,9 @@
rm.apply();
ColumnFamily retrieved = store.getColumnFamily("key1", "Standard1", new IdentityFilter());
- assert retrieved.getColumnCount() == 0;
+ assert retrieved.isMarkedForDelete();
+ assertEquals(retrieved.getColumnCount(), 0);
+ assertNull(ColumnFamilyStore.removeDeleted(retrieved, Integer.MAX_VALUE));
}
@Test