You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2016/12/13 10:37:49 UTC
[13/19] cassandra git commit: Reduce granuality of OpOrder.Group
during index build
Reduce granuality of OpOrder.Group during index build
3.0+ version
Patch by Sam Tunnicliffe; reviewed by Milan Majercik
and Jeremiah Jordan for CASSANDRA-12796
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/36ce4e02
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/36ce4e02
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/36ce4e02
Branch: refs/heads/trunk
Commit: 36ce4e02b429b1297d71c5c8a963623c62d9e159
Parents: 6434e88
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Tue Dec 13 10:02:25 2016 +0000
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Tue Dec 13 10:18:09 2016 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/Keyspace.java | 23 ----
.../cassandra/index/SecondaryIndexBuilder.java | 5 +-
.../cassandra/index/SecondaryIndexManager.java | 108 +++++++++++----
.../apache/cassandra/index/CustomIndexTest.java | 130 ++++++++++++++++++-
5 files changed, 216 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36ce4e02/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5bc30be..a65a147 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@
* Prevent reloading of logback.xml from UDF sandbox (CASSANDRA-12535)
* Reenable HeapPool (CASSANDRA-12900)
Merged from 2.2:
+ * Reduce granuality of OpOrder.Group during index build (CASSANDRA-12796)
* Test bind parameters and unset parameters in InsertUpdateIfConditionTest (CASSANDRA-12980)
* Do not specify local address on outgoing connection when listen_on_broadcast_address is set (CASSANDRA-12673)
* Use saved tokens when setting local tokens on StorageService.joinRing (CASSANDRA-12935)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36ce4e02/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 75aab8f..ec5102b 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -35,7 +35,6 @@ import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.view.ViewManager;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.index.Index;
@@ -569,28 +568,6 @@ public class Keyspace
return replicationStrategy;
}
- /**
- * @param key row to index
- * @param cfs ColumnFamily to index partition in
- * @param indexes the indexes to submit the row to
- */
- public static void indexPartition(DecoratedKey key, ColumnFamilyStore cfs, Set<Index> indexes)
- {
- if (logger.isTraceEnabled())
- logger.trace("Indexing partition {} ", cfs.metadata.getKeyValidator().getString(key.getKey()));
-
- SinglePartitionReadCommand cmd = SinglePartitionReadCommand.fullPartitionRead(cfs.metadata,
- FBUtilities.nowInSeconds(),
- key);
-
- try (OpOrder.Group writeGroup = cfs.keyspace.writeOrder.start();
- OpOrder.Group readGroup = cfs.readOrdering.start();
- UnfilteredRowIterator partition = cmd.queryMemtableAndDisk(cfs, readGroup))
- {
- cfs.indexManager.indexPartition(partition, writeGroup, indexes, cmd.nowInSec());
- }
- }
-
public List<Future<?>> flush()
{
List<Future<?>> futures = new ArrayList<>(columnFamilyStores.size());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36ce4e02/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java b/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
index e66f0a3..c627b2d 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
@@ -17,13 +17,11 @@
*/
package org.apache.cassandra.index;
-import java.io.IOException;
import java.util.Set;
import java.util.UUID;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.compaction.CompactionInfo;
import org.apache.cassandra.db.compaction.CompactionInterruptedException;
import org.apache.cassandra.db.compaction.OperationType;
@@ -61,12 +59,13 @@ public class SecondaryIndexBuilder extends CompactionInfo.Holder
{
try
{
+ int pageSize = cfs.indexManager.calculateIndexingPageSize();
while (iter.hasNext())
{
if (isStopRequested())
throw new CompactionInterruptedException(getCompactionInfo());
DecoratedKey key = iter.next();
- Keyspace.indexPartition(key, cfs, indexers);
+ cfs.indexManager.indexPartition(key, indexers, pageSize);
}
}
finally
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36ce4e02/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index 6dfdeee..003b624 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -47,6 +47,7 @@ import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.db.partitions.PartitionIterators;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -56,7 +57,9 @@ import org.apache.cassandra.io.sstable.ReducingKeyIterator;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.schema.Indexes;
+import org.apache.cassandra.service.pager.SinglePartitionPager;
import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.transport.Server;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.concurrent.Refs;
@@ -98,6 +101,9 @@ public class SecondaryIndexManager implements IndexRegistry
{
private static final Logger logger = LoggerFactory.getLogger(SecondaryIndexManager.class);
+ // default page size (in rows) when rebuilding the index for a whole partition
+ public static final int DEFAULT_PAGE_SIZE = 10000;
+
private Map<String, Index> indexes = Maps.newConcurrentMap();
/**
@@ -517,39 +523,97 @@ public class SecondaryIndexManager implements IndexRegistry
/**
* When building an index against existing data in sstables, add the given partition to the index
*/
- public void indexPartition(UnfilteredRowIterator partition, OpOrder.Group opGroup, Set<Index> indexes, int nowInSec)
+ public void indexPartition(DecoratedKey key, Set<Index> indexes, int pageSize)
{
+ if (logger.isTraceEnabled())
+ logger.trace("Indexing partition {}", baseCfs.metadata.getKeyValidator().getString(key.getKey()));
+
if (!indexes.isEmpty())
{
- DecoratedKey key = partition.partitionKey();
- Set<Index.Indexer> indexers = indexes.stream()
- .map(index -> index.indexerFor(key,
- partition.columns(),
- nowInSec,
- opGroup,
- IndexTransaction.Type.UPDATE))
- .filter(Objects::nonNull)
- .collect(Collectors.toSet());
-
- indexers.forEach(Index.Indexer::begin);
-
- try (RowIterator filtered = UnfilteredRowIterators.filter(partition, nowInSec))
+ SinglePartitionReadCommand cmd = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata,
+ FBUtilities.nowInSeconds(),
+ key);
+ int nowInSec = cmd.nowInSec();
+ boolean readStatic = false;
+
+ SinglePartitionPager pager = new SinglePartitionPager(cmd, null, Server.CURRENT_VERSION);
+ while (!pager.isExhausted())
{
- if (!filtered.staticRow().isEmpty())
- indexers.forEach(indexer -> indexer.insertRow(filtered.staticRow()));
-
- while (filtered.hasNext())
+ try (ReadOrderGroup readGroup = cmd.startOrderGroup();
+ OpOrder.Group writeGroup = Keyspace.writeOrder.start();
+ RowIterator partition =
+ PartitionIterators.getOnlyElement(pager.fetchPageInternal(pageSize,readGroup),
+ cmd))
{
- Row row = filtered.next();
- indexers.forEach(indexer -> indexer.insertRow(row));
+ Set<Index.Indexer> indexers = indexes.stream()
+ .map(index -> index.indexerFor(key,
+ partition.columns(),
+ nowInSec,
+ writeGroup,
+ IndexTransaction.Type.UPDATE))
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+
+ indexers.forEach(Index.Indexer::begin);
+
+ // only process the static row once per partition
+ if (!readStatic && !partition.staticRow().isEmpty())
+ {
+ indexers.forEach(indexer -> indexer.insertRow(partition.staticRow()));
+ readStatic = true;
+ }
+
+ while (partition.hasNext())
+ {
+ Row row = partition.next();
+ indexers.forEach(indexer -> indexer.insertRow(row));
+ }
+
+ indexers.forEach(Index.Indexer::finish);
}
}
-
- indexers.forEach(Index.Indexer::finish);
}
}
/**
+ * Return the page size used when indexing an entire partition
+ */
+ public int calculateIndexingPageSize()
+ {
+ if (Boolean.getBoolean("cassandra.force_default_indexing_page_size"))
+ return DEFAULT_PAGE_SIZE;
+
+ double targetPageSizeInBytes = 32 * 1024 * 1024;
+ double meanPartitionSize = baseCfs.getMeanPartitionSize();
+ if (meanPartitionSize <= 0)
+ return DEFAULT_PAGE_SIZE;
+
+ int meanCellsPerPartition = baseCfs.getMeanColumns();
+ if (meanCellsPerPartition <= 0)
+ return DEFAULT_PAGE_SIZE;
+
+ int columnsPerRow = baseCfs.metadata.partitionColumns().regulars.size();
+ if (meanCellsPerPartition <= 0)
+ return DEFAULT_PAGE_SIZE;
+
+ int meanRowsPerPartition = meanCellsPerPartition / columnsPerRow;
+ double meanRowSize = meanPartitionSize / meanRowsPerPartition;
+
+ int pageSize = (int) Math.max(1, Math.min(DEFAULT_PAGE_SIZE, targetPageSizeInBytes / meanRowSize));
+
+ logger.trace("Calculated page size {} for indexing {}.{} ({}/{}/{}/{})",
+ pageSize,
+ baseCfs.metadata.ksName,
+ baseCfs.metadata.cfName,
+ meanPartitionSize,
+ meanCellsPerPartition,
+ meanRowsPerPartition,
+ meanRowSize);
+
+ return pageSize;
+ }
+
+ /**
* Delete all data from all indexes for this partition.
* For when cleanup rips a partition out entirely.
*
http://git-wip-us.apache.org/repos/asf/cassandra/blob/36ce4e02/test/unit/org/apache/cassandra/index/CustomIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/CustomIndexTest.java b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
index 6930d13..33e7182 100644
--- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
@@ -39,22 +39,25 @@ import org.apache.cassandra.cql3.restrictions.IndexRestrictions;
import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
import org.apache.cassandra.cql3.statements.IndexTarget;
import org.apache.cassandra.cql3.statements.ModificationStatement;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.ReadCommand;
-import org.apache.cassandra.db.ReadOrderGroup;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.index.transactions.IndexTransaction;
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.schema.Indexes;
import org.apache.cassandra.transport.Server;
+import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.OpOrder;
import static org.apache.cassandra.Util.throwAssert;
import static org.apache.cassandra.cql3.statements.IndexTarget.CUSTOM_INDEX_OPTION_NAME;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -647,6 +650,42 @@ public class CustomIndexTest extends CQLTester
assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
}
+ @Test
+ public void indexBuildingPagesLargePartitions() throws Throwable
+ {
+ createTable("CREATE TABLE %s(k int, c int, v int, PRIMARY KEY(k,c))");
+ ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+ SecondaryIndexManager indexManager = cfs.indexManager;
+ int totalRows = SimulateConcurrentFlushingIndex.ROWS_IN_PARTITION;
+ // Insert a single wide partition to be indexed
+ for (int i = 0; i < totalRows; i++)
+ execute("INSERT INTO %s (k, c, v) VALUES (0, ?, ?)", i, i);
+ cfs.forceBlockingFlush();
+
+ // Create the index, which won't automatically start building
+ String indexName = "build_single_partition_idx";
+ createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(v) USING '%s'",
+ indexName, SimulateConcurrentFlushingIndex.class.getName()));
+ SimulateConcurrentFlushingIndex index = (SimulateConcurrentFlushingIndex) indexManager.getIndexByName(indexName);
+
+ // Index the partition with an Indexer which artificially simulates additional concurrent
+ // flush activity by periodically issuing barriers on the read & write op groupings
+ DecoratedKey targetKey = getCurrentColumnFamilyStore().decorateKey(ByteBufferUtil.bytes(0));
+ indexManager.indexPartition(targetKey, Collections.singleton(index), totalRows / 10);
+
+ // When indexing is done check that:
+ // * The base table's read ordering at finish was > the one at the start (i.e. that
+ // we didn't hold a single read OpOrder.Group for the whole operation.
+ // * That multiple write OpOrder.Groups were used to perform the writes to the index
+ // * That all operations are complete, that none of the relevant OpOrder.Groups are
+ // marked as blocking progress and that all the barriers' ops are considered done.
+ assertTrue(index.readOrderingAtFinish.compareTo(index.readOrderingAtStart) > 0);
+ assertTrue(index.writeGroups.size() > 1);
+ assertFalse(index.readOrderingAtFinish.isBlocking());
+ index.writeGroups.forEach(group -> assertFalse(group.isBlocking()));
+ index.barriers.forEach(OpOrder.Barrier::allPriorOpsAreFinished);
+ }
+
// Used for index creation above
public static class BrokenCustom2I extends StubIndex
{
@@ -868,4 +907,89 @@ public class CustomIndexTest extends CQLTester
return new HashMap<>();
}
}
+
+ public static final class SimulateConcurrentFlushingIndex extends StubIndex
+ {
+ ColumnFamilyStore baseCfs;
+ AtomicInteger indexedRowCount = new AtomicInteger(0);
+
+ OpOrder.Group readOrderingAtStart = null;
+ OpOrder.Group readOrderingAtFinish = null;
+ Set<OpOrder.Group> writeGroups = new HashSet<>();
+ List<OpOrder.Barrier> barriers = new ArrayList<>();
+
+ static final int ROWS_IN_PARTITION = 1000;
+
+ public SimulateConcurrentFlushingIndex(ColumnFamilyStore baseCfs, IndexMetadata metadata)
+ {
+ super(baseCfs, metadata);
+ this.baseCfs = baseCfs;
+ }
+
+ // When indexing an entire partition 2 potential problems can be caused by
+ // whilst holding a single read & a single write OpOrder.Group.
+ // * By holding a write group too long, flushes are blocked
+ // * Holding a read group for too long prevents the memory from flushed memtables
+ // from being reclaimed.
+ // See CASSANDRA-12796 for details.
+ // To test that the index builder pages through a large partition, using
+ // finer grained OpOrder.Groups we write a "large" partition to disk, then
+ // kick off an index build on it, using this indexer.
+ // To simulate concurrent flush activity, we periodically issue barriers on
+ // the current read and write groups.
+ // When we're done indexing the partition, the test checks the states of the
+ // various OpOrder.Groups, which it can obtain from this index.
+
+ public Indexer indexerFor(final DecoratedKey key,
+ PartitionColumns columns,
+ int nowInSec,
+ OpOrder.Group opGroup,
+ IndexTransaction.Type transactionType)
+ {
+ if (readOrderingAtStart == null)
+ readOrderingAtStart = baseCfs.readOrdering.getCurrent();
+
+ writeGroups.add(opGroup);
+
+ return new Indexer()
+ {
+ public void begin()
+ {
+ // to simulate other activity on base table during indexing, issue
+ // barriers on the read and write orderings. This is analogous to
+ // what happens when other flushes are being processed during the
+ // indexing of a partition
+ OpOrder.Barrier readBarrier = baseCfs.readOrdering.newBarrier();
+ readBarrier.issue();
+ barriers.add(readBarrier);
+ OpOrder.Barrier writeBarrier = Keyspace.writeOrder.newBarrier();
+ writeBarrier.issue();
+ barriers.add(writeBarrier);
+ }
+
+ public void insertRow(Row row)
+ {
+ indexedRowCount.incrementAndGet();
+ }
+
+ public void finish()
+ {
+ // we've indexed all rows in the target partition,
+ // grab the read OpOrder.Group for the base CFS so
+ // we can compare it with the starting group
+ if (indexedRowCount.get() < ROWS_IN_PARTITION)
+ readOrderingAtFinish = baseCfs.readOrdering.getCurrent();
+ }
+
+ public void partitionDelete(DeletionTime deletionTime) { }
+
+ public void rangeTombstone(RangeTombstone tombstone) { }
+
+ public void updateRow(Row oldRowData, Row newRowData) { }
+
+ public void removeRow(Row row) { }
+
+ };
+ }
+ }
}