You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2015/08/22 01:48:44 UTC

[13/15] cassandra git commit: New 2i API and implementations for built in indexes

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index d63a832..1d5d477 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -22,20 +22,16 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.collect.Lists;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.Operator;
-import org.apache.cassandra.db.index.SecondaryIndexSearcher;
 import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.index.Index;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -43,7 +39,6 @@ import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.ClientWarn;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
@@ -261,14 +256,16 @@ public abstract class ReadCommand implements ReadQuery
              : ReadResponse.createDataResponse(iterator, selection);
     }
 
-    protected SecondaryIndexSearcher getIndexSearcher(ColumnFamilyStore cfs)
+    protected Index getIndex(ColumnFamilyStore cfs, boolean includeInTrace)
     {
-        return cfs.indexManager.getBestIndexSearcherFor(this);
+        return cfs.indexManager.getBestIndexFor(this, includeInTrace);
     }
 
     /**
      * Executes this command on the local host.
      *
+     * @param orderGroup the operation group spanning this command
+     *
      * @return an iterator over the result of executing this command locally.
      */
     @SuppressWarnings("resource") // The result iterator is closed upon exceptions (we know it's fine to potentially not close the intermediary
@@ -278,10 +275,12 @@ public abstract class ReadCommand implements ReadQuery
         long startTimeNanos = System.nanoTime();
 
         ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata());
-        SecondaryIndexSearcher searcher = getIndexSearcher(cfs);
+        Index index = getIndex(cfs, true);
+        Index.Searcher searcher = index == null ? null : index.searcherFor(this);
+
         UnfilteredPartitionIterator resultIterator = searcher == null
                                          ? queryStorage(cfs, orderGroup)
-                                         : searcher.search(this, orderGroup);
+                                         : searcher.search(orderGroup);
 
         try
         {
@@ -291,7 +290,7 @@ public abstract class ReadCommand implements ReadQuery
             // no point in checking it again.
             RowFilter updatedFilter = searcher == null
                                     ? rowFilter()
-                                    : rowFilter().without(searcher.primaryClause(this));
+                                    : index.getPostIndexQueryFilter(rowFilter());
 
             // TODO: We'll currently do filtering by the rowFilter here because it's convenient. However,
             // we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/ReadOrderGroup.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadOrderGroup.java b/src/java/org/apache/cassandra/db/ReadOrderGroup.java
index 0a5bee8..44befa2 100644
--- a/src/java/org/apache/cassandra/db/ReadOrderGroup.java
+++ b/src/java/org/apache/cassandra/db/ReadOrderGroup.java
@@ -17,7 +17,7 @@
  */
 package org.apache.cassandra.db;
 
-import org.apache.cassandra.db.index.*;
+import org.apache.cassandra.index.Index;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
 public class ReadOrderGroup implements AutoCloseable
@@ -98,14 +98,8 @@ public class ReadOrderGroup implements AutoCloseable
 
     private static ColumnFamilyStore maybeGetIndexCfs(ColumnFamilyStore baseCfs, ReadCommand command)
     {
-        SecondaryIndexSearcher searcher = command.getIndexSearcher(baseCfs);
-        if (searcher == null)
-            return null;
-
-        SecondaryIndex index = searcher.highestSelectivityIndex(command.rowFilter());
-        return index == null || !(index instanceof AbstractSimplePerColumnSecondaryIndex)
-             ? null
-             : ((AbstractSimplePerColumnSecondaryIndex)index).getIndexCfs();
+        Index index = baseCfs.indexManager.getBestIndexFor(command);
+        return index == null ? null : index.getBackingTable().orElse(null);
     }
 
     public void close()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 38cfed6..fb9eb48 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -17,18 +17,22 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.*;
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
-
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.TabularData;
 
-import com.google.common.collect.*;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.SetMultimap;
 import com.google.common.io.ByteStreams;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,25 +41,18 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.cql3.functions.*;
-import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.compaction.CompactionHistoryTabularData;
 import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.LocalPartitioner;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.dht.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.util.DataInputBuffer;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.util.NIODataInputStream;
+import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.metrics.RestorableMeter;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.*;
-import org.apache.cassandra.schema.Tables;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.paxos.Commit;
 import org.apache.cassandra.service.paxos.PaxosState;
@@ -65,7 +62,6 @@ import org.apache.cassandra.utils.*;
 
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonMap;
-
 import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal;
 
@@ -516,7 +512,14 @@ public final class SystemKeyspace
         if (ksname.equals("system") && cfname.equals(COMPACTION_HISTORY))
             return;
         String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, compacted_at, bytes_in, bytes_out, rows_merged) VALUES (?, ?, ?, ?, ?, ?, ?)";
-        executeInternal(String.format(req, COMPACTION_HISTORY), UUIDGen.getTimeUUID(), ksname, cfname, ByteBufferUtil.bytes(compactedAt), bytesIn, bytesOut, rowsMerged);
+        executeInternal(String.format(req, COMPACTION_HISTORY),
+                        UUIDGen.getTimeUUID(),
+                        ksname,
+                        cfname,
+                        ByteBufferUtil.bytes(compactedAt),
+                        bytesIn,
+                        bytesOut,
+                        rowsMerged);
     }
 
     public static TabularData getCompactionHistory() throws OpenDataException
@@ -1030,6 +1033,16 @@ public final class SystemKeyspace
         forceBlockingFlush(BUILT_INDEXES);
     }
 
+    public static List<String> getBuiltIndexes(String keyspaceName, Set<String> indexNames)
+    {
+        List<String> names = new ArrayList<>(indexNames);
+        String req = "SELECT index_name from %s.\"%s\" WHERE table_name=? AND index_name IN ?";
+        UntypedResultSet results = executeInternal(String.format(req, NAME, BUILT_INDEXES), keyspaceName, names);
+        return StreamSupport.stream(results.spliterator(), false)
+                            .map(r -> r.getString("index_name"))
+                            .collect(Collectors.toList());
+    }
+
     /**
      * Read the host ID from the system keyspace, creating (and storing) one if
      * none exists.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index cab96fb..4aaa17a 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -17,15 +17,19 @@
  */
 package org.apache.cassandra.db.compaction;
 
-import java.util.UUID;
 import java.util.List;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.index.SecondaryIndexManager;
-import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.partitions.PurgingPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.index.transactions.CompactionTransaction;
 import org.apache.cassandra.io.sstable.ISSTableScanner;
 import org.apache.cassandra.metrics.CompactionMetrics;
 
@@ -47,6 +51,7 @@ import org.apache.cassandra.metrics.CompactionMetrics;
  */
 public class CompactionIterator extends CompactionInfo.Holder implements UnfilteredPartitionIterator
 {
+    private static final Logger logger = LoggerFactory.getLogger(CompactionIterator.class);
     private static final long UNFILTERED_TO_UPDATE_PROGRESS = 100;
 
     private final OperationType type;
@@ -148,29 +153,33 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
                 if (type != OperationType.COMPACTION || !controller.cfs.indexManager.hasIndexes())
                     return null;
 
-                // If we have a 2ndary index, we must update it with deleted/shadowed cells.
-                // TODO: this should probably be done asynchronously and batched.
-                final SecondaryIndexManager.Updater indexer = controller.cfs.indexManager.gcUpdaterFor(partitionKey, nowInSec);
-                final RowDiffListener diffListener = new RowDiffListener()
+                Columns statics = Columns.NONE;
+                Columns regulars = Columns.NONE;
+                for (UnfilteredRowIterator iter : versions)
                 {
-                    public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original)
-                    {
-                    }
-
-                    public void onDeletion(int i, Clustering clustering, DeletionTime merged, DeletionTime original)
-                    {
-                    }
-
-                    public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original)
+                    if (iter != null)
                     {
+                        statics = statics.mergeTo(iter.columns().statics);
+                        regulars = regulars.mergeTo(iter.columns().regulars);
                     }
+                }
+                final PartitionColumns partitionColumns = new PartitionColumns(statics, regulars);
 
