You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2015/11/23 19:58:37 UTC

cassandra git commit: Make index building pluggable

Repository: cassandra
Updated Branches:
  refs/heads/trunk b1f73d4b0 -> 440366edd


Make index building pluggable

Patch and review by Pavel Yaskevich and Sam Tunnicliffe for
CASSANDRA-10681


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/440366ed
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/440366ed
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/440366ed

Branch: refs/heads/trunk
Commit: 440366edd0ef0e1c6c1af69230dabc996d967626
Parents: b1f73d4
Author: Pavel Yaskevich <xe...@apache.org>
Authored: Thu Nov 19 15:56:42 2015 -0800
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Mon Nov 23 18:47:55 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 src/java/org/apache/cassandra/index/Index.java  | 51 +++++++++++++
 .../cassandra/index/SecondaryIndexBuilder.java  | 53 +------------
 .../cassandra/index/SecondaryIndexManager.java  | 20 +++--
 .../index/internal/CassandraIndex.java          |  6 +-
 .../internal/CollatedViewIndexBuilder.java      | 78 ++++++++++++++++++++
 .../index/internal/CustomCassandraIndex.java    |  6 +-
 7 files changed, 152 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/440366ed/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a6646f9..13b2d05 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.2
+ * Make index building pluggable (CASSANDRA-10681)
  * Add sstable flush observer (CASSANDRA-10678)
  * Improve NTS endpoints calculation (CASSANDRA-10200)
  * Improve performance of the folderSize function (CASSANDRA-10677)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/440366ed/src/java/org/apache/cassandra/index/Index.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/Index.java b/src/java/org/apache/cassandra/index/Index.java
index 7bca924..64d621f 100644
--- a/src/java/org/apache/cassandra/index/Index.java
+++ b/src/java/org/apache/cassandra/index/Index.java
@@ -1,6 +1,8 @@
 package org.apache.cassandra.index;
 
+import java.util.Collection;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.function.BiFunction;
 
@@ -15,9 +17,12 @@ import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.index.internal.CollatedViewIndexBuilder;
 import org.apache.cassandra.index.transactions.IndexTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
@@ -108,10 +113,56 @@ public interface Index
 {
 
     /*
+     * Helpers for building indexes from SSTable data
+     */
+
+    /**
+     * Provider of {@code SecondaryIndexBuilder} instances. See {@code getBuildTaskSupport} and
+     * {@code SecondaryIndexManager.buildIndexesBlocking} for more detail.
+     */
+    interface IndexBuildingSupport
+    {
+        SecondaryIndexBuilder getIndexBuildTask(ColumnFamilyStore cfs, Set<Index> indexes, Collection<SSTableReader> sstables);
+    }
+
+    /**
+     * Default implementation of {@code IndexBuildingSupport} which uses a {@code ReducingKeyIterator} to obtain a
+     * collated view of the data in the SSTables.
+     */
+    public static class CollatedViewIndexBuildingSupport implements IndexBuildingSupport
+    {
+        public SecondaryIndexBuilder getIndexBuildTask(ColumnFamilyStore cfs, Set<Index> indexes, Collection<SSTableReader> sstables)
+        {
+            return new CollatedViewIndexBuilder(cfs, indexes, new ReducingKeyIterator(sstables));
+        }
+    }
+
+    /**
+     * Singleton instance of {@code CollatedViewIndexBuildingSupport}, which may be used by any {@code Index}
+     * implementation.
+     */
+    public static final CollatedViewIndexBuildingSupport INDEX_BUILDER_SUPPORT = new CollatedViewIndexBuildingSupport();
+
+    /*
      * Management functions
      */
 
     /**
+     * Get an instance of a helper to provide tasks for building the index from a set of SSTable data.
+     * When processing a number of indexes to be rebuilt, {@code SecondaryIndexManager.buildIndexesBlocking} groups
+     * those with the same {@code IndexBuildingSupport} instance, allowing multiple indexes to be built with a
+     * single pass through the data. The singleton instance returned from the default method implementation builds
+     * indexes using a {@code ReducingKeyIterator} to provide a collated view of the SSTable data.
+     *
+     * @return an instance of the index build taski helper. Index implementations which return <b>the same instance</b>
+     * will be built using a single task.
+     */
+    default IndexBuildingSupport getBuildTaskSupport()
+    {
+        return INDEX_BUILDER_SUPPORT;
+    }
+
+    /**
      * Return a task to perform any initialization work when a new index instance is created.
      * This may involve costly operations such as (re)building the index, and is performed asynchronously
      * by SecondaryIndexManager

http://git-wip-us.apache.org/repos/asf/cassandra/blob/440366ed/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java b/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
index e66f0a3..9ec8a4e 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
@@ -17,61 +17,12 @@
  */
 package org.apache.cassandra.index;
 
-import java.io.IOException;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.compaction.CompactionInfo;
-import org.apache.cassandra.db.compaction.CompactionInterruptedException;
-import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.io.sstable.ReducingKeyIterator;
-import org.apache.cassandra.utils.UUIDGen;
 
 /**
  * Manages building an entire index from column family data. Runs on to compaction manager.
  */
-public class SecondaryIndexBuilder extends CompactionInfo.Holder
+public abstract class SecondaryIndexBuilder extends CompactionInfo.Holder
 {
-    private final ColumnFamilyStore cfs;
-    private final Set<Index> indexers;
-    private final ReducingKeyIterator iter;
-    private final UUID compactionId;
-
-    public SecondaryIndexBuilder(ColumnFamilyStore cfs, Set<Index> indexers, ReducingKeyIterator iter)
-    {
-        this.cfs = cfs;
-        this.indexers = indexers;
-        this.iter = iter;
-        this.compactionId = UUIDGen.getTimeUUID();
-    }
-
-    public CompactionInfo getCompactionInfo()
-    {
-        return new CompactionInfo(cfs.metadata,
-                                  OperationType.INDEX_BUILD,
-                                  iter.getBytesRead(),
-                                  iter.getTotalBytes(),
-                                  compactionId);
-    }
-
-    public void build()
-    {
-        try
-        {
-            while (iter.hasNext())
-            {
-                if (isStopRequested())
-                    throw new CompactionInterruptedException(getCompactionInfo());
-                DecoratedKey key = iter.next();
-                Keyspace.indexPartition(key, cfs, indexers);
-            }
-        }
-        finally
-        {
-            iter.close();
-        }
-    }
+    public abstract void build();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/440366ed/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index df8e38d..3eb72d3 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -51,7 +51,6 @@ import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.index.internal.CassandraIndex;
 import org.apache.cassandra.index.transactions.*;
-import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.schema.Indexes;
@@ -338,11 +337,20 @@ public class SecondaryIndexManager implements IndexRegistry
                     indexes.stream().map(i -> i.getIndexMetadata().name).collect(Collectors.joining(",")),
                     sstables.stream().map(SSTableReader::toString).collect(Collectors.joining(",")));
 
-        SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
-                                                                  indexes,
-                                                                  new ReducingKeyIterator(sstables));
-        Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
-        FBUtilities.waitOnFuture(future);
+        Map<Index.IndexBuildingSupport, Set<Index>> byType = new HashMap<>();
+        for (Index index : indexes)
+        {
+            Set<Index> stored = byType.computeIfAbsent(index.getBuildTaskSupport(), i -> new HashSet<>());
+            stored.add(index);
+        }
+
+        List<Future<?>> futures = byType.entrySet()
+                                        .stream()
+                                        .map((e) -> e.getKey().getIndexBuildTask(baseCfs, e.getValue(), sstables))
+                                        .map(CompactionManager.instance::submitIndexBuild)
+                                        .collect(Collectors.toList());
+
+        FBUtilities.waitOnFutures(futures);
 
         flushIndexesBlocking(indexes);
         logger.info("Index build of {} complete",

http://git-wip-us.apache.org/repos/asf/cassandra/blob/440366ed/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
index 674cd20..3128152 100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@ -664,9 +664,9 @@ public abstract class CassandraIndex implements Index
                         metadata.name,
                         getSSTableNames(sstables));
 
