You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2017/01/19 22:15:03 UTC

[3/6] cassandra git commit: Use unfiltered iterator for partition indexing

Use unfiltered iterator for partition indexing

Patch by Alex Petrov; reviewed by Sergio Bossa for CASSANDRA-13075;


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7a06df79
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7a06df79
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7a06df79

Branch: refs/heads/trunk
Commit: 7a06df79d829ac073264045eb9420a61c5ba939a
Parents: 48fed80
Author: Alex Petrov <ol...@gmail.com>
Authored: Thu Jan 19 09:49:23 2017 +0100
Committer: Jason Brown <ja...@gmail.com>
Committed: Thu Jan 19 10:55:10 2017 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/index/SecondaryIndexManager.java  |  86 ++++++++++-----
 .../service/pager/AbstractQueryPager.java       |  54 ++++++++--
 .../apache/cassandra/index/CustomIndexTest.java | 106 +++++++++++++++++++
 .../org/apache/cassandra/index/StubIndex.java   |   4 +
 5 files changed, 216 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a06df79/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6293cfa..a85386b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.11
+ * Indexer is not correctly invoked when building indexes over sstables (CASSANDRA-13075)
  * Read repair is not blocking repair to finish in foreground repair (CASSANDRA-13115)
  * Stress daemon help is incorrect (CASSANDRA-12563)
  * Remove ALTER TYPE support (CASSANDRA-12443)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a06df79/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index a6ed3ba..d39b607 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -48,8 +48,7 @@ import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.lifecycle.View;
-import org.apache.cassandra.db.partitions.PartitionIterators;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.index.internal.CassandraIndex;
@@ -543,35 +542,64 @@ public class SecondaryIndexManager implements IndexRegistry
             {
                 try (ReadOrderGroup readGroup = cmd.startOrderGroup();
                      OpOrder.Group writeGroup = Keyspace.writeOrder.start();
-                     RowIterator partition =
-                        PartitionIterators.getOnlyElement(pager.fetchPageInternal(pageSize,readGroup),
-                                                          cmd))
+                     UnfilteredPartitionIterator page = pager.fetchPageUnfiltered(baseCfs.metadata, pageSize, readGroup))
                 {
-                    Set<Index.Indexer> indexers = indexes.stream()
-                                                         .map(index -> index.indexerFor(key,
-                                                                                        partition.columns(),
-                                                                                        nowInSec,
-                                                                                        writeGroup,
-                                                                                        IndexTransaction.Type.UPDATE))
-                                                         .filter(Objects::nonNull)
-                                                         .collect(Collectors.toSet());
-
-                    indexers.forEach(Index.Indexer::begin);
-
-                    // only process the static row once per partition
-                    if (!readStatic && !partition.staticRow().isEmpty())
-                    {
-                        indexers.forEach(indexer -> indexer.insertRow(partition.staticRow()));
-                        readStatic = true;
+                    if (!page.hasNext())
+                        break;
+
+                    try (UnfilteredRowIterator partition = page.next()) {
+                        Set<Index.Indexer> indexers = indexes.stream()
+                                                             .map(index -> index.indexerFor(key,
+                                                                                            partition.columns(),
+                                                                                            nowInSec,
+                                                                                            writeGroup,
+                                                                                            IndexTransaction.Type.UPDATE))
+                                                             .filter(Objects::nonNull)
+                                                             .collect(Collectors.toSet());
+
+                        // Short-circuit empty partitions if static row is processed or isn't read
+                        if (!readStatic && partition.isEmpty() && partition.staticRow().isEmpty())
+                            break;
+
+                        indexers.forEach(Index.Indexer::begin);
+
+                        if (!readStatic)
+                        {
+                            if (!partition.staticRow().isEmpty())
+                                indexers.forEach(indexer -> indexer.insertRow(partition.staticRow()));
+                            indexers.forEach((Index.Indexer i) -> i.partitionDelete(partition.partitionLevelDeletion()));
+                            readStatic = true;
+                        }
+
+                        MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(partition.partitionLevelDeletion(), baseCfs.getComparator(), false);
+
+                        while (partition.hasNext())
+                        {
+                            Unfiltered unfilteredRow = partition.next();
+
+                            if (unfilteredRow.isRow())
+                            {
+                                Row row = (Row) unfilteredRow;
+                                indexers.forEach(indexer -> indexer.insertRow(row));
+                            }
+                            else
+                            {
+                                assert unfilteredRow.isRangeTombstoneMarker();
+                                RangeTombstoneMarker marker = (RangeTombstoneMarker) unfilteredRow;
+                                deletionBuilder.add(marker);
+                            }
+                        }
+
+                        MutableDeletionInfo deletionInfo = deletionBuilder.build();
+                        if (deletionInfo.hasRanges())
+                        {
+                            Iterator<RangeTombstone> iter = deletionInfo.rangeIterator(false);
+                            while (iter.hasNext())
+                                indexers.forEach(indexer -> indexer.rangeTombstone(iter.next()));
+                        }
+
+                        indexers.forEach(Index.Indexer::finish);
                     }
-
-                    while (partition.hasNext())
-                    {
-                        Row row = partition.next();
-                        indexers.forEach(indexer -> indexer.insertRow(row));
-                    }
-
-                    indexers.forEach(Index.Indexer::finish);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a06df79/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index 48f6c04..74ec47d 100644
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.service.pager;
 
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
@@ -63,7 +64,7 @@ abstract class AbstractQueryPager implements QueryPager
             return EmptyIterators.partition();
 
         pageSize = Math.min(pageSize, remaining);
-        Pager pager = new Pager(limits.forPaging(pageSize), command.nowInSec());
+        Pager pager = new RowPager(limits.forPaging(pageSize), command.nowInSec());
         return Transformation.apply(nextPageReadCommand(pageSize).execute(consistency, clientState), pager);
     }
 
@@ -73,14 +74,53 @@ abstract class AbstractQueryPager implements QueryPager
             return EmptyIterators.partition();
 
         pageSize = Math.min(pageSize, remaining);
-        Pager pager = new Pager(limits.forPaging(pageSize), command.nowInSec());
+        RowPager pager = new RowPager(limits.forPaging(pageSize), command.nowInSec());
         return Transformation.apply(nextPageReadCommand(pageSize).executeInternal(orderGroup), pager);
     }
 
