You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2017/01/19 22:15:03 UTC
[3/6] cassandra git commit: Use unfiltered iterator for partition
indexing
Use unfiltered iterator for partition indexing
Patch by Alex Petrov; reviewed by Sergio Bossa for CASSANDRA-13075;
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7a06df79
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7a06df79
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7a06df79
Branch: refs/heads/trunk
Commit: 7a06df79d829ac073264045eb9420a61c5ba939a
Parents: 48fed80
Author: Alex Petrov <ol...@gmail.com>
Authored: Thu Jan 19 09:49:23 2017 +0100
Committer: Jason Brown <ja...@gmail.com>
Committed: Thu Jan 19 10:55:10 2017 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/index/SecondaryIndexManager.java | 86 ++++++++++-----
.../service/pager/AbstractQueryPager.java | 54 ++++++++--
.../apache/cassandra/index/CustomIndexTest.java | 106 +++++++++++++++++++
.../org/apache/cassandra/index/StubIndex.java | 4 +
5 files changed, 216 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a06df79/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6293cfa..a85386b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.11
+ * Indexer is not correctly invoked when building indexes over sstables (CASSANDRA-13075)
* Read repair is not blocking repair to finish in foreground repair (CASSANDRA-13115)
* Stress daemon help is incorrect (CASSANDRA-12563)
* Remove ALTER TYPE support (CASSANDRA-12443)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a06df79/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index a6ed3ba..d39b607 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -48,8 +48,7 @@ import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.lifecycle.View;
-import org.apache.cassandra.db.partitions.PartitionIterators;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.index.internal.CassandraIndex;
@@ -543,35 +542,64 @@ public class SecondaryIndexManager implements IndexRegistry
{
try (ReadOrderGroup readGroup = cmd.startOrderGroup();
OpOrder.Group writeGroup = Keyspace.writeOrder.start();
- RowIterator partition =
- PartitionIterators.getOnlyElement(pager.fetchPageInternal(pageSize,readGroup),
- cmd))
+ UnfilteredPartitionIterator page = pager.fetchPageUnfiltered(baseCfs.metadata, pageSize, readGroup))
{
- Set<Index.Indexer> indexers = indexes.stream()
- .map(index -> index.indexerFor(key,
- partition.columns(),
- nowInSec,
- writeGroup,
- IndexTransaction.Type.UPDATE))
- .filter(Objects::nonNull)
- .collect(Collectors.toSet());
-
- indexers.forEach(Index.Indexer::begin);
-
- // only process the static row once per partition
- if (!readStatic && !partition.staticRow().isEmpty())
- {
- indexers.forEach(indexer -> indexer.insertRow(partition.staticRow()));
- readStatic = true;
+ if (!page.hasNext())
+ break;
+
+ try (UnfilteredRowIterator partition = page.next()) {
+ Set<Index.Indexer> indexers = indexes.stream()
+ .map(index -> index.indexerFor(key,
+ partition.columns(),
+ nowInSec,
+ writeGroup,
+ IndexTransaction.Type.UPDATE))
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+
+ // Short-circuit empty partitions if static row is processed or isn't read
+ if (!readStatic && partition.isEmpty() && partition.staticRow().isEmpty())
+ break;
+
+ indexers.forEach(Index.Indexer::begin);
+
+ if (!readStatic)
+ {
+ if (!partition.staticRow().isEmpty())
+ indexers.forEach(indexer -> indexer.insertRow(partition.staticRow()));
+ indexers.forEach((Index.Indexer i) -> i.partitionDelete(partition.partitionLevelDeletion()));
+ readStatic = true;
+ }
+
+ MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(partition.partitionLevelDeletion(), baseCfs.getComparator(), false);
+
+ while (partition.hasNext())
+ {
+ Unfiltered unfilteredRow = partition.next();
+
+ if (unfilteredRow.isRow())
+ {
+ Row row = (Row) unfilteredRow;
+ indexers.forEach(indexer -> indexer.insertRow(row));
+ }
+ else
+ {
+ assert unfilteredRow.isRangeTombstoneMarker();
+ RangeTombstoneMarker marker = (RangeTombstoneMarker) unfilteredRow;
+ deletionBuilder.add(marker);
+ }
+ }
+
+ MutableDeletionInfo deletionInfo = deletionBuilder.build();
+ if (deletionInfo.hasRanges())
+ {
+ Iterator<RangeTombstone> iter = deletionInfo.rangeIterator(false);
+ while (iter.hasNext())
+ indexers.forEach(indexer -> indexer.rangeTombstone(iter.next()));
+ }
+
+ indexers.forEach(Index.Indexer::finish);
}
-
- while (partition.hasNext())
- {
- Row row = partition.next();
- indexers.forEach(indexer -> indexer.insertRow(row));
- }
-
- indexers.forEach(Index.Indexer::finish);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a06df79/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index 48f6c04..74ec47d 100644
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.service.pager;
+import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.partitions.*;
@@ -63,7 +64,7 @@ abstract class AbstractQueryPager implements QueryPager
return EmptyIterators.partition();
pageSize = Math.min(pageSize, remaining);
- Pager pager = new Pager(limits.forPaging(pageSize), command.nowInSec());
+ Pager pager = new RowPager(limits.forPaging(pageSize), command.nowInSec());
return Transformation.apply(nextPageReadCommand(pageSize).execute(consistency, clientState), pager);
}
@@ -73,14 +74,53 @@ abstract class AbstractQueryPager implements QueryPager
return EmptyIterators.partition();
pageSize = Math.min(pageSize, remaining);
- Pager pager = new Pager(limits.forPaging(pageSize), command.nowInSec());
+ RowPager pager = new RowPager(limits.forPaging(pageSize), command.nowInSec());
return Transformation.apply(nextPageReadCommand(pageSize).executeInternal(orderGroup), pager);
}
- private class Pager extends Transformation<RowIterator>
+ public UnfilteredPartitionIterator fetchPageUnfiltered(CFMetaData cfm, int pageSize, ReadOrderGroup orderGroup)
+ {
+ if (isExhausted())
+ return EmptyIterators.unfilteredPartition(cfm, false);
+
+ pageSize = Math.min(pageSize, remaining);
+ UnfilteredPager pager = new UnfilteredPager(limits.forPaging(pageSize), command.nowInSec());
+
+ return Transformation.apply(nextPageReadCommand(pageSize).executeLocally(orderGroup), pager);
+ }
+
+ private class UnfilteredPager extends Pager<Unfiltered>
+ {
+
+ private UnfilteredPager(DataLimits pageLimits, int nowInSec)
+ {
+ super(pageLimits, nowInSec);
+ }
+
+ protected BaseRowIterator<Unfiltered> apply(BaseRowIterator<Unfiltered> partition)
+ {
+ return Transformation.apply(counter.applyTo((UnfilteredRowIterator) partition), this);
+ }
+ }
+
+ private class RowPager extends Pager<Row>
+ {
+
+ private RowPager(DataLimits pageLimits, int nowInSec)
+ {
+ super(pageLimits, nowInSec);
+ }
+
+ protected BaseRowIterator<Row> apply(BaseRowIterator<Row> partition)
+ {
+ return Transformation.apply(counter.applyTo((RowIterator) partition), this);
+ }
+ }
+
+ private abstract class Pager<T extends Unfiltered> extends Transformation<BaseRowIterator<T>>
{
private final DataLimits pageLimits;
- private final DataLimits.Counter counter;
+ protected final DataLimits.Counter counter;
private Row lastRow;
private boolean isFirstPartition = true;
@@ -91,7 +131,7 @@ abstract class AbstractQueryPager implements QueryPager
}
@Override
- public RowIterator applyToPartition(RowIterator partition)
+ public BaseRowIterator<T> applyToPartition(BaseRowIterator<T> partition)
{
DecoratedKey key = partition.partitionKey();
if (lastKey == null || !lastKey.equals(key))
@@ -113,9 +153,11 @@ abstract class AbstractQueryPager implements QueryPager
}
}
- return Transformation.apply(counter.applyTo(partition), this);
+ return apply(partition);
}
+ protected abstract BaseRowIterator<T> apply(BaseRowIterator<T> partition);
+
@Override
public void onClose()
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a06df79/test/unit/org/apache/cassandra/index/CustomIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/CustomIndexTest.java b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
index 33e7182..81e1745 100644
--- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
@@ -686,6 +686,112 @@ public class CustomIndexTest extends CQLTester
index.barriers.forEach(OpOrder.Barrier::allPriorOpsAreFinished);
}
+ @Test
+ public void partitionIndexTest() throws Throwable
+ {
+ createTable("CREATE TABLE %s(k int, c int, v int, s int static, PRIMARY KEY(k,c))");
+ ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+
+ execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 1, 1, 1);
+ execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 1, 2, 2);
+ execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 1, 3, 3);
+
+ execute("INSERT INTO %s (k, c) VALUES (?, ?)", 2, 2);
+
+ execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 3, 1, 1);
+ execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 3, 2, 2);
+ execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 3, 3, 3);
+ execute("DELETE FROM %s WHERE k = ? AND c >= ?", 3, 3);
+ execute("DELETE FROM %s WHERE k = ? AND c <= ?", 3, 1);
+
+ execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 4, 1, 1);
+ execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 4, 2, 2);
+ execute("DELETE FROM %s WHERE k = ? AND c = ?", 4, 1);
+
+ execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 5, 1, 1);
+ execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 5, 2, 2);
+ execute("INSERT INTO %s (k, c, v) VALUES (?, ?, ?)", 5, 3, 3);
+ execute("DELETE FROM %s WHERE k = ?", 5);
+
+ cfs.forceBlockingFlush();
+
+ String indexName = "partition_index_test_idx";
+ createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(v) USING '%s'",
+ indexName, StubIndex.class.getName()));
+
+ SecondaryIndexManager indexManager = cfs.indexManager;
+ StubIndex index = (StubIndex) indexManager.getIndexByName(indexName);
+
+ DecoratedKey targetKey;
+ for (int pageSize = 1; pageSize <= 5; pageSize++)
+ {
+ targetKey = getCurrentColumnFamilyStore().decorateKey(ByteBufferUtil.bytes(1));
+ indexManager.indexPartition(targetKey, Collections.singleton(index), pageSize);
+ assertEquals(3, index.rowsInserted.size());
+ assertEquals(0, index.rangeTombstones.size());
+ assertTrue(index.partitionDeletions.get(0).isLive());
+ index.reset();
+ }
+
+ for (int pageSize = 1; pageSize <= 5; pageSize++)
+ {
+ targetKey = getCurrentColumnFamilyStore().decorateKey(ByteBufferUtil.bytes(2));
+ indexManager.indexPartition(targetKey, Collections.singleton(index), pageSize);
+ assertEquals(1, index.rowsInserted.size());
+ assertEquals(0, index.rangeTombstones.size());
+ assertTrue(index.partitionDeletions.get(0).isLive());
+ index.reset();
+ }
+
+ for (int pageSize = 1; pageSize <= 5; pageSize++)
+ {
+ targetKey = getCurrentColumnFamilyStore().decorateKey(ByteBufferUtil.bytes(3));
+ indexManager.indexPartition(targetKey, Collections.singleton(index), pageSize);
+ assertEquals(1, index.rowsInserted.size());
+ assertEquals(2, index.rangeTombstones.size());
+ assertTrue(index.partitionDeletions.get(0).isLive());
+ index.reset();
+ }
+
+ for (int pageSize = 1; pageSize <= 5; pageSize++)
+ {
+ targetKey = getCurrentColumnFamilyStore().decorateKey(ByteBufferUtil.bytes(5));
+ indexManager.indexPartition(targetKey, Collections.singleton(index), pageSize);
+ assertEquals(1, index.partitionDeletions.size());
+ assertFalse(index.partitionDeletions.get(0).isLive());
+ index.reset();
+ }
+ }
+
+ @Test
+ public void partitionIsNotOverIndexed() throws Throwable
+ {
+ createTable("CREATE TABLE %s(k int, c int, v int, PRIMARY KEY(k,c))");
+ ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+ SecondaryIndexManager indexManager = cfs.indexManager;
+
+ int totalRows = 1;
+
+ // Insert a single row partition to be indexed
+ for (int i = 0; i < totalRows; i++)
+ execute("INSERT INTO %s (k, c, v) VALUES (0, ?, ?)", i, i);
+ cfs.forceBlockingFlush();
+
+ // Create the index, which won't automatically start building
+ String indexName = "partition_overindex_test_idx";
+ createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(v) USING '%s'",
+ indexName, StubIndex.class.getName()));
+ StubIndex index = (StubIndex) indexManager.getIndexByName(indexName);
+
+ // Index the partition
+ DecoratedKey targetKey = getCurrentColumnFamilyStore().decorateKey(ByteBufferUtil.bytes(0));
+ indexManager.indexPartition(targetKey, Collections.singleton(index), totalRows);
+
+ // Assert only one partition is counted
+ assertEquals(1, index.beginCalls);
+ assertEquals(1, index.finishCalls);
+ }
+
// Used for index creation above
public static class BrokenCustom2I extends StubIndex
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/7a06df79/test/unit/org/apache/cassandra/index/StubIndex.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/StubIndex.java b/test/unit/org/apache/cassandra/index/StubIndex.java
index 28ea097..569ce97 100644
--- a/test/unit/org/apache/cassandra/index/StubIndex.java
+++ b/test/unit/org/apache/cassandra/index/StubIndex.java
@@ -47,6 +47,8 @@ import org.apache.cassandra.utils.concurrent.OpOrder;
*/
public class StubIndex implements Index
{
+ public volatile int beginCalls;
+ public volatile int finishCalls;
public List<DeletionTime> partitionDeletions = new ArrayList<>();
public List<RangeTombstone> rangeTombstones = new ArrayList<>();
public List<Row> rowsInserted = new ArrayList<>();
@@ -105,6 +107,7 @@ public class StubIndex implements Index
{
public void begin()
{
+ beginCalls++;
}
public void partitionDelete(DeletionTime deletionTime)
@@ -134,6 +137,7 @@ public class StubIndex implements Index
public void finish()
{
+ finishCalls++;
}
};
}