-            SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
-                                                                      Collections.singleton(this),
-                                                                      new ReducingKeyIterator(sstables));
+            SecondaryIndexBuilder builder = new CollatedViewIndexBuilder(baseCfs,
+                                                                         Collections.singleton(this),
+                                                                         new ReducingKeyIterator(sstables));
             Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
             FBUtilities.waitOnFuture(future);
             indexCfs.forceBlockingFlush();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/440366ed/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java b/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java
new file mode 100644
index 0000000..8ea7a68
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/internal/CollatedViewIndexBuilder.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.internal;
+
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionInfo;
+import org.apache.cassandra.db.compaction.CompactionInterruptedException;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.index.SecondaryIndexBuilder;
+import org.apache.cassandra.io.sstable.ReducingKeyIterator;
+import org.apache.cassandra.utils.UUIDGen;
+
+/**
+ * Manages building an entire index from column family data. Runs on to compaction manager.
+ */
+public class CollatedViewIndexBuilder extends SecondaryIndexBuilder
+{
+    private final ColumnFamilyStore cfs;
+    private final Set<Index> indexers;
+    private final ReducingKeyIterator iter;
+    private final UUID compactionId;
+
+    public CollatedViewIndexBuilder(ColumnFamilyStore cfs, Set<Index> indexers, ReducingKeyIterator iter)
+    {
+        this.cfs = cfs;
+        this.indexers = indexers;
+        this.iter = iter;
+        this.compactionId = UUIDGen.getTimeUUID();
+    }
+
+    public CompactionInfo getCompactionInfo()
+    {
+        return new CompactionInfo(cfs.metadata,
+                OperationType.INDEX_BUILD,
+                iter.getBytesRead(),
+                iter.getTotalBytes(),
+                compactionId);
+    }
+
+    public void build()
+    {
+        try
+        {
+            while (iter.hasNext())
+            {
+                if (isStopRequested())
+                    throw new CompactionInterruptedException(getCompactionInfo());
+                DecoratedKey key = iter.next();
+                Keyspace.indexPartition(key, cfs, indexers);
+            }
+        }
+        finally
+        {
+            iter.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/440366ed/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
index 0957f74..2b17849 100644
--- a/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
+++ b/test/unit/org/apache/cassandra/index/internal/CustomCassandraIndex.java
@@ -622,9 +622,9 @@ public class CustomCassandraIndex implements Index
                         metadata.name,
                         getSSTableNames(sstables));
 
-            SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
-                                                                      Collections.singleton(this),
-                                                                      new ReducingKeyIterator(sstables));
+            SecondaryIndexBuilder builder = new CollatedViewIndexBuilder(baseCfs,
+                                                                         Collections.singleton(this),
+                                                                         new ReducingKeyIterator(sstables));
             Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
             FBUtilities.waitOnFuture(future);
             indexCfs.forceBlockingFlush();