You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/10/06 19:19:27 UTC
svn commit: r1005174 - in /cassandra/trunk: CHANGES.txt
src/java/org/apache/cassandra/db/Table.java test/conf/cassandra.yaml
test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Author: jbellis
Date: Wed Oct 6 17:19:26 2010
New Revision: 1005174
URL: http://svn.apache.org/viewvc?rev=1005174&view=rev
Log:
fix 2ary index support for deletions
patch by jbellis; reviewed by Stu Hood for CASSANDRA-1546
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
cassandra/trunk/test/conf/cassandra.yaml
cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1005174&r1=1005173&r2=1005174&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Oct 6 17:19:26 2010
@@ -12,6 +12,7 @@ dev
* fix unbootstrap when no data is present in a transfer range (CASSANDRA-1573)
* take advantage of AVRO-495 to simplify our avro IDL (CASSANDRA-1436)
* extend authorization hierarchy to column family (CASSANDRA-1554)
+ * deletion support in secondary indexes (CASSANDRA-1571)
0.7-beta2
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1005174&r1=1005173&r2=1005174&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Oct 6 17:19:26 2010
@@ -354,7 +354,7 @@ public class Table
SortedSet<byte[]> mutatedIndexedColumns = null;
for (byte[] column : cfs.getIndexedColumns())
{
- if (cf.getColumnNames().contains(column))
+ if (cf.getColumnNames().contains(column) || cf.isMarkedForDelete())
{
if (mutatedIndexedColumns == null)
mutatedIndexedColumns = new TreeSet<byte[]>(FBUtilities.byteArrayComparator);
@@ -367,8 +367,12 @@ public class Table
ColumnFamily oldIndexedColumns = null;
if (mutatedIndexedColumns != null)
{
+ // with the raw data CF, we can just apply every update in any order and let
+ // read-time resolution throw out obsolete versions, thus avoiding read-before-write.
+ // but for indexed data we need to make sure that we're not creating index entries
+ // for obsolete writes.
oldIndexedColumns = readCurrentIndexedColumns(key, cfs, mutatedIndexedColumns);
- ignoreObsoleteMutations(cf, cfs.metadata.reconciler, mutatedIndexedColumns, oldIndexedColumns);
+ ignoreObsoleteMutations(cf, mutatedIndexedColumns, oldIndexedColumns);
}
Memtable fullMemtable = cfs.apply(key, cf);
@@ -402,14 +406,22 @@ public class Table
return memtablesToFlush;
}
- private static void ignoreObsoleteMutations(ColumnFamily cf, AbstractReconciler reconciler, SortedSet<byte[]> mutatedIndexedColumns, ColumnFamily oldIndexedColumns)
+ private static void ignoreObsoleteMutations(ColumnFamily cf, SortedSet<byte[]> mutatedIndexedColumns, ColumnFamily oldIndexedColumns)
{
if (oldIndexedColumns == null)
return;
+ ColumnFamily cf2 = cf.cloneMe();
for (IColumn oldColumn : oldIndexedColumns)
{
- if (reconciler.reconcile((Column) oldColumn, (Column) cf.getColumn(oldColumn.name())).equals(oldColumn))
+ cf2.addColumn(oldColumn);
+ }
+ ColumnFamily resolved = ColumnFamilyStore.removeDeleted(cf2, Integer.MAX_VALUE);
+
+ for (IColumn oldColumn : oldIndexedColumns)
+ {
+ IColumn resolvedColumn = resolved == null ? null : resolved.getColumn(oldColumn.name());
+ if (resolvedColumn != null && resolvedColumn.equals(oldColumn))
{
cf.remove(oldColumn.name());
mutatedIndexedColumns.remove(oldColumn.name());
@@ -424,6 +436,10 @@ public class Table
return cfs.getColumnFamily(filter);
}
+ /**
+ * removes obsolete index entries and creates new ones for the given row key and mutated columns.
+ * @return list of full (index CF) memtables
+ */
private static List<Memtable> applyIndexUpdates(byte[] key,
ColumnFamily cf,
ColumnFamilyStore cfs,
@@ -436,6 +452,9 @@ public class Table
for (byte[] columnName : mutatedIndexedColumns)
{
IColumn column = cf.getColumn(columnName);
+ if (column == null || column.isMarkedForDelete())
+ continue; // null column == row deletion
+
DecoratedKey<LocalToken> valueKey = cfs.getIndexKeyFor(columnName, column.value());
ColumnFamily cfi = cfs.newIndexedColumnFamily(columnName);
if (column instanceof ExpiringColumn)
@@ -444,7 +463,9 @@ public class Table
cfi.addColumn(new ExpiringColumn(key, ArrayUtils.EMPTY_BYTE_ARRAY, ec.clock(), ec.getTimeToLive(), ec.getLocalDeletionTime()));
}
else
+ {
cfi.addColumn(new Column(key, ArrayUtils.EMPTY_BYTE_ARRAY, column.clock()));
+ }
Memtable fullMemtable = cfs.getIndexedColumnFamilyStore(columnName).apply(valueKey, cfi);
if (fullMemtable != null)
fullMemtables = addFullMemtable(fullMemtables, fullMemtable);
@@ -458,6 +479,8 @@ public class Table
{
byte[] columnName = entry.getKey();
IColumn column = entry.getValue();
+ if (column.isMarkedForDelete())
+ continue;
DecoratedKey<LocalToken> valueKey = cfs.getIndexKeyFor(columnName, column.value());
ColumnFamily cfi = cfs.newIndexedColumnFamily(columnName);
cfi.deleteColumn(key, localDeletionTime, column.clock());
Modified: cassandra/trunk/test/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/cassandra.yaml?rev=1005174&r1=1005173&r2=1005174&view=diff
==============================================================================
--- cassandra/trunk/test/conf/cassandra.yaml (original)
+++ cassandra/trunk/test/conf/cassandra.yaml Wed Oct 6 17:19:26 2010
@@ -105,14 +105,18 @@ keyspaces:
replica_placement_strategy: org.apache.cassandra.locator.SimpleStrategy
replication_factor: 5
column_families:
-
- name: Standard1
+ - name: Indexed1
+ column_metadata:
+ - name: birthdate
+ validator_class: LongType
+ index_type: KEYS
+
- name: Keyspace4
replica_placement_strategy: org.apache.cassandra.locator.SimpleStrategy
replication_factor: 3
column_families:
-
- name: Standard1
- name: Standard3
@@ -128,5 +132,4 @@ keyspaces:
replica_placement_strategy: org.apache.cassandra.locator.SimpleStrategy
replication_factor: 2
column_families:
-
- name: Standard1
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=1005174&r1=1005173&r2=1005174&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java Wed Oct 6 17:19:26 2010
@@ -177,7 +177,7 @@ public class ColumnFamilyStoreTest exten
rm.add(new QueryPath("Indexed1", null, "notbirthdate".getBytes("UTF8")), FBUtilities.toByteArray(2L), new TimestampClock(0));
rm.add(new QueryPath("Indexed1", null, "birthdate".getBytes("UTF8")), FBUtilities.toByteArray(1L), new TimestampClock(0));
rm.apply();
-
+
rm = new RowMutation("Keyspace1", "k4aaaa".getBytes());
rm.add(new QueryPath("Indexed1", null, "notbirthdate".getBytes("UTF8")), FBUtilities.toByteArray(2L), new TimestampClock(0));
rm.add(new QueryPath("Indexed1", null, "birthdate".getBytes("UTF8")), FBUtilities.toByteArray(3L), new TimestampClock(0));
@@ -223,6 +223,55 @@ public class ColumnFamilyStoreTest exten
}
@Test
+ public void testIndexDeletions() throws IOException
+ {
+ ColumnFamilyStore cfs = Table.open("Keyspace3").getColumnFamilyStore("Indexed1");
+ RowMutation rm;
+
+ rm = new RowMutation("Keyspace3", "k1".getBytes());
+ rm.add(new QueryPath("Indexed1", null, "birthdate".getBytes("UTF8")), FBUtilities.toByteArray(1L), new TimestampClock(0));
+ rm.apply();
+
+ IndexExpression expr = new IndexExpression("birthdate".getBytes("UTF8"), IndexOperator.EQ, FBUtilities.toByteArray(1L));
+ IndexClause clause = new IndexClause(Arrays.asList(expr), ArrayUtils.EMPTY_BYTE_ARRAY, 100);
+ IFilter filter = new IdentityQueryFilter();
+ IPartitioner p = StorageService.getPartitioner();
+ Range range = new Range(p.getMinimumToken(), p.getMinimumToken());
+ List<Row> rows = cfs.scan(clause, range, filter);
+ assert rows.size() == 1 : StringUtils.join(rows, ",");
+ assert Arrays.equals("k1".getBytes(), rows.get(0).key.key);
+
+ // delete the column directly
+ rm = new RowMutation("Keyspace3", "k1".getBytes());
+ rm.delete(new QueryPath("Indexed1", null, "birthdate".getBytes("UTF8")), new TimestampClock(1));
+ rm.apply();
+ rows = cfs.scan(clause, range, filter);
+ assert rows.isEmpty();
+
+ // verify that it's not being indexed under the deletion column value either
+ IColumn deletion = rm.getColumnFamilies().iterator().next().iterator().next();
+ IndexExpression expr0 = new IndexExpression("birthdate".getBytes("UTF8"), IndexOperator.EQ, deletion.value());
+ IndexClause clause0 = new IndexClause(Arrays.asList(expr0), ArrayUtils.EMPTY_BYTE_ARRAY, 100);
+ rows = cfs.scan(clause0, range, filter);
+ assert rows.isEmpty();
+
+ // resurrect w/ a newer timestamp
+ rm = new RowMutation("Keyspace3", "k1".getBytes());
+ rm.add(new QueryPath("Indexed1", null, "birthdate".getBytes("UTF8")), FBUtilities.toByteArray(1L), new TimestampClock(2));
+ rm.apply();
+ rows = cfs.scan(clause, range, filter);
+ assert rows.size() == 1 : StringUtils.join(rows, ",");
+ assert Arrays.equals("k1".getBytes(), rows.get(0).key.key);
+
+ // delete the entire row
+ rm = new RowMutation("Keyspace3", "k1".getBytes());
+ rm.delete(new QueryPath("Indexed1"), new TimestampClock(3));
+ rm.apply();
+ rows = cfs.scan(clause, range, filter);
+ assert rows.isEmpty() : StringUtils.join(rows, ",");
+ }
+
+ @Test
public void testIndexUpdate() throws IOException
{
Table table = Table.open("Keyspace2");