You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2015/08/22 01:48:36 UTC

[05/15] cassandra git commit: New 2i API and implementations for built in indexes

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
deleted file mode 100644
index d4ca707..0000000
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.btree.BTreeSet;
-import org.apache.cassandra.utils.FBUtilities;
-
-public abstract class SecondaryIndexSearcher
-{
-    private static final Logger logger = LoggerFactory.getLogger(SecondaryIndexSearcher.class);
-
-    protected final SecondaryIndexManager indexManager;
-    protected final Set<ColumnDefinition> columns;
-    protected final ColumnFamilyStore baseCfs;
-
-    public SecondaryIndexSearcher(SecondaryIndexManager indexManager, Set<ColumnDefinition> columns)
-    {
-        this.indexManager = indexManager;
-        this.columns = columns;
-        this.baseCfs = indexManager.baseCfs;
-    }
-
-    public SecondaryIndex highestSelectivityIndex(RowFilter filter)
-    {
-        RowFilter.Expression expr = highestSelectivityPredicate(filter, false);
-        return expr == null ? null : indexManager.getIndexForColumn(expr.column());
-    }
-
-    public RowFilter.Expression primaryClause(ReadCommand command)
-    {
-        return highestSelectivityPredicate(command.rowFilter(), false);
-    }
-
-    @SuppressWarnings("resource") // Both the OpOrder and 'indexIter' are closed on exception, or through the closing of the result
-                                  // of this method.
-    public UnfilteredPartitionIterator search(ReadCommand command, ReadOrderGroup orderGroup)
-    {
-        RowFilter.Expression primary = highestSelectivityPredicate(command.rowFilter(), true);
-        assert primary != null;
-
-        AbstractSimplePerColumnSecondaryIndex index = (AbstractSimplePerColumnSecondaryIndex)indexManager.getIndexForColumn(primary.column());
-        assert index != null && index.getIndexCfs() != null;
-
-        if (logger.isDebugEnabled())
-            logger.debug("Most-selective indexed predicate is {}", primary);
-
-        DecoratedKey indexKey = index.getIndexKeyFor(primary.getIndexValue());
-
-        UnfilteredRowIterator indexIter = queryIndex(index, indexKey, command, orderGroup);
-        try
-        {
-            return queryDataFromIndex(index, indexKey, UnfilteredRowIterators.filter(indexIter, command.nowInSec()), command, orderGroup);
-        }
-        catch (RuntimeException | Error e)
-        {
-            indexIter.close();
-            throw e;
-        }
-    }
-
-    private UnfilteredRowIterator queryIndex(AbstractSimplePerColumnSecondaryIndex index, DecoratedKey indexKey, ReadCommand command, ReadOrderGroup orderGroup)
-    {
-        ClusteringIndexFilter filter = makeIndexFilter(index, command);
-        CFMetaData indexMetadata = index.getIndexCfs().metadata;
-        return SinglePartitionReadCommand.create(indexMetadata, command.nowInSec(), indexKey, ColumnFilter.all(indexMetadata), filter)
-                                         .queryMemtableAndDisk(index.getIndexCfs(), orderGroup.indexReadOpOrderGroup());
-    }
-
-    private ClusteringIndexFilter makeIndexFilter(AbstractSimplePerColumnSecondaryIndex index, ReadCommand command)
-    {
-        if (command instanceof SinglePartitionReadCommand)
-        {
-            // Note: as yet there's no route to get here - a 2i query *always* uses a
-            // PartitionRangeReadCommand. This is here in preparation for coming changes
-            // in SelectStatement.
-            SinglePartitionReadCommand sprc = (SinglePartitionReadCommand)command;
-            ByteBuffer pk = sprc.partitionKey().getKey();
-            ClusteringIndexFilter filter = sprc.clusteringIndexFilter();
-
-            if (filter instanceof ClusteringIndexNamesFilter)
-            {
-                NavigableSet<Clustering> requested = ((ClusteringIndexNamesFilter)filter).requestedRows();
-                BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(index.getIndexComparator());
-                for (Clustering c : requested)
-                    clusterings.add(index.makeIndexClustering(pk, c, (Cell)null));
-                return new ClusteringIndexNamesFilter(clusterings.build(), filter.isReversed());
-            }
-            else
-            {
-                Slices requested = ((ClusteringIndexSliceFilter)filter).requestedSlices();
-                Slices.Builder builder = new Slices.Builder(index.getIndexComparator());
-                for (Slice slice : requested)
-                    builder.add(index.makeIndexBound(pk, slice.start()), index.makeIndexBound(pk, slice.end()));
-                return new ClusteringIndexSliceFilter(builder.build(), filter.isReversed());
-            }
-        }
-        else
-        {
-
-            DataRange dataRange = ((PartitionRangeReadCommand)command).dataRange();
-            AbstractBounds<PartitionPosition> range = dataRange.keyRange();
-
-            Slice slice = Slice.ALL;
-
-            /*
-             * XXX: If the range requested is a token range, we'll have to start at the beginning (and stop at the end) of
-             * the indexed row unfortunately (which will be inefficient), because we have no way to intuit the smallest possible
-             * key having a given token. A potential fix would be to actually store the token along the key in the indexed row.
-             */
-            if (range.left instanceof DecoratedKey)
-            {
-                // the right hand side of the range may not be a DecoratedKey (for instance if we're paging),
-                // but if it is, we can optimise slightly by restricting the slice
-                if (range.right instanceof DecoratedKey)
-                {
-
-                    DecoratedKey startKey = (DecoratedKey) range.left;
-                    DecoratedKey endKey = (DecoratedKey) range.right;
-
-                    Slice.Bound start = Slice.Bound.BOTTOM;
-                    Slice.Bound end = Slice.Bound.TOP;
-
-                    /*
-                     * For index queries over a range, we can't do a whole lot better than querying everything for the key range, though for
-                     * slice queries where we can slightly restrict the beginning and end.
-                     */
-                    if (!dataRange.isNamesQuery())
-                    {
-                        ClusteringIndexSliceFilter startSliceFilter = ((ClusteringIndexSliceFilter) dataRange.clusteringIndexFilter(startKey));
-                        ClusteringIndexSliceFilter endSliceFilter = ((ClusteringIndexSliceFilter) dataRange.clusteringIndexFilter(endKey));
-
-                        // We can't effectively support reversed queries when we have a range, so we don't support it
-                        // (or through post-query reordering) and shouldn't get there.
-                        assert !startSliceFilter.isReversed() && !endSliceFilter.isReversed();
-
-                        Slices startSlices = startSliceFilter.requestedSlices();
-                        Slices endSlices = endSliceFilter.requestedSlices();
-
-                        if (startSlices.size() > 0)
-                            start = startSlices.get(0).start();
-
-                        if (endSlices.size() > 0)
-                            end = endSlices.get(endSlices.size() - 1).end();
-                    }
-
-                    slice = Slice.make(index.makeIndexBound(startKey.getKey(), start),
-                                       index.makeIndexBound(endKey.getKey(), end));
-                }
-                else
-                {
-                   // otherwise, just start the index slice from the key we do have
-                   slice = Slice.make(index.makeIndexBound(((DecoratedKey)range.left).getKey(), Slice.Bound.BOTTOM),
-                                      Slice.Bound.TOP);
-                }
-            }
-            return new ClusteringIndexSliceFilter(Slices.with(index.getIndexComparator(), slice), false);
-        }
-    }
-
-    protected abstract UnfilteredPartitionIterator queryDataFromIndex(AbstractSimplePerColumnSecondaryIndex index,
-                                                                      DecoratedKey indexKey,
-                                                                      RowIterator indexHits,
-                                                                      ReadCommand command,
-                                                                      ReadOrderGroup orderGroup);
-
-    protected RowFilter.Expression highestSelectivityPredicate(RowFilter filter, boolean includeInTrace)
-    {
-        RowFilter.Expression best = null;
-        int bestMeanCount = Integer.MAX_VALUE;
-        Map<SecondaryIndex, Integer> candidates = new HashMap<>();
-
-        for (RowFilter.Expression expression : filter)
-        {
-            // skip columns belonging to a different index type
-            if (!columns.contains(expression.column()))
-                continue;
-
-            SecondaryIndex index = indexManager.getIndexForColumn(expression.column());
-            if (index == null || index.getIndexCfs() == null || !index.supportsOperator(expression.operator()))
-                continue;
-
-            int columns = index.getIndexCfs().getMeanColumns();
-            candidates.put(index, columns);
-            if (columns < bestMeanCount)
-            {
-                best = expression;
-                bestMeanCount = columns;
-            }
-        }
-
-        if (includeInTrace)
-        {
-            if (best == null)
-                Tracing.trace("No applicable indexes found");
-            else if (Tracing.isTracing())
-                // pay for an additional threadlocal get() rather than build the strings unnecessarily
-                Tracing.trace("Candidate index mean cardinalities are {}. Scanning with {}.", FBUtilities.toString(candidates), indexManager.getIndexForColumn(best.column()).getIndexName());
-        }
-        return best;
-    }
-
-    /**
-     * Post-process the result of an index query. This is done by the coordinator node after it has reconciled
-     * the replica responses.
-     *
-     * @param command The {@code ReadCommand} use for the query.
-     * @param result The index query results to be post-processed
-     * @return The post-processed results
-     */
-    public PartitionIterator postReconciliationProcessing(RowFilter filter, PartitionIterator result)
-    {
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
deleted file mode 100644
index abc5344..0000000
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index.composites;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.cassandra.schema.IndexMetadata;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
-import org.apache.cassandra.db.index.SecondaryIndex;
-import org.apache.cassandra.db.index.SecondaryIndexManager;
-import org.apache.cassandra.db.index.SecondaryIndexSearcher;
-import org.apache.cassandra.db.marshal.CollectionType;
-import org.apache.cassandra.exceptions.ConfigurationException;
-
-/**
- * Base class for internal secondary indexes (this could be merged with AbstractSimplePerColumnSecondaryIndex).
- */
-public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIndex
-{
-    public static CompositesIndex create(IndexMetadata indexDef, CFMetaData baseMetadata)
-    {
-        ColumnDefinition cfDef = indexDef.indexedColumn(baseMetadata);
-        if (cfDef.type.isCollection() && cfDef.type.isMultiCell())
-        {
-            switch (((CollectionType)cfDef.type).kind)
-            {
-                case LIST:
-                    return new CompositesIndexOnCollectionValue();
-                case SET:
-                    return new CompositesIndexOnCollectionKey();
-                case MAP:
-                    if (indexDef.options.containsKey(SecondaryIndex.INDEX_KEYS_OPTION_NAME))
-                        return new CompositesIndexOnCollectionKey();
-                    else if (indexDef.options.containsKey(SecondaryIndex.INDEX_ENTRIES_OPTION_NAME))
-                        return new CompositesIndexOnCollectionKeyAndValue();
-                    else
-                        return new CompositesIndexOnCollectionValue();
-            }
-        }
-
-        switch (cfDef.kind)
-        {
-            case CLUSTERING:
-                return new CompositesIndexOnClusteringKey();
-            case REGULAR:
-                return new CompositesIndexOnRegular();
-            case PARTITION_KEY:
-                return new CompositesIndexOnPartitionKey();
-            //case COMPACT_VALUE:
-            //    return new CompositesIndexOnCompactValue();
-        }
-        throw new AssertionError();
-    }
-
-    public static void addIndexClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, IndexMetadata indexDef)
-    {
-        ColumnDefinition cfDef = indexDef.indexedColumn(baseMetadata);
-        if (cfDef.type.isCollection() && cfDef.type.isMultiCell())
-        {
-            CollectionType type = (CollectionType)cfDef.type;
-            if (type.kind == CollectionType.Kind.LIST
-                || (type.kind == CollectionType.Kind.MAP && indexDef.options.containsKey(SecondaryIndex.INDEX_VALUES_OPTION_NAME)))
-            {
-                CompositesIndexOnCollectionValue.addClusteringColumns(indexMetadata, baseMetadata, cfDef);
-            }
-            else
-            {
-                addGenericClusteringColumns(indexMetadata, baseMetadata, cfDef);
-            }
-        }
-        else if (cfDef.isClusteringColumn())
-        {
-            CompositesIndexOnClusteringKey.addClusteringColumns(indexMetadata, baseMetadata, cfDef);
-        }
-        else
-        {
-            addGenericClusteringColumns(indexMetadata, baseMetadata, cfDef);
-        }
-    }
-
-    protected static void addGenericClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition columnDef)
-    {
-        indexMetadata.addClusteringColumn("partition_key", baseMetadata.partitioner.partitionOrdering());
-        for (ColumnDefinition def : baseMetadata.clusteringColumns())
-            indexMetadata.addClusteringColumn(def.name, def.type);
-    }
-
-    public abstract IndexedEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry);
-
-    public abstract boolean isStale(Row row, ByteBuffer indexValue, int nowInSec);
-
-    public void delete(IndexedEntry entry, OpOrder.Group opGroup, int nowInSec)
-    {
-        Row row = BTreeRow.emptyDeletedRow(entry.indexClustering, new DeletionTime(entry.timestamp, nowInSec));
-        PartitionUpdate upd = PartitionUpdate.singleRowUpdate(indexCfs.metadata, entry.indexValue, row);
-        indexCfs.apply(upd, SecondaryIndexManager.nullUpdater, opGroup, null);
-
-        if (logger.isDebugEnabled())
-            logger.debug("removed index entry for cleaned-up value {}:{}", entry.indexValue, upd);
-    }
-
-    public SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ColumnDefinition> columns)
-    {
-        return new CompositesSearcher(baseCfs.indexManager, columns);
-    }
-
-    public void validateOptions(CFMetaData baseCfm, IndexMetadata indexMetadata) throws ConfigurationException
-    {
-        ColumnDefinition columnDef = indexMetadata.indexedColumn(baseCfm);
-        Map<String, String> options = new HashMap<String, String>(indexMetadata.options);
-
-        // We used to have an option called "prefix_size" so skip it silently for backward compatibility sake.
-        options.remove("prefix_size");
-
-        if (columnDef.type.isCollection())
-        {
-            options.remove(SecondaryIndex.INDEX_VALUES_OPTION_NAME);
-            options.remove(SecondaryIndex.INDEX_KEYS_OPTION_NAME);
-            options.remove(SecondaryIndex.INDEX_ENTRIES_OPTION_NAME);
-        }
-
-        if (!options.isEmpty())
-            throw new ConfigurationException("Unknown options provided for COMPOSITES index: " + options.keySet());
-    }
-
-    public static class IndexedEntry
-    {
-        public final DecoratedKey indexValue;
-        public final Clustering indexClustering;
-        public final long timestamp;
-
-        public final ByteBuffer indexedKey;
-        public final Clustering indexedEntryClustering;
-
-        public IndexedEntry(DecoratedKey indexValue, Clustering indexClustering, long timestamp, ByteBuffer indexedKey, Clustering indexedEntryClustering)
-        {
-            this.indexValue = indexValue;
-            this.indexClustering = indexClustering;
-            this.timestamp = timestamp;
-            this.indexedKey = indexedKey;
-            this.indexedEntryClustering = indexedEntryClustering;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/composites/CompositesIndexIncludingCollectionKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexIncludingCollectionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexIncludingCollectionKey.java
deleted file mode 100644
index 7624c1f..0000000
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexIncludingCollectionKey.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index.composites;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-
-/**
- * Common superclass for indexes that capture collection keys, including
- * indexes on such keys themselves.
- *
- * A cell indexed by this index will have the general form:
- *   ck_0 ... ck_n c_name [col_elt] : v
- * where ck_i are the cluster keys, c_name the CQL3 column name, col_elt the
- * collection element that we want to index (which may or may not be there depending
- * on whether c_name is the collection we're indexing), and v the cell value.
- *
- * Such a cell is indexed if c_name is the indexed collection (in which case we are guaranteed to have
- * col_elt). The index entry can be viewed in the following way:
- *   - the row key is determined by subclasses of this type.
- *   - the cell name will be 'rk ck_0 ... ck_n' where rk is the row key of the initial cell.
- */
-public abstract class CompositesIndexIncludingCollectionKey extends CompositesIndex
-{
-    protected CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path)
-    {
-        CBuilder builder = CBuilder.create(getIndexComparator());
-        builder.add(rowKey);
-        for (int i = 0; i < prefix.size(); i++)
-            builder.add(prefix.get(i));
-        return builder;
-    }
-
-    public IndexedEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
-    {
-        int count = 1 + baseCfs.metadata.clusteringColumns().size();
-        Clustering clustering = indexEntry.clustering();
-        CBuilder builder = CBuilder.create(baseCfs.getComparator());
-        for (int i = 0; i < count - 1; i++)
-            builder.add(clustering.get(i + 1));
-        return new IndexedEntry(indexedValue, clustering, indexEntry.primaryKeyLivenessInfo().timestamp(), clustering.get(0), builder.build());
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
deleted file mode 100644
index cd4aff9..0000000
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index.composites;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-
-/**
- * Index on a CLUSTERING column definition.
- *
- * A cell indexed by this index will have the general form:
- *   ck_0 ... ck_n c_name : v
- * where ck_i are the cluster keys, c_name the last component of the cell
- * composite name (or second to last if collections are in use, but this
- * has no impact) and v the cell value.
- *
- * Such a cell is always indexed by this index (or rather, it is indexed if
- * n >= columnDef.componentIndex, which will always be the case in practice)
- * and it will generate (makeIndexColumnName()) an index entry whose:
- *   - row key will be ck_i (getIndexedValue()) where i == columnDef.componentIndex.
- *   - cell name will
- *       rk ck_0 ... ck_{i-1} ck_{i+1} ck_n
- *     where rk is the row key of the initial cell and i == columnDef.componentIndex.
- */
-public class CompositesIndexOnClusteringKey extends CompositesIndex
-{
-    public static void addClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition columnDef)
-    {
-        indexMetadata.addClusteringColumn("partition_key", baseMetadata.partitioner.partitionOrdering());
-
-        List<ColumnDefinition> cks = baseMetadata.clusteringColumns();
-        for (int i = 0; i < columnDef.position(); i++)
-        {
-            ColumnDefinition def = cks.get(i);
-            indexMetadata.addClusteringColumn(def.name, def.type);
-        }
-        for (int i = columnDef.position() + 1; i < cks.size(); i++)
-        {
-            ColumnDefinition def = cks.get(i);
-            indexMetadata.addClusteringColumn(def.name, def.type);
-        }
-    }
-
-    protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path)
-    {
-        return clustering.get(columnDef.position());
-    }
-
-    protected CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path)
-    {
-        CBuilder builder = CBuilder.create(getIndexComparator());
-        builder.add(rowKey);
-        for (int i = 0; i < Math.min(columnDef.position(), prefix.size()); i++)
-            builder.add(prefix.get(i));
-        for (int i = columnDef.position() + 1; i < prefix.size(); i++)
-            builder.add(prefix.get(i));
-        return builder;
-    }
-
-    public IndexedEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
-    {
-        int ckCount = baseCfs.metadata.clusteringColumns().size();
-
-        Clustering clustering = indexEntry.clustering();
-        CBuilder builder = CBuilder.create(baseCfs.getComparator());
-        for (int i = 0; i < columnDef.position(); i++)
-            builder.add(clustering.get(i + 1));
-
-        builder.add(indexedValue.getKey());
-
-        for (int i = columnDef.position() + 1; i < ckCount; i++)
-            builder.add(clustering.get(i));
-
-        return new IndexedEntry(indexedValue, clustering, indexEntry.primaryKeyLivenessInfo().timestamp(), clustering.get(0), builder.build());
-    }
-
-    @Override
-    protected boolean indexPrimaryKeyColumn()
-    {
-        return true;
-    }
-
-    @Override
-    public boolean indexes(ColumnDefinition c)
-    {
-        // Actual indexing for this index type is done through maybeIndex
-        return false;
-    }
-
-    public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
-    {
-        return !data.hasLiveData(nowInSec);
-    }
-
-    @Override
-    public void maybeIndex(ByteBuffer partitionKey, Clustering clustering, long timestamp, int ttl, OpOrder.Group opGroup, int nowInSec)
-    {
-        if (clustering != Clustering.STATIC_CLUSTERING && clustering.get(columnDef.position()) != null)
-            insert(partitionKey, clustering, null, LivenessInfo.create(indexCfs.metadata, timestamp, ttl, nowInSec), opGroup);
-    }
-
-    @Override
-    public void maybeDelete(ByteBuffer partitionKey, Clustering clustering, DeletionTime deletion, OpOrder.Group opGroup)
-    {
-        if (clustering.get(columnDef.position()) != null && !deletion.isLive())
-            delete(partitionKey, clustering, null, null, deletion, opGroup);
-    }
-
-    @Override
-    public void delete(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup, int nowInSec)
-    {
-        // We only know that one column of the CQL row has been updated/deleted, but we don't know if the
-        // full row has been deleted so we should not do anything. If it ends up that the whole row has
-        // been deleted, it will be eventually cleaned up on read because the entry will be detected stale.
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
deleted file mode 100644
index 50e81c4..0000000
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index.composites;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.cql3.Operator;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.marshal.*;
-
-/**
- * Index on the collection element of the cell name of a collection.
- *
- * The row keys for this index are given by the collection element for
- * indexed columns.
- */
-public class CompositesIndexOnCollectionKey extends CompositesIndexIncludingCollectionKey
-{
-    @Override
-    protected AbstractType<?> getIndexKeyComparator()
-    {
-        return ((CollectionType)columnDef.type).nameComparator();
-    }
-
-    protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path)
-    {
-        return path.get(0);
-    }
-
-    @Override
-    public boolean supportsOperator(Operator operator)
-    {
-        return operator == Operator.CONTAINS_KEY ||
-               operator == Operator.CONTAINS && columnDef.type instanceof SetType;
-    }
-
-    public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
-    {
-        Cell cell = data.getCell(columnDef, CellPath.create(indexValue));
-        return cell == null || !cell.isLive(nowInSec);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKeyAndValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKeyAndValue.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKeyAndValue.java
deleted file mode 100644
index 766f803..0000000
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKeyAndValue.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index.composites;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.marshal.*;
-
-/**
- * Index on the element and value of cells participating in a collection.
- *
- * The row keys for this index are a composite of the collection element
- * and value of indexed columns.
- */
-public class CompositesIndexOnCollectionKeyAndValue extends CompositesIndexIncludingCollectionKey
-{
-    @Override
-    protected AbstractType<?> getIndexKeyComparator()
-    {
-        CollectionType colType = (CollectionType)columnDef.type;
-        return CompositeType.getInstance(colType.nameComparator(), colType.valueComparator());
-    }
-
-    protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path)
-    {
-        return CompositeType.build(path.get(0), cellValue);
-    }
-
-    public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
-    {
-        ByteBuffer[] components = ((CompositeType)getIndexKeyComparator()).split(indexValue);
-        ByteBuffer mapKey = components[0];
-        ByteBuffer mapValue = components[1];
-
-        Cell cell = data.getCell(columnDef, CellPath.create(mapKey));
-        if (cell == null || !cell.isLive(nowInSec))
-            return true;
-
-        AbstractType<?> valueComparator = ((CollectionType)columnDef.type).valueComparator();
-        return valueComparator.compare(mapValue, cell.value()) != 0;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
deleted file mode 100644
index 30391cf..0000000
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index.composites;
-
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.Operator;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.marshal.*;
-
-/**
- * Index the value of a collection cell.
- *
- * This is a lot like an index on REGULAR, except that we also need to make
- * the collection key part of the index entry so that:
- *   1) we don't have to scan the whole collection at query time to know the
- *   entry is stale and if it still satisfies the query.
- *   2) if a collection has multiple time the same value, we need one entry
- *   for each so that if we delete one of the value only we only delete the
- *   entry corresponding to that value.
- */
-public class CompositesIndexOnCollectionValue extends CompositesIndex
-{
-    public static void addClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition columnDef)
-    {
-        addGenericClusteringColumns(indexMetadata, baseMetadata, columnDef);
-
-        // collection key
-        indexMetadata.addClusteringColumn("cell_path", ((CollectionType)columnDef.type).nameComparator());
-    }
-
-    @Override
-    protected AbstractType<?> getIndexKeyComparator()
-    {
-        return ((CollectionType)columnDef.type).valueComparator();
-    }
-
-    protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path)
-    {
-        return cellValue;
-    }
-
-    protected CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path)
-    {
-        CBuilder builder = CBuilder.create(getIndexComparator());
-        builder.add(rowKey);
-        for (int i = 0; i < prefix.size(); i++)
-            builder.add(prefix.get(i));
-
-        // When indexing, cell will be present, but when searching, it won't  (CASSANDRA-7525)
-        if (prefix.size() == baseCfs.metadata.clusteringColumns().size() && path != null)
-            builder.add(path.get(0));
-
-        return builder;
-    }
-
-    public IndexedEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
-    {
-        Clustering clustering = indexEntry.clustering();
-        CBuilder builder = CBuilder.create(baseCfs.getComparator());
-        for (int i = 0; i < baseCfs.getComparator().size(); i++)
-            builder.add(clustering.get(i + 1));
-        return new IndexedEntry(indexedValue, clustering, indexEntry.primaryKeyLivenessInfo().timestamp(), clustering.get(0), builder.build());
-    }
-
-    @Override
-    public boolean supportsOperator(Operator operator)
-    {
-        return operator == Operator.CONTAINS && !(columnDef.type instanceof SetType);
-    }
-
-    public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
-    {
-        ComplexColumnData complexData = data.getComplexColumnData(columnDef);
-        for (Cell cell : complexData)
-        {
-            if (cell.isLive(nowInSec) && ((CollectionType) columnDef.type).valueComparator().compare(indexValue, cell.value()) == 0)
-                return false;
-        }
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
deleted file mode 100644
index a93f8e1..0000000
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index.composites;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-
-/**
- * Index on a PARTITION_KEY column definition.
- *
- * This suppose a composite row key:
- *   rk = rk_0 ... rk_n
- *
- * The corresponding index entry will be:
- *   - index row key will be rk_i (where i == columnDef.componentIndex)
- *   - cell name will be: rk ck
- *     where rk is the fully partition key and ck the clustering keys of the
- *     original cell names (thus excluding the last column name as we want to refer to
- *     the whole CQL3 row, not just the cell itself)
- *
- * Note that contrarily to other type of index, we repeat the indexed value in
- * the index cell name (we use the whole partition key). The reason is that we
- * want to order the index cell name by partitioner first, and skipping a part
- * of the row key would change the order.
- */
-public class CompositesIndexOnPartitionKey extends CompositesIndex
-{
-    protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path)
-    {
-        CompositeType keyComparator = (CompositeType)baseCfs.metadata.getKeyValidator();
-        ByteBuffer[] components = keyComparator.split(rowKey);
-        return components[columnDef.position()];
-    }
-
-    protected CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path)
-    {
-        CBuilder builder = CBuilder.create(getIndexComparator());
-        builder.add(rowKey);
-        for (int i = 0; i < prefix.size(); i++)
-            builder.add(prefix.get(i));
-        return builder;
-    }
-
-    public IndexedEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
-    {
-        int ckCount = baseCfs.metadata.clusteringColumns().size();
-        Clustering clustering = indexEntry.clustering();
-        CBuilder builder = CBuilder.create(baseCfs.getComparator());
-        for (int i = 0; i < ckCount; i++)
-            builder.add(clustering.get(i + 1));
-
-        return new IndexedEntry(indexedValue, clustering, indexEntry.primaryKeyLivenessInfo().timestamp(), clustering.get(0), builder.build());
-    }
-
-    @Override
-    protected boolean indexPrimaryKeyColumn()
-    {
-        return true;
-    }
-
-    @Override
-    public boolean indexes(ColumnDefinition c)
-    {
-        // Actual indexing for this index type is done through maybeIndex
-        return false;
-    }
-
-    public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
-    {
-        return !data.hasLiveData(nowInSec);
-    }
-
-    @Override
-    public void maybeIndex(ByteBuffer partitionKey, Clustering clustering, long timestamp, int ttl, OpOrder.Group opGroup, int nowInSec)
-    {
-        insert(partitionKey, clustering, null, LivenessInfo.create(indexCfs.metadata, timestamp, ttl, nowInSec), opGroup);
-    }
-
-    @Override
-    public void delete(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup, int nowInSec)
-    {
-        // We only know that one column of the CQL row has been updated/deleted, but we don't know if the
-        // full row has been deleted so we should not do anything. If it ends up that the whole row has
-        // been deleted, it will be eventually cleaned up on read because the entry will be detected stale.
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
deleted file mode 100644
index a88502a..0000000
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index.composites;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-
-/**
- * Index on a REGULAR column definition on a composite type.
- *
- * A cell indexed by this index will have the general form:
- *   ck_0 ... ck_n c_name : v
- * where ck_i are the cluster keys, c_name the last component of the cell
- * composite name (or second to last if collections are in use, but this
- * has no impact) and v the cell value.
- *
- * Such a cell is indexed if c_name == columnDef.name, and it will generate
- * (makeIndexColumnName()) an index entry whose:
- *   - row key will be the value v (getIndexedValue()).
- *   - cell name will
- *       rk ck_0 ... ck_n
- *     where rk is the row key of the initial cell. I.e. the index entry store
- *     all the information require to locate back the indexed cell.
- */
-public class CompositesIndexOnRegular extends CompositesIndex
-{
-    protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path)
-    {
-        return cellValue;
-    }
-
-    protected CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path)
-    {
-        CBuilder builder = CBuilder.create(getIndexComparator());
-        builder.add(rowKey);
-        for (int i = 0; i < prefix.size(); i++)
-            builder.add(prefix.get(i));
-        return builder;
-    }
-
-    public IndexedEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
-    {
-        Clustering clustering = indexEntry.clustering();
-        ClusteringComparator baseComparator = baseCfs.getComparator();
-        CBuilder builder = CBuilder.create(baseComparator);
-        for (int i = 0; i < baseComparator.size(); i++)
-            builder.add(clustering.get(i + 1));
-        return new IndexedEntry(indexedValue, clustering, indexEntry.primaryKeyLivenessInfo().timestamp(), clustering.get(0), builder.build());
-    }
-
-    public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
-    {
-        Cell cell = data.getCell(columnDef);
-        return cell == null
-            || !cell.isLive(nowInSec)
-            || columnDef.type.compare(indexValue, cell.value()) != 0;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
deleted file mode 100644
index b76bf7e..0000000
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index.composites;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.index.*;
-import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
-import org.apache.cassandra.utils.btree.BTreeSet;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-
-
-public class CompositesSearcher extends SecondaryIndexSearcher
-{
-    private static final Logger logger = LoggerFactory.getLogger(CompositesSearcher.class);
-
-    public CompositesSearcher(SecondaryIndexManager indexManager, Set<ColumnDefinition> columns)
-    {
-        super(indexManager, columns);
-    }
-
-    private boolean isMatchingEntry(DecoratedKey partitionKey, CompositesIndex.IndexedEntry entry, ReadCommand command)
-    {
-        return command.selects(partitionKey, entry.indexedEntryClustering);
-    }
-
-    protected UnfilteredPartitionIterator queryDataFromIndex(AbstractSimplePerColumnSecondaryIndex secondaryIdx,
-                                                             final DecoratedKey indexKey,
-                                                             final RowIterator indexHits,
-                                                             final ReadCommand command,
-                                                             final ReadOrderGroup orderGroup)
-    {
-        assert indexHits.staticRow() == Rows.EMPTY_STATIC_ROW;
-
-        assert secondaryIdx instanceof CompositesIndex;
-        final CompositesIndex index = (CompositesIndex)secondaryIdx;
-
-        return new UnfilteredPartitionIterator()
-        {
-            private CompositesIndex.IndexedEntry nextEntry;
-
-            private UnfilteredRowIterator next;
-
-            public boolean isForThrift()
-            {
-                return command.isForThrift();
-            }
-
-            public CFMetaData metadata()
-            {
-                return command.metadata();
-            }
-
-            public boolean hasNext()
-            {
-                return prepareNext();
-            }
-
-            public UnfilteredRowIterator next()
-            {
-                if (next == null)
-                    prepareNext();
-
-                UnfilteredRowIterator toReturn = next;
-                next = null;
-                return toReturn;
-            }
-
-            private boolean prepareNext()
-            {
-                if (next != null)
-                    return true;
-
-                if (nextEntry == null)
-                {
-                    if (!indexHits.hasNext())
-                        return false;
-
-                    nextEntry = index.decodeEntry(indexKey, indexHits.next());
-                }
-
-                // Gather all index hits belonging to the same partition and query the data for those hits.
-                // TODO: it's much more efficient to do 1 read for all hits to the same partition than doing
-                // 1 read per index hit. However, this basically mean materializing all hits for a partition
-                // in memory so we should consider adding some paging mechanism. However, index hits should
-                // be relatively small so it's much better than the previous code that was materializing all
-                // *data* for a given partition.
-                BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(baseCfs.getComparator());
-                List<CompositesIndex.IndexedEntry> entries = new ArrayList<>();
-                DecoratedKey partitionKey = baseCfs.decorateKey(nextEntry.indexedKey);
-
-                while (nextEntry != null && partitionKey.getKey().equals(nextEntry.indexedKey))
-                {
-                    // We're queried a slice of the index, but some hits may not match some of the clustering column constraints
-                    if (isMatchingEntry(partitionKey, nextEntry, command))
-                    {
-                        clusterings.add(nextEntry.indexedEntryClustering);
-                        entries.add(nextEntry);
-                    }
-
-                    nextEntry = indexHits.hasNext() ? index.decodeEntry(indexKey, indexHits.next()) : null;
-                }
-
-                // Because we've eliminated entries that don't match the clustering columns, it's possible we added nothing
-                if (clusterings.isEmpty())
-                    return prepareNext();
-
-                // Query the gathered index hits. We still need to filter stale hits from the resulting query.
-                ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(clusterings.build(), false);
-                SinglePartitionReadCommand dataCmd = new SinglePartitionNamesCommand(metadata(),
-                                                                                     command.nowInSec(),
-                                                                                     command.columnFilter(),
-                                                                                     command.rowFilter(),
-                                                                                     DataLimits.NONE,
-                                                                                     partitionKey,
-                                                                                     filter);
-                @SuppressWarnings("resource") // We close right away if empty, and if it's assign to next it will be called either
-                                              // by the next caller of next, or through closing this iterator is this come before.
-                UnfilteredRowIterator dataIter = filterStaleEntries(dataCmd.queryMemtableAndDisk(baseCfs, orderGroup.baseReadOpOrderGroup()),
-                                                                    index,
-                                                                    indexKey.getKey(),
-                                                                    entries,
-                                                                    orderGroup.writeOpOrderGroup(),
-                                                                    command.nowInSec());
-                if (dataIter.isEmpty())
-                {
-                    dataIter.close();
-                    return prepareNext();
-                }
-
-                next = dataIter;
-                return true;
-            }
-
-            public void remove()
-            {
-                throw new UnsupportedOperationException();
-            }
-
-            public void close()
-            {
-                indexHits.close();
-                if (next != null)
-                    next.close();
-            }
-        };
-    }
-
-    private UnfilteredRowIterator filterStaleEntries(UnfilteredRowIterator dataIter,
-                                                     final CompositesIndex index,
-                                                     final ByteBuffer indexValue,
-                                                     final List<CompositesIndex.IndexedEntry> entries,
-                                                     final OpOrder.Group writeOp,
-                                                     final int nowInSec)
-    {
-        return new AlteringUnfilteredRowIterator(dataIter)
-        {
-            private int entriesIdx;
-
-            @Override
-            protected Row computeNext(Row row)
-            {
-                CompositesIndex.IndexedEntry entry = findEntry(row.clustering(), writeOp, nowInSec);
-                if (!index.isStale(row, indexValue, nowInSec))
-                    return row;
-
-                // The entry is stale: delete the entry and ignore otherwise
-                index.delete(entry, writeOp, nowInSec);
-                return null;
-            }
-
-            private CompositesIndex.IndexedEntry findEntry(Clustering clustering, OpOrder.Group writeOp, int nowInSec)
-            {
-                assert entriesIdx < entries.size();
-                while (entriesIdx < entries.size())
-                {
-                    CompositesIndex.IndexedEntry entry = entries.get(entriesIdx++);
-                    // The entries are in clustering order. So that the requested entry should be the
-                    // next entry, the one at 'entriesIdx'. However, we can have stale entries, entries
-                    // that have no corresponding row in the base table typically because of a range
-                    // tombstone or partition level deletion. Delete such stale entries.
-                    int cmp = metadata().comparator.compare(entry.indexedEntryClustering, clustering);
-                    assert cmp <= 0; // this would means entries are not in clustering order, which shouldn't happen
-                    if (cmp == 0)
-                        return entry;
-                    else
-                        index.delete(entry, writeOp, nowInSec);
-                }
-                // entries correspond to the rows we've queried, so we shouldn't have a row that has no corresponding entry.
-                throw new AssertionError();
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
deleted file mode 100644
index 384b29d..0000000
--- a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index.keys;
-
-import java.nio.ByteBuffer;
-import java.util.Set;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
-import org.apache.cassandra.db.index.SecondaryIndexSearcher;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.schema.IndexMetadata;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-
-/**
- * Implements a secondary index for a column family using a second column family.
- * The design uses inverted index http://en.wikipedia.org/wiki/Inverted_index.
- * The row key is the indexed value. For example, if we're indexing a column named
- * city, the index value of city is the row key.
- * The column names are the keys of the records. To see a detailed example, please
- * refer to wikipedia.
- */
-public class KeysIndex extends AbstractSimplePerColumnSecondaryIndex
-{
-    public static void addIndexClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata)
-    {
-        indexMetadata.addClusteringColumn("partition_key", baseMetadata.partitioner.partitionOrdering());
-    }
-
-    @Override
-    public void indexRow(DecoratedKey key, Row row, OpOrder.Group opGroup, int nowInSec)
-    {
-        super.indexRow(key, row, opGroup, nowInSec);
-
-        // This is used when building indexes, in particular when the index is first created. On thrift, this
-        // potentially means the column definition just got created, and so we need to check if's not a "dynamic"
-        // row that actually correspond to the index definition.
-        assert baseCfs.metadata.isCompactTable();
-        if (!row.isStatic())
-        {
-            Clustering clustering = row.clustering();
-            if (clustering.get(0).equals(columnDef.name.bytes))
-            {
-                Cell cell = row.getCell(baseCfs.metadata.compactValueColumn());
-                if (cell != null && cell.isLive(nowInSec))
-                    insert(key.getKey(), clustering, cell, opGroup);
-            }
-        }
-    }
-
-    protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path)
-    {
-        return cellValue;
-    }
-
-    protected CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path)
-    {
-        CBuilder builder = CBuilder.create(getIndexComparator());
-        builder.add(rowKey);
-        return builder;
-    }
-
-    public SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ColumnDefinition> columns)
-    {
-        return new KeysSearcher(baseCfs.indexManager, columns);
-    }
-
-    public void validateOptions(CFMetaData baseCfm, IndexMetadata def) throws ConfigurationException
-    {
-        // no options used
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
deleted file mode 100644
index 4b70dcf..0000000
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.index.keys;
-
-import java.nio.ByteBuffer;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.db.index.*;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-
-public class KeysSearcher extends SecondaryIndexSearcher
-{
-    private static final Logger logger = LoggerFactory.getLogger(KeysSearcher.class);
-
-    public KeysSearcher(SecondaryIndexManager indexManager, Set<ColumnDefinition> columns)
-    {
-        super(indexManager, columns);
-    }
-
-    protected UnfilteredPartitionIterator queryDataFromIndex(final AbstractSimplePerColumnSecondaryIndex index,
-                                                             final DecoratedKey indexKey,
-                                                             final RowIterator indexHits,
-                                                             final ReadCommand command,
-                                                             final ReadOrderGroup orderGroup)
-    {
-        assert indexHits.staticRow() == Rows.EMPTY_STATIC_ROW;
-
-        return new UnfilteredPartitionIterator()
-        {
-            private UnfilteredRowIterator next;
-
-            public boolean isForThrift()
-            {
-                return command.isForThrift();
-            }
-
-            public CFMetaData metadata()
-            {
-                return command.metadata();
-            }
-
-            public boolean hasNext()
-            {
-                return prepareNext();
-            }
-
-            public UnfilteredRowIterator next()
-            {
-                if (next == null)
-                    prepareNext();
-
-                UnfilteredRowIterator toReturn = next;
-                next = null;
-                return toReturn;
-            }
-
-            private boolean prepareNext()
-            {
-                while (next == null && indexHits.hasNext())
-                {
-                    Row hit = indexHits.next();
-                    DecoratedKey key = baseCfs.decorateKey(hit.clustering().get(0));
-
-                    SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(isForThrift(),
-                                                                                           baseCfs.metadata,
-                                                                                           command.nowInSec(),
-                                                                                           command.columnFilter(),
-                                                                                           command.rowFilter(),
-                                                                                           DataLimits.NONE,
-                                                                                           key,
-                                                                                           command.clusteringIndexFilter(key));
-                    @SuppressWarnings("resource") // filterIfStale closes it's iterator if either it materialize it or if it returns null.
-                                                  // Otherwise, we close right away if empty, and if it's assigned to next it will be called either
-                                                  // by the next caller of next, or through closing this iterator is this come before.
-                    UnfilteredRowIterator dataIter = filterIfStale(dataCmd.queryMemtableAndDisk(baseCfs, orderGroup.baseReadOpOrderGroup()),
-                                                                   index,
-                                                                   hit,
-                                                                   indexKey.getKey(),
-                                                                   orderGroup.writeOpOrderGroup(),
-                                                                   isForThrift(),
-                                                                   command.nowInSec());
-
-                    if (dataIter != null)
-                    {
-                        if (dataIter.isEmpty())
-                            dataIter.close();
-                        else
-                            next = dataIter;
-                    }
-                }
-                return next != null;
-            }
-
-            public void remove()
-            {
-                throw new UnsupportedOperationException();
-            }
-
-            public void close()
-            {
-                indexHits.close();
-                if (next != null)
-                    next.close();
-            }
-        };
-    }
-
-    private UnfilteredRowIterator filterIfStale(UnfilteredRowIterator iterator,
-                                                AbstractSimplePerColumnSecondaryIndex index,
-                                                Row indexHit,
-                                                ByteBuffer indexedValue,
-                                                OpOrder.Group writeOp,
-                                                boolean isForThrift,
-                                                int nowInSec)
-    {
-        if (isForThrift)
-        {
-            // The data we got has gone though ThrifResultsMerger, so we're looking for the row whose clustering
-            // is the indexed name. Ans so we need to materialize the partition.
-            ImmutableBTreePartition result = ImmutableBTreePartition.create(iterator);
-            iterator.close();
-            Row data = result.getRow(new Clustering(index.indexedColumn().name.bytes));
-            Cell cell = data == null ? null : data.getCell(baseCfs.metadata.compactValueColumn());
-            return deleteIfStale(iterator.partitionKey(), cell, index, indexHit, indexedValue, writeOp, nowInSec)
-                 ? null
-                 : result.unfilteredIterator();
-        }
-        else
-        {
-            assert iterator.metadata().isCompactTable();
-            Row data = iterator.staticRow();
-            Cell cell = data == null ? null : data.getCell(index.indexedColumn());
-            if (deleteIfStale(iterator.partitionKey(), cell, index, indexHit, indexedValue, writeOp, nowInSec))
-            {
-                iterator.close();
-                return null;
-            }
-            else
-            {
-                return iterator;
-            }
-        }
-    }
-
-    private boolean deleteIfStale(DecoratedKey partitionKey,
-                                  Cell cell,
-                                  AbstractSimplePerColumnSecondaryIndex index,
-                                  Row indexHit,
-                                  ByteBuffer indexedValue,
-                                  OpOrder.Group writeOp,
-                                  int nowInSec)
-    {
-        if (cell == null || !cell.isLive(nowInSec) || index.indexedColumn().type.compare(indexedValue, cell.value()) != 0)
-        {
-            // Index is stale, remove the index entry and ignore
-            index.delete(partitionKey.getKey(),
-                         new Clustering(index.indexedColumn().name.bytes),
-                         indexedValue,
-                         null,
-                         new DeletionTime(indexHit.primaryKeyLivenessInfo().timestamp(), nowInSec),
-                         writeOp);
-            return true;
-        }
-        return false;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
index 0b73292..d279a6b 100644
--- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db.partitions;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -27,17 +26,18 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Rows;
+import org.apache.cassandra.index.transactions.UpdateTransaction;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.btree.BTree;
 import org.apache.cassandra.utils.btree.UpdateFunction;
-import org.apache.cassandra.utils.ObjectSizes;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.concurrent.Locks;
-import org.apache.cassandra.utils.memory.MemtableAllocator;
+import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.memory.HeapAllocator;
-import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
 
 /**
  * A thread-safe and atomic Partition implementation.
@@ -86,7 +86,7 @@ public class AtomicBTreePartition extends AbstractBTreePartition
 
     public AtomicBTreePartition(CFMetaData metadata, DecoratedKey partitionKey, MemtableAllocator allocator)
     {
-        // TODO: is this a potential bug? partition columns may be a subset if we alter columns while it's in memtable
+        // involved in potential bug? partition columns may be a subset if we alter columns while it's in memtable
         super(metadata, partitionKey, metadata.partitionColumns());
         this.allocator = allocator;
         this.ref = EMPTY;
@@ -108,11 +108,10 @@ public class AtomicBTreePartition extends AbstractBTreePartition
      * @return an array containing first the difference in size seen after merging the updates, and second the minimum
      * time detla between updates.
      */
-    public long[] addAllWithSizeDelta(final PartitionUpdate update, OpOrder.Group writeOp, Updater indexer)
+    public long[] addAllWithSizeDelta(final PartitionUpdate update, OpOrder.Group writeOp, UpdateTransaction indexer)
     {
         RowUpdater updater = new RowUpdater(this, allocator, writeOp, indexer);
         DeletionInfo inputDeletionInfoCopy = null;
-
         boolean monitorOwned = false;
         try
         {
@@ -121,12 +120,21 @@ public class AtomicBTreePartition extends AbstractBTreePartition
                 Locks.monitorEnterUnsafe(this);
                 monitorOwned = true;
             }
+
+            indexer.start();
+
             while (true)
             {
                 Holder current = ref;
                 updater.ref = current;
                 updater.reset();
 
+                if (!update.deletionInfo().isLive())
+                    indexer.onPartitionDeletion(update.deletionInfo().getPartitionDeletion());
+
+                if (update.deletionInfo().hasRanges())
+                    update.deletionInfo().rangeIterator(false).forEachRemaining(indexer::onRangeTombstone);
+
                 DeletionInfo deletionInfo;
                 if (update.deletionInfo().mayModify(current.deletionInfo))
                 {
@@ -150,7 +158,6 @@ public class AtomicBTreePartition extends AbstractBTreePartition
 
                 if (tree != null && refUpdater.compareAndSet(this, current, new Holder(tree, deletionInfo, staticRow, newStats)))
                 {
-                    indexer.updateRowLevelIndexes();
                     updater.finish();
                     return new long[]{ updater.dataSize, updater.colUpdateTimeDelta };
                 }
@@ -171,6 +178,7 @@ public class AtomicBTreePartition extends AbstractBTreePartition
         }
         finally
         {
+            indexer.commit();
             if (monitorOwned)
                 Locks.monitorExitUnsafe(this);
         }
@@ -230,7 +238,7 @@ public class AtomicBTreePartition extends AbstractBTreePartition
         final AtomicBTreePartition updating;
         final MemtableAllocator allocator;
         final OpOrder.Group writeOp;
-        final Updater indexer;
+        final UpdateTransaction indexer;
         final int nowInSec;
         Holder ref;
         Row.Builder regularBuilder;
@@ -240,7 +248,7 @@ public class AtomicBTreePartition extends AbstractBTreePartition
         final MemtableAllocator.DataReclaimer reclaimer;
         List<Row> inserted; // TODO: replace with walk of aborted BTree
 
-        private RowUpdater(AtomicBTreePartition updating, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
+        private RowUpdater(AtomicBTreePartition updating, MemtableAllocator allocator, OpOrder.Group writeOp, UpdateTransaction indexer)
         {
             this.updating = updating;
             this.allocator = allocator;
@@ -265,7 +273,7 @@ public class AtomicBTreePartition extends AbstractBTreePartition
         public Row apply(Row insert)
         {
             Row data = Rows.copy(insert, builder(insert.clustering())).build();
-            insertIntoIndexes(data);
+            indexer.onInserted(insert);
 
             this.dataSize += data.dataSize();
             this.heapSize += data.unsharedHeapSizeExcludingData();
@@ -280,10 +288,12 @@ public class AtomicBTreePartition extends AbstractBTreePartition
             Columns mergedColumns = existing.columns().mergeTo(update.columns());
 
             Row.Builder builder = builder(existing.clustering());
-            colUpdateTimeDelta = Math.min(colUpdateTimeDelta, Rows.merge(existing, update, mergedColumns, builder, nowInSec, indexer));
+            colUpdateTimeDelta = Math.min(colUpdateTimeDelta, Rows.merge(existing, update, mergedColumns, builder, nowInSec));
 
             Row reconciled = builder.build();
 
+            indexer.onUpdated(existing, reconciled);
+
             dataSize += reconciled.dataSize() - existing.dataSize();
             heapSize += reconciled.unsharedHeapSizeExcludingData() - existing.unsharedHeapSizeExcludingData();
             if (inserted == null)
@@ -294,41 +304,6 @@ public class AtomicBTreePartition extends AbstractBTreePartition
             return reconciled;
         }
 
-        private void insertIntoIndexes(Row toInsert)
-        {
-            if (indexer == SecondaryIndexManager.nullUpdater)
-                return;
-
-            maybeIndexPrimaryKeyColumns(toInsert);
-            Clustering clustering = toInsert.clustering();
-            for (Cell cell : toInsert.cells())
-                indexer.insert(clustering, cell);
-        }
-
-        private void maybeIndexPrimaryKeyColumns(Row row)
-        {
-            // We want to update a primary key index with the most up to date info contains in that inserted row (if only for
-            // backward compatibility). Note that if there is an index but not a partition key or clustering column one, we've
-            // wasting this work. We might be able to avoid that if row indexing was pushed in the index updater.
-            long timestamp = row.primaryKeyLivenessInfo().timestamp();
-            int ttl = row.primaryKeyLivenessInfo().ttl();
-
-            for (Cell cell : row.cells())
-            {
-                long cellTimestamp = cell.timestamp();
-                if (cell.isLive(nowInSec))
-                {
-                    if (cellTimestamp > timestamp)
-                    {
-                        timestamp = cellTimestamp;
-                        ttl = cell.ttl();
-                    }
-                }
-            }
-
-            indexer.maybeIndex(row.clustering(), timestamp, ttl, row.deletion());
-        }
-
         protected void reset()
         {
             this.dataSize = 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 2f0a4ec..3d2c94b 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -19,30 +19,23 @@ package org.apache.cassandra.db.partitions;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.io.util.DataInputBuffer;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.MergeIterator;
-import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.btree.BTree;
 import org.apache.cassandra.utils.btree.UpdateFunction;