You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2015/11/23 19:58:37 UTC
cassandra git commit: Make index building pluggable
Repository: cassandra
Updated Branches:
refs/heads/trunk b1f73d4b0 -> 440366edd
Make index building pluggable
Patch and review by Pavel Yaskevich and Sam Tunnicliffe for
CASSANDRA-10681
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/440366ed
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/440366ed
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/440366ed
Branch: refs/heads/trunk
Commit: 440366edd0ef0e1c6c1af69230dabc996d967626
Parents: b1f73d4
Author: Pavel Yaskevich <xe...@apache.org>
Authored: Thu Nov 19 15:56:42 2015 -0800
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Mon Nov 23 18:47:55 2015 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/index/Index.java | 51 +++++++++++++
.../cassandra/index/SecondaryIndexBuilder.java | 53 +------------
.../cassandra/index/SecondaryIndexManager.java | 20 +++--
.../index/internal/CassandraIndex.java | 6 +-
.../internal/CollatedViewIndexBuilder.java | 78 ++++++++++++++++++++
.../index/internal/CustomCassandraIndex.java | 6 +-
7 files changed, 152 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/440366ed/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a6646f9..13b2d05 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.2
+ * Make index building pluggable (CASSANDRA-10681)
* Add sstable flush observer (CASSANDRA-10678)
* Improve NTS endpoints calculation (CASSANDRA-10200)
* Improve performance of the folderSize function (CASSANDRA-10677)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/440366ed/src/java/org/apache/cassandra/index/Index.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/Index.java b/src/java/org/apache/cassandra/index/Index.java
index 7bca924..64d621f 100644
--- a/src/java/org/apache/cassandra/index/Index.java
+++ b/src/java/org/apache/cassandra/index/Index.java
@@ -1,6 +1,8 @@
package org.apache.cassandra.index;
+import java.util.Collection;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.function.BiFunction;
@@ -15,9 +17,12 @@ import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.index.internal.CollatedViewIndexBuilder;
import org.apache.cassandra.index.transactions.IndexTransaction;
import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.ReducingKeyIterator;
import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -108,10 +113,56 @@ public interface Index
{
/*
+ * Helpers for building indexes from SSTable data
+ */
+
+ /**
+ * Provider of {@code SecondaryIndexBuilder} instances. See {@code getBuildTaskSupport} and
+ * {@code SecondaryIndexManager.buildIndexesBlocking} for more detail.
+ */
+ interface IndexBuildingSupport
+ {
+ SecondaryIndexBuilder getIndexBuildTask(ColumnFamilyStore cfs, Set<Index> indexes, Collection<SSTableReader> sstables);
+ }
+
+ /**
+ * Default implementation of {@code IndexBuildingSupport} which uses a {@code ReducingKeyIterator} to obtain a
+ * collated view of the data in the SSTables.
+ */
+ public static class CollatedViewIndexBuildingSupport implements IndexBuildingSupport
+ {
+ public SecondaryIndexBuilder getIndexBuildTask(ColumnFamilyStore cfs, Set<Index> indexes, Collection<SSTableReader> sstables)
+ {
+ return new CollatedViewIndexBuilder(cfs, indexes, new ReducingKeyIterator(sstables));
+ }
+ }
+
+ /**
+ * Singleton instance of {@code CollatedViewIndexBuildingSupport}, which may be used by any {@code Index}
+ * implementation.
+ */
+ public static final CollatedViewIndexBuildingSupport INDEX_BUILDER_SUPPORT = new CollatedViewIndexBuildingSupport();
+
+ /*
* Management functions
*/
/**
+ * Get an instance of a helper to provide tasks for building the index from a set of SSTable data.
+ * When processing a number of indexes to be rebuilt, {@code SecondaryIndexManager.buildIndexesBlocking} groups
+ * those with the same {@code IndexBuildingSupport} instance, allowing multiple indexes to be built with a
+ * single pass through the data. The singleton instance returned from the default method implementation builds
+ * indexes using a {@code ReducingKeyIterator} to provide a collated view of the SSTable data.
+ *
+ * @return an instance of the index build taski helper. Index implementations which return <b>the same instance</b>
+ * will be built using a single task.
+ */
+ default IndexBuildingSupport getBuildTaskSupport()
+ {
+ return INDEX_BUILDER_SUPPORT;
+ }
+
+ /**
* Return a task to perform any initialization work when a new index instance is created.
* This may involve costly operations such as (re)building the index, and is performed asynchronously
* by SecondaryIndexManager
http://git-wip-us.apache.org/repos/asf/cassandra/blob/440366ed/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java b/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
index e66f0a3..9ec8a4e 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
@@ -17,61 +17,12 @@
*/
package org.apache.cassandra.index;
-import java.io.IOException;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.compaction.CompactionInfo;
-import org.apache.cassandra.db.compaction.CompactionInterruptedException;
-import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.io.sstable.ReducingKeyIterator;
-import org.apache.cassandra.utils.UUIDGen;
/**
* Manages building an entire index from column family data. Runs on to compaction manager.
*/
-public class SecondaryIndexBuilder extends CompactionInfo.Holder
+public abstract class SecondaryIndexBuilder extends CompactionInfo.Holder
{
- private final ColumnFamilyStore cfs;
- private final Set<Index> indexers;
- private final ReducingKeyIterator iter;
- private final UUID compactionId;
-
- public SecondaryIndexBuilder(ColumnFamilyStore cfs, Set<Index> indexers, ReducingKeyIterator iter)
- {
- this.cfs = cfs;
- this.indexers = indexers;
- this.iter = iter;
- this.compactionId = UUIDGen.getTimeUUID();
- }
-
- public CompactionInfo getCompactionInfo()
- {
- return new CompactionInfo(cfs.metadata,
- OperationType.INDEX_BUILD,
- iter.getBytesRead(),
- iter.getTotalBytes(),
- compactionId);
- }
-
- public void build()
- {
- try
- {
- while (iter.hasNext())
- {
- if (isStopRequested())
- throw new CompactionInterruptedException(getCompactionInfo());
- DecoratedKey key = iter.next();
- Keyspace.indexPartition(key, cfs, indexers);
- }
- }
- finally
- {
- iter.close();
- }
- }
+ public abstract void build();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/440366ed/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index df8e38d..3eb72d3 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -51,7 +51,6 @@ import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.index.internal.CassandraIndex;
import org.apache.cassandra.index.transactions.*;
-import org.apache.cassandra.io.sstable.ReducingKeyIterator;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.schema.Indexes;
@@ -338,11 +337,20 @@ public class SecondaryIndexManager implements IndexRegistry
indexes.stream().map(i -> i.getIndexMetadata().name).collect(Collectors.joining(",")),
sstables.stream().map(SSTableReader::toString).collect(Collectors.joining(",")));
- SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
- indexes,
- new ReducingKeyIterator(sstables));
- Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
- FBUtilities.waitOnFuture(future);
+ Map<Index.IndexBuildingSupport, Set<Index>> byType = new HashMap<>();
+ for (Index index : indexes)
+ {
+ Set<Index> stored = byType.computeIfAbsent(index.getBuildTaskSupport(), i -> new HashSet<>());
+ stored.add(index);
+ }
+
+ List<Future<?>> futures = byType.entrySet()
+ .stream()
+ .map((e) -> e.getKey().getIndexBuildTask(baseCfs, e.getValue(), sstables))
+ .map(CompactionManager.instance::submitIndexBuild)
+ .collect(Collectors.toList());
+
+ FBUtilities.waitOnFutures(futures);
flushIndexesBlocking(indexes);
logger.info("Index build of {} complete",
http://git-wip-us.apache.org/repos/asf/cassandra/blob/440366ed/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
index 674cd20..3128152 100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@ -664,9 +664,9 @@ public abstract class CassandraIndex implements Index
metadata.name,
getSSTableNames(sstables));
- SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
- Collections.singleton(this),
- new ReducingKeyIterator(sstables));
+ SecondaryIndexBuilder builder = new CollatedViewIndexBuilder(baseCfs,
+ Collections.singleton(this),
+ new ReducingKeyIterator(sstables));
Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
FBUtilities.waitOnFuture(future);
indexCfs.forceBlockingFlush();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/440366ed/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java b/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java
new file mode 100644
index 0000000..8ea7a68
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.internal;
+
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionInfo;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.index.SecondaryIndexBuilder;
+import org.apache.cassandra.io.sstable.ReducingKeyIterator;
+import org.apache.cassandra.utils.UUIDGen;
+
+/**
+ * Manages building an entire index from column family data. Runs on to compaction manager.
+ */
+public class CollatedViewIndexBuilder extends SecondaryIndexBuilder
+{
+ private final ColumnFamilyStore cfs;
+ private final Set<Index> indexers;
+ private final ReducingKeyIterator iter;
+ private final UUID compactionId;
+
+ public CollatedViewIndexBuilder(ColumnFamilyStore cfs, Set<Index> indexers, ReducingKeyIterator iter)
+ {
+ this.cfs = cfs;
+ this.indexers = indexers;
+ this.iter = iter;
+ this.compactionId = UUIDGen.getTimeUUID();
+ }
+
+ public CompactionInfo getCompactionInfo()
+ {
+ return new CompactionInfo(cfs.metadata,
+ OperationType.INDEX_BUILD,
+ iter.getBytesRead(),
+ iter.getTotalBytes(),
+ compactionId);
+ }
+
+ public void build()
+ {
+ try
+ {
+ while (iter.hasNext())
+ {
+ if (isStopRequested())
+ throw new CompactionInterruptedException(getCompactionInfo());
+ DecoratedKey key = iter.next();
+ Keyspace.indexPartition(key, cfs, indexers);
+ }
+ }
+ finally
+ {
+ iter.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/440366ed/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
index 0957f74..2b17849 100644
--- a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
+++ b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
@@ -622,9 +622,9 @@ public class CustomCassandraIndex implements Index
metadata.name,
getSSTableNames(sstables));
- SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
- Collections.singleton(this),
- new ReducingKeyIterator(sstables));
+ SecondaryIndexBuilder builder = new CollatedViewIndexBuilder(baseCfs,
+ Collections.singleton(this),
+ new ReducingKeyIterator(sstables));
Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
FBUtilities.waitOnFuture(future);
indexCfs.forceBlockingFlush();