You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2016/12/13 10:37:49 UTC

[13/19] cassandra git commit: Reduce granuality of OpOrder.Group during index build

Reduce granuality of OpOrder.Group during index build

3.0+ version

Patch by Sam Tunnicliffe; reviewed by Milan Majercik
and Jeremiah Jordan for CASSANDRA-12796


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/36ce4e02
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/36ce4e02
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/36ce4e02

Branch: refs/heads/trunk
Commit: 36ce4e02b429b1297d71c5c8a963623c62d9e159
Parents: 6434e88
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Tue Dec 13 10:02:25 2016 +0000
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Tue Dec 13 10:18:09 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 src/java/org/apache/cassandra/db/Keyspace.java  |  23 ----
 .../cassandra/index/SecondaryIndexBuilder.java  |   5 +-
 .../cassandra/index/SecondaryIndexManager.java  | 108 +++++++++++----
 .../apache/cassandra/index/CustomIndexTest.java | 130 ++++++++++++++++++-
 5 files changed, 216 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/36ce4e02/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5bc30be..a65a147 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@
  * Prevent reloading of logback.xml from UDF sandbox (CASSANDRA-12535)
  * Reenable HeapPool (CASSANDRA-12900)
 Merged from 2.2:
+ * Reduce granuality of OpOrder.Group during index build (CASSANDRA-12796)
  * Test bind parameters and unset parameters in InsertUpdateIfConditionTest (CASSANDRA-12980)
  * Do not specify local address on outgoing connection when listen_on_broadcast_address is set (CASSANDRA-12673)
  * Use saved tokens when setting local tokens on StorageService.joinRing (CASSANDRA-12935)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36ce4e02/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 75aab8f..ec5102b 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -35,7 +35,6 @@ import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.view.ViewManager;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.index.Index;
@@ -569,28 +568,6 @@ public class Keyspace
         return replicationStrategy;
     }
 