-    private class Pager extends Transformation<RowIterator>
+    public UnfilteredPartitionIterator fetchPageUnfiltered(CFMetaData cfm, int pageSize, ReadOrderGroup orderGroup)
+    {
+        if (isExhausted())
+            return EmptyIterators.unfilteredPartition(cfm, false);
+
+        pageSize = Math.min(pageSize, remaining);
+        UnfilteredPager pager = new UnfilteredPager(limits.forPaging(pageSize), command.nowInSec());
+
+        return Transformation.apply(nextPageReadCommand(pageSize).executeLocally(orderGroup), pager);
+    }
+
+    private class UnfilteredPager extends Pager<Unfiltered>
+    {
+
+        private UnfilteredPager(DataLimits pageLimits, int nowInSec)
+        {
+            super(pageLimits, nowInSec);
+        }
+
+        protected BaseRowIterator<Unfiltered> apply(BaseRowIterator<Unfiltered> partition)
+        {
+            return Transformation.apply(counter.applyTo((UnfilteredRowIterator) partition), this);
+        }
+    }
+
+    private class RowPager extends Pager<Row>
+    {
+
+        private RowPager(DataLimits pageLimits, int nowInSec)
+        {
+            super(pageLimits, nowInSec);
+        }
+
+        protected BaseRowIterator<Row> apply(BaseRowIterator<Row> partition)
+        {
+            return Transformation.apply(counter.applyTo((RowIterator) partition), this);
+        }
+    }
+
+    private abstract class Pager<T extends Unfiltered> extends Transformation<BaseRowIterator<T>>
     {
         private final DataLimits pageLimits;
-        private final DataLimits.Counter counter;
+        protected final DataLimits.Counter counter;
         private Row lastRow;
         private boolean isFirstPartition = true;
 
@@ -91,7 +131,7 @@ abstract class AbstractQueryPager implements QueryPager
         }
 
         @Override
-        public RowIterator applyToPartition(RowIterator partition)
+        public BaseRowIterator<T> applyToPartition(BaseRowIterator<T> partition)
         {
             DecoratedKey key = partition.partitionKey();
             if (lastKey == null || !lastKey.equals(key))
@@ -113,9 +153,11 @@ abstract class AbstractQueryPager implements QueryPager
                 }
             }
 
-            return Transformation.apply(counter.applyTo(partition), this);
+            return apply(partition);
         }
 
+        protected abstract BaseRowIterator<T> apply(BaseRowIterator<T> partition);
+
         @Override
         public void onClose()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a06df79/test/unit/org/apache/cassandra/index/CustomIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/CustomIndexTest.java b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
index 33e7182..81e1745 100644
--- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
@@ -686,6 +686,112 @@ public class CustomIndexTest extends CQLTester
         index.barriers.forEach(OpOrder.Barrier::allPriorOpsAreFinished);
     }
 