-                    public void onCell(int i, Clustering clustering, Cell merged, Cell original)
-                    {
-                        if (original != null && (merged == null || !merged.isLive(nowInSec)))
-                            indexer.remove(clustering, original);
-                    }
-                };
+                // If we have a 2ndary index, we must update it with deleted/shadowed cells.
+                // we can reuse a single CleanupTransaction for the duration of a partition.
+                // Currently, it doesn't do any batching of row updates, so every merge event
+                // for a single partition results in a fresh cycle of:
+                // * Get new Indexer instances
+                // * Indexer::start
+                // * Indexer::onRowMerge (for every row being merged by the compaction)
+                // * Indexer::commit
+                // A new OpOrder.Group is opened in an ARM block wrapping the commits
+                // TODO: this should probably be done asynchronously and batched.
+                final CompactionTransaction indexTransaction =
+                    controller.cfs.indexManager.newCompactionTransaction(partitionKey,
+                                                                         partitionColumns,
+                                                                         versions.size(),
+                                                                         nowInSec);
 
                 return new UnfilteredRowIterators.MergeListener()
                 {
@@ -180,7 +189,9 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte
 
                     public void onMergedRows(Row merged, Columns columns, Row[] versions)
                     {
-                        Rows.diff(merged, columns, versions, diffListener);
+                        indexTransaction.start();
+                        indexTransaction.onRowMerge(columns, merged, versions);
+                        indexTransaction.commit();
                     }
 
                     public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker mergedMarker, RangeTombstoneMarker[] versions)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 66f9ed5..0890341 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -20,22 +20,8 @@ package org.apache.cassandra.db.compaction;
 import java.io.File;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.*;
+import java.util.concurrent.*;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import javax.management.openmbean.OpenDataException;
@@ -56,33 +42,27 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.compaction.CompactionInfo.Holder;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.index.SecondaryIndexBuilder;
-import org.apache.cassandra.db.view.MaterializedViewBuilder;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.view.MaterializedViewBuilder;
 import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.index.SecondaryIndexBuilder;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.SSTableRewriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.io.sstable.ISSTableScanner;
-import org.apache.cassandra.io.sstable.SSTableRewriter;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.CompactionMetrics;
 import org.apache.cassandra.repair.Validator;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.JVMStabilityInspector;
-import org.apache.cassandra.utils.MerkleTrees;
-import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.WrappedRunnable;
-import org.apache.cassandra.utils.UUIDGen;
-import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.concurrent.Refs;
 
 import static java.util.Collections.singleton;
@@ -847,7 +827,7 @@ public class CompactionManager implements CompactionManagerMBean
             }
 
             // flush to ensure we don't lose the tombstones on a restart, since they are not commitlog'd
-            cfs.indexManager.flushIndexesBlocking();
+            cfs.indexManager.flushAllIndexesBlocking();
 
             finished = writer.finish();
         }
@@ -939,11 +919,7 @@ public class CompactionManager implements CompactionManagerMBean
 
                 cfs.invalidateCachedPartition(partition.partitionKey());
 
