You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:47:50 UTC

[26/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
index ba48350..0b6577e 100644
--- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
@@ -23,11 +23,11 @@ import java.util.concurrent.Future;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.LocalPartitioner;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
@@ -51,8 +51,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
 
         columnDef = columnDefs.iterator().next();
 
-        CellNameType indexComparator = SecondaryIndex.getIndexComparator(baseCfs.metadata, columnDef);
-        CFMetaData indexedCfMetadata = CFMetaData.newIndexMetadata(baseCfs.metadata, columnDef, indexComparator);
+        CFMetaData indexedCfMetadata = SecondaryIndex.newIndexMetadata(baseCfs.metadata, columnDef);
         indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace,
                                                              indexedCfMetadata.cfName,
                                                              new LocalPartitioner(getIndexKeyComparator()),
@@ -65,73 +64,98 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
         return columnDef.type;
     }
 
+    public ColumnDefinition indexedColumn()
+    {
+        return columnDef;
+    }
+
     @Override
     String indexTypeForGrouping()
     {
         return "_internal_";
     }
 
-    protected abstract CellName makeIndexColumnName(ByteBuffer rowKey, Cell cell);
+    protected Clustering makeIndexClustering(ByteBuffer rowKey, Clustering clustering, Cell cell)
+    {
+        return makeIndexClustering(rowKey, clustering, cell == null ? null : cell.path());
+    }
 
-    protected abstract ByteBuffer getIndexedValue(ByteBuffer rowKey, Cell cell);
+    protected Clustering makeIndexClustering(ByteBuffer rowKey, Clustering clustering, CellPath path)
+    {
+        return buildIndexClusteringPrefix(rowKey, clustering, path).build();
+    }
 
-    protected abstract AbstractType getExpressionComparator();
+    protected Slice.Bound makeIndexBound(ByteBuffer rowKey, Slice.Bound bound)
+    {
+        return buildIndexClusteringPrefix(rowKey, bound, null).buildBound(bound.isStart(), bound.isInclusive());
+    }
 
-    public String expressionString(IndexExpression expr)
+    protected abstract CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path);
+
+    protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, Cell cell)
     {
-        return String.format("'%s.%s %s %s'",
-                             baseCfs.name,
-                             getExpressionComparator().getString(expr.column),
-                             expr.operator,
-                             baseCfs.metadata.getColumnDefinition(expr.column).type.getString(expr.value));
+        return cell == null
+             ? getIndexedValue(rowKey, clustering, null, null)
+             : getIndexedValue(rowKey, clustering, cell.value(), cell.path());
     }
 
-    public void delete(ByteBuffer rowKey, Cell cell, OpOrder.Group opGroup)
+    protected abstract ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath cellPath);
+
+    public void delete(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup, int nowInSec)
     {
-        deleteForCleanup(rowKey, cell, opGroup);
+        deleteForCleanup(rowKey, clustering, cell, opGroup, nowInSec);
     }
 
-    public void deleteForCleanup(ByteBuffer rowKey, Cell cell, OpOrder.Group opGroup)
+    public void deleteForCleanup(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup, int nowInSec)
     {
-        if (!cell.isLive())
-            return;
+        delete(rowKey, clustering, cell.value(), cell.path(), new SimpleDeletionTime(cell.livenessInfo().timestamp(), nowInSec), opGroup);
+    }
 
-        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, cell));
-        int localDeletionTime = (int) (System.currentTimeMillis() / 1000);
-        ColumnFamily cfi = ArrayBackedSortedColumns.factory.create(indexCfs.metadata, false, 1);
-        cfi.addTombstone(makeIndexColumnName(rowKey, cell), localDeletionTime, cell.timestamp());
-        indexCfs.apply(valueKey, cfi, SecondaryIndexManager.nullUpdater, opGroup, null);
+    public void delete(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path, DeletionTime deletion, OpOrder.Group opGroup)
+    {
+        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, clustering, cellValue, path));
+        PartitionUpdate upd = new PartitionUpdate(indexCfs.metadata, valueKey, PartitionColumns.NONE, 1);
+        Row.Writer writer = upd.writer();
+        Rows.writeClustering(makeIndexClustering(rowKey, clustering, path), writer);
+        writer.writeRowDeletion(deletion);
+        writer.endOfRow();
+        indexCfs.apply(upd, SecondaryIndexManager.nullUpdater, opGroup, null);
         if (logger.isDebugEnabled())
-            logger.debug("removed index entry for cleaned-up value {}:{}", valueKey, cfi);
+            logger.debug("removed index entry for cleaned-up value {}:{}", valueKey, upd);
     }
 
-    public void insert(ByteBuffer rowKey, Cell cell, OpOrder.Group opGroup)
+    public void insert(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup)
     {
-        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, cell));
-        ColumnFamily cfi = ArrayBackedSortedColumns.factory.create(indexCfs.metadata, false, 1);
-        CellName name = makeIndexColumnName(rowKey, cell);
-        if (cell instanceof ExpiringCell)
-        {
-            ExpiringCell ec = (ExpiringCell) cell;
-            cfi.addColumn(new BufferExpiringCell(name, ByteBufferUtil.EMPTY_BYTE_BUFFER, ec.timestamp(), ec.getTimeToLive(), ec.getLocalDeletionTime()));
-        }
-        else
-        {
-            cfi.addColumn(new BufferCell(name, ByteBufferUtil.EMPTY_BYTE_BUFFER, cell.timestamp()));
-        }
+        insert(rowKey, clustering, cell, cell.livenessInfo(), opGroup);
+    }
+
+    public void insert(ByteBuffer rowKey, Clustering clustering, Cell cell, LivenessInfo info, OpOrder.Group opGroup)
+    {
+        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, clustering, cell));
+
+        PartitionUpdate upd = new PartitionUpdate(indexCfs.metadata, valueKey, PartitionColumns.NONE, 1);
+        Row.Writer writer = upd.writer();
+        Rows.writeClustering(makeIndexClustering(rowKey, clustering, cell), writer);
+        writer.writePartitionKeyLivenessInfo(info);
+        writer.endOfRow();
         if (logger.isDebugEnabled())
-            logger.debug("applying index row {} in {}", indexCfs.metadata.getKeyValidator().getString(valueKey.getKey()), cfi);
+            logger.debug("applying index row {} in {}", indexCfs.metadata.getKeyValidator().getString(valueKey.getKey()), upd);
 
-        indexCfs.apply(valueKey, cfi, SecondaryIndexManager.nullUpdater, opGroup, null);
+        indexCfs.apply(upd, SecondaryIndexManager.nullUpdater, opGroup, null);
     }
 
-    public void update(ByteBuffer rowKey, Cell oldCol, Cell col, OpOrder.Group opGroup)
+    public void update(ByteBuffer rowKey, Clustering clustering, Cell oldCell, Cell cell, OpOrder.Group opGroup, int nowInSec)
     {
         // insert the new value before removing the old one, so we never have a period
-        // where the row is invisible to both queries (the opposite seems preferable); see CASSANDRA-5540                    
-        insert(rowKey, col, opGroup);
-        if (SecondaryIndexManager.shouldCleanupOldValue(oldCol, col))
-            delete(rowKey, oldCol, opGroup);
+        // where the row is invisible to both queries (the opposite seems preferable); see CASSANDRA-5540
+        insert(rowKey, clustering, cell, opGroup);
+        if (SecondaryIndexManager.shouldCleanupOldValue(oldCell, cell))
+            delete(rowKey, clustering, oldCell, opGroup, nowInSec);
+    }
+
+    public boolean indexes(ColumnDefinition column)
+    {
+        return column.name.equals(columnDef.name);
     }
 
     public void removeIndex(ByteBuffer columnName)
@@ -165,6 +189,12 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
        return indexCfs;
     }
 