+    @Test
+    public void partitionIndexTest() throws Throwable
+    {
+        createTable("CREATE TABLE %s(k int, c int, v int, s int static, PRIMARY KEY(k,c))");
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+
+        execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 1, 1, 1);
+        execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 1, 2, 2);
+        execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 1, 3, 3);
+
+        execute("INSERT INTO %s (k, c) VALUES (?, ?)", 2, 2);
+
+        execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 3, 1, 1);
+        execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 3, 2, 2);
+        execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 3, 3, 3);
+        execute("DELETE FROM %s WHERE k = ? AND c >= ?", 3, 3);
+        execute("DELETE FROM %s WHERE k = ? AND c <= ?", 3, 1);
+
+        execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 4, 1, 1);
+        execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 4, 2, 2);
+        execute("DELETE FROM %s WHERE k = ? AND c = ?", 4, 1);
+
+        execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 5, 1, 1);
+        execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 5, 2, 2);
+        execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 5, 3, 3);
+        execute("DELETE FROM %s WHERE k = ?", 5);
+
+        cfs.forceBlockingFlush();
+
+        String indexName = "partition_index_test_idx";
+        createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(v) USING '%s'",
+                                  indexName, StubIndex.class.getName()));
+
+        SecondaryIndexManager indexManager = cfs.indexManager;
+        StubIndex index = (StubIndex) indexManager.getIndexByName(indexName);
+
+        DecoratedKey targetKey;
+        for (int pageSize = 1; pageSize <= 5; pageSize++)
+        {
+            targetKey = getCurrentColumnFamilyStore().decorateKey(ByteBufferUtil.bytes(1));
+            indexManager.indexPartition(targetKey, Collections.singleton(index), pageSize);
+            assertEquals(3, index.rowsInserted.size());
+            assertEquals(0, index.rangeTombstones.size());
+            assertTrue(index.partitionDeletions.get(0).isLive());
+            index.reset();
+        }
+
+        for (int pageSize = 1; pageSize <= 5; pageSize++)
+        {
+            targetKey = getCurrentColumnFamilyStore().decorateKey(ByteBufferUtil.bytes(2));
+            indexManager.indexPartition(targetKey, Collections.singleton(index), pageSize);
+            assertEquals(1, index.rowsInserted.size());
+            assertEquals(0, index.rangeTombstones.size());
+            assertTrue(index.partitionDeletions.get(0).isLive());
+            index.reset();
+        }
+
+        for (int pageSize = 1; pageSize <= 5; pageSize++)
+        {
+            targetKey = getCurrentColumnFamilyStore().decorateKey(ByteBufferUtil.bytes(3));
+            indexManager.indexPartition(targetKey, Collections.singleton(index), pageSize);
+            assertEquals(1, index.rowsInserted.size());
+            assertEquals(2, index.rangeTombstones.size());
+            assertTrue(index.partitionDeletions.get(0).isLive());
+            index.reset();
+        }
+
+        for (int pageSize = 1; pageSize <= 5; pageSize++)
+        {
+            targetKey = getCurrentColumnFamilyStore().decorateKey(ByteBufferUtil.bytes(5));
+            indexManager.indexPartition(targetKey, Collections.singleton(index), pageSize);
+            assertEquals(1, index.partitionDeletions.size());
+            assertFalse(index.partitionDeletions.get(0).isLive());
+            index.reset();
+        }
+    }
+
+    @Test
+    public void partitionIsNotOverIndexed() throws Throwable
+    {
+        createTable("CREATE TABLE %s(k int, c int, v int, PRIMARY KEY(k,c))");
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+        SecondaryIndexManager indexManager = cfs.indexManager;
+
+        int totalRows = 1;
+
+        // Insert a single row partition to be indexed
+        for (int i = 0; i < totalRows; i++)
+            execute("INSERT INTO %s (k, c, v) VALUES (0, ?, ?)", i, i);
+        cfs.forceBlockingFlush();
+
+        // Create the index, which won't automatically start building
+        String indexName = "partition_overindex_test_idx";
+        createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(v) USING '%s'",
+                                  indexName, StubIndex.class.getName()));
+        StubIndex index = (StubIndex) indexManager.getIndexByName(indexName);
+
+        // Index the partition
+        DecoratedKey targetKey = getCurrentColumnFamilyStore().decorateKey(ByteBufferUtil.bytes(0));
+        indexManager.indexPartition(targetKey, Collections.singleton(index), totalRows);
+
+        // Assert only one partition is counted
+        assertEquals(1, index.beginCalls);
+        assertEquals(1, index.finishCalls);
+    }
+
     // Used for index creation above
     public static class BrokenCustom2I extends StubIndex
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a06df79/test/unit/org/apache/cassandra/index/StubIndex.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/StubIndex.java b/test/unit/org/apache/cassandra/index/StubIndex.java
index 28ea097..569ce97 100644
--- a/test/unit/org/apache/cassandra/index/StubIndex.java
+++ b/test/unit/org/apache/cassandra/index/StubIndex.java
@@ -47,6 +47,8 @@ import org.apache.cassandra.utils.concurrent.OpOrder;
  */
 public class StubIndex implements Index
 {
+    public volatile int beginCalls;
+    public volatile int finishCalls;
     public List<DeletionTime> partitionDeletions = new ArrayList<>();
     public List<RangeTombstone> rangeTombstones = new ArrayList<>();
     public List<Row> rowsInserted = new ArrayList<>();
@@ -105,6 +107,7 @@ public class StubIndex implements Index
         {
             public void begin()
             {
+                beginCalls++;
             }
 
             public void partitionDelete(DeletionTime deletionTime)
@@ -134,6 +137,7 @@ public class StubIndex implements Index
 
             public void finish()
             {
+                finishCalls++;
             }
         };
     }