-    /**
-     * @param key row to index
-     * @param cfs ColumnFamily to index partition in
-     * @param indexes the indexes to submit the row to
-     */
-    public static void indexPartition(DecoratedKey key, ColumnFamilyStore cfs, Set<Index> indexes)
-    {
-        if (logger.isTraceEnabled())
-            logger.trace("Indexing partition {} ", cfs.metadata.getKeyValidator().getString(key.getKey()));
-
-        SinglePartitionReadCommand cmd = SinglePartitionReadCommand.fullPartitionRead(cfs.metadata,
-                                                                                      FBUtilities.nowInSeconds(),
-                                                                                      key);
-
-        try (OpOrder.Group writeGroup = cfs.keyspace.writeOrder.start();
-             OpOrder.Group readGroup = cfs.readOrdering.start();
-             UnfilteredRowIterator partition = cmd.queryMemtableAndDisk(cfs, readGroup))
-        {
-            cfs.indexManager.indexPartition(partition, writeGroup, indexes, cmd.nowInSec());
-        }
-    }
-
     public List<Future<?>> flush()
     {
         List<Future<?>> futures = new ArrayList<>(columnFamilyStores.size());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36ce4e02/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java b/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
index e66f0a3..c627b2d 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
@@ -17,13 +17,11 @@
  */
 package org.apache.cassandra.index;
 
-import java.io.IOException;
 import java.util.Set;
 import java.util.UUID;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.compaction.CompactionInfo;
 import org.apache.cassandra.db.compaction.CompactionInterruptedException;
 import org.apache.cassandra.db.compaction.OperationType;
@@ -61,12 +59,13 @@ public class SecondaryIndexBuilder extends CompactionInfo.Holder
     {
         try
         {
+            int pageSize = cfs.indexManager.calculateIndexingPageSize();
             while (iter.hasNext())
             {
                 if (isStopRequested())
                     throw new CompactionInterruptedException(getCompactionInfo());
                 DecoratedKey key = iter.next();
-                Keyspace.indexPartition(key, cfs, indexers);
+                cfs.indexManager.indexPartition(key, indexers, pageSize);
             }
         }
         finally

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36ce4e02/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index 6dfdeee..003b624 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -47,6 +47,7 @@ import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.db.partitions.PartitionIterators;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -56,7 +57,9 @@ import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.schema.Indexes;
+import org.apache.cassandra.service.pager.SinglePartitionPager;
 import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.transport.Server;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.concurrent.Refs;
@@ -98,6 +101,9 @@ public class SecondaryIndexManager implements IndexRegistry
 {
     private static final Logger logger = LoggerFactory.getLogger(SecondaryIndexManager.class);
 
+    // default page size (in rows) when rebuilding the index for a whole partition
+    public static final int DEFAULT_PAGE_SIZE = 10000;
+
     private Map<String, Index> indexes = Maps.newConcurrentMap();
 
     /**
@@ -517,39 +523,97 @@ public class SecondaryIndexManager implements IndexRegistry
     /**
      * When building an index against existing data in sstables, add the given partition to the index
      */
-    public void indexPartition(UnfilteredRowIterator partition, OpOrder.Group opGroup, Set<Index> indexes, int nowInSec)
+    public void indexPartition(DecoratedKey key, Set<Index> indexes, int pageSize)
     {
+        if (logger.isTraceEnabled())
+            logger.trace("Indexing partition {}", baseCfs.metadata.getKeyValidator().getString(key.getKey()));
+
         if (!indexes.isEmpty())
         {
-            DecoratedKey key = partition.partitionKey();
-            Set<Index.Indexer> indexers = indexes.stream()
-                                                 .map(index -> index.indexerFor(key,
-                                                                                partition.columns(),
-                                                                                nowInSec,
-                                                                                opGroup,
-                                                                                IndexTransaction.Type.UPDATE))
-                                                 .filter(Objects::nonNull)
-                                                 .collect(Collectors.toSet());
-
-            indexers.forEach(Index.Indexer::begin);
-
-            try (RowIterator filtered = UnfilteredRowIterators.filter(partition, nowInSec))
+            SinglePartitionReadCommand cmd = SinglePartitionReadCommand.fullPartitionRead(baseCfs.metadata,
+                                                                                          FBUtilities.nowInSeconds(),
+                                                                                          key);
+            int nowInSec = cmd.nowInSec();
+            boolean readStatic = false;
+
+            SinglePartitionPager pager = new SinglePartitionPager(cmd, null, Server.CURRENT_VERSION);
+            while (!pager.isExhausted())
             {
-                if (!filtered.staticRow().isEmpty())
-                    indexers.forEach(indexer -> indexer.insertRow(filtered.staticRow()));
-
-                while (filtered.hasNext())
+                try (ReadOrderGroup readGroup = cmd.startOrderGroup();
+                     OpOrder.Group writeGroup = Keyspace.writeOrder.start();
+                     RowIterator partition =
+                        PartitionIterators.getOnlyElement(pager.fetchPageInternal(pageSize,readGroup),
+                                                          cmd))
                 {
-                    Row row = filtered.next();
-                    indexers.forEach(indexer -> indexer.insertRow(row));
+                    Set<Index.Indexer> indexers = indexes.stream()
+                                                         .map(index -> index.indexerFor(key,
+                                                                                        partition.columns(),
+                                                                                        nowInSec,
+                                                                                        writeGroup,
+                                                                                        IndexTransaction.Type.UPDATE))
+                                                         .filter(Objects::nonNull)
+                                                         .collect(Collectors.toSet());
+
+                    indexers.forEach(Index.Indexer::begin);
+
+                    // only process the static row once per partition
+                    if (!readStatic && !partition.staticRow().isEmpty())
+                    {
+                        indexers.forEach(indexer -> indexer.insertRow(partition.staticRow()));
+                        readStatic = true;
+                    }
+
+                    while (partition.hasNext())
+                    {
+                        Row row = partition.next();
+                        indexers.forEach(indexer -> indexer.insertRow(row));
+                    }
+
+                    indexers.forEach(Index.Indexer::finish);
                 }
             }
-
-            indexers.forEach(Index.Indexer::finish);
         }
     }
 
     /**
+     * Return the page size used when indexing an entire partition
+     */
+    public int calculateIndexingPageSize()
+    {
+        if (Boolean.getBoolean("cassandra.force_default_indexing_page_size"))
+            return DEFAULT_PAGE_SIZE;
+
+        double targetPageSizeInBytes = 32 * 1024 * 1024;
+        double meanPartitionSize = baseCfs.getMeanPartitionSize();
+        if (meanPartitionSize <= 0)
+            return DEFAULT_PAGE_SIZE;
+
+        int meanCellsPerPartition = baseCfs.getMeanColumns();
+        if (meanCellsPerPartition <= 0)
+            return DEFAULT_PAGE_SIZE;
+
+        int columnsPerRow = baseCfs.metadata.partitionColumns().regulars.size();
+        if (meanCellsPerPartition <= 0)
+            return DEFAULT_PAGE_SIZE;
+
+        int meanRowsPerPartition = meanCellsPerPartition / columnsPerRow;
+        double meanRowSize = meanPartitionSize / meanRowsPerPartition;
+
+        int pageSize = (int) Math.max(1, Math.min(DEFAULT_PAGE_SIZE, targetPageSizeInBytes / meanRowSize));
+
+        logger.trace("Calculated page size {} for indexing {}.{} ({}/{}/{}/{})",
+                     pageSize,
+                     baseCfs.metadata.ksName,
+                     baseCfs.metadata.cfName,
+                     meanPartitionSize,
+                     meanCellsPerPartition,
+                     meanRowsPerPartition,
+                     meanRowSize);
+
+        return pageSize;
+    }
+
+    /**
      * Delete all data from all indexes for this partition.
      * For when cleanup rips a partition out entirely.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/36ce4e02/test/unit/org/apache/cassandra/index/CustomIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/CustomIndexTest.java b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
index 6930d13..33e7182 100644
--- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java
@@ -39,22 +39,25 @@ import org.apache.cassandra.cql3.restrictions.IndexRestrictions;
 import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
 import org.apache.cassandra.cql3.statements.IndexTarget;
 import org.apache.cassandra.cql3.statements.ModificationStatement;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.ReadCommand;
-import org.apache.cassandra.db.ReadOrderGroup;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.index.transactions.IndexTransaction;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.schema.Indexes;
 import org.apache.cassandra.transport.Server;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.OpOrder;
 
 import static org.apache.cassandra.Util.throwAssert;
 import static org.apache.cassandra.cql3.statements.IndexTarget.CUSTOM_INDEX_OPTION_NAME;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -647,6 +650,42 @@ public class CustomIndexTest extends CQLTester
         assertEquals(1, getCurrentColumnFamilyStore().getDirectories().getDirectoryForNewSSTables().listFiles().length);
     }
 
+    @Test
+    public void indexBuildingPagesLargePartitions() throws Throwable
+    {
+        createTable("CREATE TABLE %s(k int, c int, v int, PRIMARY KEY(k,c))");
+        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
+        SecondaryIndexManager indexManager = cfs.indexManager;
+        int totalRows = SimulateConcurrentFlushingIndex.ROWS_IN_PARTITION;
+        // Insert a single wide partition to be indexed
+        for (int i = 0; i < totalRows; i++)
+            execute("INSERT INTO %s (k, c, v) VALUES (0, ?, ?)", i, i);
+        cfs.forceBlockingFlush();
+
+        // Create the index, which won't automatically start building
+        String indexName = "build_single_partition_idx";
+        createIndex(String.format("CREATE CUSTOM INDEX %s ON %%s(v) USING '%s'",
+                                  indexName, SimulateConcurrentFlushingIndex.class.getName()));
+        SimulateConcurrentFlushingIndex index = (SimulateConcurrentFlushingIndex) indexManager.getIndexByName(indexName);
+
+        // Index the partition with an Indexer which artificially simulates additional concurrent
+        // flush activity by periodically issuing barriers on the read & write op groupings
+        DecoratedKey targetKey = getCurrentColumnFamilyStore().decorateKey(ByteBufferUtil.bytes(0));
+        indexManager.indexPartition(targetKey, Collections.singleton(index), totalRows / 10);
+
+        // When indexing is done check that:
+        // * The base table's read ordering at finish was > the one at the start (i.e. that
+        //   we didn't hold a single read OpOrder.Group for the whole operation.
+        // * That multiple write OpOrder.Groups were used to perform the writes to the index
+        // * That all operations are complete, that none of the relevant OpOrder.Groups are
+        //   marked as blocking progress and that all the barriers' ops are considered done.
+        assertTrue(index.readOrderingAtFinish.compareTo(index.readOrderingAtStart) > 0);
+        assertTrue(index.writeGroups.size() > 1);
+        assertFalse(index.readOrderingAtFinish.isBlocking());
+        index.writeGroups.forEach(group -> assertFalse(group.isBlocking()));
+        index.barriers.forEach(OpOrder.Barrier::allPriorOpsAreFinished);
+    }
+
     // Used for index creation above
     public static class BrokenCustom2I extends StubIndex
     {
@@ -868,4 +907,89 @@ public class CustomIndexTest extends CQLTester
             return new HashMap<>();
         }
     }
+
+    public static final class SimulateConcurrentFlushingIndex extends StubIndex
+    {
+        ColumnFamilyStore baseCfs;
+        AtomicInteger indexedRowCount = new AtomicInteger(0);
+
+        OpOrder.Group readOrderingAtStart = null;
+        OpOrder.Group readOrderingAtFinish = null;
+        Set<OpOrder.Group> writeGroups = new HashSet<>();
+        List<OpOrder.Barrier> barriers = new ArrayList<>();
+
+        static final int ROWS_IN_PARTITION = 1000;
+
+        public SimulateConcurrentFlushingIndex(ColumnFamilyStore baseCfs, IndexMetadata metadata)
+        {
+            super(baseCfs, metadata);
+            this.baseCfs = baseCfs;
+        }
+
+        // When indexing an entire partition 2 potential problems can be caused by
+        // whilst holding a single read & a single write OpOrder.Group.
+        // * By holding a write group too long, flushes are blocked
+        // * Holding a read group for too long prevents the memory from flushed memtables
+        //   from being reclaimed.
+        // See CASSANDRA-12796 for details.
+        // To test that the index builder pages through a large partition, using
+        // finer grained OpOrder.Groups we write a "large" partition to disk, then
+        // kick off an index build on it, using this indexer.
+        // To simulate concurrent flush activity, we periodically issue barriers on
+        // the current read and write groups.
+        // When we're done indexing the partition, the test checks the states of the
+        // various OpOrder.Groups, which it can obtain from this index.
+
+        public Indexer indexerFor(final DecoratedKey key,
+                                  PartitionColumns columns,
+                                  int nowInSec,
+                                  OpOrder.Group opGroup,
+                                  IndexTransaction.Type transactionType)
+        {
+            if (readOrderingAtStart == null)
+                readOrderingAtStart = baseCfs.readOrdering.getCurrent();
+
+            writeGroups.add(opGroup);
+
+            return new Indexer()
+            {
+                public void begin()
+                {
+                    // to simulate other activity on base table during indexing, issue
+                    // barriers on the read and write orderings. This is analogous to
+                    // what happens when other flushes are being processed during the
+                    // indexing of a partition
+                    OpOrder.Barrier readBarrier = baseCfs.readOrdering.newBarrier();
+                    readBarrier.issue();
+                    barriers.add(readBarrier);
+                    OpOrder.Barrier writeBarrier = Keyspace.writeOrder.newBarrier();
+                    writeBarrier.issue();
+                    barriers.add(writeBarrier);
+                }
+
+                public void insertRow(Row row)
+                {
+                    indexedRowCount.incrementAndGet();
+                }
+
+                public void finish()
+                {
+                    // we've indexed all rows in the target partition,
+                    // grab the read OpOrder.Group for the base CFS so
+                    // we can compare it with the starting group
+                    if (indexedRowCount.get() < ROWS_IN_PARTITION)
+                        readOrderingAtFinish = baseCfs.readOrdering.getCurrent();
+                }
+
+                public void partitionDelete(DeletionTime deletionTime) { }
+
+                public void rangeTombstone(RangeTombstone tombstone) { }
+
+                public void updateRow(Row oldRowData, Row newRowData) { }
+
+                public void removeRow(Row row) { }
+
+            };
+        }
+    }
 }