You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/08/06 21:15:57 UTC

[2/3] git commit: Notify indexer of columns shadowed by range tombstones patch by Sam Tunnicliffe; reviewed by jbellis for CASSANDRA-5614

Notify indexer of columns shadowed by range tombstones
patch by Sam Tunnicliffe; reviewed by jbellis for CASSANDRA-5614


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ed4a0677
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ed4a0677
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ed4a0677

Branch: refs/heads/trunk
Commit: ed4a06771f555f49882c37f02ae45a58c2301105
Parents: cf62bdc
Author: Jonathan Ellis <jb...@apache.org>
Authored: Tue Aug 6 14:15:46 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Tue Aug 6 14:15:46 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   4 +
 .../cassandra/db/AtomicSortedColumns.java       |  13 +-
 .../org/apache/cassandra/db/DeletionInfo.java   |   5 +
 .../org/apache/cassandra/db/RangeTombstone.java |   6 +-
 .../db/compaction/LazilyCompactedRow.java       |  11 +-
 .../db/compaction/PrecompactedRow.java          |  14 +-
 .../apache/cassandra/db/RangeTombstoneTest.java | 220 ++++++++++++++++++-
 7 files changed, 264 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed4a0677/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index af94bc8..9509113 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,7 @@
+2.0.1
+ * Notify indexer of columns shadowed by range tombstones (CASSANDRA-5614)
+
+
 2.0.0
  * fix CAS contention timeout (CASSANDRA-5830)
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed4a0677/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
index 2393c5a..f6a6b83 100644
--- a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java
@@ -22,6 +22,8 @@ import java.util.*;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+
 import edu.stanford.ppl.concurrent.SnapTreeMap;
 
 import org.apache.cassandra.config.CFMetaData;
@@ -65,7 +67,7 @@ public class AtomicSortedColumns extends ColumnFamily
     private AtomicSortedColumns(CFMetaData metadata, Holder holder)
     {
         super(metadata);
-        this.ref = new AtomicReference<Holder>(holder);
+        this.ref = new AtomicReference<>(holder);
     }
 
     public AbstractType<?> getComparator()
@@ -179,6 +181,15 @@ public class AtomicSortedColumns extends ColumnFamily
             DeletionInfo newDelInfo = current.deletionInfo.copy().add(cm.deletionInfo());
             modified = new Holder(current.map.clone(), newDelInfo);
 
+            if (cm.deletionInfo().hasRanges())
+            {
+                for (Column currentColumn : Iterables.concat(current.map.values(), cm))
+                {
+                    if (cm.deletionInfo().isDeleted(currentColumn))
+                        indexer.remove(currentColumn);
+                }
+            }
+
             for (Column column : cm)
             {
                 sizeDelta += modified.addColumn(transformation.apply(column), allocator, indexer);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed4a0677/src/java/org/apache/cassandra/db/DeletionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java
index d00f2bc..4e1d68d 100644
--- a/src/java/org/apache/cassandra/db/DeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/DeletionInfo.java
@@ -230,6 +230,11 @@ public class DeletionInfo
         return size + (ranges == null ? 0 : ranges.dataSize());
     }
 
+    public boolean hasRanges()
+    {
+        return ranges != null && !ranges.isEmpty();
+    }
+
     @Override
     public String toString()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed4a0677/src/java/org/apache/cassandra/db/RangeTombstone.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java
