You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/10/06 19:19:27 UTC

svn commit: r1005174 - in /cassandra/trunk: CHANGES.txt src/java/org/apache/cassandra/db/Table.java test/conf/cassandra.yaml test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java

Author: jbellis
Date: Wed Oct  6 17:19:26 2010
New Revision: 1005174

URL: http://svn.apache.org/viewvc?rev=1005174&view=rev
Log:
fix 2ary index support for deletions
patch by jbellis; reviewed by Stu Hood for CASSANDRA-1546

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    cassandra/trunk/test/conf/cassandra.yaml
    cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1005174&r1=1005173&r2=1005174&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Oct  6 17:19:26 2010
@@ -12,6 +12,7 @@ dev
  * fix unbootstrap when no data is present in a transfer range (CASSANDRA-1573)
  * take advantage of AVRO-495 to simplify our avro IDL (CASSANDRA-1436)
  * extend authorization hierarchy to column family (CASSANDRA-1554)
+ * deletion support in secondary indexes (CASSANDRA-1571)
 
 
 0.7-beta2

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1005174&r1=1005173&r2=1005174&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Oct  6 17:19:26 2010
@@ -354,7 +354,7 @@ public class Table
                 SortedSet<byte[]> mutatedIndexedColumns = null;
                 for (byte[] column : cfs.getIndexedColumns())
                 {
-                    if (cf.getColumnNames().contains(column))
+                    if (cf.getColumnNames().contains(column) || cf.isMarkedForDelete())
                     {
                         if (mutatedIndexedColumns == null)
                             mutatedIndexedColumns = new TreeSet<byte[]>(FBUtilities.byteArrayComparator);
@@ -367,8 +367,12 @@ public class Table
                     ColumnFamily oldIndexedColumns = null;
                     if (mutatedIndexedColumns != null)
                     {
+                        // with the raw data CF, we can just apply every update in any order and let
+                        // read-time resolution throw out obsolete versions, thus avoiding read-before-write.
+                        // but for indexed data we need to make sure that we're not creating index entries
+                        // for obsolete writes.
                         oldIndexedColumns = readCurrentIndexedColumns(key, cfs, mutatedIndexedColumns);
-                        ignoreObsoleteMutations(cf, cfs.metadata.reconciler, mutatedIndexedColumns, oldIndexedColumns);
+                        ignoreObsoleteMutations(cf, mutatedIndexedColumns, oldIndexedColumns);
                     }
 
                     Memtable fullMemtable = cfs.apply(key, cf);
@@ -402,14 +406,22 @@ public class Table
         return memtablesToFlush;
     }
 
-    private static void ignoreObsoleteMutations(ColumnFamily cf, AbstractReconciler reconciler, SortedSet<byte[]> mutatedIndexedColumns, ColumnFamily oldIndexedColumns)
+    private static void ignoreObsoleteMutations(ColumnFamily cf, SortedSet<byte[]> mutatedIndexedColumns, ColumnFamily oldIndexedColumns)
     {
         if (oldIndexedColumns == null)
             return;
 
+        ColumnFamily cf2 = cf.cloneMe();
         for (IColumn oldColumn : oldIndexedColumns)
         {
-            if (reconciler.reconcile((Column) oldColumn, (Column) cf.getColumn(oldColumn.name())).equals(oldColumn))
+            cf2.addColumn(oldColumn);
+        }
+        ColumnFamily resolved = ColumnFamilyStore.removeDeleted(cf2, Integer.MAX_VALUE);
+
+        for (IColumn oldColumn : oldIndexedColumns)
+        {
+            IColumn resolvedColumn = resolved == null ? null : resolved.getColumn(oldColumn.name());
+            if (resolvedColumn != null && resolvedColumn.equals(oldColumn))
             {
                 cf.remove(oldColumn.name());
                 mutatedIndexedColumns.remove(oldColumn.name());
@@ -424,6 +436,10 @@ public class Table
         return cfs.getColumnFamily(filter);
     }
 
+    /**
+     * removes obsolete index entries and creates new ones for the given row key and mutated columns.
+     * @return list of full (index CF) memtables
+     */
     private static List<Memtable> applyIndexUpdates(byte[] key,
                                                     ColumnFamily cf,
                                                     ColumnFamilyStore cfs,
@@ -436,6 +452,9 @@ public class Table
         for (byte[] columnName : mutatedIndexedColumns)
         {
             IColumn column = cf.getColumn(columnName);
+            if (column == null || column.isMarkedForDelete())
+                continue; // null column == row deletion
+
             DecoratedKey<LocalToken> valueKey = cfs.getIndexKeyFor(columnName, column.value());
             ColumnFamily cfi = cfs.newIndexedColumnFamily(columnName);
             if (column instanceof ExpiringColumn)
@@ -444,7 +463,9 @@ public class Table
                 cfi.addColumn(new ExpiringColumn(key, ArrayUtils.EMPTY_BYTE_ARRAY, ec.clock(), ec.getTimeToLive(), ec.getLocalDeletionTime()));
             }
             else
+            {
                 cfi.addColumn(new Column(key, ArrayUtils.EMPTY_BYTE_ARRAY, column.clock()));
+            }
             Memtable fullMemtable = cfs.getIndexedColumnFamilyStore(columnName).apply(valueKey, cfi);
             if (fullMemtable != null)
                 fullMemtables = addFullMemtable(fullMemtables, fullMemtable);
@@ -458,6 +479,8 @@ public class Table
             {
                 byte[] columnName = entry.getKey();
                 IColumn column = entry.getValue();
+                if (column.isMarkedForDelete())
+                    continue;
                 DecoratedKey<LocalToken> valueKey = cfs.getIndexKeyFor(columnName, column.value());
                 ColumnFamily cfi = cfs.newIndexedColumnFamily(columnName);
                 cfi.deleteColumn(key, localDeletionTime, column.clock());

Modified: cassandra/trunk/test/conf/cassandra.yaml
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/conf/cassandra.yaml?rev=1005174&r1=1005173&r2=1005174&view=diff
==============================================================================
--- cassandra/trunk/test/conf/cassandra.yaml (original)
+++ cassandra/trunk/test/conf/cassandra.yaml Wed Oct  6 17:19:26 2010
@@ -105,14 +105,18 @@ keyspaces:
       replica_placement_strategy: org.apache.cassandra.locator.SimpleStrategy
       replication_factor: 5
       column_families:
-
         - name: Standard1
 
+        - name: Indexed1
+          column_metadata:
+            - name: birthdate
+              validator_class: LongType
+              index_type: KEYS
+
     - name: Keyspace4
       replica_placement_strategy: org.apache.cassandra.locator.SimpleStrategy
       replication_factor: 3
       column_families:
-
         - name: Standard1
 
         - name: Standard3
@@ -128,5 +132,4 @@ keyspaces:
       replica_placement_strategy: org.apache.cassandra.locator.SimpleStrategy
       replication_factor: 2
       column_families:
-
         - name: Standard1

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=1005174&r1=1005173&r2=1005174&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java Wed Oct  6 17:19:26 2010
@@ -177,7 +177,7 @@ public class ColumnFamilyStoreTest exten
         rm.add(new QueryPath("Indexed1", null, "notbirthdate".getBytes("UTF8")), FBUtilities.toByteArray(2L), new TimestampClock(0));
         rm.add(new QueryPath("Indexed1", null, "birthdate".getBytes("UTF8")), FBUtilities.toByteArray(1L), new TimestampClock(0));
         rm.apply();
-        
+
         rm = new RowMutation("Keyspace1", "k4aaaa".getBytes());
         rm.add(new QueryPath("Indexed1", null, "notbirthdate".getBytes("UTF8")), FBUtilities.toByteArray(2L), new TimestampClock(0));
         rm.add(new QueryPath("Indexed1", null, "birthdate".getBytes("UTF8")), FBUtilities.toByteArray(3L), new TimestampClock(0));
@@ -223,6 +223,55 @@ public class ColumnFamilyStoreTest exten
     }
 
     @Test
+    public void testIndexDeletions() throws IOException
+    {
+        ColumnFamilyStore cfs = Table.open("Keyspace3").getColumnFamilyStore("Indexed1");
+        RowMutation rm;
+
+        rm = new RowMutation("Keyspace3", "k1".getBytes());
+        rm.add(new QueryPath("Indexed1", null, "birthdate".getBytes("UTF8")), FBUtilities.toByteArray(1L), new TimestampClock(0));
+        rm.apply();
+
+        IndexExpression expr = new IndexExpression("birthdate".getBytes("UTF8"), IndexOperator.EQ, FBUtilities.toByteArray(1L));
+        IndexClause clause = new IndexClause(Arrays.asList(expr), ArrayUtils.EMPTY_BYTE_ARRAY, 100);
+        IFilter filter = new IdentityQueryFilter();
+        IPartitioner p = StorageService.getPartitioner();
+        Range range = new Range(p.getMinimumToken(), p.getMinimumToken());
+        List<Row> rows = cfs.scan(clause, range, filter);
+        assert rows.size() == 1 : StringUtils.join(rows, ",");
+        assert Arrays.equals("k1".getBytes(), rows.get(0).key.key);
+
+        // delete the column directly
+        rm = new RowMutation("Keyspace3", "k1".getBytes());
+        rm.delete(new QueryPath("Indexed1", null, "birthdate".getBytes("UTF8")), new TimestampClock(1));
+        rm.apply();
+        rows = cfs.scan(clause, range, filter);
+        assert rows.isEmpty();
+
+        // verify that it's not being indexed under the deletion column value either
+        IColumn deletion = rm.getColumnFamilies().iterator().next().iterator().next();
+        IndexExpression expr0 = new IndexExpression("birthdate".getBytes("UTF8"), IndexOperator.EQ, deletion.value());
+        IndexClause clause0 = new IndexClause(Arrays.asList(expr0), ArrayUtils.EMPTY_BYTE_ARRAY, 100);
+        rows = cfs.scan(clause0, range, filter);
+        assert rows.isEmpty();
+
+        // resurrect w/ a newer timestamp
+        rm = new RowMutation("Keyspace3", "k1".getBytes());
+        rm.add(new QueryPath("Indexed1", null, "birthdate".getBytes("UTF8")), FBUtilities.toByteArray(1L), new TimestampClock(2));
+        rm.apply();
+        rows = cfs.scan(clause, range, filter);
+        assert rows.size() == 1 : StringUtils.join(rows, ",");
+        assert Arrays.equals("k1".getBytes(), rows.get(0).key.key);
+
+        // delete the entire row
+        rm = new RowMutation("Keyspace3", "k1".getBytes());
+        rm.delete(new QueryPath("Indexed1"), new TimestampClock(3));
+        rm.apply();
+        rows = cfs.scan(clause, range, filter);
+        assert rows.isEmpty() : StringUtils.join(rows, ",");
+    }
+
+    @Test
     public void testIndexUpdate() throws IOException
     {
         Table table = Table.open("Keyspace2");