+    protected ClusteringComparator getIndexComparator()
+    {
+        assert indexCfs != null;
+        return indexCfs.metadata.comparator;
+    }
+
     public String getIndexName()
     {
         return indexCfs.name;
@@ -172,18 +202,43 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
 
     public void reload()
     {
-        indexCfs.metadata.reloadSecondaryIndexMetadata(baseCfs.metadata);
+        indexCfs.metadata.reloadIndexMetadataProperties(baseCfs.metadata);
         indexCfs.reload();
     }
-    
+
     public long estimateResultRows()
     {
         return getIndexCfs().getMeanColumns();
     }
 
-    public boolean validate(ByteBuffer rowKey, Cell cell)
+    public void validate(DecoratedKey partitionKey) throws InvalidRequestException
+    {
+        if (columnDef.kind == ColumnDefinition.Kind.PARTITION_KEY)
+            validateIndexedValue(getIndexedValue(partitionKey.getKey(), null, null, null));
+    }
+
+    public void validate(Clustering clustering) throws InvalidRequestException
+    {
+        if (columnDef.kind == ColumnDefinition.Kind.CLUSTERING_COLUMN)
+            validateIndexedValue(getIndexedValue(null, clustering, null, null));
+    }
+
+    public void validate(ByteBuffer cellValue, CellPath path) throws InvalidRequestException
+    {
+        if (!columnDef.isPrimaryKeyColumn())
+            validateIndexedValue(getIndexedValue(null, null, cellValue, path));
+    }
+
+    private void validateIndexedValue(ByteBuffer value)
+    {
+        if (value != null && value.remaining() >= FBUtilities.MAX_UNSIGNED_SHORT)
+            throw new InvalidRequestException(String.format("Cannot index value of size %d for index %s on %s.%s(%s) (maximum allowed size=%d)",
+                                                            value.remaining(), getIndexName(), baseKeyspace(), baseTable(), columnDef.name, FBUtilities.MAX_UNSIGNED_SHORT));
+    }
+
+    @Override
+    public String toString()
     {
-        return getIndexedValue(rowKey, cell).remaining() < FBUtilities.MAX_UNSIGNED_SHORT
-            && makeIndexColumnName(rowKey, cell).toByteBuffer().remaining() < FBUtilities.MAX_UNSIGNED_SHORT;
+        return String.format("%s(%s)", baseTable(), columnDef.name);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
index ba902ec..ab8e688 100644
--- a/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
@@ -19,9 +19,9 @@ package org.apache.cassandra.db.index;
 
 import java.nio.ByteBuffer;
 
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.utils.FBUtilities;
 
 /**
  * Base class for Secondary indexes that implement a unique index per column
@@ -35,12 +35,26 @@ public abstract class PerColumnSecondaryIndex extends SecondaryIndex
      * @param rowKey the underlying row key which is indexed
      * @param col all the column info
      */
-    public abstract void delete(ByteBuffer rowKey, Cell col, OpOrder.Group opGroup);
+    public abstract void delete(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup, int nowInSec);
 
     /**
      * Called when a column has been removed due to a cleanup operation.
      */
-    public abstract void deleteForCleanup(ByteBuffer rowKey, Cell col, OpOrder.Group opGroup);
+    public abstract void deleteForCleanup(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup, int nowInSec);
+
+    /**
+     * For indexes on the primary key, index the given PK.
+     */
+    public void maybeIndex(ByteBuffer partitionKey, Clustering clustering, long timestamp, int ttl, OpOrder.Group opGroup, int nowInSec)
+    {
+    }
+
+    /**
+     * For indexes on the primary key, delete the given PK.
+     */
+    public void maybeDelete(ByteBuffer partitionKey, Clustering clustering, DeletionTime deletion, OpOrder.Group opGroup)
+    {
+    }
 
     /**
      * insert a column to the index
@@ -48,7 +62,7 @@ public abstract class PerColumnSecondaryIndex extends SecondaryIndex
      * @param rowKey the underlying row key which is indexed
      * @param col all the column info
      */
-    public abstract void insert(ByteBuffer rowKey, Cell col, OpOrder.Group opGroup);
+    public abstract void insert(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup);
 
     /**
      * update a column from the index
@@ -57,20 +71,44 @@ public abstract class PerColumnSecondaryIndex extends SecondaryIndex
      * @param oldCol the previous column info
      * @param col all the column info
      */
-    public abstract void update(ByteBuffer rowKey, Cell oldCol, Cell col, OpOrder.Group opGroup);
+    public abstract void update(ByteBuffer rowKey, Clustering clustering, Cell oldCell, Cell cell, OpOrder.Group opGroup, int nowInSec);
 
-    public String getNameForSystemKeyspace(ByteBuffer column)
+    protected boolean indexPrimaryKeyColumn()
     {
-        return getIndexName();
+        return false;
     }
 
-    public boolean validate(ByteBuffer rowKey, Cell cell)
+    public void indexRow(DecoratedKey key, Row row, OpOrder.Group opGroup, int nowInSec)
     {
-        return validate(cell);
+        Clustering clustering = row.clustering();
+        if (indexPrimaryKeyColumn())
+        {
+            // Same as in AtomicBTreePartition.maybeIndexPrimaryKeyColumn
+            long timestamp = row.primaryKeyLivenessInfo().timestamp();
+            int ttl = row.primaryKeyLivenessInfo().ttl();
+
+            for (Cell cell : row)
+            {
+                if (cell.isLive(nowInSec) && cell.livenessInfo().timestamp() > timestamp)
+                {
+                    timestamp = cell.livenessInfo().timestamp();
+                    ttl = cell.livenessInfo().ttl();
+                }
+            }
+            maybeIndex(key.getKey(), clustering, timestamp, ttl, opGroup, nowInSec);
+        }
+        for (Cell cell : row)
+        {
+            if (!indexes(cell.column()))
+                continue;
+
+            if (cell.isLive(nowInSec))
+                insert(key.getKey(), clustering, cell, opGroup);
+        }
     }
 
-    public boolean validate(Cell cell)
+    public String getNameForSystemKeyspace(ByteBuffer column)
     {
-        return cell.value().remaining() < FBUtilities.MAX_UNSIGNED_SHORT;
+        return getIndexName();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
index f6f0e8d..502b213 100644
--- a/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
@@ -20,11 +20,9 @@ package org.apache.cassandra.db.index;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
@@ -33,19 +31,16 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 public abstract class PerRowSecondaryIndex extends SecondaryIndex
 {
     /**
-     * Index the given row.
-     *
-     * @param rowKey the row key
-     * @param cf the cf data to be indexed
+     * Index the given partition.
      */
-    public abstract void index(ByteBuffer rowKey, ColumnFamily cf);
+    public abstract void index(ByteBuffer key, UnfilteredRowIterator atoms);
 
     /**
      * cleans up deleted columns from cassandra cleanup compaction
      *
      * @param key
      */
-    public abstract void delete(DecoratedKey key, OpOrder.Group opGroup);
+    public abstract void delete(ByteBuffer key, OpOrder.Group opGroup);
 
     public String getNameForSystemKeyspace(ByteBuffer columnName)
     {
@@ -58,15 +53,4 @@ public abstract class PerRowSecondaryIndex extends SecondaryIndex
             throw new RuntimeException(e);
         }
     }
-
-
-    public boolean validate(ByteBuffer rowKey, Cell cell)
-    {
-        return validate(cell);
-    }
-
-    public boolean validate(Cell cell)
-    {
-        return true;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
index 11626d6..7552fd5 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
@@ -32,15 +32,11 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.IndexType;
 import org.apache.cassandra.cql3.Operator;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.SimpleDenseCellNameType;
 import org.apache.cassandra.db.index.composites.CompositesIndex;
 import org.apache.cassandra.db.index.keys.KeysIndex;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -48,6 +44,7 @@ import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.LocalByPartionerType;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
@@ -168,7 +165,7 @@ public abstract class SecondaryIndex
      * @param columns the list of columns which belong to this index type
      * @return the secondary index search impl
      */
-    protected abstract SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ByteBuffer> columns);
+    protected abstract SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ColumnDefinition> columns);
 
     /**
      * Forces this indexes' in memory data to disk
@@ -308,22 +305,12 @@ public abstract class SecondaryIndex
     }
 
     /**
-     * Returns true if the provided cell name is indexed by this secondary index.
+     * Returns true if the provided column is indexed by this secondary index.
      *
      * The default implementation checks whether the name is one the columnDef name,
      * but this should be overriden but subclass if needed.
      */
-    public abstract boolean indexes(CellName name);
-
-    /**
-     * Returns true if the provided column definition is indexed by this secondary index.
-     *
-     * The default implementation checks whether it is contained in this index column definitions set.
-     */
-    public boolean indexes(ColumnDefinition cdef)
-    {
-        return columnDefs.contains(cdef);
-    }
+    public abstract boolean indexes(ColumnDefinition column);
 
     /**
      * This is the primary way to create a secondary index instance for a CF column.
@@ -371,28 +358,45 @@ public abstract class SecondaryIndex
         return index;
     }
 
-    public abstract boolean validate(ByteBuffer rowKey, Cell cell);
+    public abstract void validate(DecoratedKey partitionKey) throws InvalidRequestException;
+    public abstract void validate(Clustering clustering) throws InvalidRequestException;
+    public abstract void validate(ByteBuffer cellValue, CellPath path) throws InvalidRequestException;
 
     public abstract long estimateResultRows();
 
+    protected String baseKeyspace()
+    {
+        return baseCfs.metadata.ksName;
+    }
+
+    protected String baseTable()
+    {
+        return baseCfs.metadata.cfName;
+    }
+
     /**
-     * Returns the index comparator for index backed by CFS, or null.
-     *
-     * Note: it would be cleaner to have this be a member method. However we need this when opening indexes
-     * sstables, but by then the CFS won't be fully initiated, so the SecondaryIndex object won't be accessible.
+     * Create the index metadata for the index on a given column of a given table.
      */
-    public static CellNameType getIndexComparator(CFMetaData baseMetadata, ColumnDefinition cdef)
+    public static CFMetaData newIndexMetadata(CFMetaData baseMetadata, ColumnDefinition def)
     {
-        switch (cdef.getIndexType())
+        if (def.getIndexType() == IndexType.CUSTOM)
+            return null;
+
+        CFMetaData.Builder builder = CFMetaData.Builder.create(baseMetadata.ksName, baseMetadata.indexColumnFamilyName(def))
+                                                       .withId(baseMetadata.cfId)
+                                                       .addPartitionKey(def.name, def.type);
+
+        if (def.getIndexType() == IndexType.COMPOSITES)
         {
-            case KEYS:
-                return new SimpleDenseCellNameType(keyComparator);
-            case COMPOSITES:
-                return CompositesIndex.getIndexComparator(baseMetadata, cdef);
-            case CUSTOM:
-                return null;
+            CompositesIndex.addIndexClusteringColumns(builder, baseMetadata, def);
         }
-        throw new AssertionError();
+        else
+        {
+            assert def.getIndexType() == IndexType.KEYS;
+            KeysIndex.addIndexClusteringColumns(builder, baseMetadata, def);
+        }
+
+        return builder.build().reloadIndexMetadataProperties(baseMetadata);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
index 916c286..a117f6d 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.db.compaction.CompactionInfo;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.compaction.CompactionInterruptedException;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
 /**
@@ -45,7 +46,7 @@ public class SecondaryIndexBuilder extends CompactionInfo.Holder
         this.cfs = cfs;
         this.idxNames = idxNames;
         this.iter = iter;
-        compactionId = UUIDGen.getTimeUUID();
+        this.compactionId = UUIDGen.getTimeUUID();
     }
 
     public CompactionInfo getCompactionInfo()
@@ -59,21 +60,26 @@ public class SecondaryIndexBuilder extends CompactionInfo.Holder
 
     public void build()
     {
-        while (iter.hasNext())
-        {
-            if (isStopRequested())
-                throw new CompactionInterruptedException(getCompactionInfo());
-            DecoratedKey key = iter.next();
-            Keyspace.indexRow(key, cfs, idxNames);
-        }
-
         try
         {
-            iter.close();
+            while (iter.hasNext())
+            {
+                if (isStopRequested())
+                    throw new CompactionInterruptedException(getCompactionInfo());
+                DecoratedKey key = iter.next();
+                Keyspace.indexPartition(key, cfs, idxNames);
+            }
         }
-        catch (IOException e)
+        finally
         {
-            throw new RuntimeException(e);
+            try
+            {
+                iter.close();
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index 4c1bf45..1bd5452 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.db.index;
 
 import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -40,20 +39,14 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.IndexType;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.IndexExpression;
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.filter.ExtendedFilter;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
@@ -67,11 +60,10 @@ public class SecondaryIndexManager
 
     public static final Updater nullUpdater = new Updater()
     {
-        public void insert(Cell cell) { }
-
-        public void update(Cell oldCell, Cell cell) { }
-
-        public void remove(Cell current) { }
+        public void maybeIndex(Clustering clustering, long timestamp, int ttl, DeletionTime deletion) {}
+        public void insert(Clustering clustering, Cell cell) {}
+        public void update(Clustering clustering, Cell oldCell, Cell cell) {}
+        public void remove(Clustering clustering, Cell current) {}
 
         public void updateRowLevelIndexes() {}
     };
@@ -144,6 +136,24 @@ public class SecondaryIndexManager
         return names;
     }
 
+    public Set<PerColumnSecondaryIndex> perColumnIndexes()
+    {
+        Set<PerColumnSecondaryIndex> s = new HashSet<>();
+        for (SecondaryIndex index : allIndexes)
+            if (index instanceof PerColumnSecondaryIndex)
+                s.add((PerColumnSecondaryIndex)index);
+        return s;
+    }
+
+    public Set<PerRowSecondaryIndex> perRowIndexes()
+    {
+        Set<PerRowSecondaryIndex> s = new HashSet<>();
+        for (SecondaryIndex index : allIndexes)
+            if (index instanceof PerRowSecondaryIndex)
+                s.add((PerRowSecondaryIndex)index);
+        return s;
+    }
+
     /**
      * Does a full, blocking rebuild of the indexes specified by columns from the sstables.
      * Does nothing if columns is empty.
@@ -171,26 +181,20 @@ public class SecondaryIndexManager
         logger.info("Index build of {} complete", idxNames);
     }
 
-    public boolean indexes(CellName name, Set<SecondaryIndex> indexes)
+    public boolean indexes(ColumnDefinition column)
     {
-        boolean matching = false;
-        for (SecondaryIndex index : indexes)
-        {
-            if (index.indexes(name))
-            {
-                matching = true;
-                break;
-            }
-        }
-        return matching;
+        for (SecondaryIndex index : allIndexes)
+            if (index.indexes(column))
+                return true;
+        return false;
     }
 
-    public Set<SecondaryIndex> indexFor(CellName name, Set<SecondaryIndex> indexes)
+    private Set<SecondaryIndex> indexFor(ColumnDefinition column)
     {
         Set<SecondaryIndex> matching = null;
-        for (SecondaryIndex index : indexes)
+        for (SecondaryIndex index : allIndexes)
         {
-            if (index.indexes(name))
+            if (index.indexes(column))
             {
                 if (matching == null)
                     matching = new HashSet<>();
@@ -200,36 +204,6 @@ public class SecondaryIndexManager
         return matching == null ? Collections.<SecondaryIndex>emptySet() : matching;
     }
 
-    public boolean indexes(Cell cell)
-    {
-        return indexes(cell.name());
-    }
-
-    public boolean indexes(CellName name)
-    {
-        return indexes(name, allIndexes);
-    }
-
-    public Set<SecondaryIndex> indexFor(CellName name)
-    {
-        return indexFor(name, allIndexes);
-    }
-
-    /**
-     * @return true if at least one of the indexes can handle the clause.
-     */
-    public boolean hasIndexFor(List<IndexExpression> clause)
-    {
-        if (clause == null || clause.isEmpty())
-            return false;
-
-        for (SecondaryIndexSearcher searcher : getIndexSearchersForQuery(clause))
-            if (searcher.canHandleIndexClause(clause))
-                return true;
-
-        return false;
-    }
-
     /**
      * Removes a existing index
      * @param column the indexed column to remove
@@ -325,9 +299,9 @@ public class SecondaryIndexManager
      * @param column the name of indexes column
      * @return the index
      */
-    public SecondaryIndex getIndexForColumn(ByteBuffer column)
+    public SecondaryIndex getIndexForColumn(ColumnDefinition column)
     {
-        return indexesByColumn.get(column);
+        return indexesByColumn.get(column.name.bytes);
     }
 
     /**
@@ -427,105 +401,125 @@ public class SecondaryIndexManager
     }
 
     /**
-     * When building an index against existing data, add the given row to the index
-     *
-     * @param key the row key
-     * @param cf the current rows data
+     * When building an index against existing data, add the given partition to the index
      */
-    public void indexRow(ByteBuffer key, ColumnFamily cf, OpOrder.Group opGroup)
+    public void indexPartition(UnfilteredRowIterator partition, OpOrder.Group opGroup, Set<SecondaryIndex> allIndexes, int nowInSec)
     {
-        // Update entire row only once per row level index
-        Set<Class<? extends SecondaryIndex>> appliedRowLevelIndexes = null;
+        Set<PerRowSecondaryIndex> perRowIndexes = perRowIndexes();
+        Set<PerColumnSecondaryIndex> perColumnIndexes = perColumnIndexes();
 
-        for (SecondaryIndex index : allIndexes)
+        if (!perRowIndexes.isEmpty())
         {
-            if (index instanceof PerRowSecondaryIndex)
+            // TODO: This is passing the same partition iterator to all perRow index, which means this only
+            // work if there is only one of them. We should change the API so it doesn't work directly on the
+            // partition, but rather on individual rows, so we can do a single iteration on the partition in this
+            // method and pass the rows to index to all indexes.
+
+            // Update entire partition only once per row level index
+            Set<Class<? extends SecondaryIndex>> appliedRowLevelIndexes = new HashSet<>();
+            for (PerRowSecondaryIndex index : perRowIndexes)
             {
-                if (appliedRowLevelIndexes == null)
-                    appliedRowLevelIndexes = new HashSet<>();
-
                 if (appliedRowLevelIndexes.add(index.getClass()))
-                    ((PerRowSecondaryIndex)index).index(key, cf);
+                    ((PerRowSecondaryIndex)index).index(partition.partitionKey().getKey(), partition);
             }
-            else
+        }
+
+        if (!perColumnIndexes.isEmpty())
+        {
+            DecoratedKey key = partition.partitionKey();
+
+            if (!partition.staticRow().isEmpty())
             {
-                for (Cell cell : cf)
-                    if (cell.isLive() && index.indexes(cell.name()))
-                        ((PerColumnSecondaryIndex) index).insert(key, cell, opGroup);
+                for (PerColumnSecondaryIndex index : perColumnIndexes)
+                    index.indexRow(key, partition.staticRow(), opGroup, nowInSec);
+            }
+
+            try (RowIterator filtered = UnfilteredRowIterators.filter(partition, nowInSec))
+            {
+                while (filtered.hasNext())
+                {
+                    Row row = filtered.next();
+                    for (PerColumnSecondaryIndex index : perColumnIndexes)
+                        index.indexRow(key, row, opGroup, nowInSec);
+                }
             }
         }
     }
 
     /**
-     * Delete all columns from all indexes for this row.  For when cleanup rips a row out entirely.
-     *
-     * @param key the row key
-     * @param indexedColumnsInRow all column names in row
+     * Delete all data from all indexes for this partition.  For when cleanup rips a partition out entirely.
      */
-    public void deleteFromIndexes(DecoratedKey key, List<Cell> indexedColumnsInRow, OpOrder.Group opGroup)
+    public void deleteFromIndexes(UnfilteredRowIterator partition, OpOrder.Group opGroup, int nowInSec)
     {
-        // Update entire row only once per row level index
-        Set<Class<? extends SecondaryIndex>> cleanedRowLevelIndexes = null;
+        ByteBuffer key = partition.partitionKey().getKey();
 
-        for (Cell cell : indexedColumnsInRow)
+        for (PerRowSecondaryIndex index : perRowIndexes())
+            index.delete(key, opGroup);
+
+        Set<PerColumnSecondaryIndex> indexes = perColumnIndexes();
+
+        while (partition.hasNext())
         {
-            for (SecondaryIndex index : indexFor(cell.name()))
+            Unfiltered unfiltered = partition.next();
+            if (unfiltered.kind() != Unfiltered.Kind.ROW)
+                continue;
+
+            Row row = (Row) unfiltered;
+            Clustering clustering = row.clustering();
+            if (!row.deletion().isLive())
+                for (PerColumnSecondaryIndex index : indexes)
+                    index.maybeDelete(key, clustering, row.deletion(), opGroup);
+            for (Cell cell : row)
             {
-                if (index instanceof PerRowSecondaryIndex)
+                for (PerColumnSecondaryIndex index : indexes)
                 {
-                    if (cleanedRowLevelIndexes == null)
-                        cleanedRowLevelIndexes = new HashSet<>();
-                    if (cleanedRowLevelIndexes.add(index.getClass()))
-                        ((PerRowSecondaryIndex) index).delete(key, opGroup);
-                }
-                else
-                {
-                    ((PerColumnSecondaryIndex) index).deleteForCleanup(key.getKey(), cell, opGroup);
+                    if (!index.indexes(cell.column()))
+                        continue;
+
+                    ((PerColumnSecondaryIndex) index).deleteForCleanup(key, clustering, cell, opGroup, nowInSec);
                 }
             }
         }
     }
 
     /**
-     * This helper acts as a closure around the indexManager
-     * and updated cf data to ensure that down in
-     * Memtable's ColumnFamily implementation, the index
-     * can get updated. Note: only a CF backed by AtomicSortedColumns implements
-     * this behaviour fully, other types simply ignore the index updater.
+     * This helper acts as a closure around the indexManager and updated data
+     * to ensure that down in Memtable's ColumnFamily implementation, the index
+     * can get updated.
      */
-    public Updater updaterFor(DecoratedKey key, ColumnFamily cf, OpOrder.Group opGroup)
+    public Updater updaterFor(PartitionUpdate update, OpOrder.Group opGroup, int nowInSec)
     {
         return (indexesByColumn.isEmpty() && rowLevelIndexMap.isEmpty())
                 ? nullUpdater
-                : new StandardUpdater(key, cf, opGroup);
+                : new StandardUpdater(update, opGroup, nowInSec);
     }
 
     /**
      * Updated closure with only the modified row key.
      */
-    public Updater gcUpdaterFor(DecoratedKey key)
+    public Updater gcUpdaterFor(DecoratedKey key, int nowInSec)
     {
-        return new GCUpdater(key);
+        return new GCUpdater(key, nowInSec);
     }
 
     /**
      * Get a list of IndexSearchers from the union of expression index types
-     * @param clause the query clause
+     * @param command the query
      * @return the searchers needed to query the index
      */
-    public List<SecondaryIndexSearcher> getIndexSearchersForQuery(List<IndexExpression> clause)
+    public List<SecondaryIndexSearcher> getIndexSearchersFor(ReadCommand command)
     {
-        Map<String, Set<ByteBuffer>> groupByIndexType = new HashMap<>();
+        Map<String, Set<ColumnDefinition>> groupByIndexType = new HashMap<>();
 
         //Group columns by type
-        for (IndexExpression ix : clause)
+        for (RowFilter.Expression e : command.rowFilter())
         {
-            SecondaryIndex index = getIndexForColumn(ix.column);
+            SecondaryIndex index = getIndexForColumn(e.column());
 
-            if (index == null || !index.supportsOperator(ix.operator))
+            if (index == null || !index.supportsOperator(e.operator()))
                 continue;
 
-            Set<ByteBuffer> columns = groupByIndexType.get(index.indexTypeForGrouping());
+            Set<ColumnDefinition> columns = groupByIndexType.get(index.indexTypeForGrouping());
 
             if (columns == null)
             {
@@ -533,107 +527,54 @@ public class SecondaryIndexManager
                 groupByIndexType.put(index.indexTypeForGrouping(), columns);
             }
 
-            columns.add(ix.column);
+            columns.add(e.column());
         }
 
         List<SecondaryIndexSearcher> indexSearchers = new ArrayList<>(groupByIndexType.size());
 
         //create searcher per type
-        for (Set<ByteBuffer> column : groupByIndexType.values())
+        for (Set<ColumnDefinition> column : groupByIndexType.values())
             indexSearchers.add(getIndexForColumn(column.iterator().next()).createSecondaryIndexSearcher(column));
 
         return indexSearchers;
     }
 
-    /**
-     * Validates an union of expression index types. It will throw a {@link RuntimeException} if
-     * any of the expressions in the provided clause is not valid for its index implementation.
-     * @param clause the query clause
-     * @throws org.apache.cassandra.exceptions.InvalidRequestException in case of validation errors
-     */
-    public void validateIndexSearchersForQuery(List<IndexExpression> clause) throws InvalidRequestException
+    public SecondaryIndexSearcher getBestIndexSearcherFor(ReadCommand command)
     {
-        // Group by index type
-        Map<String, Set<IndexExpression>> expressionsByIndexType = new HashMap<>();
-        Map<String, Set<ByteBuffer>> columnsByIndexType = new HashMap<>();
-        for (IndexExpression indexExpression : clause)
-        {
-            SecondaryIndex index = getIndexForColumn(indexExpression.column);
-
-            if (index == null)
-                continue;
-
-            String canonicalIndexName = index.getClass().getCanonicalName();
-            Set<IndexExpression> expressions = expressionsByIndexType.get(canonicalIndexName);
-            Set<ByteBuffer> columns = columnsByIndexType.get(canonicalIndexName);
-            if (expressions == null)
-            {
-                expressions = new HashSet<>();
-                columns = new HashSet<>();
-                expressionsByIndexType.put(canonicalIndexName, expressions);
-                columnsByIndexType.put(canonicalIndexName, columns);
-            }
+        List<SecondaryIndexSearcher> indexSearchers = getIndexSearchersFor(command);
 
-            expressions.add(indexExpression);
-            columns.add(indexExpression.column);
-        }
-
-        // Validate
-        boolean haveSupportedIndexLookup = false;
-        for (Map.Entry<String, Set<IndexExpression>> expressions : expressionsByIndexType.entrySet())
-        {
-            Set<ByteBuffer> columns = columnsByIndexType.get(expressions.getKey());
-            SecondaryIndex secondaryIndex = getIndexForColumn(columns.iterator().next());
-            SecondaryIndexSearcher searcher = secondaryIndex.createSecondaryIndexSearcher(columns);
-            for (IndexExpression expression : expressions.getValue())
-            {
-                searcher.validate(expression);
-                haveSupportedIndexLookup |= secondaryIndex.supportsOperator(expression.operator);
-            }
-        }
+        if (indexSearchers.isEmpty())
+            return null;
 
-        if (!haveSupportedIndexLookup)
+        SecondaryIndexSearcher mostSelective = null;
+        long bestEstimate = Long.MAX_VALUE;
+        for (SecondaryIndexSearcher searcher : indexSearchers)
         {
-            // build the error message
-            int i = 0;
-            StringBuilder sb = new StringBuilder("No secondary indexes on the restricted columns support the provided operators: ");
-            for (Map.Entry<String, Set<IndexExpression>> expressions : expressionsByIndexType.entrySet())
+            SecondaryIndex highestSelectivityIndex = searcher.highestSelectivityIndex(command.rowFilter());
+            long estimate = highestSelectivityIndex.estimateResultRows();
+            if (estimate <= bestEstimate)
             {
-                for (IndexExpression expression : expressions.getValue())
-                {
-                    if (i++ > 0)
-                        sb.append(", ");
-                    sb.append("'");
-                    String columnName;
-                    try
-                    {
-                        columnName = ByteBufferUtil.string(expression.column);
-                    }
-                    catch (CharacterCodingException ex)
-                    {
-                        columnName = "<unprintable>";
-                    }
-                    sb.append(columnName).append(" ").append(expression.operator).append(" <value>").append("'");
-                }
+                bestEstimate = estimate;
+                mostSelective = searcher;
             }
-
-            throw new InvalidRequestException(sb.toString());
         }
+        return mostSelective;
     }
 
     /**
-     * Performs a search across a number of column indexes
-     *
-     * @param filter the column range to restrict to
-     * @return found indexed rows
+     * Validates an union of expression index types. It will throw an {@link InvalidRequestException} if
+     * any of the expressions in the provided clause is not valid for its index implementation.
+     * @param filter the filter to check
+     * @throws org.apache.cassandra.exceptions.InvalidRequestException in case of validation errors
      */
-    public List<Row> search(ExtendedFilter filter)
+    public void validateFilter(RowFilter filter) throws InvalidRequestException
     {
-        SecondaryIndexSearcher mostSelective = getHighestSelectivityIndexSearcher(filter.getClause());
-        if (mostSelective == null)
-            return Collections.emptyList();
-        else
-            return mostSelective.search(filter);
+        for (RowFilter.Expression expression : filter)
+        {
+            SecondaryIndex index = getIndexForColumn(expression.column());
+            if (index != null && index.supportsOperator(expression.operator()))
+                expression.validateForIndexing();
+        }
     }
 
     public Set<SecondaryIndex> getIndexesByNames(Set<String> idxNames)
@@ -666,19 +607,27 @@ public class SecondaryIndexManager
             index.setIndexRemoved();
     }
 
-    public SecondaryIndex validate(ByteBuffer rowKey, Cell cell)
+    public void validate(DecoratedKey partitionKey) throws InvalidRequestException
     {
-        for (SecondaryIndex index : indexFor(cell.name()))
-        {
-            if (!index.validate(rowKey, cell))
-                return index;
-        }
-        return null;
+        for (SecondaryIndex index : perColumnIndexes())
+            index.validate(partitionKey);
+    }
+
+    public void validate(Clustering clustering) throws InvalidRequestException
+    {
+        for (SecondaryIndex index : perColumnIndexes())
+            index.validate(clustering);
+    }
+
+    public void validate(ColumnDefinition column, ByteBuffer value, CellPath path) throws InvalidRequestException
+    {
+        for (SecondaryIndex index : indexFor(column))
+            index.validate(value, path);
     }
 
     static boolean shouldCleanupOldValue(Cell oldCell, Cell newCell)
     {
-        // If any one of name/value/timestamp are different, then we
+        // If either the value or timestamp is different, then we
         // should delete from the index. If not, then we can infer that
         // at least one of the cells is an ExpiringColumn and that the
         // difference is in the expiry time. In this case, we don't want to
@@ -686,10 +635,9 @@ public class SecondaryIndexManager
         // will just hide the inserted value.
         // Completely identical cells (including expiring columns with
         // identical ttl & localExpirationTime) will not get this far due
-        // to the oldCell.equals(newColumn) in StandardUpdater.update
-        return !oldCell.name().equals(newCell.name())
-            || !oldCell.value().equals(newCell.value())
-            || oldCell.timestamp() != newCell.timestamp();
+        // to the oldCell.equals(newCell) in StandardUpdater.update
+        return !oldCell.value().equals(newCell.value())
+            || oldCell.livenessInfo().timestamp() != newCell.livenessInfo().timestamp();
     }
 
     private Set<String> filterByColumn(Set<String> idxNames)
@@ -712,14 +660,17 @@ public class SecondaryIndexManager
 
     public static interface Updater
     {
+        /** Called when a row with the provided clustering and row infos is inserted */
+        public void maybeIndex(Clustering clustering, long timestamp, int ttl, DeletionTime deletion);
+
         /** called when constructing the index against pre-existing data */
-        public void insert(Cell cell);
+        public void insert(Clustering clustering, Cell cell);
 
         /** called when updating the index from a memtable */
-        public void update(Cell oldCell, Cell cell);
+        public void update(Clustering clustering, Cell oldCell, Cell cell);
 
         /** called when lazy-updating the index during compaction (CASSANDRA-2897) */
-        public void remove(Cell current);
+        public void remove(Clustering clustering, Cell current);
 
         /** called after memtable updates are complete (CASSANDRA-5397) */
         public void updateRowLevelIndexes();
@@ -728,34 +679,38 @@ public class SecondaryIndexManager
     private final class GCUpdater implements Updater
     {
         private final DecoratedKey key;
+        private final int nowInSec;
 
-        public GCUpdater(DecoratedKey key)
+        public GCUpdater(DecoratedKey key, int nowInSec)
         {
             this.key = key;
+            this.nowInSec = nowInSec;
         }
 
-        public void insert(Cell cell)
+        public void maybeIndex(Clustering clustering, long timestamp, int ttl, DeletionTime deletion)
         {
             throw new UnsupportedOperationException();
         }
 
-        public void update(Cell oldCell, Cell newCell)
+        public void insert(Clustering clustering, Cell cell)
         {
             throw new UnsupportedOperationException();
         }
 
-        public void remove(Cell cell)
+        public void update(Clustering clustering, Cell oldCell, Cell newCell)
         {
-            if (!cell.isLive())
-                return;
+            throw new UnsupportedOperationException();
+        }
 
-            for (SecondaryIndex index : indexFor(cell.name()))
+        public void remove(Clustering clustering, Cell cell)
+        {
+            for (SecondaryIndex index : indexFor(cell.column()))
             {
                 if (index instanceof PerColumnSecondaryIndex)
                 {
                     try (OpOrder.Group opGroup = baseCfs.keyspace.writeOrder.start())
                     {
-                        ((PerColumnSecondaryIndex) index).delete(key.getKey(), cell, opGroup);
+                        ((PerColumnSecondaryIndex) index).delete(key.getKey(), clustering, cell, opGroup, nowInSec);
                     }
                 }
             }
@@ -770,39 +725,50 @@ public class SecondaryIndexManager
 
     private final class StandardUpdater implements Updater
     {
-        private final DecoratedKey key;
-        private final ColumnFamily cf;
+        private final PartitionUpdate update;
         private final OpOrder.Group opGroup;
+        private final int nowInSec;
 
-        public StandardUpdater(DecoratedKey key, ColumnFamily cf, OpOrder.Group opGroup)
+        public StandardUpdater(PartitionUpdate update, OpOrder.Group opGroup, int nowInSec)
         {
-            this.key = key;
-            this.cf = cf;
+            this.update = update;
             this.opGroup = opGroup;
+            this.nowInSec = nowInSec;
+        }
+
+        public void maybeIndex(Clustering clustering, long timestamp, int ttl, DeletionTime deletion)
+        {
+            for (PerColumnSecondaryIndex index : perColumnIndexes())
+            {
+                if (timestamp != LivenessInfo.NO_TIMESTAMP)
+                    index.maybeIndex(update.partitionKey().getKey(), clustering, timestamp, ttl, opGroup, nowInSec);
+                if (!deletion.isLive())
+                    index.maybeDelete(update.partitionKey().getKey(), clustering, deletion, opGroup);
+            }
         }
 
-        public void insert(Cell cell)
+        public void insert(Clustering clustering, Cell cell)
         {
-            if (!cell.isLive())
+            if (!cell.isLive(nowInSec))
                 return;
 
-            for (SecondaryIndex index : indexFor(cell.name()))
+            for (SecondaryIndex index : indexFor(cell.column()))
                 if (index instanceof PerColumnSecondaryIndex)
-                    ((PerColumnSecondaryIndex) index).insert(key.getKey(), cell, opGroup);
+                    ((PerColumnSecondaryIndex) index).insert(update.partitionKey().getKey(), clustering, cell, opGroup);
         }
 
-        public void update(Cell oldCell, Cell cell)
+        public void update(Clustering clustering, Cell oldCell, Cell cell)
         {
             if (oldCell.equals(cell))
                 return;
 
-            for (SecondaryIndex index : indexFor(cell.name()))
+            for (SecondaryIndex index : indexFor(cell.column()))
             {
                 if (index instanceof PerColumnSecondaryIndex)
                 {
-                    if (cell.isLive())
+                    if (cell.isLive(nowInSec))
                     {
-                        ((PerColumnSecondaryIndex) index).update(key.getKey(), oldCell, cell, opGroup);
+                        ((PerColumnSecondaryIndex) index).update(update.partitionKey().getKey(), clustering, oldCell, cell, opGroup, nowInSec);
                     }
                     else
                     {
@@ -812,53 +778,22 @@ public class SecondaryIndexManager
                         // identical values and ttl) Then, we don't want to delete as the
                         // tombstone will hide the new value we just inserted; see CASSANDRA-7268
                         if (shouldCleanupOldValue(oldCell, cell))
-                            ((PerColumnSecondaryIndex) index).delete(key.getKey(), oldCell, opGroup);
+                            ((PerColumnSecondaryIndex) index).delete(update.partitionKey().getKey(), clustering, oldCell, opGroup, nowInSec);
                     }
                 }
             }
         }
 
-        public void remove(Cell cell)
+        public void remove(Clustering clustering, Cell cell)
         {
-            if (!cell.isLive())
-                return;
-
-            for (SecondaryIndex index : indexFor(cell.name()))
-                if (index instanceof PerColumnSecondaryIndex)
-                   ((PerColumnSecondaryIndex) index).delete(key.getKey(), cell, opGroup);
+            throw new UnsupportedOperationException();
         }
 
         public void updateRowLevelIndexes()
         {
             for (SecondaryIndex index : rowLevelIndexMap.values())
-                ((PerRowSecondaryIndex) index).index(key.getKey(), cf);
-        }
-
-    }
-
-    public SecondaryIndexSearcher getHighestSelectivityIndexSearcher(List<IndexExpression> clause)
-    {
-        if (clause == null)
-            return null;
-
-        List<SecondaryIndexSearcher> indexSearchers = getIndexSearchersForQuery(clause);
-
-        if (indexSearchers.isEmpty())
-            return null;
-
-        SecondaryIndexSearcher mostSelective = null;
-        long bestEstimate = Long.MAX_VALUE;
-        for (SecondaryIndexSearcher searcher : indexSearchers)
-        {
-            SecondaryIndex highestSelectivityIndex = searcher.highestSelectivityIndex(clause);
-            long estimate = highestSelectivityIndex.estimateResultRows();
-            if (estimate <= bestEstimate)
-            {
-                bestEstimate = estimate;
-                mostSelective = searcher;
-            }
+                ((PerRowSecondaryIndex) index).index(update.partitionKey().getKey(), update.unfilteredIterator());
         }
 
-        return mostSelective;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
index 5812e9d..4f63ae8 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
@@ -20,75 +20,192 @@ package org.apache.cassandra.db.index;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.ExtendedFilter;
-import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 
 public abstract class SecondaryIndexSearcher
 {
+    private static final Logger logger = LoggerFactory.getLogger(SecondaryIndexSearcher.class);
+
     protected final SecondaryIndexManager indexManager;
-    protected final Set<ByteBuffer> columns;
+    protected final Set<ColumnDefinition> columns;
     protected final ColumnFamilyStore baseCfs;
 
-    public SecondaryIndexSearcher(SecondaryIndexManager indexManager, Set<ByteBuffer> columns)
+    public SecondaryIndexSearcher(SecondaryIndexManager indexManager, Set<ColumnDefinition> columns)
     {
         this.indexManager = indexManager;
         this.columns = columns;
         this.baseCfs = indexManager.baseCfs;
     }
 
-    public SecondaryIndex highestSelectivityIndex(List<IndexExpression> clause)
+    public SecondaryIndex highestSelectivityIndex(RowFilter filter)
     {
-        IndexExpression expr = highestSelectivityPredicate(clause, false);
-        return expr == null ? null : indexManager.getIndexForColumn(expr.column);
+        RowFilter.Expression expr = highestSelectivityPredicate(filter, false);
+        return expr == null ? null : indexManager.getIndexForColumn(expr.column());
     }
 
-    public abstract List<Row> search(ExtendedFilter filter);
+    public RowFilter.Expression primaryClause(ReadCommand command)
+    {
+        return highestSelectivityPredicate(command.rowFilter(), false);
+    }
 
-    /**
-     * @return true this index is able to handle the given index expressions.
-     */
-    public boolean canHandleIndexClause(List<IndexExpression> clause)
+    @SuppressWarnings("resource") // Both the OpOrder and 'indexIter' are closed on exception, or through the closing of the result
+                                  // of this method.
+    public UnfilteredPartitionIterator search(ReadCommand command, ReadOrderGroup orderGroup)
     {
-        for (IndexExpression expression : clause)
-        {
-            if (!columns.contains(expression.column))
-                continue;
+        RowFilter.Expression primary = highestSelectivityPredicate(command.rowFilter(), true);
+        assert primary != null;
+
+        AbstractSimplePerColumnSecondaryIndex index = (AbstractSimplePerColumnSecondaryIndex)indexManager.getIndexForColumn(primary.column());
+        assert index != null && index.getIndexCfs() != null;
+
+        if (logger.isDebugEnabled())
+            logger.debug("Most-selective indexed predicate is {}", primary);
 
-            SecondaryIndex index = indexManager.getIndexForColumn(expression.column);
-            if (index != null && index.getIndexCfs() != null && index.supportsOperator(expression.operator))
-                return true;
+        DecoratedKey indexKey = index.getIndexKeyFor(primary.getIndexValue());
+
+        UnfilteredRowIterator indexIter = queryIndex(index, indexKey, command, orderGroup);
+        try
+        {
+            return queryDataFromIndex(index, indexKey, UnfilteredRowIterators.filter(indexIter, command.nowInSec()), command, orderGroup);
+        }
+        catch (RuntimeException | Error e)
+        {
+            indexIter.close();
+            throw e;
         }
-        return false;
     }
-    
-    /**
-     * Validates the specified {@link IndexExpression}. It will throw an {@link org.apache.cassandra.exceptions.InvalidRequestException}
-     * if the provided clause is not valid for the index implementation.
-     *
-     * @param indexExpression An {@link IndexExpression} to be validated
-     * @throws org.apache.cassandra.exceptions.InvalidRequestException in case of validation errors
-     */
-    public void validate(IndexExpression indexExpression) throws InvalidRequestException
+
+    private UnfilteredRowIterator queryIndex(AbstractSimplePerColumnSecondaryIndex index, DecoratedKey indexKey, ReadCommand command, ReadOrderGroup orderGroup)
+    {
+        ClusteringIndexFilter filter = makeIndexFilter(index, command);
+        CFMetaData indexMetadata = index.getIndexCfs().metadata;
+        return SinglePartitionReadCommand.create(indexMetadata, command.nowInSec(), indexKey, ColumnFilter.all(indexMetadata), filter)
+                                         .queryMemtableAndDisk(index.getIndexCfs(), orderGroup.indexReadOpOrderGroup());
+    }
+
+    private ClusteringIndexFilter makeIndexFilter(AbstractSimplePerColumnSecondaryIndex index, ReadCommand command)
     {
+        if (command instanceof SinglePartitionReadCommand)
+        {
+            // Note: as yet there's no route to get here - a 2i query *always* uses a
+            // PartitionRangeReadCommand. This is here in preparation for coming changes
+            // in SelectStatement.
+            SinglePartitionReadCommand sprc = (SinglePartitionReadCommand)command;
+            ByteBuffer pk = sprc.partitionKey().getKey();
+            ClusteringIndexFilter filter = sprc.clusteringIndexFilter();
+
+            if (filter instanceof ClusteringIndexNamesFilter)
+            {
+                NavigableSet<Clustering> requested = ((ClusteringIndexNamesFilter)filter).requestedRows();
+                NavigableSet<Clustering> clusterings = new TreeSet<>(index.getIndexComparator());
+                for (Clustering c : requested)
+                    clusterings.add(index.makeIndexClustering(pk, c, (Cell)null).takeAlias());
+                return new ClusteringIndexNamesFilter(clusterings, filter.isReversed());
+            }
+            else
+            {
+                Slices requested = ((ClusteringIndexSliceFilter)filter).requestedSlices();
+                Slices.Builder builder = new Slices.Builder(index.getIndexComparator());
+                for (Slice slice : requested)
+                    builder.add(index.makeIndexBound(pk, slice.start()), index.makeIndexBound(pk, slice.end()));
+                return new ClusteringIndexSliceFilter(builder.build(), filter.isReversed());
+            }
+        }
+        else
+        {
+
+            DataRange dataRange = ((PartitionRangeReadCommand)command).dataRange();
+            AbstractBounds<PartitionPosition> range = dataRange.keyRange();
+
+            Slice slice = Slice.ALL;
+
+            /*
+             * XXX: If the range requested is a token range, we'll have to start at the beginning (and stop at the end) of
+             * the indexed row unfortunately (which will be inefficient), because we have no way to intuit the smallest possible
+             * key having a given token. A potential fix would be to actually store the token along the key in the indexed row.
+             */
+            if (range.left instanceof DecoratedKey)
+            {
+                // the right hand side of the range may not be a DecoratedKey (for instance if we're paging),
+                // but if it is, we can optimise slightly by restricting the slice
+                if (range.right instanceof DecoratedKey)
+                {
+
+                    DecoratedKey startKey = (DecoratedKey) range.left;
+                    DecoratedKey endKey = (DecoratedKey) range.right;
+
+                    Slice.Bound start = Slice.Bound.BOTTOM;
+                    Slice.Bound end = Slice.Bound.TOP;
+
+                    /*
+                     * For index queries over a range, we can't do a whole lot better than querying everything for the key range, though for
+                     * slice queries where we can slightly restrict the beginning and end.
+                     */
+                    if (!dataRange.isNamesQuery())
+                    {
+                        ClusteringIndexSliceFilter startSliceFilter = ((ClusteringIndexSliceFilter) dataRange.clusteringIndexFilter(startKey));
+                        ClusteringIndexSliceFilter endSliceFilter = ((ClusteringIndexSliceFilter) dataRange.clusteringIndexFilter(endKey));
+
+                        // We can't effectively support reversed queries when we have a range, so we don't support it
+                        // (or through post-query reordering) and shouldn't get there.
+                        assert !startSliceFilter.isReversed() && !endSliceFilter.isReversed();
+
+                        Slices startSlices = startSliceFilter.requestedSlices();
+                        Slices endSlices = endSliceFilter.requestedSlices();
+
+                        if (startSlices.size() > 0)
+                            start = startSlices.get(0).start();
+
+                        if (endSlices.size() > 0)
+                            end = endSlices.get(endSlices.size() - 1).end();
+                    }
+
+                    slice = Slice.make(index.makeIndexBound(startKey.getKey(), start),
+                                       index.makeIndexBound(endKey.getKey(), end));
+                }
+                else
+                {
+                   // otherwise, just start the index slice from the key we do have
+                   slice = Slice.make(index.makeIndexBound(((DecoratedKey)range.left).getKey(), Slice.Bound.BOTTOM),
+                                      Slice.Bound.TOP);
+                }
+            }
+            return new ClusteringIndexSliceFilter(Slices.with(index.getIndexComparator(), slice), false);
+        }
     }
 
-    protected IndexExpression highestSelectivityPredicate(List<IndexExpression> clause, boolean includeInTrace)
+    protected abstract UnfilteredPartitionIterator queryDataFromIndex(AbstractSimplePerColumnSecondaryIndex index,
+                                                                      DecoratedKey indexKey,
+                                                                      RowIterator indexHits,
+                                                                      ReadCommand command,
+                                                                      ReadOrderGroup orderGroup);
+
+    protected RowFilter.Expression highestSelectivityPredicate(RowFilter filter, boolean includeInTrace)
     {
-        IndexExpression best = null;
+        RowFilter.Expression best = null;
         int bestMeanCount = Integer.MAX_VALUE;
         Map<SecondaryIndex, Integer> candidates = new HashMap<>();
 
-        for (IndexExpression expression : clause)
+        for (RowFilter.Expression expression : filter)
         {
             // skip columns belonging to a different index type
-            if (!columns.contains(expression.column))
+            if (!columns.contains(expression.column()))
                 continue;
 
-            SecondaryIndex index = indexManager.getIndexForColumn(expression.column);
-            if (index == null || index.getIndexCfs() == null || !index.supportsOperator(expression.operator))
+            SecondaryIndex index = indexManager.getIndexForColumn(expression.column());
+            if (index == null || index.getIndexCfs() == null || !index.supportsOperator(expression.operator()))
                 continue;
 
             int columns = index.getIndexCfs().getMeanColumns();
@@ -106,34 +223,21 @@ public abstract class SecondaryIndexSearcher
                 Tracing.trace("No applicable indexes found");
             else if (Tracing.isTracing())
                 // pay for an additional threadlocal get() rather than build the strings unnecessarily
-                Tracing.trace("Candidate index mean cardinalities are {}. Scanning with {}.",
-                              FBUtilities.toString(candidates),
-                              indexManager.getIndexForColumn(best.column).getIndexName());
+                Tracing.trace("Candidate index mean cardinalities are {}. Scanning with {}.", FBUtilities.toString(candidates), indexManager.getIndexForColumn(best.column()).getIndexName());
         }
         return best;
     }
 
     /**
-     * Returns {@code true} if the specified list of {@link IndexExpression}s require a full scan of all the nodes.
-     *
-     * @param clause A list of {@link IndexExpression}s
-     * @return {@code true} if the {@code IndexExpression}s require a full scan, {@code false} otherwise
-     */
-    public boolean requiresScanningAllRanges(List<IndexExpression> clause)
-    {
-        return false;
-    }
-
-    /**
-     * Combines index query results from multiple nodes. This is done by the coordinator node after it has reconciled
+     * Post-process the result of an index query. This is done by the coordinator node after it has reconciled
      * the replica responses.
      *
-     * @param clause A list of {@link IndexExpression}s
-     * @param rows The index query results to be combined
-     * @return The combination of the index query results
+     * @param command The {@code ReadCommand} use for the query.
+     * @param result The index query results to be post-processed
+     * @return The post-processed results
      */
-    public List<Row> postReconciliationProcessing(List<IndexExpression> clause, List<Row> rows)
+    public PartitionIterator postReconciliationProcessing(RowFilter filter, PartitionIterator result)
     {
-        return rows;
+        return result;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
index e88d456..9333bcf 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
@@ -26,36 +26,20 @@ import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.index.SecondaryIndexSearcher;
-import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.exceptions.ConfigurationException;
 
 /**
- * Base class for secondary indexes where composites are involved.
+ * Base class for internal secondary indexes (this could be merged with AbstractSimplePerColumnSecondaryIndex).
  */
 public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIndex
 {
-    private volatile CellNameType indexComparator;
-
-    protected CellNameType getIndexComparator()
-    {
-        // Yes, this is racy, but doing this more than once is not a big deal, we just want to avoid doing it every time
-        // More seriously, we should fix that whole SecondaryIndex API so this can be a final and avoid all that non-sense.
-        if (indexComparator == null)
-        {
-            assert columnDef != null;
-            indexComparator = getIndexComparator(baseCfs.metadata, columnDef);
-        }
-        return indexComparator;
-    }
-
     public static CompositesIndex create(ColumnDefinition cfDef)
     {
         if (cfDef.type.isCollection() && cfDef.type.isMultiCell())
@@ -90,68 +74,56 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
         throw new AssertionError();
     }
 
-    // Check SecondaryIndex.getIndexComparator if you want to know why this is static
-    public static CellNameType getIndexComparator(CFMetaData baseMetadata, ColumnDefinition cfDef)
+    public static void addIndexClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition cfDef)
     {
         if (cfDef.type.isCollection() && cfDef.type.isMultiCell())
         {
-            switch (((CollectionType)cfDef.type).kind)
+            CollectionType type = (CollectionType)cfDef.type;
+            if (type.kind == CollectionType.Kind.LIST
+                || (type.kind == CollectionType.Kind.MAP && cfDef.hasIndexOption(SecondaryIndex.INDEX_VALUES_OPTION_NAME)))
             {
-                case LIST:
-                    return CompositesIndexOnCollectionValue.buildIndexComparator(baseMetadata, cfDef);
-                case SET:
-                    return CompositesIndexOnCollectionKey.buildIndexComparator(baseMetadata, cfDef);
-                case MAP:
-                    if (cfDef.hasIndexOption(SecondaryIndex.INDEX_KEYS_OPTION_NAME))
-                        return CompositesIndexOnCollectionKey.buildIndexComparator(baseMetadata, cfDef);
-                    else if (cfDef.hasIndexOption(SecondaryIndex.INDEX_ENTRIES_OPTION_NAME))
-                        return CompositesIndexOnCollectionKeyAndValue.buildIndexComparator(baseMetadata, cfDef);
-                    else
-                        return CompositesIndexOnCollectionValue.buildIndexComparator(baseMetadata, cfDef);
+                CompositesIndexOnCollectionValue.addClusteringColumns(indexMetadata, baseMetadata, cfDef);
+            }
+            else
+            {
+                addGenericClusteringColumns(indexMetadata, baseMetadata, cfDef);
             }
         }
-
-        switch (cfDef.kind)
+        else if (cfDef.isClusteringColumn())
         {
-            case CLUSTERING_COLUMN:
-                return CompositesIndexOnClusteringKey.buildIndexComparator(baseMetadata, cfDef);
-            case REGULAR:
-                return CompositesIndexOnRegular.buildIndexComparator(baseMetadata, cfDef);
-            case PARTITION_KEY:
-                return CompositesIndexOnPartitionKey.buildIndexComparator(baseMetadata, cfDef);
-            //case COMPACT_VALUE:
-            //    return CompositesIndexOnCompactValue.buildIndexComparator(baseMetadata, cfDef);
+            CompositesIndexOnClusteringKey.addClusteringColumns(indexMetadata, baseMetadata, cfDef);
+        }
+        else
+        {
+            addGenericClusteringColumns(indexMetadata, baseMetadata, cfDef);
         }
-        throw new AssertionError();
     }
 
-    protected CellName makeIndexColumnName(ByteBuffer rowKey, Cell cell)
+    protected static void addGenericClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition columnDef)
     {
-        return getIndexComparator().create(makeIndexColumnPrefix(rowKey, cell.name()), null);
+        indexMetadata.addClusteringColumn("partition_key", SecondaryIndex.keyComparator);
+        for (ColumnDefinition def : baseMetadata.clusteringColumns())
+            indexMetadata.addClusteringColumn(def.name, def.type);
     }
 
-    protected abstract Composite makeIndexColumnPrefix(ByteBuffer rowKey, Composite columnName);
+    public abstract IndexedEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry);
 
-    public abstract IndexedEntry decodeEntry(DecoratedKey indexedValue, Cell indexEntry);
+    public abstract boolean isStale(Row row, ByteBuffer indexValue, int nowInSec);
 
-    public abstract boolean isStale(IndexedEntry entry, ColumnFamily data, long now);
-
-    public void delete(IndexedEntry entry, OpOrder.Group opGroup)
+    public void delete(IndexedEntry entry, OpOrder.Group opGroup, int nowInSec)
     {
-        int localDeletionTime = (int) (System.currentTimeMillis() / 1000);
-        ColumnFamily cfi = ArrayBackedSortedColumns.factory.create(indexCfs.metadata);
-        cfi.addTombstone(entry.indexEntry, localDeletionTime, entry.timestamp);
-        indexCfs.apply(entry.indexValue, cfi, SecondaryIndexManager.nullUpdater, opGroup, null);
-        if (logger.isDebugEnabled())
-            logger.debug("removed index entry for cleaned-up value {}:{}", entry.indexValue, cfi);
-    }
+        PartitionUpdate upd = new PartitionUpdate(indexCfs.metadata, entry.indexValue, PartitionColumns.NONE, 1);
+        Row.Writer writer = upd.writer();
+        Rows.writeClustering(entry.indexClustering, writer);
+        writer.writeRowDeletion(new SimpleDeletionTime(entry.timestamp, nowInSec));
+        writer.endOfRow();
+        indexCfs.apply(upd, SecondaryIndexManager.nullUpdater, opGroup, null);
 
-    protected AbstractType<?> getExpressionComparator()
-    {
-        return baseCfs.metadata.getColumnDefinitionComparator(columnDef);
+        if (logger.isDebugEnabled())
+            logger.debug("removed index entry for cleaned-up value {}:{}", entry.indexValue, upd);
     }
 
-    public SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ByteBuffer> columns)
+    public SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ColumnDefinition> columns)
     {
         return new CompositesSearcher(baseCfs.indexManager, columns);
     }
@@ -178,31 +150,19 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
     public static class IndexedEntry
     {
         public final DecoratedKey indexValue;
-        public final CellName indexEntry;
+        public final Clustering indexClustering;
         public final long timestamp;
 
         public final ByteBuffer indexedKey;
-        public final Composite indexedEntryPrefix;
-        public final ByteBuffer indexedEntryCollectionKey; // may be null
-
-        public IndexedEntry(DecoratedKey indexValue, CellName indexEntry, long timestamp, ByteBuffer indexedKey, Composite indexedEntryPrefix)
-        {
-            this(indexValue, indexEntry, timestamp, indexedKey, indexedEntryPrefix, null);
-        }
+        public final Clustering indexedEntryClustering;
 
-        public IndexedEntry(DecoratedKey indexValue,
-                            CellName indexEntry,
-                            long timestamp,
-                            ByteBuffer indexedKey,
-                            Composite indexedEntryPrefix,
-                            ByteBuffer indexedEntryCollectionKey)
+        public IndexedEntry(DecoratedKey indexValue, Clustering indexClustering, long timestamp, ByteBuffer indexedKey, Clustering indexedEntryClustering)
         {
             this.indexValue = indexValue;
-            this.indexEntry = indexEntry;
+            this.indexClustering = indexClustering.takeAlias();
             this.timestamp = timestamp;
             this.indexedKey = indexedKey;
-            this.indexedEntryPrefix = indexedEntryPrefix;
-            this.indexedEntryCollectionKey = indexedEntryCollectionKey;
+            this.indexedEntryClustering = indexedEntryClustering.takeAlias();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/composites/CompositesIndexIncludingCollectionKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexIncludingCollectionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexIncludingCollectionKey.java
index 402ea05..7624c1f 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexIncludingCollectionKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexIncludingCollectionKey.java
@@ -18,19 +18,9 @@
 package org.apache.cassandra.db.index.composites;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.CBuilder;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.composites.CompoundDenseCellNameType;
-import org.apache.cassandra.db.index.SecondaryIndex;
-import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.rows.*;
 
 /**
  * Common superclass for indexes that capture collection keys, including
@@ -49,41 +39,22 @@ import org.apache.cassandra.db.marshal.*;
  */
 public abstract class CompositesIndexIncludingCollectionKey extends CompositesIndex
 {
-    public static CellNameType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
+    protected CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path)
     {
-        int count = 1 + baseMetadata.clusteringColumns().size(); // row key + clustering prefix
-        List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(count);
-        types.add(SecondaryIndex.keyComparator);
-        for (int i = 0; i < count - 1; i++)
-            types.add(baseMetadata.comparator.subtype(i));
-        return new CompoundDenseCellNameType(types);
-    }
-
-    protected Composite makeIndexColumnPrefix(ByteBuffer rowKey, Composite cellName)
-    {
-        int count = 1 + baseCfs.metadata.clusteringColumns().size();
-        CBuilder builder = getIndexComparator().builder();
+        CBuilder builder = CBuilder.create(getIndexComparator());
         builder.add(rowKey);
-        for (int i = 0; i < Math.min(cellName.size(), count - 1); i++)
-            builder.add(cellName.get(i));
-        return builder.build();
+        for (int i = 0; i < prefix.size(); i++)
+            builder.add(prefix.get(i));
+        return builder;
     }
 
-    public IndexedEntry decodeEntry(DecoratedKey indexedValue, Cell indexEntry)
+    public IndexedEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
     {
         int count = 1 + baseCfs.metadata.clusteringColumns().size();
-        CBuilder builder = baseCfs.getComparator().builder();
+        Clustering clustering = indexEntry.clustering();
+        CBuilder builder = CBuilder.create(baseCfs.getComparator());
         for (int i = 0; i < count - 1; i++)
-            builder.add(indexEntry.name().get(i + 1));
-        return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), indexEntry.name().get(0), builder.build());
-    }
-
-    @Override
-    public boolean indexes(CellName name)
-    {
-        // We index if the CQL3 column name is the one of the collection we index
-        AbstractType<?> comp = baseCfs.metadata.getColumnDefinitionComparator(columnDef);
-        return name.size() > columnDef.position()
-            && comp.compare(name.get(columnDef.position()), columnDef.name.bytes) == 0;
+            builder.add(clustering.get(i + 1));
+        return new IndexedEntry(indexedValue, clustering, indexEntry.primaryKeyLivenessInfo().timestamp(), clustering.get(0), builder.build());
     }
 }