index 02dddb2..c943ad7 100644
--- a/src/java/org/apache/cassandra/db/RangeTombstone.java
+++ b/src/java/org/apache/cassandra/db/RangeTombstone.java
@@ -241,8 +241,12 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement
         {
             for (RangeTombstone tombstone : ranges)
             {
-                if (comparator.compare(column.name(), tombstone.max) <= 0 && tombstone.data.isDeleted(column))
+                if (comparator.compare(column.name(), tombstone.min) >= 0
+                    && comparator.compare(column.name(), tombstone.max) <= 0
+                    && tombstone.maxTimestamp() >= column.timestamp())
+                {
                     return true;
+                }
             }
             return false;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed4a0677/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
index 6390b14..7d7c5a4 100644
--- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
@@ -208,8 +208,12 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
             {
                 Column column = (Column) current;
                 container.addColumn(column);
-                if (indexer != SecondaryIndexManager.nullUpdater
-                    && !column.isMarkedForDelete(System.currentTimeMillis())
+
+                // skip the index-update checks if there is no indexing needed since they are a bit expensive
+                if (indexer == SecondaryIndexManager.nullUpdater)
+                    return;
+
+                if (!column.isMarkedForDelete(System.currentTimeMillis())
                     && !container.getColumn(column.name()).equals(column))
                 {
                     indexer.remove(column);
@@ -247,7 +251,10 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable
                 // PrecompactedRow.removeDeletedAndOldShards have only checked the top-level CF deletion times,
                 // not the range tombstone. For that we use the columnIndexer tombstone tracker.
                 if (indexBuilder.tombstoneTracker().isDeleted(reduced))
+                {
+                    indexer.remove(reduced);
                     return null;
+                }
 
                 columns++;
                 minTimestampSeen = Math.min(minTimestampSeen, reduced.minTimestamp());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed4a0677/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
index 66f2ecb..15ae0b8 100644
--- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
@@ -106,7 +106,7 @@ public class PrecompactedRow extends AbstractCompactedRow
         final ColumnFamily returnCF = ArrayBackedSortedColumns.factory.create(controller.cfs.metadata);
 
         // transform into iterators that MergeIterator will like, and apply row-level tombstones
-        List<CloseableIterator<Column>> data = new ArrayList<CloseableIterator<Column>>(rows.size());
+        List<CloseableIterator<Column>> data = new ArrayList<>(rows.size());
         for (SSTableIdentityIterator row : rows)
         {
             ColumnFamily cf = row.getColumnFamilyWithColumns(ArrayBackedSortedColumns.factory);
@@ -132,9 +132,15 @@ public class PrecompactedRow extends AbstractCompactedRow
             public void reduce(Column column)
             {
                 container.addColumn(column);
-                if (indexer != SecondaryIndexManager.nullUpdater
-                    && !column.isMarkedForDelete(System.currentTimeMillis())
-                    && !container.getColumn(column.name()).equals(column))
+
+                // skip the index-update checks if there is no indexing needed since they are a bit expensive
+                if (indexer == SecondaryIndexManager.nullUpdater)
+                    return;
+
+                // notify the index that the column has been overwritten if the value being reduced has been
+                // superceded by another directly, or indirectly by a range tombstone
+                if ((!column.isMarkedForDelete(System.currentTimeMillis()) && !container.getColumn(column.name()).equals(column))
+                    || returnCF.deletionInfo().isDeleted(column.name(), CompactionManager.NO_GC))
                 {
                     indexer.remove(column);
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ed4a0677/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
index b240a5f..731b364 100644
--- a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
+++ b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
@@ -21,6 +21,16 @@ package org.apache.cassandra.db;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import com.google.common.collect.ImmutableMap;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
+import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
+import org.apache.cassandra.db.index.*;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.thrift.IndexType;
+
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
@@ -29,6 +39,8 @@ import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.apache.cassandra.Util.dk;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class RangeTombstoneTest extends SchemaLoader
 {
@@ -73,7 +85,7 @@ public class RangeTombstoneTest extends SchemaLoader
         // Queries by name
         int[] live = new int[]{ 4, 9, 11, 17, 28 };
         int[] dead = new int[]{ 12, 19, 21, 24, 27 };
-        SortedSet<ByteBuffer> columns = new TreeSet<ByteBuffer>(cfs.getComparator());
+        SortedSet<ByteBuffer> columns = new TreeSet<>(cfs.getComparator());
         for (int i : live)
             columns.add(b(i));
         for (int i : dead)
@@ -182,6 +194,166 @@ public class RangeTombstoneTest extends SchemaLoader
         assert last == 1 : "Last column should be column 1 since column 2 has been deleted";
     }
 
+    @Test
+    public void testPreCompactedRowWithRangeTombstonesUpdatesSecondaryIndex() throws Exception
+    {
+        // nothing special to do here, just run the test
+        runCompactionWithRangeTombstoneAndCheckSecondaryIndex();
+    }
+
+    @Test
+    public void testLazilyCompactedRowWithRangeTombstonesUpdatesSecondaryIndex() throws Exception
+    {
+        // make sure we use LazilyCompactedRow by exceeding in_memory_compaction_limit
+        DatabaseDescriptor.setInMemoryCompactionLimit(0);
+        runCompactionWithRangeTombstoneAndCheckSecondaryIndex();
+    }
+
+    @Test
+    public void testLazilyCompactedRowGeneratesSameSSTablesAsPreCompactedRow() throws Exception
+    {
+        Keyspace table = Keyspace.open(KSNAME);
+        ColumnFamilyStore cfs = table.getColumnFamilyStore(CFNAME);
+        ByteBuffer key = ByteBufferUtil.bytes("k4");
+
+        // remove any existing sstables before starting
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+        cfs.setCompactionStrategyClass(SizeTieredCompactionStrategy.class.getCanonicalName());
+
+        RowMutation rm = new RowMutation(KSNAME, key);
+        for (int i = 0; i < 10; i += 2)
+            add(rm, i, 0);
+        rm.apply();
+        cfs.forceBlockingFlush();
+
+        rm = new RowMutation(KSNAME, key);
+        ColumnFamily cf = rm.addOrGet(CFNAME);
+        for (int i = 0; i < 10; i += 2)
+            delete(cf, 0, 7, 0);
+        rm.apply();
+        cfs.forceBlockingFlush();
+
+        // there should be 2 sstables
+        assertEquals(2, cfs.getSSTables().size());
+
+        // compact down to single sstable
+        CompactionManager.instance.performMaximal(cfs);
+        assertEquals(1, cfs.getSSTables().size());
+
+        // test the physical structure of the sstable i.e. rt & columns on disk
+        SSTableReader sstable = cfs.getSSTables().iterator().next();
+        OnDiskAtomIterator iter = sstable.getScanner().next();
+        int cnt = 0;
+        // after compaction, the first element should be an RT followed by the remaining non-deleted columns
+        while(iter.hasNext())
+        {
+            OnDiskAtom atom = iter.next();
+            if (cnt == 0)
+                assertTrue(atom instanceof RangeTombstone);
+            if (cnt > 0)
+                assertTrue(atom instanceof Column);
+            cnt++;
+        }
+        assertEquals(2, cnt);
+    }
+
+    @Test
+    public void testMemtableUpdateWithRangeTombstonesUpdatesSecondaryIndex() throws Exception
+    {
+        Keyspace table = Keyspace.open(KSNAME);
+        ColumnFamilyStore cfs = table.getColumnFamilyStore(CFNAME);
+        ByteBuffer key = ByteBufferUtil.bytes("k5");
+        ByteBuffer indexedColumnName = ByteBufferUtil.bytes(1);
+
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+        cfs.setCompactionStrategyClass(SizeTieredCompactionStrategy.class.getCanonicalName());
+        if (cfs.indexManager.getIndexForColumn(indexedColumnName) == null)
+        {
+            ColumnDefinition cd = new ColumnDefinition(indexedColumnName,
+                                                       cfs.getComparator(),
+                                                       IndexType.CUSTOM,
+                                                       ImmutableMap.of(SecondaryIndex.CUSTOM_INDEX_OPTION_NAME, TestIndex.class.getName()),
+                                                       "test_index",
+                                                       0,
+                                                       null);
+            cfs.indexManager.addIndexedColumn(cd);
+        }
+
+        TestIndex index = ((TestIndex)cfs.indexManager.getIndexForColumn(indexedColumnName));
+        index.resetCounts();
+
+        RowMutation rm = new RowMutation(KSNAME, key);
+        for (int i = 0; i < 10; i++)
+            add(rm, i, 0);
+        rm.apply();
+
+        // We should have indexed 1 column
+        assertEquals(1, index.inserts.size());
+
+        rm = new RowMutation(KSNAME, key);
+        ColumnFamily cf = rm.addOrGet(CFNAME);
+        for (int i = 0; i < 10; i += 2)
+            delete(cf, 0, 7, 0);
+        rm.apply();
+
+        // verify that the 1 indexed column was removed from the index
+        assertEquals(1, index.deletes.size());
+        assertEquals(index.deletes.get(0), index.inserts.get(0));
+    }
+
+    private void runCompactionWithRangeTombstoneAndCheckSecondaryIndex() throws Exception
+    {
+        Keyspace table = Keyspace.open(KSNAME);
+        ColumnFamilyStore cfs = table.getColumnFamilyStore(CFNAME);
+        ByteBuffer key = ByteBufferUtil.bytes("k5");
+        ByteBuffer indexedColumnName = ByteBufferUtil.bytes(1);
+
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+        cfs.setCompactionStrategyClass(SizeTieredCompactionStrategy.class.getCanonicalName());
+        if (cfs.indexManager.getIndexForColumn(indexedColumnName) == null)
+        {
+            ColumnDefinition cd = new ColumnDefinition(indexedColumnName,
+                                                       cfs.getComparator(),
+                                                       IndexType.CUSTOM,
+                                                       ImmutableMap.of(SecondaryIndex.CUSTOM_INDEX_OPTION_NAME, TestIndex.class.getName()),
+                                                       "test_index",
+                                                       0,
+                                                       null);
+            cfs.indexManager.addIndexedColumn(cd);
+        }
+
+        TestIndex index = ((TestIndex)cfs.indexManager.getIndexForColumn(indexedColumnName));
+        index.resetCounts();
+
+        RowMutation rm = new RowMutation(KSNAME, key);
+        for (int i = 0; i < 10; i++)
+            add(rm, i, 0);
+        rm.apply();
+        cfs.forceBlockingFlush();
+
+        rm = new RowMutation(KSNAME, key);
+        ColumnFamily cf = rm.addOrGet(CFNAME);
+        for (int i = 0; i < 10; i += 2)
+            delete(cf, 0, 7, 0);
+        rm.apply();
+        cfs.forceBlockingFlush();
+
+        // We should have indexed 1 column
+        assertEquals(1, index.inserts.size());
+
+        CompactionManager.instance.performMaximal(cfs);
+
+        // compacted down to single sstable
+        assertEquals(1, cfs.getSSTables().size());
+
+        // verify that the 1 indexed column was removed from the index
+        assertEquals(1, index.deletes.size());
+        assertEquals(index.deletes.get(0), index.inserts.get(0));
+    }
+
     private static boolean isLive(ColumnFamily cf, Column c)
     {
         return c != null && !c.isMarkedForDelete(System.currentTimeMillis()) && !cf.deletionInfo().isDeleted(c);
@@ -210,4 +382,50 @@ public class RangeTombstoneTest extends SchemaLoader
                                    timestamp,
                                    (int)(System.currentTimeMillis() / 1000)));
     }
+
+    public static class TestIndex extends PerColumnSecondaryIndex
+    {
+        public List<Column> inserts = new ArrayList<>();
+        public List<Column> deletes = new ArrayList<>();
+
+        public void resetCounts()
+        {
+            inserts.clear();
+            deletes.clear();
+        }
+
+        public void delete(ByteBuffer rowKey, Column col)
+        {
+            deletes.add(col);
+        }
+
+        public void insert(ByteBuffer rowKey, Column col)
+        {
+            inserts.add(col);
+        }
+
+        public void update(ByteBuffer rowKey, Column col){}
+
+        public void init(){}
+
+        public void reload(){}
+
+        public void validateOptions() throws ConfigurationException{}
+
+        public String getIndexName(){ return "TestIndex";}
+
+        protected SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ByteBuffer> columns){ return null; }
+
+        public void forceBlockingFlush(){}
+
+        public long getLiveSize(){ return 0; }
+
+        public ColumnFamilyStore getIndexCfs(){ return null; }
+
+        public void removeIndex(ByteBuffer columnName){}
+
+        public void invalidate(){}
+
+        public void truncateBlocking(long truncatedAt) { }
+    }
 }