-                // acquire memtable lock here because secondary index deletion may cause a race. See CASSANDRA-3712
-                try (OpOrder.Group opGroup = cfs.keyspace.writeOrder.start())
-                {
-                    cfs.indexManager.deleteFromIndexes(partition, opGroup, nowInSec);
-                }
+                cfs.indexManager.deletePartition(partition, nowInSec);
                 return null;
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/filter/RowFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java
index a5212aa..bbec004 100644
--- a/src/java/org/apache/cassandra/db/filter/RowFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@ -27,10 +27,9 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -38,7 +37,9 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
-import static org.apache.cassandra.cql3.statements.RequestValidations.*;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkBindValueSet;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull;
 
 /**
  * A filter on which rows a given query should include or exclude.
@@ -91,6 +92,11 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
         expressions.add(new ThriftExpression(metadata, name, op, value));
     }
 
+    public List<Expression> getExpressions()
+    {
+        return expressions;
+    }
+
     /**
      * Filters the provided iterator so that only the row satisfying the expression of this filter
      * are included in the resulting iterator.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
deleted file mode 100644
index a631b9a..0000000
--- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.concurrent.Future;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-
-/**
- * Implements a secondary index for a column family using a second column family
- * in which the row keys are indexed values, and column names are base row keys.
- */
-public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSecondaryIndex
-{
-    protected ColumnFamilyStore indexCfs;
-
-    // SecondaryIndex "forces" a set of ColumnDefinition. However this class (and thus it's subclass)
-    // only support one def per index. So inline it in a field for 1) convenience and 2) avoid creating
-    // an iterator each time we need to access it.
-    // TODO: we should fix SecondaryIndex API
-    protected ColumnDefinition columnDef;
-
-    public void init()
-    {
-        assert baseCfs != null && columnDefs != null && columnDefs.size() == 1;
-
-        columnDef = columnDefs.iterator().next();
-
-        CFMetaData indexedCfMetadata = SecondaryIndex.newIndexMetadata(baseCfs.metadata, indexMetadata, getIndexKeyComparator());
-        indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace,
-                                                             indexedCfMetadata.cfName,
-                                                             indexedCfMetadata,
-                                                             baseCfs.getTracker().loadsstables);
-    }
-
-    protected AbstractType<?> getIndexKeyComparator()
-    {
-        return columnDef.type;
-    }
-
-    public ColumnDefinition indexedColumn()
-    {
-        return columnDef;
-    }
-
-    @Override
-    String indexTypeForGrouping()
-    {
-        return "_internal_";
-    }
-
-    protected Clustering makeIndexClustering(ByteBuffer rowKey, Clustering clustering, Cell cell)
-    {
-        return makeIndexClustering(rowKey, clustering, cell == null ? null : cell.path());
-    }
-
-    protected Clustering makeIndexClustering(ByteBuffer rowKey, Clustering clustering, CellPath path)
-    {
-        return buildIndexClusteringPrefix(rowKey, clustering, path).build();
-    }
-
-    protected Slice.Bound makeIndexBound(ByteBuffer rowKey, Slice.Bound bound)
-    {
-        return buildIndexClusteringPrefix(rowKey, bound, null).buildBound(bound.isStart(), bound.isInclusive());
-    }
-
-    protected abstract CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path);
-
-    protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, Cell cell)
-    {
-        return cell == null
-             ? getIndexedValue(rowKey, clustering, null, null)
-             : getIndexedValue(rowKey, clustering, cell.value(), cell.path());
-    }
-
-    protected abstract ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath cellPath);
-
-    public void delete(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup, int nowInSec)
-    {
-        deleteForCleanup(rowKey, clustering, cell, opGroup, nowInSec);
-    }
-
-    public void deleteForCleanup(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup, int nowInSec)
-    {
-        delete(rowKey, clustering, cell.value(), cell.path(), new DeletionTime(cell.timestamp(), nowInSec), opGroup);
-    }
-
-    public void delete(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path, DeletionTime deletion, OpOrder.Group opGroup)
-    {
-        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, clustering, cellValue, path));
-
-        Row row = BTreeRow.emptyDeletedRow(makeIndexClustering(rowKey, clustering, path), deletion);
-        PartitionUpdate upd = PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row);
-
-        indexCfs.apply(upd, SecondaryIndexManager.nullUpdater, opGroup, null);
-        if (logger.isDebugEnabled())
-            logger.debug("removed index entry for cleaned-up value {}:{}", valueKey, upd);
-    }
-
-    public void insert(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup)
-    {
-        insert(rowKey, clustering, cell, LivenessInfo.create(cell.timestamp(), cell.ttl(), cell.localDeletionTime()), opGroup);
-    }
-
-    public void insert(ByteBuffer rowKey, Clustering clustering, Cell cell, LivenessInfo info, OpOrder.Group opGroup)
-    {
-        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, clustering, cell));
-
-        Row row = BTreeRow.noCellLiveRow(makeIndexClustering(rowKey, clustering, cell), info);
-        PartitionUpdate upd = PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row);
-
-        if (logger.isDebugEnabled())
-            logger.debug("applying index row {} in {}", indexCfs.metadata.getKeyValidator().getString(valueKey.getKey()), upd);
-
-        indexCfs.apply(upd, SecondaryIndexManager.nullUpdater, opGroup, null);
-    }
-
-    public void update(ByteBuffer rowKey, Clustering clustering, Cell oldCell, Cell cell, OpOrder.Group opGroup, int nowInSec)
-    {
-        // insert the new value before removing the old one, so we never have a period
-        // where the row is invisible to both queries (the opposite seems preferable); see CASSANDRA-5540
-        insert(rowKey, clustering, cell, opGroup);
-        if (SecondaryIndexManager.shouldCleanupOldValue(oldCell, cell))
-            delete(rowKey, clustering, oldCell, opGroup, nowInSec);
-    }
-
-    public boolean indexes(ColumnDefinition column)
-    {
-        return column.name.equals(columnDef.name);
-    }
-
-    public void removeIndex(ByteBuffer columnName)
-    {
-        // interrupt in-progress compactions
-        Collection<ColumnFamilyStore> cfss = Collections.singleton(indexCfs);
-        CompactionManager.instance.interruptCompactionForCFs(cfss, true);
-        CompactionManager.instance.waitForCessation(cfss);
-
-        indexCfs.keyspace.writeOrder.awaitNewBarrier();
-        indexCfs.forceBlockingFlush();
-
-        indexCfs.readOrdering.awaitNewBarrier();
-        indexCfs.invalidate();
-    }
-
-    public void forceBlockingFlush()
-    {
-        Future<?> wait;
-        // we synchronise on the baseCfs to make sure we are ordered correctly with other flushes to the base CFS
-        synchronized (baseCfs.getTracker())
-        {
-            wait = indexCfs.forceFlush();
-        }
-        FBUtilities.waitOnFuture(wait);
-    }
-
-    public void invalidate()
-    {
-        indexCfs.invalidate();
-    }
-
-    public void truncateBlocking(long truncatedAt)
-    {
-        indexCfs.discardSSTables(truncatedAt);
-    }
-
-    public ColumnFamilyStore getIndexCfs()
-    {
-       return indexCfs;
-    }
-
-    protected ClusteringComparator getIndexComparator()
-    {
-        assert indexCfs != null;
-        return indexCfs.metadata.comparator;
-    }
-
-    public String getIndexName()
-    {
-        return indexCfs.name;
-    }
-
-    public void reload()
-    {
-        indexCfs.metadata.reloadIndexMetadataProperties(baseCfs.metadata);
-        indexCfs.reload();
-    }
-
-    public long estimateResultRows()
-    {
-        return getIndexCfs().getMeanColumns();
-    }
-
-    public void validate(DecoratedKey partitionKey) throws InvalidRequestException
-    {
-        if (columnDef.kind == ColumnDefinition.Kind.PARTITION_KEY)
-            validateIndexedValue(getIndexedValue(partitionKey.getKey(), null, null, null));
-    }
-
-    public void validate(Clustering clustering) throws InvalidRequestException
-    {
-        if (columnDef.kind == ColumnDefinition.Kind.CLUSTERING)
-            validateIndexedValue(getIndexedValue(null, clustering, null, null));
-    }
-
-    public void validate(ByteBuffer cellValue, CellPath path) throws InvalidRequestException
-    {
-        if (!columnDef.isPrimaryKeyColumn())
-            validateIndexedValue(getIndexedValue(null, null, cellValue, path));
-    }
-
-    private void validateIndexedValue(ByteBuffer value)
-    {
-        if (value != null && value.remaining() >= FBUtilities.MAX_UNSIGNED_SHORT)
-            throw new InvalidRequestException(String.format("Cannot index value of size %d for index %s on %s.%s(%s) (maximum allowed size=%d)",
-                                                            value.remaining(), getIndexName(), baseKeyspace(), baseTable(), columnDef.name, FBUtilities.MAX_UNSIGNED_SHORT));
-    }
-
-    @Override
-    public String toString()
-    {
-        return String.format("%s(%s)", baseTable(), columnDef.name);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
deleted file mode 100644
index 897aa9c..0000000
--- a/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index;
-
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-
-/**
- * Base class for Secondary indexes that implement a unique index per column
- *
- */
-public abstract class PerColumnSecondaryIndex extends SecondaryIndex
-{
-    /**
-     * Called when a column has been tombstoned or replaced.
-     *
-     * @param rowKey the underlying row key which is indexed
-     * @param col all the column info
-     */
-    public abstract void delete(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup, int nowInSec);
-
-    /**
-     * Called when a column has been removed due to a cleanup operation.
-     */
-    public abstract void deleteForCleanup(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup, int nowInSec);
-
-    /**
-     * For indexes on the primary key, index the given PK.
-     */
-    public void maybeIndex(ByteBuffer partitionKey, Clustering clustering, long timestamp, int ttl, OpOrder.Group opGroup, int nowInSec)
-    {
-    }
-
-    /**
-     * For indexes on the primary key, delete the given PK.
-     */
-    public void maybeDelete(ByteBuffer partitionKey, Clustering clustering, DeletionTime deletion, OpOrder.Group opGroup)
-    {
-    }
-
-    /**
-     * insert a column to the index
-     *
-     * @param rowKey the underlying row key which is indexed
-     * @param col all the column info
-     */
-    public abstract void insert(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup);
-
-    /**
-     * update a column from the index
-     *
-     * @param rowKey the underlying row key which is indexed
-     * @param oldCol the previous column info
-     * @param col all the column info
-     */
-    public abstract void update(ByteBuffer rowKey, Clustering clustering, Cell oldCell, Cell cell, OpOrder.Group opGroup, int nowInSec);
-
-    protected boolean indexPrimaryKeyColumn()
-    {
-        return false;
-    }
-
-    public void indexRow(DecoratedKey key, Row row, OpOrder.Group opGroup, int nowInSec)
-    {
-        Clustering clustering = row.clustering();
-        if (indexPrimaryKeyColumn())
-        {
-            // Same as in AtomicBTreePartition.maybeIndexPrimaryKeyColumn
-            long timestamp = row.primaryKeyLivenessInfo().timestamp();
-            int ttl = row.primaryKeyLivenessInfo().ttl();
-
-            for (Cell cell : row.cells())
-            {
-                if (cell.isLive(nowInSec) && cell.timestamp() > timestamp)
-                {
-                    timestamp = cell.timestamp();
-                    ttl = cell.ttl();
-                }
-            }
-            maybeIndex(key.getKey(), clustering, timestamp, ttl, opGroup, nowInSec);
-        }
-
-        for (Cell cell : row.cells())
-        {
-            if (!indexes(cell.column()))
-                continue;
-
-            if (cell.isLive(nowInSec))
-                insert(key.getKey(), clustering, cell, opGroup);
-        }
-    }
-
-    public String getNameForSystemKeyspace(ByteBuffer column)
-    {
-        return getIndexName();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
deleted file mode 100644
index 502b213..0000000
--- a/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
-
-import org.apache.cassandra.db.rows.UnfilteredRowIterator;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-/**
- *  Base class for Secondary indexes that implement a unique index per row
- */
-public abstract class PerRowSecondaryIndex extends SecondaryIndex
-{
-    /**
-     * Index the given partition.
-     */
-    public abstract void index(ByteBuffer key, UnfilteredRowIterator atoms);
-
-    /**
-     * cleans up deleted columns from cassandra cleanup compaction
-     *
-     * @param key
-     */
-    public abstract void delete(ByteBuffer key, OpOrder.Group opGroup);
-
-    public String getNameForSystemKeyspace(ByteBuffer columnName)
-    {
-        try
-        {
-            return getIndexName()+ByteBufferUtil.string(columnName);
-        }
-        catch (CharacterCodingException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
deleted file mode 100644
index 4302112..0000000
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
+++ /dev/null
@@ -1,429 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
-
-import com.google.common.base.Objects;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.cql3.Operator;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.index.composites.CompositesIndex;
-import org.apache.cassandra.db.index.keys.KeysIndex;
-import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.db.lifecycle.View;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.dht.LocalPartitioner;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.io.sstable.ReducingKeyIterator;
-import org.apache.cassandra.schema.IndexMetadata;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.concurrent.Refs;
-
-/**
- * Abstract base class for different types of secondary indexes.
- *
- * Do not extend this directly, please pick from PerColumnSecondaryIndex or PerRowSecondaryIndex
- */
-public abstract class SecondaryIndex
-{
-    protected static final Logger logger = LoggerFactory.getLogger(SecondaryIndex.class);
-
-    public static final String CUSTOM_INDEX_OPTION_NAME = "class_name";
-
-    /**
-     * The name of the option used to specify that the index is on the collection keys.
-     */
-    public static final String INDEX_KEYS_OPTION_NAME = "index_keys";
-
-    /**
-     * The name of the option used to specify that the index is on the collection values.
-     */
-    public static final String INDEX_VALUES_OPTION_NAME = "index_values";
-
-    /**
-     * The name of the option used to specify that the index is on the collection (map) entries.
-     */
-    public static final String INDEX_ENTRIES_OPTION_NAME = "index_keys_and_values";
-
-    /**
-     * Base CF that has many indexes
-     */
-    protected ColumnFamilyStore baseCfs;
-
-
-    /**
-     * The column definitions which this index is responsible for
-     */
-    protected final Set<ColumnDefinition> columnDefs = Collections.newSetFromMap(new ConcurrentHashMap<ColumnDefinition,Boolean>());
-
-    protected IndexMetadata indexMetadata;
-
-    /**
-     * Perform any initialization work
-     */
-    public abstract void init();
-
-    /**
-     * Reload an existing index following a change to its configuration,
-     * or that of the indexed column(s). Differs from init() in that we expect
-     * expect new resources (such as CFS for a KEYS index) to be created by
-     * init() but not here
-     */
-    public abstract void reload();
-
-    /**
-     * Validates the index_options passed in the IndexMetadata
-     * @throws ConfigurationException
-     */
-    public abstract void validateOptions(CFMetaData baseCfm, IndexMetadata def) throws ConfigurationException;
-
-    /**
-     * @return The name of the index
-     */
-    abstract public String getIndexName();
-
-    /**
-     * All internal 2ndary indexes will return "_internal_" for this. Custom
-     * 2ndary indexes will return their class name. This only matter for
-     * SecondaryIndexManager.groupByIndexType.
-     */
-    String indexTypeForGrouping()
-    {
-        // Our internal indexes overwrite this
-        return getClass().getCanonicalName();
-    }
-
-    /**
-     * Return the unique name for this index and column
-     * to be stored in the SystemKeyspace that tracks if each column is built
-     *
-     * @param columnName the name of the column
-     * @return the unique name
-     */
-    abstract public String getNameForSystemKeyspace(ByteBuffer columnName);
-
-    /**
-     * Checks if the index for specified column is fully built
-     *
-     * @param columnName the column
-     * @return true if the index is fully built
-     */
-    public boolean isIndexBuilt(ByteBuffer columnName)
-    {
-        return SystemKeyspace.isIndexBuilt(baseCfs.keyspace.getName(), getNameForSystemKeyspace(columnName));
-    }
-
-    public void setIndexBuilt()
-    {
-        for (ColumnDefinition columnDef : columnDefs)
-            SystemKeyspace.setIndexBuilt(baseCfs.keyspace.getName(), getNameForSystemKeyspace(columnDef.name.bytes));
-    }
-
-    public void setIndexRemoved()
-    {
-        for (ColumnDefinition columnDef : columnDefs)
-            SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), getNameForSystemKeyspace(columnDef.name.bytes));
-    }
-
-    /**
-     * Called at query time
-     * Creates a implementation specific searcher instance for this index type
-     * @param columns the list of columns which belong to this index type
-     * @return the secondary index search impl
-     */
-    protected abstract SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ColumnDefinition> columns);
-
-    /**
-     * Forces this indexes' in memory data to disk
-     */
-    public abstract void forceBlockingFlush();
-
-    /**
-     * Allow access to the underlying column family store if there is one
-     * @return the underlying column family store or null
-     */
-    public abstract ColumnFamilyStore getIndexCfs();
-
-
-    /**
-     * Delete all files and references to this index
-     * @param columnName the indexed column to remove
-     */
-    public abstract void removeIndex(ByteBuffer columnName);
-
-    /**
-     * Remove the index and unregisters this index's mbean if one exists
-     */
-    public abstract void invalidate();
-
-    /**
-     * Truncate all the data from the current index
-     *
-     * @param truncatedAt The truncation timestamp, all data before that timestamp should be rejected.
-     */
-    public abstract void truncateBlocking(long truncatedAt);
-
-    /**
-     * Builds the index using the data in the underlying CFS
-     * Blocks till it's complete
-     */
-    protected void buildIndexBlocking()
-    {
-        logger.info(String.format("Submitting index build of %s for data in %s",
-                getIndexName(), StringUtils.join(baseCfs.getSSTables(SSTableSet.CANONICAL), ", ")));
-
-        try (Refs<SSTableReader> sstables = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL)).refs)
-        {
-            SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
-                                                                      Collections.singleton(getIndexName()),
-                                                                      new ReducingKeyIterator(sstables));
-            Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
-            FBUtilities.waitOnFuture(future);
-            forceBlockingFlush();
-            setIndexBuilt();
-        }
-        logger.info("Index build of {} complete", getIndexName());
-    }
-
-
-    /**
-     * Builds the index using the data in the underlying CF, non blocking
-     *
-     *
-     * @return A future object which the caller can block on (optional)
-     */
-    public Future<?> buildIndexAsync()
-    {
-        // if we're just linking in the index to indexedColumns on an already-built index post-restart, we're done
-        boolean allAreBuilt = true;
-        for (ColumnDefinition cdef : columnDefs)
-        {
-            if (!SystemKeyspace.isIndexBuilt(baseCfs.keyspace.getName(), getNameForSystemKeyspace(cdef.name.bytes)))
-            {
-                allAreBuilt = false;
-                break;
-            }
-        }
-
-        if (allAreBuilt)
-            return null;
-
-        // build it asynchronously; addIndex gets called by CFS open and schema update, neither of which
-        // we want to block for a long period.  (actual build is serialized on CompactionManager.)
-        Runnable runnable = new Runnable()
-        {
-            public void run()
-            {
-                baseCfs.forceBlockingFlush();
-                buildIndexBlocking();
-            }
-        };
-        FutureTask<?> f = new FutureTask<Object>(runnable, null);
-
-        new Thread(f, "Creating index: " + getIndexName()).start();
-        return f;
-    }
-
-    public ColumnFamilyStore getBaseCfs()
-    {
-        return baseCfs;
-    }
-
-    private void setBaseCfs(ColumnFamilyStore baseCfs)
-    {
-        this.baseCfs = baseCfs;
-    }
-
-    public Set<ColumnDefinition> getColumnDefs()
-    {
-        return columnDefs;
-    }
-
-    void setIndexMetadata(IndexMetadata indexDef)
-    {
-        this.indexMetadata = indexDef;
-        for (ColumnIdentifier col : indexDef.columns)
-            this.columnDefs.add(baseCfs.metadata.getColumnDefinition(col));
-    }
-
-    void addColumnDef(ColumnDefinition columnDef)
-    {
-       columnDefs.add(columnDef);
-    }
-
-    void removeColumnDef(ByteBuffer name)
-    {
-        Iterator<ColumnDefinition> it = columnDefs.iterator();
-        while (it.hasNext())
-        {
-            if (it.next().name.bytes.equals(name))
-                it.remove();
-        }
-    }
-
-    /** Returns true if the index supports lookups for the given operator, false otherwise. */
-    public boolean supportsOperator(Operator operator)
-    {
-        return operator == Operator.EQ;
-    }
-
-    /**
-     * Returns the decoratedKey for a column value. Assumes an index CFS is present.
-     * @param value column value
-     * @return decorated key
-     */
-    public DecoratedKey getIndexKeyFor(ByteBuffer value)
-    {
-        return getIndexCfs().decorateKey(value);
-    }
-
-    /**
-     * Returns true if the provided column is indexed by this secondary index.
-     *
-     * The default implementation checks whether the name is one the columnDef name,
-     * but this should be overriden but subclass if needed.
-     */
-    public abstract boolean indexes(ColumnDefinition column);
-
-    /**
-     * This is the primary way to create a secondary index instance for a CF column.
-     * It will validate the index_options before initializing.
-     *
-     * @param baseCfs the source of data for the Index
-     * @param indexDef the meta information about this index (index_type, index_options, name, etc...)
-     *
-     * @return The secondary index instance for this column
-     * @throws ConfigurationException
-     */
-    public static SecondaryIndex createInstance(ColumnFamilyStore baseCfs,
-                                                IndexMetadata indexDef) throws ConfigurationException
-    {
-        SecondaryIndex index = uninitializedInstance(baseCfs.metadata, indexDef);
-        index.validateOptions(baseCfs.metadata, indexDef);
-        index.setBaseCfs(baseCfs);
-        index.setIndexMetadata(indexDef);
-
-        return index;
-    }
-
-    public static void validate(CFMetaData baseMetadata,
-                                IndexMetadata indexDef) throws ConfigurationException
-    {
-        SecondaryIndex index = uninitializedInstance(baseMetadata, indexDef);
-        index.validateOptions(baseMetadata, indexDef);
-    }
-
-    private static SecondaryIndex uninitializedInstance(CFMetaData baseMetadata,
-                                                        IndexMetadata indexDef) throws ConfigurationException
-    {
-        if (indexDef.isKeys())
-        {
-            return new KeysIndex();
-        }
-        else if (indexDef.isComposites())
-        {
-            return CompositesIndex.create(indexDef, baseMetadata);
-        }
-        else if (indexDef.isCustom())
-        {
-            assert indexDef.options != null;
-            String class_name = indexDef.options.get(CUSTOM_INDEX_OPTION_NAME);
-            assert class_name != null;
-            try
-            {
-                return (SecondaryIndex) Class.forName(class_name).newInstance();
-            }
-            catch (Exception e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
-
-        throw new AssertionError("Unknown index type: " + indexDef.name);
-    }
-
-    public abstract void validate(DecoratedKey partitionKey) throws InvalidRequestException;
-    public abstract void validate(Clustering clustering) throws InvalidRequestException;
-    public abstract void validate(ByteBuffer cellValue, CellPath path) throws InvalidRequestException;
-
-    public abstract long estimateResultRows();
-
-    protected String baseKeyspace()
-    {
-        return baseCfs.metadata.ksName;
-    }
-
-    protected String baseTable()
-    {
-        return baseCfs.metadata.cfName;
-    }
-
-    /**
-     * Create the index metadata for the index on a given column of a given table.
-     */
-    public static CFMetaData newIndexMetadata(CFMetaData baseMetadata, IndexMetadata def)
-    {
-        return newIndexMetadata(baseMetadata, def, def.indexedColumn(baseMetadata).type);
-    }
-
-    /**
-     * Create the index metadata for the index on a given column of a given table.
-     */
-    static CFMetaData newIndexMetadata(CFMetaData baseMetadata, IndexMetadata def, AbstractType<?> comparator)
-    {
-        assert !def.isCustom();
-
-        CFMetaData.Builder builder = CFMetaData.Builder.create(baseMetadata.ksName, baseMetadata.indexColumnFamilyName(def))
-                                                       .withId(baseMetadata.cfId)
-                                                       .withPartitioner(new LocalPartitioner(comparator))
-                                                       .addPartitionKey(def.indexedColumn(baseMetadata).name, comparator);
-
-        if (def.isComposites())
-        {
-            CompositesIndex.addIndexClusteringColumns(builder, baseMetadata, def);
-        }
-        else
-        {
-            assert def.isKeys();
-            KeysIndex.addIndexClusteringColumns(builder, baseMetadata);
-        }
-
-        return builder.build().reloadIndexMetadataProperties(baseMetadata);
-    }
-
-    @Override
-    public String toString()
-    {
-        return Objects.toStringHelper(this).add("columnDefs", columnDefs).toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
deleted file mode 100644
index a117f6d..0000000
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index;
-
-import java.io.IOException;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.compaction.CompactionInfo;
-import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.db.compaction.CompactionInterruptedException;
-import org.apache.cassandra.io.sstable.ReducingKeyIterator;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.UUIDGen;
-
-/**
- * Manages building an entire index from column family data. Runs on to compaction manager.
- */
-public class SecondaryIndexBuilder extends CompactionInfo.Holder
-{
-    private final ColumnFamilyStore cfs;
-    private final Set<String> idxNames;
-    private final ReducingKeyIterator iter;
-    private final UUID compactionId;
-
-    public SecondaryIndexBuilder(ColumnFamilyStore cfs, Set<String> idxNames, ReducingKeyIterator iter)
-    {
-        this.cfs = cfs;
-        this.idxNames = idxNames;
-        this.iter = iter;
-        this.compactionId = UUIDGen.getTimeUUID();
-    }
-
-    public CompactionInfo getCompactionInfo()
-    {
-        return new CompactionInfo(cfs.metadata,
-                                  OperationType.INDEX_BUILD,
-                                  iter.getBytesRead(),
-                                  iter.getTotalBytes(),
-                                  compactionId);
-    }
-
-    public void build()
-    {
-        try
-        {
-            while (iter.hasNext())
-            {
-                if (isStopRequested())
-                    throw new CompactionInterruptedException(getCompactionInfo());
-                DecoratedKey key = iter.next();
-                Keyspace.indexPartition(key, cfs, idxNames);
-            }
-        }
-        finally
-        {
-            try
-            {
-                iter.close();
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
deleted file mode 100644
index 996c730..0000000
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ /dev/null
@@ -1,792 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.Future;
-
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.io.sstable.ReducingKeyIterator;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.schema.IndexMetadata;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-
-/**
- * Manages all the indexes associated with a given CFS
- * Different types of indexes can be created across the same CF
- */
-public class SecondaryIndexManager
-{
-    private static final Logger logger = LoggerFactory.getLogger(SecondaryIndexManager.class);
-
-    public static final Updater nullUpdater = new Updater()
-    {
-        public void maybeIndex(Clustering clustering, long timestamp, int ttl, DeletionTime deletion) {}
-        public void insert(Clustering clustering, Cell cell) {}
-        public void update(Clustering clustering, Cell oldCell, Cell cell) {}
-        public void remove(Clustering clustering, Cell current) {}
-
-        public void updateRowLevelIndexes() {}
-    };
-
-    /**
-     * Organizes the indexes by column name
-     */
-    private final ConcurrentNavigableMap<ByteBuffer, SecondaryIndex> indexesByColumn;
-
-
-    /**
-     * Keeps a single instance of a SecondaryIndex for many columns when the index type
-     * has isRowLevelIndex() == true
-     *
-     * This allows updates to happen to an entire row at once
-     */
-    private final ConcurrentMap<Class<? extends SecondaryIndex>, SecondaryIndex> rowLevelIndexMap;
-
-
-    /**
-     * Keeps all secondary index instances, either per-column or per-row
-     */
-    private final Set<SecondaryIndex> allIndexes;
-
-
-    /**
-     * The underlying column family containing the source data for these indexes
-     */
-    public final ColumnFamilyStore baseCfs;
-
-    public SecondaryIndexManager(ColumnFamilyStore baseCfs)
-    {
-        indexesByColumn = new ConcurrentSkipListMap<>();
-        rowLevelIndexMap = new ConcurrentHashMap<>();
-        allIndexes = Collections.newSetFromMap(new ConcurrentHashMap<SecondaryIndex, Boolean>());
-
-        this.baseCfs = baseCfs;
-    }
-
-    /**
-     * Drops and adds new indexes associated with the underlying CF
-     */
-    public void reload()
-    {
-        // figure out what needs to be added and dropped.
-        // future: if/when we have modifiable settings for secondary indexes,
-        // they'll need to be handled here.
-        Collection<ByteBuffer> indexedColumnNames = indexesByColumn.keySet();
-        for (ByteBuffer indexedColumn : indexedColumnNames)
-        {
-            ColumnDefinition def = baseCfs.metadata.getColumnDefinition(indexedColumn);
-            if (def == null || !baseCfs.metadata.getIndexes().get(def).isPresent())
-                removeIndexedColumn(indexedColumn);
-        }
-
-        // TODO: allow all ColumnDefinition type
-        for (IndexMetadata indexDef : baseCfs.metadata.getIndexes())
-        {
-            if (!indexedColumnNames.contains(indexDef.indexedColumn(baseCfs.metadata).name.bytes))
-                addIndexedColumn(indexDef);
-        }
-
-        for (SecondaryIndex index : allIndexes)
-            index.reload();
-    }
-
-    public Set<String> allIndexesNames()
-    {
-        Set<String> names = new HashSet<>(allIndexes.size());
-        for (SecondaryIndex index : allIndexes)
-            names.add(index.getIndexName());
-        return names;
-    }
-
-    public Set<PerColumnSecondaryIndex> perColumnIndexes()
-    {
-        Set<PerColumnSecondaryIndex> s = new HashSet<>();
-        for (SecondaryIndex index : allIndexes)
-            if (index instanceof PerColumnSecondaryIndex)
-                s.add((PerColumnSecondaryIndex)index);
-        return s;
-    }
-
-    public Set<PerRowSecondaryIndex> perRowIndexes()
-    {
-        Set<PerRowSecondaryIndex> s = new HashSet<>();
-        for (SecondaryIndex index : allIndexes)
-            if (index instanceof PerRowSecondaryIndex)
-                s.add((PerRowSecondaryIndex)index);
-        return s;
-    }
-
-    /**
-     * Does a full, blocking rebuild of the indexes specified by columns from the sstables.
-     * Does nothing if columns is empty.
-     *
-     * Caller must acquire and release references to the sstables used here.
-     *
-     * @param sstables the data to build from
-     * @param idxNames the list of columns to index, ordered by comparator
-     */
-    public void maybeBuildSecondaryIndexes(Collection<SSTableReader> sstables, Set<String> idxNames)
-    {
-        idxNames = filterByColumn(idxNames);
-        if (idxNames.isEmpty())
-            return;        
-
-        logger.info(String.format("Submitting index build of %s for data in %s",
-                                  idxNames, StringUtils.join(sstables, ", ")));
-
-        SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs, idxNames, new ReducingKeyIterator(sstables));
-        Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
-        FBUtilities.waitOnFuture(future);
-
-        flushIndexesBlocking();
-
-        logger.info("Index build of {} complete", idxNames);
-    }
-
-    public boolean indexes(ColumnDefinition column)
-    {
-        for (SecondaryIndex index : allIndexes)
-            if (index.indexes(column))
-                return true;
-        return false;
-    }
-
-    private Set<SecondaryIndex> indexFor(ColumnDefinition column)
-    {
-        Set<SecondaryIndex> matching = null;
-        for (SecondaryIndex index : allIndexes)
-        {
-            if (index.indexes(column))
-            {
-                if (matching == null)
-                    matching = new HashSet<>();
-                matching.add(index);
-            }
-        }
-        return matching == null ? Collections.<SecondaryIndex>emptySet() : matching;
-    }
-
-    /**
-     * Removes a existing index
-     * @param column the indexed column to remove
-     */
-    public void removeIndexedColumn(ByteBuffer column)
-    {
-        SecondaryIndex index = indexesByColumn.remove(column);
-
-        if (index == null)
-            return;
-
-        // Remove this column from from row level index map as well as all indexes set
-        if (index instanceof PerRowSecondaryIndex)
-        {
-            index.removeColumnDef(column);
-
-            // If no columns left remove from row level lookup as well as all indexes set
-            if (index.getColumnDefs().isEmpty())
-            {
-                allIndexes.remove(index);
-                rowLevelIndexMap.remove(index.getClass());
-            }
-        }
-        else
-        {
-            allIndexes.remove(index);
-        }
-
-        index.removeIndex(column);
-        SystemKeyspace.setIndexRemoved(baseCfs.metadata.ksName, index.getNameForSystemKeyspace(column));
-    }
-
-    /**
-     * Adds and builds a index for a column
-     * @param indexDef the index metadata
-     * @return a future which the caller can optionally block on signaling the index is built
-     */
-    public synchronized Future<?> addIndexedColumn(IndexMetadata indexDef)
-    {
-        ColumnDefinition cdef = indexDef.indexedColumn(baseCfs.metadata);
-        if (indexesByColumn.containsKey(cdef.name.bytes))
-            return null;
-
-        SecondaryIndex index = SecondaryIndex.createInstance(baseCfs, indexDef);
-
-        // Keep a single instance of the index per-cf for row level indexes
-        // since we want all columns to be under the index
-        if (index instanceof PerRowSecondaryIndex)
-        {
-            SecondaryIndex currentIndex = rowLevelIndexMap.get(index.getClass());
-
-            if (currentIndex == null)
-            {
-                rowLevelIndexMap.put(index.getClass(), index);
-                index.init();
-            }
-            else
-            {
-                index = currentIndex;
-                index.setIndexMetadata(indexDef);
-                logger.info("Creating new index : {}",indexDef.name);
-            }
-        }
-        else
-        {
-            // TODO: We sould do better than throw a RuntimeException
-            if (indexDef.isCustom() && index instanceof AbstractSimplePerColumnSecondaryIndex)
-                throw new RuntimeException("Cannot use a subclass of AbstractSimplePerColumnSecondaryIndex as a CUSTOM index, as they assume they are CFS backed");
-            index.init();
-        }
-
-        // link in indexedColumns. this means that writes will add new data to
-        // the index immediately,
-        // so we don't have to lock everything while we do the build. it's up to
-        // the operator to wait
-        // until the index is actually built before using in queries.
-        indexesByColumn.put(cdef.name.bytes, index);
-
-        // Add to all indexes set:
-        allIndexes.add(index);
-
-        // if we're just linking in the index to indexedColumns on an
-        // already-built index post-restart, we're done
-        if (index.isIndexBuilt(cdef.name.bytes))
-            return null;
-
-        return index.buildIndexAsync();
-    }
-
-    /**
-     *
-     * @param column the name of indexes column
-     * @return the index
-     */
-    public SecondaryIndex getIndexForColumn(ColumnDefinition column)
-    {
-        return indexesByColumn.get(column.name.bytes);
-    }
-
-    /**
-     * Remove the index
-     */
-    public void invalidate()
-    {
-        for (SecondaryIndex index : allIndexes)
-            index.invalidate();
-    }
-
-    /**
-     * Flush all indexes to disk
-     */
-    public void flushIndexesBlocking()
-    {
-        // despatch flushes for all CFS backed indexes
-        List<Future<?>> wait = new ArrayList<>();
-        synchronized (baseCfs.getTracker())
-        {
-            for (SecondaryIndex index : allIndexes)
-                if (index.getIndexCfs() != null)
-                    wait.add(index.getIndexCfs().forceFlush());
-        }
-
-        // blockingFlush any non-CFS-backed indexes
-        for (SecondaryIndex index : allIndexes)
-            if (index.getIndexCfs() == null)
-                index.forceBlockingFlush();
-
-        // wait for the CFS-backed index flushes to complete
-        FBUtilities.waitOnFutures(wait);
-    }
-
-    /**
-     * @return all built indexes (ready to use)
-     */
-    public List<String> getBuiltIndexes()
-    {
-        List<String> indexList = new ArrayList<>();
-
-        for (Map.Entry<ByteBuffer, SecondaryIndex> entry : indexesByColumn.entrySet())
-        {
-            SecondaryIndex index = entry.getValue();
-
-            if (index.isIndexBuilt(entry.getKey()))
-                indexList.add(entry.getValue().getIndexName());
-        }
-
-        return indexList;
-    }
-
-    /**
-     * @return all CFS from indexes which use a backing CFS internally (KEYS)
-     */
-    public Set<ColumnFamilyStore> getIndexesBackedByCfs()
-    {
-        Set<ColumnFamilyStore> cfsList = new HashSet<>();
-
-        for (SecondaryIndex index: allIndexes)
-        {
-            ColumnFamilyStore cfs = index.getIndexCfs();
-            if (cfs != null)
-                cfsList.add(cfs);
-        }
-
-        return cfsList;
-    }
-
-    /**
-     * @return all indexes which do *not* use a backing CFS internally
-     */
-    public Set<SecondaryIndex> getIndexesNotBackedByCfs()
-    {
-        // we use identity map because per row indexes use same instance across many columns
-        Set<SecondaryIndex> indexes = Collections.newSetFromMap(new IdentityHashMap<SecondaryIndex, Boolean>());
-        for (SecondaryIndex index: allIndexes)
-            if (index.getIndexCfs() == null)
-                indexes.add(index);
-        return indexes;
-    }
-
-    /**
-     * @return all of the secondary indexes without distinction to the (non-)backed by secondary ColumnFamilyStore.
-     */
-    public Set<SecondaryIndex> getIndexes()
-    {
-        return allIndexes;
-    }
-
-    /**
-     * @return if there are ANY indexes for this table..
-     */
-    public boolean hasIndexes()
-    {
-        return !indexesByColumn.isEmpty();
-    }
-
-    /**
-     * When building an index against existing data, add the given partition to the index
-     */
-    public void indexPartition(UnfilteredRowIterator partition, OpOrder.Group opGroup, Set<SecondaryIndex> allIndexes, int nowInSec)
-    {
-        Set<PerRowSecondaryIndex> perRowIndexes = perRowIndexes();
-        Set<PerColumnSecondaryIndex> perColumnIndexes = perColumnIndexes();
-
-        if (!perRowIndexes.isEmpty())
-        {
-            // TODO: This is passing the same partition iterator to all perRow index, which means this only
-            // work if there is only one of them. We should change the API so it doesn't work directly on the
-            // partition, but rather on individual rows, so we can do a single iteration on the partition in this
-            // method and pass the rows to index to all indexes.
-
-            // Update entire partition only once per row level index
-            Set<Class<? extends SecondaryIndex>> appliedRowLevelIndexes = new HashSet<>();
-            for (PerRowSecondaryIndex index : perRowIndexes)
-            {
-                if (appliedRowLevelIndexes.add(index.getClass()))
-                    ((PerRowSecondaryIndex)index).index(partition.partitionKey().getKey(), partition);
-            }
-        }
-
-        if (!perColumnIndexes.isEmpty())
-        {
-            DecoratedKey key = partition.partitionKey();
-
-            if (!partition.staticRow().isEmpty())
-            {
-                for (PerColumnSecondaryIndex index : perColumnIndexes)
-                    index.indexRow(key, partition.staticRow(), opGroup, nowInSec);
-            }
-
-            try (RowIterator filtered = UnfilteredRowIterators.filter(partition, nowInSec))
-            {
-                while (filtered.hasNext())
-                {
-                    Row row = filtered.next();
-                    for (PerColumnSecondaryIndex index : perColumnIndexes)
-                        index.indexRow(key, row, opGroup, nowInSec);
-                }
-            }
-        }
-    }
-
-    /**
-     * Delete all data from all indexes for this partition.  For when cleanup rips a partition out entirely.
-     */
-    public void deleteFromIndexes(UnfilteredRowIterator partition, OpOrder.Group opGroup, int nowInSec)
-    {
-        ByteBuffer key = partition.partitionKey().getKey();
-
-        for (PerRowSecondaryIndex index : perRowIndexes())
-            index.delete(key, opGroup);
-
-        Set<PerColumnSecondaryIndex> indexes = perColumnIndexes();
-
-        while (partition.hasNext())
-        {
-            Unfiltered unfiltered = partition.next();
-            if (unfiltered.kind() != Unfiltered.Kind.ROW)
-                continue;
-
-            Row row = (Row) unfiltered;
-            Clustering clustering = row.clustering();
-            if (!row.deletion().isLive())
-                for (PerColumnSecondaryIndex index : indexes)
-                    index.maybeDelete(key, clustering, row.deletion(), opGroup);
-
-            for (Cell cell : row.cells())
-            {
-                for (PerColumnSecondaryIndex index : indexes)
-                {
-                    if (!index.indexes(cell.column()))
-                        continue;
-
-                    ((PerColumnSecondaryIndex) index).deleteForCleanup(key, clustering, cell, opGroup, nowInSec);
-                }
-            }
-        }
-    }
-
-    /**
-     * This helper acts as a closure around the indexManager and updated data
-     * to ensure that down in Memtable's ColumnFamily implementation, the index
-     * can get updated.
-     */
-    public Updater updaterFor(PartitionUpdate update, OpOrder.Group opGroup, int nowInSec)
-    {
-        return (indexesByColumn.isEmpty() && rowLevelIndexMap.isEmpty())
-                ? nullUpdater
-                : new StandardUpdater(update, opGroup, nowInSec);
-    }
-
-    /**
-     * Updated closure with only the modified row key.
-     */
-    public Updater gcUpdaterFor(DecoratedKey key, int nowInSec)
-    {
-        return new GCUpdater(key, nowInSec);
-    }
-
-    /**
-     * Get a list of IndexSearchers from the union of expression index types
-     * @param command the query
-     * @return the searchers needed to query the index
-     */
-    public List<SecondaryIndexSearcher> getIndexSearchersFor(ReadCommand command)
-    {
-        Map<String, Set<ColumnDefinition>> groupByIndexType = new HashMap<>();
-
-        //Group columns by type
-        for (RowFilter.Expression e : command.rowFilter())
-        {
-            SecondaryIndex index = getIndexForColumn(e.column());
-
-            if (index == null || !index.supportsOperator(e.operator()))
-                continue;
-
-            Set<ColumnDefinition> columns = groupByIndexType.get(index.indexTypeForGrouping());
-
-            if (columns == null)
-            {
-                columns = new HashSet<>();
-                groupByIndexType.put(index.indexTypeForGrouping(), columns);
-            }
-
-            columns.add(e.column());
-        }
-
-        List<SecondaryIndexSearcher> indexSearchers = new ArrayList<>(groupByIndexType.size());
-
-        //create searcher per type
-        for (Set<ColumnDefinition> column : groupByIndexType.values())
-            indexSearchers.add(getIndexForColumn(column.iterator().next()).createSecondaryIndexSearcher(column));
-
-        return indexSearchers;
-    }
-
-    public SecondaryIndexSearcher getBestIndexSearcherFor(ReadCommand command)
-    {
-        List<SecondaryIndexSearcher> indexSearchers = getIndexSearchersFor(command);
-
-        if (indexSearchers.isEmpty())
-            return null;
-
-        SecondaryIndexSearcher mostSelective = null;
-        long bestEstimate = Long.MAX_VALUE;
-        for (SecondaryIndexSearcher searcher : indexSearchers)
-        {
-            SecondaryIndex highestSelectivityIndex = searcher.highestSelectivityIndex(command.rowFilter());
-            long estimate = highestSelectivityIndex.estimateResultRows();
-            if (estimate <= bestEstimate)
-            {
-                bestEstimate = estimate;
-                mostSelective = searcher;
-            }
-        }
-        return mostSelective;
-    }
-
-    /**
-     * Validates an union of expression index types. It will throw an {@link InvalidRequestException} if
-     * any of the expressions in the provided clause is not valid for its index implementation.
-     * @param filter the filter to check
-     * @throws org.apache.cassandra.exceptions.InvalidRequestException in case of validation errors
-     */
-    public void validateFilter(RowFilter filter) throws InvalidRequestException
-    {
-        for (RowFilter.Expression expression : filter)
-        {
-            SecondaryIndex index = getIndexForColumn(expression.column());
-            if (index != null && index.supportsOperator(expression.operator()))
-                expression.validateForIndexing();
-        }
-    }
-
-    public Set<SecondaryIndex> getIndexesByNames(Set<String> idxNames)
-    {
-        Set<SecondaryIndex> result = new HashSet<>();
-        for (SecondaryIndex index : allIndexes)
-            if (idxNames.contains(index.getIndexName()))
-                result.add(index);
-        return result;
-    }
-
-    public SecondaryIndex getIndexByName(String idxName)
-    {
-        for (SecondaryIndex index : allIndexes)
-            if (idxName.equals(index.getIndexName()))
-                return index;
-
-        return null;
-    }
-
-    public void setIndexBuilt(Set<String> idxNames)
-    {
-        for (SecondaryIndex index : getIndexesByNames(idxNames))
-            index.setIndexBuilt();
-    }
-
-    public void setIndexRemoved(Set<String> idxNames)
-    {
-        for (SecondaryIndex index : getIndexesByNames(idxNames))
-            index.setIndexRemoved();
-    }
-
-    public void validate(DecoratedKey partitionKey) throws InvalidRequestException
-    {
-        for (SecondaryIndex index : perColumnIndexes())
-            index.validate(partitionKey);
-    }
-
-    public void validate(Clustering clustering) throws InvalidRequestException
-    {
-        for (SecondaryIndex index : perColumnIndexes())
-            index.validate(clustering);
-    }
-
-    public void validate(ColumnDefinition column, ByteBuffer value, CellPath path) throws InvalidRequestException
-    {
-        for (SecondaryIndex index : indexFor(column))
-            index.validate(value, path);
-    }
-
-    static boolean shouldCleanupOldValue(Cell oldCell, Cell newCell)
-    {
-        // If either the value or timestamp is different, then we
-        // should delete from the index. If not, then we can infer that
-        // at least one of the cells is an ExpiringColumn and that the
-        // difference is in the expiry time. In this case, we don't want to
-        // delete the old value from the index as the tombstone we insert
-        // will just hide the inserted value.
-        // Completely identical cells (including expiring columns with
-        // identical ttl & localExpirationTime) will not get this far due
-        // to the oldCell.equals(newCell) in StandardUpdater.update
-        return !oldCell.value().equals(newCell.value()) || oldCell.timestamp() != newCell.timestamp();
-    }
-
-    private Set<String> filterByColumn(Set<String> idxNames)
-    {
-        Set<SecondaryIndex> indexes = getIndexesByNames(idxNames);
-        Set<String> filtered = new HashSet<>(idxNames.size());
-        for (SecondaryIndex candidate : indexes)
-        {
-            for (ColumnDefinition column : baseCfs.metadata.allColumns())
-            {
-                if (candidate.indexes(column))
-                {
-                    filtered.add(candidate.getIndexName());
-                    break;
-                }
-            }
-        }
-        return filtered;
-    }
-
-    public static interface Updater
-    {
-        /** Called when a row with the provided clustering and row infos is inserted */
-        public void maybeIndex(Clustering clustering, long timestamp, int ttl, DeletionTime deletion);
-
-        /** called when constructing the index against pre-existing data */
-        public void insert(Clustering clustering, Cell cell);
-
-        /** called when updating the index from a memtable */
-        public void update(Clustering clustering, Cell oldCell, Cell cell);
-
-        /** called when lazy-updating the index during compaction (CASSANDRA-2897) */
-        public void remove(Clustering clustering, Cell current);
-
-        /** called after memtable updates are complete (CASSANDRA-5397) */
-        public void updateRowLevelIndexes();
-    }
-
-    private final class GCUpdater implements Updater
-    {
-        private final DecoratedKey key;
-        private final int nowInSec;
-
-        public GCUpdater(DecoratedKey key, int nowInSec)
-        {
-            this.key = key;
-            this.nowInSec = nowInSec;
-        }
-
-        public void maybeIndex(Clustering clustering, long timestamp, int ttl, DeletionTime deletion)
-        {
-            throw new UnsupportedOperationException();
-        }
-
-        public void insert(Clustering clustering, Cell cell)
-        {
-            throw new UnsupportedOperationException();
-        }
-
-        public void update(Clustering clustering, Cell oldCell, Cell newCell)
-        {
-            throw new UnsupportedOperationException();
-        }
-
-        public void remove(Clustering clustering, Cell cell)
-        {
-            for (SecondaryIndex index : indexFor(cell.column()))
-            {
-                if (index instanceof PerColumnSecondaryIndex)
-                {
-                    try (OpOrder.Group opGroup = baseCfs.keyspace.writeOrder.start())
-                    {
-                        ((PerColumnSecondaryIndex) index).delete(key.getKey(), clustering, cell, opGroup, nowInSec);
-                    }
-                }
-            }
-        }
-
-        public void updateRowLevelIndexes()
-        {
-            for (SecondaryIndex index : rowLevelIndexMap.values())
-                ((PerRowSecondaryIndex) index).index(key.getKey(), null);
-        }
-    }
-
-    private final class StandardUpdater implements Updater
-    {
-        private final PartitionUpdate update;
-        private final OpOrder.Group opGroup;
-        private final int nowInSec;
-
-        public StandardUpdater(PartitionUpdate update, OpOrder.Group opGroup, int nowInSec)
-        {
-            this.update = update;
-            this.opGroup = opGroup;
-            this.nowInSec = nowInSec;
-        }
-
-        public void maybeIndex(Clustering clustering, long timestamp, int ttl, DeletionTime deletion)
-        {
-            for (PerColumnSecondaryIndex index : perColumnIndexes())
-            {
-                if (timestamp != LivenessInfo.NO_TIMESTAMP)
-                    index.maybeIndex(update.partitionKey().getKey(), clustering, timestamp, ttl, opGroup, nowInSec);
-                if (!deletion.isLive())
-                    index.maybeDelete(update.partitionKey().getKey(), clustering, deletion, opGroup);
-            }
-        }
-
-        public void insert(Clustering clustering, Cell cell)
-        {
-            if (!cell.isLive(nowInSec))
-                return;
-
-            for (SecondaryIndex index : indexFor(cell.column()))
-                if (index instanceof PerColumnSecondaryIndex)
-                    ((PerColumnSecondaryIndex) index).insert(update.partitionKey().getKey(), clustering, cell, opGroup);
-        }
-
-        public void update(Clustering clustering, Cell oldCell, Cell cell)
-        {
-            if (oldCell.equals(cell))
-                return;
-
-            for (SecondaryIndex index : indexFor(cell.column()))
-            {
-                if (index instanceof PerColumnSecondaryIndex)
-                {
-                    if (cell.isLive(nowInSec))
-                    {
-                        ((PerColumnSecondaryIndex) index).update(update.partitionKey().getKey(), clustering, oldCell, cell, opGroup, nowInSec);
-                    }
-                    else
-                    {
-                        // Usually we want to delete the old value from the index, except when
-                        // name/value/timestamp are all equal, but the columns themselves
-                        // are not (as is the case when overwriting expiring columns with
-                        // identical values and ttl) Then, we don't want to delete as the
-                        // tombstone will hide the new value we just inserted; see CASSANDRA-7268
-                        if (shouldCleanupOldValue(oldCell, cell))
-                            ((PerColumnSecondaryIndex) index).delete(update.partitionKey().getKey(), clustering, oldCell, opGroup, nowInSec);
-                    }
-                }
-            }
-        }
-
-        public void remove(Clustering clustering, Cell cell)
-        {
-            throw new UnsupportedOperationException();
-        }
-
-        public void updateRowLevelIndexes()
-        {
-            for (SecondaryIndex index : rowLevelIndexMap.values())
-                ((PerRowSecondaryIndex) index).index(update.partitionKey().getKey(), update.unfilteredIterator());
-        }
-
-    }
-}