You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/08/21 19:30:31 UTC
svn commit: r987800 - in /cassandra/trunk:
src/java/org/apache/cassandra/db/Table.java
test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Author: jbellis
Date: Sat Aug 21 17:30:30 2010
New Revision: 987800
URL: http://svn.apache.org/viewvc?rev=987800&view=rev
Log:
prevent writes of obsolete data from updating 2ary indexes. patch by jbellis
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=987800&r1=987799&r2=987800&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Sat Aug 21 17:30:30 2010
@@ -334,20 +334,19 @@ public class Table
CommitLog.instance().add(mutation, serializedMutation);
DecoratedKey key = StorageService.getPartitioner().decorateKey(mutation.key());
- for (ColumnFamily columnFamily : mutation.getColumnFamilies())
+ for (ColumnFamily cf : mutation.getColumnFamilies())
{
- ColumnFamilyStore cfs = columnFamilyStores.get(columnFamily.id());
+ ColumnFamilyStore cfs = columnFamilyStores.get(cf.id());
if (cfs == null)
{
- logger.error("Attempting to mutate non-existant column family " + columnFamily.id());
+ logger.error("Attempting to mutate non-existant column family " + cf.id());
continue;
}
- ColumnFamily oldIndexedColumns;
SortedSet<byte[]> mutatedIndexedColumns = null;
for (byte[] column : cfs.getIndexedColumns())
{
- if (columnFamily.getColumnNames().contains(column))
+ if (cf.getColumnNames().contains(column))
{
if (mutatedIndexedColumns == null)
mutatedIndexedColumns = new TreeSet<byte[]>(FBUtilities.byteArrayComparator);
@@ -358,7 +357,7 @@ public class Table
if (mutatedIndexedColumns == null)
{
// just update the actual value, no extra synchronization
- applyCF(cfs, key, columnFamily, memtablesToFlush);
+ applyCF(cfs, key, cf, memtablesToFlush);
}
else
{
@@ -366,19 +365,33 @@ public class Table
{
// read old indexed values
QueryFilter filter = QueryFilter.getNamesFilter(key, new QueryPath(cfs.getColumnFamilyName()), mutatedIndexedColumns);
- oldIndexedColumns = cfs.getColumnFamily(filter);
+ ColumnFamily oldIndexedColumns = cfs.getColumnFamily(filter);
+
+ // ignore obsolete column updates
+ if (oldIndexedColumns != null)
+ {
+ for (IColumn oldColumn : oldIndexedColumns)
+ {
+ if (cfs.metadata.reconciler.reconcile((Column) oldColumn, (Column) cf.getColumn(oldColumn.name())).equals(oldColumn))
+ {
+ cf.remove(oldColumn.name());
+ mutatedIndexedColumns.remove(oldColumn.name());
+ oldIndexedColumns.remove(oldColumn.name());
+ }
+ }
+ }
// apply the mutation
- applyCF(cfs, key, columnFamily, memtablesToFlush);
+ applyCF(cfs, key, cf, memtablesToFlush);
// add new index entries
for (byte[] columnName : mutatedIndexedColumns)
{
- IColumn column = columnFamily.getColumn(columnName);
+ IColumn column = cf.getColumn(columnName);
DecoratedKey<LocalToken> valueKey = cfs.getIndexKeyFor(columnName, column.value());
- ColumnFamily cf = cfs.newIndexedColumnFamily(columnName);
- cf.addColumn(new Column(mutation.key(), ArrayUtils.EMPTY_BYTE_ARRAY, column.clock()));
- applyCF(cfs.getIndexedColumnFamilyStore(columnName), valueKey, cf, memtablesToFlush);
+ ColumnFamily cfi = cfs.newIndexedColumnFamily(columnName);
+ cfi.addColumn(new Column(mutation.key(), ArrayUtils.EMPTY_BYTE_ARRAY, column.clock()));
+ applyCF(cfs.getIndexedColumnFamilyStore(columnName), valueKey, cfi, memtablesToFlush);
}
// remove the old index entries
@@ -390,9 +403,9 @@ public class Table
byte[] columnName = entry.getKey();
IColumn column = entry.getValue();
DecoratedKey<LocalToken> valueKey = cfs.getIndexKeyFor(columnName, column.value());
- ColumnFamily cf = cfs.newIndexedColumnFamily(columnName);
- cf.deleteColumn(mutation.key(), localDeletionTime, column.clock());
- applyCF(cfs.getIndexedColumnFamilyStore(columnName), valueKey, cf, memtablesToFlush);
+ ColumnFamily cfi = cfs.newIndexedColumnFamily(columnName);
+ cfi.deleteColumn(mutation.key(), localDeletionTime, column.clock());
+ applyCF(cfs.getIndexedColumnFamilyStore(columnName), valueKey, cfi, memtablesToFlush);
}
}
}
@@ -400,7 +413,7 @@ public class Table
ColumnFamily cachedRow = cfs.getRawCachedRow(key);
if (cachedRow != null)
- cachedRow.addAll(columnFamily);
+ cachedRow.addAll(cf);
}
}
finally
Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=987800&r1=987799&r2=987800&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java Sat Aug 21 17:30:30 2010
@@ -220,12 +220,13 @@ public class ColumnFamilyStoreTest exten
@Test
public void testIndexUpdate() throws IOException
{
- RowMutation rm;
+ Table table = Table.open("Keyspace2");
+ // create a row and update the birthdate value, test that the index query fetches the new version
+ RowMutation rm;
rm = new RowMutation("Keyspace2", "k1".getBytes());
rm.add(new QueryPath("Indexed1", null, "birthdate".getBytes("UTF8")), FBUtilities.toByteArray(1L), new TimestampClock(1));
rm.apply();
-
rm = new RowMutation("Keyspace2", "k1".getBytes());
rm.add(new QueryPath("Indexed1", null, "birthdate".getBytes("UTF8")), FBUtilities.toByteArray(2L), new TimestampClock(2));
rm.apply();
@@ -235,12 +236,20 @@ public class ColumnFamilyStoreTest exten
IFilter filter = new IdentityQueryFilter();
IPartitioner p = StorageService.getPartitioner();
Range range = new Range(p.getMinimumToken(), p.getMinimumToken());
- List<Row> rows = Table.open("Keyspace2").getColumnFamilyStore("Indexed1").scan(clause, range, filter);
+ List<Row> rows = table.getColumnFamilyStore("Indexed1").scan(clause, range, filter);
assert rows.size() == 0;
expr = new IndexExpression("birthdate".getBytes("UTF8"), IndexOperator.EQ, FBUtilities.toByteArray(2L));
clause = new IndexClause(Arrays.asList(expr), ArrayUtils.EMPTY_BYTE_ARRAY, 100);
- rows = Table.open("Keyspace2").getColumnFamilyStore("Indexed1").scan(clause, range, filter);
+ rows = table.getColumnFamilyStore("Indexed1").scan(clause, range, filter);
+ assert Arrays.equals("k1".getBytes(), rows.get(0).key.key);
+
+ // update the birthdate value with an OLDER timestamp, and test that the index ignores this
+ rm = new RowMutation("Keyspace2", "k1".getBytes());
+ rm.add(new QueryPath("Indexed1", null, "birthdate".getBytes("UTF8")), FBUtilities.toByteArray(3L), new TimestampClock(0));
+ rm.apply();
+
+ rows = table.getColumnFamilyStore("Indexed1").scan(clause, range, filter);
assert Arrays.equals("k1".getBytes(), rows.get(0).key.key);
}