You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/08/21 19:30:31 UTC

svn commit: r987800 - in /cassandra/trunk: src/java/org/apache/cassandra/db/Table.java test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java

Author: jbellis
Date: Sat Aug 21 17:30:30 2010
New Revision: 987800

URL: http://svn.apache.org/viewvc?rev=987800&view=rev
Log:
prevent writes of obsolete data from updating 2ary indexes.  patch by jbellis

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=987800&r1=987799&r2=987800&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Sat Aug 21 17:30:30 2010
@@ -334,20 +334,19 @@ public class Table
                 CommitLog.instance().add(mutation, serializedMutation);
         
             DecoratedKey key = StorageService.getPartitioner().decorateKey(mutation.key());
-            for (ColumnFamily columnFamily : mutation.getColumnFamilies())
+            for (ColumnFamily cf : mutation.getColumnFamilies())
             {
-                ColumnFamilyStore cfs = columnFamilyStores.get(columnFamily.id());
+                ColumnFamilyStore cfs = columnFamilyStores.get(cf.id());
                 if (cfs == null)
                 {
-                    logger.error("Attempting to mutate non-existant column family " + columnFamily.id());
+                    logger.error("Attempting to mutate non-existant column family " + cf.id());
                     continue;
                 }
 
-                ColumnFamily oldIndexedColumns;
                 SortedSet<byte[]> mutatedIndexedColumns = null;
                 for (byte[] column : cfs.getIndexedColumns())
                 {
-                    if (columnFamily.getColumnNames().contains(column))
+                    if (cf.getColumnNames().contains(column))
                     {
                         if (mutatedIndexedColumns == null)
                             mutatedIndexedColumns = new TreeSet<byte[]>(FBUtilities.byteArrayComparator);
@@ -358,7 +357,7 @@ public class Table
                 if (mutatedIndexedColumns == null)
                 {
                     // just update the actual value, no extra synchronization
-                    applyCF(cfs, key, columnFamily, memtablesToFlush);
+                    applyCF(cfs, key, cf, memtablesToFlush);
                 }
                 else
                 {
@@ -366,19 +365,33 @@ public class Table
                     {
                         // read old indexed values
                         QueryFilter filter = QueryFilter.getNamesFilter(key, new QueryPath(cfs.getColumnFamilyName()), mutatedIndexedColumns);
-                        oldIndexedColumns = cfs.getColumnFamily(filter);
+                        ColumnFamily oldIndexedColumns = cfs.getColumnFamily(filter);
+
+                        // ignore obsolete column updates
+                        if (oldIndexedColumns != null)
+                        {
+                            for (IColumn oldColumn : oldIndexedColumns)
+                            {
+                                if (cfs.metadata.reconciler.reconcile((Column) oldColumn, (Column) cf.getColumn(oldColumn.name())).equals(oldColumn))
+                                {
+                                    cf.remove(oldColumn.name());
+                                    mutatedIndexedColumns.remove(oldColumn.name());
+                                    oldIndexedColumns.remove(oldColumn.name());
+                                }
+                            }
+                        }
 
                         // apply the mutation
-                        applyCF(cfs, key, columnFamily, memtablesToFlush);
+                        applyCF(cfs, key, cf, memtablesToFlush);
 
                         // add new index entries
                         for (byte[] columnName : mutatedIndexedColumns)
                         {
-                            IColumn column = columnFamily.getColumn(columnName);
+                            IColumn column = cf.getColumn(columnName);
                             DecoratedKey<LocalToken> valueKey = cfs.getIndexKeyFor(columnName, column.value());
-                            ColumnFamily cf = cfs.newIndexedColumnFamily(columnName);
-                            cf.addColumn(new Column(mutation.key(), ArrayUtils.EMPTY_BYTE_ARRAY, column.clock()));
-                            applyCF(cfs.getIndexedColumnFamilyStore(columnName), valueKey, cf, memtablesToFlush);
+                            ColumnFamily cfi = cfs.newIndexedColumnFamily(columnName);
+                            cfi.addColumn(new Column(mutation.key(), ArrayUtils.EMPTY_BYTE_ARRAY, column.clock()));
+                            applyCF(cfs.getIndexedColumnFamilyStore(columnName), valueKey, cfi, memtablesToFlush);
                         }
 
                         // remove the old index entries
@@ -390,9 +403,9 @@ public class Table
                                 byte[] columnName = entry.getKey();
                                 IColumn column = entry.getValue();
                                 DecoratedKey<LocalToken> valueKey = cfs.getIndexKeyFor(columnName, column.value());
-                                ColumnFamily cf = cfs.newIndexedColumnFamily(columnName);
-                                cf.deleteColumn(mutation.key(), localDeletionTime, column.clock());
-                                applyCF(cfs.getIndexedColumnFamilyStore(columnName), valueKey, cf, memtablesToFlush);
+                                ColumnFamily cfi = cfs.newIndexedColumnFamily(columnName);
+                                cfi.deleteColumn(mutation.key(), localDeletionTime, column.clock());
+                                applyCF(cfs.getIndexedColumnFamilyStore(columnName), valueKey, cfi, memtablesToFlush);
                             }
                         }
                     }
@@ -400,7 +413,7 @@ public class Table
 
                 ColumnFamily cachedRow = cfs.getRawCachedRow(key);
                 if (cachedRow != null)
-                    cachedRow.addAll(columnFamily);
+                    cachedRow.addAll(cf);
             }
         }
         finally

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=987800&r1=987799&r2=987800&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java Sat Aug 21 17:30:30 2010
@@ -220,12 +220,13 @@ public class ColumnFamilyStoreTest exten
     @Test
     public void testIndexUpdate() throws IOException
     {
-        RowMutation rm;
+        Table table = Table.open("Keyspace2");
 
+        // create a row and update the birthdate value, test that the index query fetches the new version
+        RowMutation rm;
         rm = new RowMutation("Keyspace2", "k1".getBytes());
         rm.add(new QueryPath("Indexed1", null, "birthdate".getBytes("UTF8")), FBUtilities.toByteArray(1L), new TimestampClock(1));
         rm.apply();
-
         rm = new RowMutation("Keyspace2", "k1".getBytes());
         rm.add(new QueryPath("Indexed1", null, "birthdate".getBytes("UTF8")), FBUtilities.toByteArray(2L), new TimestampClock(2));
         rm.apply();
@@ -235,12 +236,20 @@ public class ColumnFamilyStoreTest exten
         IFilter filter = new IdentityQueryFilter();
         IPartitioner p = StorageService.getPartitioner();
         Range range = new Range(p.getMinimumToken(), p.getMinimumToken());
-        List<Row> rows = Table.open("Keyspace2").getColumnFamilyStore("Indexed1").scan(clause, range, filter);
+        List<Row> rows = table.getColumnFamilyStore("Indexed1").scan(clause, range, filter);
         assert rows.size() == 0;
 
         expr = new IndexExpression("birthdate".getBytes("UTF8"), IndexOperator.EQ, FBUtilities.toByteArray(2L));
         clause = new IndexClause(Arrays.asList(expr), ArrayUtils.EMPTY_BYTE_ARRAY, 100);
-        rows = Table.open("Keyspace2").getColumnFamilyStore("Indexed1").scan(clause, range, filter);
+        rows = table.getColumnFamilyStore("Indexed1").scan(clause, range, filter);
+        assert Arrays.equals("k1".getBytes(), rows.get(0).key.key);
+
+        // update the birthdate value with an OLDER timestamp, and test that the index ignores this
+        rm = new RowMutation("Keyspace2", "k1".getBytes());
+        rm.add(new QueryPath("Indexed1", null, "birthdate".getBytes("UTF8")), FBUtilities.toByteArray(3L), new TimestampClock(0));
+        rm.apply();
+
+        rows = table.getColumnFamilyStore("Indexed1").scan(clause, range, filter);
         assert Arrays.equals("k1".getBytes(), rows.get(0).key.key);
     }