You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/08/06 21:15:57 UTC
[2/3] git commit: Notify indexer of columns shadowed by range
tombstones patch by Sam Tunnicliffe; reviewed by jbellis for CASSANDRA-5614
Notify indexer of columns shadowed by range tombstones
patch by Sam Tunnicliffe; reviewed by jbellis for CASSANDRA-5614
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ed4a0677
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ed4a0677
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ed4a0677
Branch: refs/heads/trunk
Commit: ed4a06771f555f49882c37f02ae45a58c2301105
Parents: cf62bdc
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Aug 6 14:15:46 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Aug 6 14:15:46 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 4 +
.../cassandra/db/AtomicSortedColumns.java | 13 +-
.../org/apache/cassandra/db/DeletionInfo.java | 5 +
.../org/apache/cassandra/db/RangeTombstone.java | 6 +-
.../db/compaction/LazilyCompactedRow.java | 11 +-
.../db/compaction/PrecompactedRow.java | 14 +-
.../apache/cassandra/db/RangeTombstoneTest.java | 220 ++++++++++++++++++-
7 files changed, 264 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed4a0677/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index af94bc8..9509113 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,7 @@
+2.0.1
+ * Notify indexer of columns shadowed by range tombstones (CASSANDRA-5614)
+
+
2.0.0
* fix CAS contention timeout (CASSANDRA-5830)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed4a0677/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
index 2393c5a..f6a6b83 100644
--- a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
@@ -22,6 +22,8 @@ import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+
import edu.stanford.ppl.concurrent.SnapTreeMap;
import org.apache.cassandra.config.CFMetaData;
@@ -65,7 +67,7 @@ public class AtomicSortedColumns extends ColumnFamily
private AtomicSortedColumns(CFMetaData metadata, Holder holder)
{
super(metadata);
- this.ref = new AtomicReference<Holder>(holder);
+ this.ref = new AtomicReference<>(holder);
}
public AbstractType<?> getComparator()
@@ -179,6 +181,15 @@ public class AtomicSortedColumns extends ColumnFamily
DeletionInfo newDelInfo = current.deletionInfo.copy().add(cm.deletionInfo());
modified = new Holder(current.map.clone(), newDelInfo);
+ if (cm.deletionInfo().hasRanges())
+ {
+ for (Column currentColumn : Iterables.concat(current.map.values(), cm))
+ {
+ if (cm.deletionInfo().isDeleted(currentColumn))
+ indexer.remove(currentColumn);
+ }
+ }
+
for (Column column : cm)
{
sizeDelta += modified.addColumn(transformation.apply(column), allocator, indexer);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed4a0677/src/java/org/apache/cassandra/db/DeletionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java
index d00f2bc..4e1d68d 100644
--- a/src/java/org/apache/cassandra/db/DeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/DeletionInfo.java
@@ -230,6 +230,11 @@ public class DeletionInfo
return size + (ranges == null ? 0 : ranges.dataSize());
}
+ public boolean hasRanges()
+ {
+ return ranges != null && !ranges.isEmpty();
+ }
+
@Override
public String toString()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed4a0677/src/java/org/apache/cassandra/db/RangeTombstone.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java
index 02dddb2..c943ad7 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstone.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstone.java
@@ -241,8 +241,12 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement
{
for (RangeTombstone tombstone : ranges)
{
- if (comparator.compare(column.name(), tombstone.max) <= 0 && tombstone.data.isDeleted(column))
+ if (comparator.compare(column.name(), tombstone.min) >= 0
+ && comparator.compare(column.name(), tombstone.max) <= 0
+ && tombstone.maxTimestamp() >= column.timestamp())
+ {
return true;
+ }
}
return false;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed4a0677/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 6390b14..7d7c5a4 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -208,8 +208,12 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
{
Column column = (Column) current;
container.addColumn(column);
- if (indexer != SecondaryIndexManager.nullUpdater
- && !column.isMarkedForDelete(System.currentTimeMillis())
+
+ // skip the index-update checks if there is no indexing needed since they are a bit expensive
+ if (indexer == SecondaryIndexManager.nullUpdater)
+ return;
+
+ if (!column.isMarkedForDelete(System.currentTimeMillis())
&& !container.getColumn(column.name()).equals(column))
{
indexer.remove(column);
@@ -247,7 +251,10 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
// PrecompactedRow.removeDeletedAndOldShards have only checked the top-level CF deletion times,
// not the range tombstone. For that we use the columnIndexer tombstone tracker.
if (indexBuilder.tombstoneTracker().isDeleted(reduced))
+ {
+ indexer.remove(reduced);
return null;
+ }
columns++;
minTimestampSeen = Math.min(minTimestampSeen, reduced.minTimestamp());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed4a0677/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
index 66f2ecb..15ae0b8 100644
--- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
@@ -106,7 +106,7 @@ public class PrecompactedRow extends AbstractCompactedRow
final ColumnFamily returnCF = ArrayBackedSortedColumns.factory.create(controller.cfs.metadata);
// transform into iterators that MergeIterator will like, and apply row-level tombstones
- List<CloseableIterator<Column>> data = new ArrayList<CloseableIterator<Column>>(rows.size());
+ List<CloseableIterator<Column>> data = new ArrayList<>(rows.size());
for (SSTableIdentityIterator row : rows)
{
ColumnFamily cf = row.getColumnFamilyWithColumns(ArrayBackedSortedColumns.factory);
@@ -132,9 +132,15 @@ public class PrecompactedRow extends AbstractCompactedRow
public void reduce(Column column)
{
container.addColumn(column);
- if (indexer != SecondaryIndexManager.nullUpdater
- && !column.isMarkedForDelete(System.currentTimeMillis())
- && !container.getColumn(column.name()).equals(column))
+
+ // skip the index-update checks if there is no indexing needed since they are a bit expensive
+ if (indexer == SecondaryIndexManager.nullUpdater)
+ return;
+
+ // notify the index that the column has been overwritten if the value being reduced has been
+ // superceded by another directly, or indirectly by a range tombstone
+ if ((!column.isMarkedForDelete(System.currentTimeMillis()) && !container.getColumn(column.name()).equals(column))
+ || returnCF.deletionInfo().isDeleted(column.name(), CompactionManager.NO_GC))
{
indexer.remove(column);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed4a0677/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
index b240a5f..731b364 100644
--- a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
+++ b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
@@ -21,6 +21,16 @@ package org.apache.cassandra.db;
import java.nio.ByteBuffer;
import java.util.*;
+import com.google.common.collect.ImmutableMap;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
+import org.apache.cassandra.db.index.*;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.thrift.IndexType;
+
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
@@ -29,6 +39,8 @@ import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.utils.ByteBufferUtil;
import static org.apache.cassandra.Util.dk;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class RangeTombstoneTest extends SchemaLoader
{
@@ -73,7 +85,7 @@ public class RangeTombstoneTest extends SchemaLoader
// Queries by name
int[] live = new int[]{ 4, 9, 11, 17, 28 };
int[] dead = new int[]{ 12, 19, 21, 24, 27 };
- SortedSet<ByteBuffer> columns = new TreeSet<ByteBuffer>(cfs.getComparator());
+ SortedSet<ByteBuffer> columns = new TreeSet<>(cfs.getComparator());
for (int i : live)
columns.add(b(i));
for (int i : dead)
@@ -182,6 +194,166 @@ public class RangeTombstoneTest extends SchemaLoader
assert last == 1 : "Last column should be column 1 since column 2 has been deleted";
}
+ @Test
+ public void testPreCompactedRowWithRangeTombstonesUpdatesSecondaryIndex() throws Exception
+ {
+ // nothing special to do here, just run the test
+ runCompactionWithRangeTombstoneAndCheckSecondaryIndex();
+ }
+
+ @Test
+ public void testLazilyCompactedRowWithRangeTombstonesUpdatesSecondaryIndex() throws Exception
+ {
+ // make sure we use LazilyCompactedRow by exceeding in_memory_compaction_limit
+ DatabaseDescriptor.setInMemoryCompactionLimit(0);
+ runCompactionWithRangeTombstoneAndCheckSecondaryIndex();
+ }
+
+ @Test
+ public void testLazilyCompactedRowGeneratesSameSSTablesAsPreCompactedRow() throws Exception
+ {
+ Keyspace table = Keyspace.open(KSNAME);
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(CFNAME);
+ ByteBuffer key = ByteBufferUtil.bytes("k4");
+
+ // remove any existing sstables before starting
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+ cfs.setCompactionStrategyClass(SizeTieredCompactionStrategy.class.getCanonicalName());
+
+ RowMutation rm = new RowMutation(KSNAME, key);
+ for (int i = 0; i < 10; i += 2)
+ add(rm, i, 0);
+ rm.apply();
+ cfs.forceBlockingFlush();
+
+ rm = new RowMutation(KSNAME, key);
+ ColumnFamily cf = rm.addOrGet(CFNAME);
+ for (int i = 0; i < 10; i += 2)
+ delete(cf, 0, 7, 0);
+ rm.apply();
+ cfs.forceBlockingFlush();
+
+ // there should be 2 sstables
+ assertEquals(2, cfs.getSSTables().size());
+
+ // compact down to single sstable
+ CompactionManager.instance.performMaximal(cfs);
+ assertEquals(1, cfs.getSSTables().size());
+
+ // test the physical structure of the sstable i.e. rt & columns on disk
+ SSTableReader sstable = cfs.getSSTables().iterator().next();
+ OnDiskAtomIterator iter = sstable.getScanner().next();
+ int cnt = 0;
+ // after compaction, the first element should be an RT followed by the remaining non-deleted columns
+ while(iter.hasNext())
+ {
+ OnDiskAtom atom = iter.next();
+ if (cnt == 0)
+ assertTrue(atom instanceof RangeTombstone);
+ if (cnt > 0)
+ assertTrue(atom instanceof Column);
+ cnt++;
+ }
+ assertEquals(2, cnt);
+ }
+
+ @Test
+ public void testMemtableUpdateWithRangeTombstonesUpdatesSecondaryIndex() throws Exception
+ {
+ Keyspace table = Keyspace.open(KSNAME);
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(CFNAME);
+ ByteBuffer key = ByteBufferUtil.bytes("k5");
+ ByteBuffer indexedColumnName = ByteBufferUtil.bytes(1);
+
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+ cfs.setCompactionStrategyClass(SizeTieredCompactionStrategy.class.getCanonicalName());
+ if (cfs.indexManager.getIndexForColumn(indexedColumnName) == null)
+ {
+ ColumnDefinition cd = new ColumnDefinition(indexedColumnName,
+ cfs.getComparator(),
+ IndexType.CUSTOM,
+ ImmutableMap.of(SecondaryIndex.CUSTOM_INDEX_OPTION_NAME, TestIndex.class.getName()),
+ "test_index",
+ 0,
+ null);
+ cfs.indexManager.addIndexedColumn(cd);
+ }
+
+ TestIndex index = ((TestIndex)cfs.indexManager.getIndexForColumn(indexedColumnName));
+ index.resetCounts();
+
+ RowMutation rm = new RowMutation(KSNAME, key);
+ for (int i = 0; i < 10; i++)
+ add(rm, i, 0);
+ rm.apply();
+
+ // We should have indexed 1 column
+ assertEquals(1, index.inserts.size());
+
+ rm = new RowMutation(KSNAME, key);
+ ColumnFamily cf = rm.addOrGet(CFNAME);
+ for (int i = 0; i < 10; i += 2)
+ delete(cf, 0, 7, 0);
+ rm.apply();
+
+ // verify that the 1 indexed column was removed from the index
+ assertEquals(1, index.deletes.size());
+ assertEquals(index.deletes.get(0), index.inserts.get(0));
+ }
+
+ private void runCompactionWithRangeTombstoneAndCheckSecondaryIndex() throws Exception
+ {
+ Keyspace table = Keyspace.open(KSNAME);
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(CFNAME);
+ ByteBuffer key = ByteBufferUtil.bytes("k5");
+ ByteBuffer indexedColumnName = ByteBufferUtil.bytes(1);
+
+ cfs.truncateBlocking();
+ cfs.disableAutoCompaction();
+ cfs.setCompactionStrategyClass(SizeTieredCompactionStrategy.class.getCanonicalName());
+ if (cfs.indexManager.getIndexForColumn(indexedColumnName) == null)
+ {
+ ColumnDefinition cd = new ColumnDefinition(indexedColumnName,
+ cfs.getComparator(),
+ IndexType.CUSTOM,
+ ImmutableMap.of(SecondaryIndex.CUSTOM_INDEX_OPTION_NAME, TestIndex.class.getName()),
+ "test_index",
+ 0,
+ null);
+ cfs.indexManager.addIndexedColumn(cd);
+ }
+
+ TestIndex index = ((TestIndex)cfs.indexManager.getIndexForColumn(indexedColumnName));
+ index.resetCounts();
+
+ RowMutation rm = new RowMutation(KSNAME, key);
+ for (int i = 0; i < 10; i++)
+ add(rm, i, 0);
+ rm.apply();
+ cfs.forceBlockingFlush();
+
+ rm = new RowMutation(KSNAME, key);
+ ColumnFamily cf = rm.addOrGet(CFNAME);
+ for (int i = 0; i < 10; i += 2)
+ delete(cf, 0, 7, 0);
+ rm.apply();
+ cfs.forceBlockingFlush();
+
+ // We should have indexed 1 column
+ assertEquals(1, index.inserts.size());
+
+ CompactionManager.instance.performMaximal(cfs);
+
+ // compacted down to single sstable
+ assertEquals(1, cfs.getSSTables().size());
+
+ // verify that the 1 indexed column was removed from the index
+ assertEquals(1, index.deletes.size());
+ assertEquals(index.deletes.get(0), index.inserts.get(0));
+ }
+
private static boolean isLive(ColumnFamily cf, Column c)
{
return c != null && !c.isMarkedForDelete(System.currentTimeMillis()) && !cf.deletionInfo().isDeleted(c);
@@ -210,4 +382,50 @@ public class RangeTombstoneTest extends SchemaLoader
timestamp,
(int)(System.currentTimeMillis() / 1000)));
}
+
+ public static class TestIndex extends PerColumnSecondaryIndex
+ {
+ public List<Column> inserts = new ArrayList<>();
+ public List<Column> deletes = new ArrayList<>();
+
+ public void resetCounts()
+ {
+ inserts.clear();
+ deletes.clear();
+ }
+
+ public void delete(ByteBuffer rowKey, Column col)
+ {
+ deletes.add(col);
+ }
+
+ public void insert(ByteBuffer rowKey, Column col)
+ {
+ inserts.add(col);
+ }
+
+ public void update(ByteBuffer rowKey, Column col){}
+
+ public void init(){}
+
+ public void reload(){}
+
+ public void validateOptions() throws ConfigurationException{}
+
+ public String getIndexName(){ return "TestIndex";}
+
+ protected SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ByteBuffer> columns){ return null; }
+
+ public void forceBlockingFlush(){}
+
+ public long getLiveSize(){ return 0; }
+
+ public ColumnFamilyStore getIndexCfs(){ return null; }
+
+ public void removeIndex(ByteBuffer columnName){}
+
+ public void invalidate(){}
+
+ public void truncateBlocking(long truncatedAt) { }
+ }
}