You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2015/08/22 01:48:41 UTC

[10/15] cassandra git commit: New 2i API and implementations for built in indexes

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
new file mode 100644
index 0000000..b17ab4e
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@ -0,0 +1,755 @@
+package org.apache.cassandra.index.internal;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.cql3.statements.IndexTarget;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.index.IndexRegistry;
+import org.apache.cassandra.index.SecondaryIndexBuilder;
+import org.apache.cassandra.index.internal.composites.CompositesSearcher;
+import org.apache.cassandra.index.internal.keys.KeysSearcher;
+import org.apache.cassandra.index.transactions.IndexTransaction;
+import org.apache.cassandra.index.transactions.UpdateTransaction;
+import org.apache.cassandra.io.sstable.ReducingKeyIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.concurrent.Refs;
+
+/**
+ * Index implementation which indexes the values for a single column in the base
+ * table and which stores its index data in a local, hidden table.
+ */
+public abstract class CassandraIndex implements Index
+{
+    private static final Logger logger = LoggerFactory.getLogger(CassandraIndex.class);
+
+    public final ColumnFamilyStore baseCfs;
+    protected IndexMetadata metadata;
+    protected ColumnFamilyStore indexCfs;
+    protected ColumnDefinition indexedColumn;
+    protected CassandraIndexFunctions functions;
+
+    public CassandraIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+    {
+        this.baseCfs = baseCfs;
+        setMetadata(indexDef);
+    }
+
+    /**
+     * Returns true if an index of this type can support search predicates of the form [column] OPERATOR [value]
+     * @param indexedColumn
+     * @param operator
+     * @return
+     */
+    protected boolean supportsOperator(ColumnDefinition indexedColumn, Operator operator)
+    {
+        return operator.equals(Operator.EQ);
+    }
+
+    /**
+     * Used to construct an the clustering for an entry in the index table based on values from the base data.
+     * The clustering columns in the index table encode the values required to retrieve the correct data from the base
+     * table and varies depending on the kind of the indexed column. See indexCfsMetadata for more details
+     * Used whenever a row in the index table is written or deleted.
+     * @param metadata
+     * @param partitionKey from the base data being indexed
+     * @param prefix from the base data being indexed
+     * @param path from the base data being indexed
+     * @return a clustering prefix to be used to insert into the index table
+     */
+    protected abstract CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey,
+                                                           ClusteringPrefix prefix,
+                                                           CellPath path);
+
+    /**
+     * Used at search time to convert a row in the index table into a simple struct containing the values required
+     * to retrieve the corresponding row from the base table.
+     * @param metadata
+     * @param indexedValue the partition key of the indexed table (i.e. the value that was indexed)
+     * @param indexEntry a row from the index table
+     * @return
+     */
+    public abstract IndexEntry decodeEntry(DecoratedKey indexedValue,
+                                           Row indexEntry);
+
+    /**
+     * Check whether a value retrieved from an index is still valid by comparing it to current row from the base table.
+     * Used at read time to identify out of date index entries so that they can be excluded from search results and
+     * repaired
+     * @param metadata required to get the indexed column definition
+     * @param row the current row from the primary data table
+     * @param indexValue the value we retrieved from the index
+     * @param nowInSec
+     * @return true if the index is out of date and the entry should be dropped
+     */
+    public abstract boolean isStale(Row row, ByteBuffer indexValue, int nowInSec);
+
+    /**
+     * Extract the value to be inserted into the index from the components of the base data
+     * @param metadata
+     * @param partitionKey from the primary data
+     * @param clustering from the primary data
+     * @param path from the primary data
+     * @param cellValue from the primary data
+     * @return a ByteBuffer containing the value to be inserted in the index. This will be used to make the partition
+     * key in the index table
+     */
+    protected abstract ByteBuffer getIndexedValue(ByteBuffer partitionKey,
+                                                  Clustering clustering,
+                                                  CellPath path,
+                                                  ByteBuffer cellValue);
+
+    public ColumnDefinition getIndexedColumn()
+    {
+        return indexedColumn;
+    }
+
+    public ClusteringComparator getIndexComparator()
+    {
+        return indexCfs.metadata.comparator;
+    }
+
+    public ColumnFamilyStore getIndexCfs()
+    {
+        return indexCfs;
+    }
+
+    public void register(IndexRegistry registry)
+    {
+        registry.registerIndex(this);
+    }
+
+    public Callable<?> getInitializationTask()
+    {
+        // if we're just linking in the index on an already-built index post-restart
+        // we've nothing to do. Otherwise, submit for building via SecondaryIndexBuilder
+        return isBuilt() ? null : getBuildIndexTask();
+    }
+
+    public IndexMetadata getIndexMetadata()
+    {
+        return metadata;
+    }
+
+    public String getIndexName()
+    {
+        // should return metadata.name, see CASSANDRA-10127
+        return indexCfs.name;
+    }
+
+    public Optional<ColumnFamilyStore> getBackingTable()
+    {
+        return indexCfs == null ? Optional.empty() : Optional.of(indexCfs);
+    }
+
+    public Callable<Void> getBlockingFlushTask()
+    {
+        return () -> {
+            indexCfs.forceBlockingFlush();
+            return null;
+        };
+    }
+
+    public Callable<?> getInvalidateTask()
+    {
+        return () -> {
+            markRemoved();
+            invalidate();
+            return null;
+        };
+    }
+
+    public Callable<?> getMetadataReloadTask(IndexMetadata indexDef)
+    {
+        setMetadata(indexDef);
+        return () -> {
+            indexCfs.metadata.reloadIndexMetadataProperties(baseCfs.metadata);
+            indexCfs.reload();
+            return null;
+        };
+    }
+
+    private void setMetadata(IndexMetadata indexDef)
+    {
+        metadata = indexDef;
+        functions = getFunctions(baseCfs.metadata, indexDef);
+        CFMetaData cfm = indexCfsMetadata(baseCfs.metadata, indexDef);
+        indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace,
+                                                             cfm.cfName,
+                                                             cfm,
+                                                             baseCfs.getTracker().loadsstables);
+        assert indexDef.columns.size() == 1 : "Build in indexes on multiple target columns are not supported";
+        indexedColumn = indexDef.indexedColumn(baseCfs.metadata);
+    }
+
+    public Callable<?> getTruncateTask(final long truncatedAt)
+    {
+        return () -> {
+            indexCfs.discardSSTables(truncatedAt);
+            return null;
+        };
+    }
+
+    public boolean indexes(PartitionColumns columns)
+    {
+        // if we have indexes on the partition key or clustering columns, return true
+        return isPrimaryKeyIndex() || columns.contains(indexedColumn);
+    }
+
+    public boolean supportsExpression(ColumnDefinition column, Operator operator)
+    {
+        return indexedColumn.name.equals(column.name)
+               && supportsOperator(indexedColumn, operator);
+    }
+
+    private boolean supportsExpression(RowFilter.Expression expression)
+    {
+        return supportsExpression(expression.column(), expression.operator());
+    }
+
+    public long getEstimatedResultRows()
+    {
+        return indexCfs.getMeanColumns();
+    }
+
+    /**
+     * No post processing of query results, just return them unchanged
+     */
+    public BiFunction<PartitionIterator, ReadCommand, PartitionIterator> postProcessorFor(ReadCommand command)
+    {
+        return (partitionIterator, readCommand) -> partitionIterator;
+    }
+
+    public RowFilter getPostIndexQueryFilter(RowFilter filter)
+    {
+        return getTargetExpression(filter.getExpressions()).map(filter::without)
+                                                           .orElse(filter);
+    }
+
+    private Optional<RowFilter.Expression> getTargetExpression(List<RowFilter.Expression> expressions)
+    {
+        return expressions.stream().filter(this::supportsExpression).findFirst();
+    }
+
+    public Index.Searcher searcherFor(ReadCommand command)
+    {
+        Optional<RowFilter.Expression> target = getTargetExpression(command.rowFilter().getExpressions());
+
+        if (target.isPresent())
+        {
+            target.get().validateForIndexing();
+            switch (getIndexMetadata().indexType)
+            {
+                case COMPOSITES:
+                    return new CompositesSearcher(command, target.get(), this);
+                case KEYS:
+                    return new KeysSearcher(command, target.get(), this);
+                default:
+                    throw new IllegalStateException(String.format("Unsupported index type %s for index %s on %s",
+                                                                  metadata.indexType,
+                                                                  metadata.name,
+                                                                  indexedColumn.name.toString()));
+            }
+        }
+
+        return null;
+
+    }
+
+    public void validate(PartitionUpdate update) throws InvalidRequestException
+    {
+        switch (indexedColumn.kind)
+        {
+            case PARTITION_KEY:
+                validatePartitionKey(update.partitionKey());
+                break;
+            case CLUSTERING:
+                validateClusterings(update);
+                break;
+            case REGULAR:
+                validateRows(update);
+                break;
+            case STATIC:
+                validateRows(Collections.singleton(update.staticRow()));
+                break;
+        }
+    }
+
+    public Indexer indexerFor(final DecoratedKey key,
+                              final int nowInSec,
+                              final OpOrder.Group opGroup,
+                              final IndexTransaction.Type transactionType)
+    {
+        return new Indexer()
+        {
+            public void begin()
+            {
+            }
+
+            public void partitionDelete(DeletionTime deletionTime)
+            {
+            }
+
+            public void rangeTombstone(RangeTombstone tombstone)
+            {
+            }
+
+            public void insertRow(Row row)
+            {
+                if (isPrimaryKeyIndex())
+                {
+                    indexPrimaryKey(row.clustering(),
+                                    getPrimaryKeyIndexLiveness(row),
+                                    row.deletion());
+                }
+                else
+                {
+                    if (indexedColumn.isComplex())
+                        indexCells(row.clustering(), row.getComplexColumnData(indexedColumn));
+                    else
+                        indexCell(row.clustering(), row.getCell(indexedColumn));
+                }
+            }
+
+            public void removeRow(Row row)
+            {
+                if (isPrimaryKeyIndex())
+                    indexPrimaryKey(row.clustering(), row.primaryKeyLivenessInfo(), row.deletion());
+
+                if (indexedColumn.isComplex())
+                    removeCells(row.clustering(), row.getComplexColumnData(indexedColumn));
+                else
+                    removeCell(row.clustering(), row.getCell(indexedColumn));
+            }
+
+
+            public void updateRow(Row oldRow, Row newRow)
+            {
+                if (isPrimaryKeyIndex())
+                    indexPrimaryKey(newRow.clustering(),
+                                    newRow.primaryKeyLivenessInfo(),
+                                    newRow.deletion());
+
+                if (indexedColumn.isComplex())
+                {
+                    indexCells(newRow.clustering(), newRow.getComplexColumnData(indexedColumn));
+                    removeCells(oldRow.clustering(), oldRow.getComplexColumnData(indexedColumn));
+                }
+                else
+                {
+                    indexCell(newRow.clustering(), newRow.getCell(indexedColumn));
+                    removeCell(oldRow.clustering(), oldRow.getCell(indexedColumn));
+                }
+            }
+
+            public void finish()
+            {
+            }
+
+            private void indexCells(Clustering clustering, Iterable<Cell> cells)
+            {
+                if (cells == null)
+                    return;
+
+                for (Cell cell : cells)
+                    indexCell(clustering, cell);
+            }
+
+            private void indexCell(Clustering clustering, Cell cell)
+            {
+                if (cell == null || !cell.isLive(nowInSec))
+                    return;
+
+                insert(key.getKey(),
+                       clustering,
+                       cell,
+                       LivenessInfo.create(cell.timestamp(), cell.ttl(), cell.localDeletionTime()),
+                       opGroup);
+            }
+
+            private void removeCells(Clustering clustering, Iterable<Cell> cells)
+            {
+                if (cells == null)
+                    return;
+
+                for (Cell cell : cells)
+                    removeCell(clustering, cell);
+            }
+
+            private void removeCell(Clustering clustering, Cell cell)
+            {
+                if (cell == null || !cell.isLive(nowInSec))
+                    return;
+
+                delete(key.getKey(), clustering, cell, opGroup, nowInSec);
+            }
+
+            private void indexPrimaryKey(final Clustering clustering,
+                                         final LivenessInfo liveness,
+                                         final DeletionTime deletion)
+            {
+                if (liveness.timestamp() != LivenessInfo.NO_TIMESTAMP)
+                    insert(key.getKey(), clustering, null, liveness, opGroup);
+
+                if (!deletion.isLive())
+                    delete(key.getKey(), clustering, deletion, opGroup);
+            }
+
+            private LivenessInfo getPrimaryKeyIndexLiveness(Row row)
+            {
+                long timestamp = row.primaryKeyLivenessInfo().timestamp();
+                int ttl = row.primaryKeyLivenessInfo().ttl();
+                for (Cell cell : row.cells())
+                {
+                    long cellTimestamp = cell.timestamp();
+                    if (cell.isLive(nowInSec))
+                    {
+                        if (cellTimestamp > timestamp)
+                        {
+                            timestamp = cellTimestamp;
+                            ttl = cell.ttl();
+                        }
+                    }
+                }
+                return LivenessInfo.create(baseCfs.metadata, timestamp, ttl, nowInSec);
+            }
+        };
+    }
+
+    /**
+     * Specific to internal indexes, this is called by a
+     * searcher when it encounters a stale entry in the index
+     * @param indexKey the partition key in the index table
+     * @param indexClustering the clustering in the index table
+     * @param deletion deletion timestamp etc
+     * @param opGroup the operation under which to perform the deletion
+     */
+    public void deleteStaleEntry(DecoratedKey indexKey,
+                                 Clustering indexClustering,
+                                 DeletionTime deletion,
+                                 OpOrder.Group opGroup)
+    {
+        doDelete(indexKey, indexClustering, deletion, opGroup);
+        logger.debug("Removed index entry for stale value {}", indexKey);
+    }
+
+    /**
+     * Called when adding a new entry to the index
+     */
+    private void insert(ByteBuffer rowKey,
+                        Clustering clustering,
+                        Cell cell,
+                        LivenessInfo info,
+                        OpOrder.Group opGroup)
+    {
+        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
+                                                               clustering,
+                                                               cell));
+        Row row = BTreeRow.noCellLiveRow(buildIndexClustering(rowKey, clustering, cell), info);
+        PartitionUpdate upd = partitionUpdate(valueKey, row);
+        indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null);
+        logger.debug("Inserted entry into index for value {}", valueKey);
+    }
+
+    /**
+     * Called when deleting entries on non-primary key columns
+     */
+    private void delete(ByteBuffer rowKey,
+                        Clustering clustering,
+                        Cell cell,
+                        OpOrder.Group opGroup,
+                        int nowInSec)
+    {
+        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
+                                                               clustering,
+                                                               cell));
+        doDelete(valueKey,
+                 buildIndexClustering(rowKey, clustering, cell),
+                 new DeletionTime(cell.timestamp(), nowInSec),
+                 opGroup);
+    }
+
+    /**
+     * Called when deleting entries from indexes on primary key columns
+     */
+    private void delete(ByteBuffer rowKey,
+                        Clustering clustering,
+                        DeletionTime deletion,
+                        OpOrder.Group opGroup)
+    {
+        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
+                                                               clustering,
+                                                               null));
+        doDelete(valueKey,
+                 buildIndexClustering(rowKey, clustering, null),
+                 deletion,
+                 opGroup);
+    }
+
+    private void doDelete(DecoratedKey indexKey,
+                          Clustering indexClustering,
+                          DeletionTime deletion,
+                          OpOrder.Group opGroup)
+    {
+        Row row = BTreeRow.emptyDeletedRow(indexClustering, deletion);
+        PartitionUpdate upd = partitionUpdate(indexKey, row);
+        indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null);
+        logger.debug("Removed index entry for value {}", indexKey);
+    }
+
+    private void validatePartitionKey(DecoratedKey partitionKey) throws InvalidRequestException
+    {
+        assert indexedColumn.isPartitionKey();
+        validateIndexedValue(getIndexedValue(partitionKey.getKey(), null, null ));
+    }
+
+    private void validateClusterings(PartitionUpdate update) throws InvalidRequestException
+    {
+        assert indexedColumn.isClusteringColumn();
+        for (Row row : update)
+            validateIndexedValue(getIndexedValue(null, row.clustering(), null));
+    }
+
+    private void validateRows(Iterable<Row> rows)
+    {
+        assert !indexedColumn.isPrimaryKeyColumn();
+        for (Row row : rows)
+        {
+            if (indexedColumn.isComplex())
+            {
+                ComplexColumnData data = row.getComplexColumnData(indexedColumn);
+                if (data != null)
+                {
+                    for (Cell cell : data)
+                    {
+                        validateIndexedValue(getIndexedValue(null, null, cell.path(), cell.value()));
+                    }
+                }
+            }
+            else
+            {
+                validateIndexedValue(getIndexedValue(null, null, row.getCell(indexedColumn)));
+            }
+        }
+    }
+
+    private void validateIndexedValue(ByteBuffer value)
+    {
+        if (value != null && value.remaining() >= FBUtilities.MAX_UNSIGNED_SHORT)
+            throw new InvalidRequestException(String.format(
+                                                           "Cannot index value of size %d for index %s on %s.%s(%s) (maximum allowed size=%d)",
+                                                           value.remaining(),
+                                                           getIndexName(),
+                                                           baseCfs.metadata.ksName,
+                                                           baseCfs.metadata.cfName,
+                                                           indexedColumn.name.toString(),
+                                                           FBUtilities.MAX_UNSIGNED_SHORT));
+    }
+
+    private ByteBuffer getIndexedValue(ByteBuffer rowKey,
+                                       Clustering clustering,
+                                       Cell cell)
+    {
+        return getIndexedValue(rowKey,
+                               clustering,
+                               cell == null ? null : cell.path(),
+                               cell == null ? null : cell.value()
+        );
+    }
+
+    private Clustering buildIndexClustering(ByteBuffer rowKey,
+                                            Clustering clustering,
+                                            Cell cell)
+    {
+        return buildIndexClusteringPrefix(rowKey,
+                                          clustering,
+                                          cell == null ? null : cell.path()).build();
+    }
+
+    private DecoratedKey getIndexKeyFor(ByteBuffer value)
+    {
+        return indexCfs.decorateKey(value);
+    }
+
+    private PartitionUpdate partitionUpdate(DecoratedKey valueKey, Row row)
+    {
+        return PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row);
+    }
+
+    private void invalidate()
+    {
+        // interrupt in-progress compactions
+        Collection<ColumnFamilyStore> cfss = Collections.singleton(indexCfs);
+        CompactionManager.instance.interruptCompactionForCFs(cfss, true);
+        CompactionManager.instance.waitForCessation(cfss);
+        indexCfs.keyspace.writeOrder.awaitNewBarrier();
+        indexCfs.forceBlockingFlush();
+        indexCfs.readOrdering.awaitNewBarrier();
+        indexCfs.invalidate();
+    }
+
+    private boolean isBuilt()
+    {
+        return SystemKeyspace.isIndexBuilt(baseCfs.keyspace.getName(), getIndexName());
+    }
+
+    private void markBuilt()
+    {
+        SystemKeyspace.setIndexBuilt(baseCfs.keyspace.getName(), getIndexName());
+    }
+
+    private void markRemoved()
+    {
+        SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), getIndexName());
+    }
+
+    private boolean isPrimaryKeyIndex()
+    {
+        return indexedColumn.isPrimaryKeyColumn();
+    }
+
+    private Callable<?> getBuildIndexTask()
+    {
+        return () -> {
+            buildBlocking();
+            return null;
+        };
+    }
+
+    private void buildBlocking()
+    {
+        baseCfs.forceBlockingFlush();
+
+        try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL));
+             Refs<SSTableReader> sstables = viewFragment.refs)
+        {
+            if (sstables.isEmpty())
+            {
+                logger.info("No SSTable data for {}.{} to build index {} from, marking empty index as built",
+                            baseCfs.metadata.ksName,
+                            baseCfs.metadata.cfName,
+                            getIndexName());
+                markBuilt();
+                return;
+            }
+
+            logger.info("Submitting index build of {} for data in {}",
+                        getIndexName(),
+                        getSSTableNames(sstables));
+
+            SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
+                                                                      Collections.singleton(this),
+                                                                      new ReducingKeyIterator(sstables));
+            Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
+            FBUtilities.waitOnFuture(future);
+            indexCfs.forceBlockingFlush();
+            markBuilt();
+        }
+        logger.info("Index build of {} complete", getIndexName());
+    }
+
+    private static String getSSTableNames(Collection<SSTableReader> sstables)
+    {
+        return StreamSupport.stream(sstables.spliterator(), false)
+                            .map(SSTableReader::toString)
+                            .collect(Collectors.joining(", "));
+    }
+
+    /**
+     * Construct the CFMetadata for an index table, the clustering columns in the index table
+     * vary dependent on the kind of the indexed value.
+     * @param baseCfsMetadata
+     * @param indexMetadata
+     * @return
+     */
+    public static final CFMetaData indexCfsMetadata(CFMetaData baseCfsMetadata, IndexMetadata indexMetadata)
+    {
+        CassandraIndexFunctions utils = getFunctions(baseCfsMetadata, indexMetadata);
+        ColumnDefinition indexedColumn = indexMetadata.indexedColumn(baseCfsMetadata);
+        AbstractType<?> indexedValueType = utils.getIndexedValueType(indexedColumn);
+        CFMetaData.Builder builder = CFMetaData.Builder.create(baseCfsMetadata.ksName,
+                                                               baseCfsMetadata.indexColumnFamilyName(indexMetadata))
+                                                       .withId(baseCfsMetadata.cfId)
+                                                       .withPartitioner(new LocalPartitioner(indexedValueType))
+                                                       .addPartitionKey(indexedColumn.name, indexedColumn.type);
+
+        builder.addClusteringColumn("partition_key", baseCfsMetadata.partitioner.partitionOrdering());
+        builder = utils.addIndexClusteringColumns(builder, baseCfsMetadata, indexedColumn);
+        return builder.build().reloadIndexMetadataProperties(baseCfsMetadata);
+    }
+
+    /**
+     * Factory method for new CassandraIndex instances
+     * @param baseCfs
+     * @param indexMetadata
+     * @return
+     */
+    public static final CassandraIndex newIndex(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata)
+    {
+        return getFunctions(baseCfs.metadata, indexMetadata).newIndexInstance(baseCfs, indexMetadata);
+    }
+
+    private static CassandraIndexFunctions getFunctions(CFMetaData baseCfMetadata, IndexMetadata indexDef)
+    {
+        if (indexDef.isKeys())
+            return CassandraIndexFunctions.KEYS_INDEX_FUNCTIONS;
+
+        ColumnDefinition indexedColumn = indexDef.indexedColumn(baseCfMetadata);
+        if (indexedColumn.type.isCollection() && indexedColumn.type.isMultiCell())
+        {
+            switch (((CollectionType)indexedColumn.type).kind)
+            {
+                case LIST:
+                    return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS;
+                case SET:
+                    return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS;
+                case MAP:
+                    if (indexDef.options.containsKey(IndexTarget.INDEX_KEYS_OPTION_NAME))
+                        return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS;
+                    else if (indexDef.options.containsKey(IndexTarget.INDEX_ENTRIES_OPTION_NAME))
+                        return CassandraIndexFunctions.COLLECTION_ENTRY_INDEX_FUNCTIONS;
+                    else
+                        return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS;
+            }
+        }
+
+        switch (indexedColumn.kind)
+        {
+            case CLUSTERING:
+                return CassandraIndexFunctions.CLUSTERING_COLUMN_INDEX_FUNCTIONS;
+            case REGULAR:
+                return CassandraIndexFunctions.REGULAR_COLUMN_INDEX_FUNCTIONS;
+            case PARTITION_KEY:
+                return CassandraIndexFunctions.PARTITION_KEY_INDEX_FUNCTIONS;
+            //case COMPACT_VALUE:
+            //    return new CompositesIndexOnCompactValue();
+        }
+        throw new AssertionError();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/internal/CassandraIndexFunctions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndexFunctions.java b/src/java/org/apache/cassandra/index/internal/CassandraIndexFunctions.java
new file mode 100644
index 0000000..b7cb3a2
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndexFunctions.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.index.internal;
+
+import java.util.List;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.index.internal.composites.*;
+import org.apache.cassandra.index.internal.keys.KeysIndex;
+import org.apache.cassandra.schema.IndexMetadata;
+
+public interface CassandraIndexFunctions
+{
+    /**
+     *
+     * @param baseCfs
+     * @param indexMetadata
+     * @return
+     */
+    public CassandraIndex newIndexInstance(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata);
+
+    /**
+     * Returns the type of the the values in the index. For most columns this is simply its type, but for collections
+     * it depends on whether the index is on the collection name/value element or on a frozen collection
+     * @param indexedColumn
+     * @return
+     */
+    default AbstractType<?> getIndexedValueType(ColumnDefinition indexedColumn)
+    {
+        return indexedColumn.type;
+    }
+
+    /**
+     * Add the clustering columns for a specific type of index table to the a CFMetaData.Builder (which is being
+     * used to construct the index table's CFMetadata. In the default implementation, the clustering columns of the
+     * index table hold the partition key & clustering columns of the base table. This is overridden in several cases:
+     * * When the indexed value is itself a clustering column, in which case, we only need store the base table's
+     *   *other* clustering values in the index - the indexed value being the index table's partition key
+     * * When the indexed value is a collection value, in which case we also need to capture the cell path from the base
+     *   table
+     * * In a KEYS index (for thrift/compact storage/static column indexes), where only the base partition key is
+     *   held in the index table.
+     *
+     * Called from indexCfsMetadata
+     * @param builder
+     * @param baseMetadata
+     * @param cfDef
+     * @return
+     */
+    default CFMetaData.Builder addIndexClusteringColumns(CFMetaData.Builder builder,
+                                                         CFMetaData baseMetadata,
+                                                         ColumnDefinition cfDef)
+    {
+        for (ColumnDefinition def : baseMetadata.clusteringColumns())
+            builder.addClusteringColumn(def.name, def.type);
+        return builder;
+    }
+
+    /*
+     * implementations providing specializations for the built in index types
+     */
+
+    static final CassandraIndexFunctions KEYS_INDEX_FUNCTIONS = new CassandraIndexFunctions()
+    {
+        public CassandraIndex newIndexInstance(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata)
+        {
+            return new KeysIndex(baseCfs, indexMetadata);
+        }
+    };
+
+    static final CassandraIndexFunctions REGULAR_COLUMN_INDEX_FUNCTIONS = new CassandraIndexFunctions()
+    {
+        public CassandraIndex newIndexInstance(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata)
+        {
+            return new RegularColumnIndex(baseCfs, indexMetadata);
+        }
+    };
+
+    static final CassandraIndexFunctions CLUSTERING_COLUMN_INDEX_FUNCTIONS = new CassandraIndexFunctions()
+    {
+        public CassandraIndex newIndexInstance(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata)
+        {
+            return new ClusteringColumnIndex(baseCfs, indexMetadata);
+        }
+
+        public CFMetaData.Builder addIndexClusteringColumns(CFMetaData.Builder builder,
+                                                            CFMetaData baseMetadata,
+                                                            ColumnDefinition columnDef)
+        {
+            List<ColumnDefinition> cks = baseMetadata.clusteringColumns();
+            for (int i = 0; i < columnDef.position(); i++)
+            {
+                ColumnDefinition def = cks.get(i);
+                builder.addClusteringColumn(def.name, def.type);
+            }
+            for (int i = columnDef.position() + 1; i < cks.size(); i++)
+            {
+                ColumnDefinition def = cks.get(i);
+                builder.addClusteringColumn(def.name, def.type);
+            }
+            return builder;
+        }
+    };
+
+    static final CassandraIndexFunctions PARTITION_KEY_INDEX_FUNCTIONS = new CassandraIndexFunctions()
+    {
+        public CassandraIndex newIndexInstance(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata)
+        {
+            return new PartitionKeyIndex(baseCfs, indexMetadata);
+        }
+    };
+
+    static final CassandraIndexFunctions COLLECTION_KEY_INDEX_FUNCTIONS = new CassandraIndexFunctions()
+    {
+        public CassandraIndex newIndexInstance(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata)
+        {
+            return new CollectionKeyIndex(baseCfs, indexMetadata);
+        }
+
+        public AbstractType<?> getIndexedValueType(ColumnDefinition indexedColumn)
+        {
+            return ((CollectionType) indexedColumn.type).nameComparator();
+        }
+    };
+
+    static final CassandraIndexFunctions COLLECTION_VALUE_INDEX_FUNCTIONS = new CassandraIndexFunctions()
+    {
+
+        public CassandraIndex newIndexInstance(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata)
+        {
+            return new CollectionValueIndex(baseCfs, indexMetadata);
+        }
+
+        public AbstractType<?> getIndexedValueType(ColumnDefinition indexedColumn)
+        {
+            return ((CollectionType)indexedColumn.type).valueComparator();
+        }
+
+        public CFMetaData.Builder addIndexClusteringColumns(CFMetaData.Builder builder,
+                                                            CFMetaData baseMetadata,
+                                                            ColumnDefinition columnDef)
+        {
+            for (ColumnDefinition def : baseMetadata.clusteringColumns())
+                builder.addClusteringColumn(def.name, def.type);
+
+            // collection key
+            builder.addClusteringColumn("cell_path", ((CollectionType)columnDef.type).nameComparator());
+            return builder;
+        }
+    };
+
+    static final CassandraIndexFunctions COLLECTION_ENTRY_INDEX_FUNCTIONS = new CassandraIndexFunctions()
+    {
+        public CassandraIndex newIndexInstance(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata)
+        {
+            return new CollectionEntryIndex(baseCfs, indexMetadata);
+        }
+
+        public AbstractType<?> getIndexedValueType(ColumnDefinition indexedColumn)
+        {
+            CollectionType colType = (CollectionType)indexedColumn.type;
+            return CompositeType.getInstance(colType.nameComparator(), colType.valueComparator());
+        }
+    };
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/internal/CassandraIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndexSearcher.java b/src/java/org/apache/cassandra/index/internal/CassandraIndexSearcher.java
new file mode 100644
index 0000000..72d2528
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndexSearcher.java
@@ -0,0 +1,172 @@
+package org.apache.cassandra.index.internal;
+
+import java.nio.ByteBuffer;
+import java.util.NavigableSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.utils.btree.BTreeSet;
+
+public abstract class CassandraIndexSearcher implements Index.Searcher
+{
+    private static final Logger logger = LoggerFactory.getLogger(CassandraIndexSearcher.class);
+
+    private final RowFilter.Expression expression;
+    protected final CassandraIndex index;
+    protected final ReadCommand command;
+
+    public CassandraIndexSearcher(ReadCommand command,
+                                  RowFilter.Expression expression,
+                                  CassandraIndex index)
+    {
+        this.command = command;
+        this.expression = expression;
+        this.index = index;
+    }
+
+    @SuppressWarnings("resource") // Both the OpOrder and 'indexIter' are closed on exception, or through the closing of the result
+    // of this method.
+    public UnfilteredPartitionIterator search(ReadOrderGroup orderGroup)
+    {
+        // the value of the index expression is the partition key in the index table
+        DecoratedKey indexKey = index.getBackingTable().get().decorateKey(expression.getIndexValue());
+        UnfilteredRowIterator indexIter = queryIndex(indexKey, command, orderGroup);
+        try
+        {
+            return queryDataFromIndex(indexKey, UnfilteredRowIterators.filter(indexIter, command.nowInSec()), command, orderGroup);
+        }
+        catch (RuntimeException | Error e)
+        {
+            indexIter.close();
+            throw e;
+        }
+    }
+
+    private UnfilteredRowIterator queryIndex(DecoratedKey indexKey, ReadCommand command, ReadOrderGroup orderGroup)
+    {
+        ClusteringIndexFilter filter = makeIndexFilter(command);
+        ColumnFamilyStore indexCfs = index.getBackingTable().get();
+        CFMetaData indexCfm = indexCfs.metadata;
+        return SinglePartitionReadCommand.create(indexCfm, command.nowInSec(), indexKey, ColumnFilter.all(indexCfm), filter)
+                                         .queryMemtableAndDisk(indexCfs, orderGroup.indexReadOpOrderGroup());
+    }
+
+    private ClusteringIndexFilter makeIndexFilter(ReadCommand command)
+    {
+        if (command instanceof SinglePartitionReadCommand)
+        {
+            // Note: as yet there's no route to get here - a 2i query *always* uses a
+            // PartitionRangeReadCommand. This is here in preparation for coming changes
+            // in SelectStatement.
+            SinglePartitionReadCommand sprc = (SinglePartitionReadCommand)command;
+            ByteBuffer pk = sprc.partitionKey().getKey();
+            ClusteringIndexFilter filter = sprc.clusteringIndexFilter();
+
+            if (filter instanceof ClusteringIndexNamesFilter)
+            {
+                NavigableSet<Clustering> requested = ((ClusteringIndexNamesFilter)filter).requestedRows();
+                BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(index.getIndexComparator());
+                for (Clustering c : requested)
+                    clusterings.add(makeIndexClustering(pk, c));
+                return new ClusteringIndexNamesFilter(clusterings.build(), filter.isReversed());
+            }
+            else
+            {
+                Slices requested = ((ClusteringIndexSliceFilter)filter).requestedSlices();
+                Slices.Builder builder = new Slices.Builder(index.getIndexComparator());
+                for (Slice slice : requested)
+                    builder.add(makeIndexBound(pk, slice.start()), makeIndexBound(pk, slice.end()));
+                return new ClusteringIndexSliceFilter(builder.build(), filter.isReversed());
+            }
+        }
+        else
+        {
+
+            DataRange dataRange = ((PartitionRangeReadCommand)command).dataRange();
+            AbstractBounds<PartitionPosition> range = dataRange.keyRange();
+
+            Slice slice = Slice.ALL;
+
+            /*
+             * XXX: If the range requested is a token range, we'll have to start at the beginning (and stop at the end) of
+             * the indexed row unfortunately (which will be inefficient), because we have no way to intuit the smallest possible
+             * key having a given token. A potential fix would be to actually store the token along the key in the indexed row.
+             */
+            if (range.left instanceof DecoratedKey)
+            {
+                // the right hand side of the range may not be a DecoratedKey (for instance if we're paging),
+                // but if it is, we can optimise slightly by restricting the slice
+                if (range.right instanceof DecoratedKey)
+                {
+
+                    DecoratedKey startKey = (DecoratedKey) range.left;
+                    DecoratedKey endKey = (DecoratedKey) range.right;
+
+                    Slice.Bound start = Slice.Bound.BOTTOM;
+                    Slice.Bound end = Slice.Bound.TOP;
+
+                    /*
+                     * For index queries over a range, we can't do a whole lot better than querying everything for the key range, though for
+                     * slice queries where we can slightly restrict the beginning and end.
+                     */
+                    if (!dataRange.isNamesQuery())
+                    {
+                        ClusteringIndexSliceFilter startSliceFilter = ((ClusteringIndexSliceFilter) dataRange.clusteringIndexFilter(
+                                                                                                                                   startKey));
+                        ClusteringIndexSliceFilter endSliceFilter = ((ClusteringIndexSliceFilter) dataRange.clusteringIndexFilter(
+                                                                                                                                 endKey));
+
+                        // We can't effectively support reversed queries when we have a range, so we don't support it
+                        // (or through post-query reordering) and shouldn't get there.
+                        assert !startSliceFilter.isReversed() && !endSliceFilter.isReversed();
+
+                        Slices startSlices = startSliceFilter.requestedSlices();
+                        Slices endSlices = endSliceFilter.requestedSlices();
+
+                        if (startSlices.size() > 0)
+                            start = startSlices.get(0).start();
+
+                        if (endSlices.size() > 0)
+                            end = endSlices.get(endSlices.size() - 1).end();
+                    }
+
+                    slice = Slice.make(makeIndexBound(startKey.getKey(), start),
+                                       makeIndexBound(endKey.getKey(), end));
+                }
+                else
+                {
+                    // otherwise, just start the index slice from the key we do have
+                    slice = Slice.make(makeIndexBound(((DecoratedKey)range.left).getKey(), Slice.Bound.BOTTOM),
+                                       Slice.Bound.TOP);
+                }
+            }
+            return new ClusteringIndexSliceFilter(Slices.with(index.getIndexComparator(), slice), false);
+        }
+    }
+
+    private Slice.Bound makeIndexBound(ByteBuffer rowKey, Slice.Bound bound)
+    {
+        return index.buildIndexClusteringPrefix(rowKey, bound, null)
+                                 .buildBound(bound.isStart(), bound.isInclusive());
+    }
+
+    protected Clustering makeIndexClustering(ByteBuffer rowKey, Clustering clustering)
+    {
+        return index.buildIndexClusteringPrefix(rowKey, clustering, null).build();
+    }
+
+    protected abstract UnfilteredPartitionIterator queryDataFromIndex(DecoratedKey indexKey,
+                                                                      RowIterator indexHits,
+                                                                      ReadCommand command,
+                                                                      ReadOrderGroup orderGroup);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/internal/IndexEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/IndexEntry.java b/src/java/org/apache/cassandra/index/internal/IndexEntry.java
new file mode 100644
index 0000000..6f94ace
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/internal/IndexEntry.java
@@ -0,0 +1,34 @@
+package org.apache.cassandra.index.internal;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.DecoratedKey;
+
+/**
+ * Entries in indexes on non-compact tables (tables with composite comparators)
+ * can be encapsulated as IndexedEntry instances. These are not used when dealing
+ * with indexes on static/compact/thrift tables (i.e. KEYS indexes).
+ */
+public final class IndexEntry
+{
+    public final DecoratedKey indexValue;
+    public final Clustering indexClustering;
+    public final long timestamp;
+
+    public final ByteBuffer indexedKey;
+    public final Clustering indexedEntryClustering;
+
+    public IndexEntry(DecoratedKey indexValue,
+                      Clustering indexClustering,
+                      long timestamp,
+                      ByteBuffer indexedKey,
+                      Clustering indexedEntryClustering)
+    {
+        this.indexValue = indexValue;
+        this.indexClustering = indexClustering;
+        this.timestamp = timestamp;
+        this.indexedKey = indexedKey;
+        this.indexedEntryClustering = indexedEntryClustering;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/internal/composites/ClusteringColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/composites/ClusteringColumnIndex.java b/src/java/org/apache/cassandra/index/internal/composites/ClusteringColumnIndex.java
new file mode 100644
index 0000000..b932602
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/internal/composites/ClusteringColumnIndex.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.internal.composites;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.index.internal.CassandraIndex;
+import org.apache.cassandra.index.internal.IndexEntry;
+import org.apache.cassandra.schema.IndexMetadata;
+
+/**
+ * Index on a CLUSTERING_COLUMN column definition.
+ *
+ * A cell indexed by this index will have the general form:
+ *   ck_0 ... ck_n c_name : v
+ * where ck_i are the cluster keys, c_name the last component of the cell
+ * composite name (or second to last if collections are in use, but this
+ * has no impact) and v the cell value.
+ *
+ * Such a cell is always indexed by this index (or rather, it is indexed if
+ * n >= columnDef.componentIndex, which will always be the case in practice)
+ * and it will generate (makeIndexColumnName()) an index entry whose:
+ *   - row key will be ck_i (getIndexedValue()) where i == columnDef.componentIndex.
+ *   - cell name will
+ *       rk ck_0 ... ck_{i-1} ck_{i+1} ck_n
+ *     where rk is the row key of the initial cell and i == columnDef.componentIndex.
+ */
+public class ClusteringColumnIndex extends CassandraIndex
+{
+    public ClusteringColumnIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+    {
+        super(baseCfs, indexDef);
+    }
+
+
+    public ByteBuffer getIndexedValue(ByteBuffer partitionKey,
+                                      Clustering clustering,
+                                      CellPath path, ByteBuffer cellValue)
+    {
+        return clustering.get(indexedColumn.position());
+    }
+
+    public CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey,
+                                               ClusteringPrefix prefix,
+                                               CellPath path)
+    {
+        CBuilder builder = CBuilder.create(getIndexComparator());
+        builder.add(partitionKey);
+        for (int i = 0; i < Math.min(indexedColumn.position(), prefix.size()); i++)
+            builder.add(prefix.get(i));
+        for (int i = indexedColumn.position() + 1; i < prefix.size(); i++)
+            builder.add(prefix.get(i));
+        return builder;
+    }
+
+    public IndexEntry decodeEntry(DecoratedKey indexedValue,
+                                  Row indexEntry)
+    {
+        int ckCount = baseCfs.metadata.clusteringColumns().size();
+
+        Clustering clustering = indexEntry.clustering();
+        CBuilder builder = CBuilder.create(baseCfs.getComparator());
+        for (int i = 0; i < indexedColumn.position(); i++)
+            builder.add(clustering.get(i + 1));
+
+        builder.add(indexedValue.getKey());
+
+        for (int i = indexedColumn.position() + 1; i < ckCount; i++)
+            builder.add(clustering.get(i));
+
+        return new IndexEntry(indexedValue,
+                              clustering,
+                              indexEntry.primaryKeyLivenessInfo().timestamp(),
+                              clustering.get(0),
+                              builder.build());
+    }
+
+    public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
+    {
+        return !data.hasLiveData(nowInSec);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/internal/composites/CollectionEntryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/composites/CollectionEntryIndex.java b/src/java/org/apache/cassandra/index/internal/composites/CollectionEntryIndex.java
new file mode 100644
index 0000000..1113600
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/internal/composites/CollectionEntryIndex.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.internal.composites;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.schema.IndexMetadata;
+
+/**
+ * Index on the element and value of cells participating in a collection.
+ *
+ * The row keys for this index are a composite of the collection element
+ * and value of indexed columns.
+ */
+public class CollectionEntryIndex extends CollectionKeyIndexBase
+{
+    public CollectionEntryIndex(ColumnFamilyStore baseCfs,
+                                IndexMetadata indexDef)
+    {
+        super(baseCfs, indexDef);
+    }
+
+    public ByteBuffer getIndexedValue(ByteBuffer partitionKey,
+                                      Clustering clustering,
+                                      CellPath path, ByteBuffer cellValue)
+    {
+        return CompositeType.build(path.get(0), cellValue);
+    }
+
+    public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
+    {
+        ByteBuffer[] components = ((CompositeType)functions.getIndexedValueType(indexedColumn)).split(indexValue);
+        ByteBuffer mapKey = components[0];
+        ByteBuffer mapValue = components[1];
+
+        ColumnDefinition columnDef = indexedColumn;
+        Cell cell = data.getCell(columnDef, CellPath.create(mapKey));
+        if (cell == null || !cell.isLive(nowInSec))
+            return true;
+
+        AbstractType<?> valueComparator = ((CollectionType)columnDef.type).valueComparator();
+        return valueComparator.compare(mapValue, cell.value()) != 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/internal/composites/CollectionKeyIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/composites/CollectionKeyIndex.java b/src/java/org/apache/cassandra/index/internal/composites/CollectionKeyIndex.java
new file mode 100644
index 0000000..42c45e5
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/internal/composites/CollectionKeyIndex.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.internal.composites;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.schema.IndexMetadata;
+
+/**
+ * Index on the collection element of the cell name of a collection.
+ *
+ * The row keys for this index are given by the collection element for
+ * indexed columns.
+ */
+public class CollectionKeyIndex extends CollectionKeyIndexBase
+{
+    public CollectionKeyIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+    {
+        super(baseCfs, indexDef);
+    }
+
+    public ByteBuffer getIndexedValue(ByteBuffer partitionKey,
+                                      Clustering clustering,
+                                      CellPath path,
+                                      ByteBuffer cellValue)
+    {
+        return path.get(0);
+    }
+
+    public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
+    {
+        Cell cell = data.getCell(indexedColumn, CellPath.create(indexValue));
+        return cell == null || !cell.isLive(nowInSec);
+    }
+
+    public boolean supportsOperator(ColumnDefinition indexedColumn, Operator operator)
+    {
+        return operator == Operator.CONTAINS_KEY ||
+               operator == Operator.CONTAINS && indexedColumn.type instanceof SetType;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/internal/composites/CollectionKeyIndexBase.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/composites/CollectionKeyIndexBase.java b/src/java/org/apache/cassandra/index/internal/composites/CollectionKeyIndexBase.java
new file mode 100644
index 0000000..fe77c96
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/internal/composites/CollectionKeyIndexBase.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.internal.composites;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.index.internal.CassandraIndex;
+import org.apache.cassandra.index.internal.IndexEntry;
+import org.apache.cassandra.schema.IndexMetadata;
+
+/**
+ * Common superclass for indexes that capture collection keys, including
+ * indexes on such keys themselves.
+ *
+ * A cell indexed by this index will have the general form:
+ *   ck_0 ... ck_n c_name [col_elt] : v
+ * where ck_i are the cluster keys, c_name the CQL3 column name, col_elt the
+ * collection element that we want to index (which may or may not be there depending
+ * on whether c_name is the collection we're indexing), and v the cell value.
+ *
+ * Such a cell is indexed if c_name is the indexed collection (in which case we are guaranteed to have
+ * col_elt). The index entry can be viewed in the following way:
+ *   - the row key is determined by subclasses of this type.
+ *   - the cell name will be 'rk ck_0 ... ck_n' where rk is the row key of the initial cell.
+ */
+public abstract class CollectionKeyIndexBase extends CassandraIndex
+{
+    public CollectionKeyIndexBase(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+    {
+        super(baseCfs, indexDef);
+    }
+
+    public CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey,
+                                               ClusteringPrefix prefix,
+                                               CellPath path)
+    {
+        CBuilder builder = CBuilder.create(getIndexComparator());
+        builder.add(partitionKey);
+        for (int i = 0; i < prefix.size(); i++)
+            builder.add(prefix.get(i));
+
+        return builder;
+    }
+
+    public IndexEntry decodeEntry(DecoratedKey indexedValue,
+                                  Row indexEntry)
+    {
+        int count = 1 + baseCfs.metadata.clusteringColumns().size();
+        Clustering clustering = indexEntry.clustering();
+        CBuilder builder = CBuilder.create(baseCfs.getComparator());
+        for (int i = 0; i < count - 1; i++)
+            builder.add(clustering.get(i + 1));
+
+        return new IndexEntry(indexedValue,
+                              clustering,
+                              indexEntry.primaryKeyLivenessInfo().timestamp(),
+                              clustering.get(0),
+                              builder.build());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/internal/composites/CollectionValueIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/composites/CollectionValueIndex.java b/src/java/org/apache/cassandra/index/internal/composites/CollectionValueIndex.java
new file mode 100644
index 0000000..95bd7e1
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/internal/composites/CollectionValueIndex.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.internal.composites;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.index.internal.CassandraIndex;
+import org.apache.cassandra.index.internal.IndexEntry;
+import org.apache.cassandra.schema.IndexMetadata;
+
+/**
+ * Index the value of a collection cell.
+ *
+ * This is a lot like an index on REGULAR, except that we also need to make
+ * the collection key part of the index entry so that:
+ *   1) we don't have to scan the whole collection at query time to know the
+ *   entry is stale and if it still satisfies the query.
+ *   2) if a collection has multiple time the same value, we need one entry
+ *   for each so that if we delete one of the value only we only delete the
+ *   entry corresponding to that value.
+ */
+public class CollectionValueIndex extends CassandraIndex
+{
+    public CollectionValueIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+    {
+        super(baseCfs, indexDef);
+    }
+
+    public ByteBuffer getIndexedValue(ByteBuffer partitionKey,
+                                      Clustering clustering,
+                                      CellPath path, ByteBuffer cellValue)
+    {
+        return cellValue;
+    }
+
+    public CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey,
+                                               ClusteringPrefix prefix,
+                                               CellPath path)
+    {
+        CBuilder builder = CBuilder.create(getIndexComparator());
+        builder.add(partitionKey);
+        for (int i = 0; i < prefix.size(); i++)
+            builder.add(prefix.get(i));
+
+        // When indexing, cell will be present, but when searching, it won't  (CASSANDRA-7525)
+        if (prefix.size() == baseCfs.metadata.clusteringColumns().size() && path != null)
+            builder.add(path.get(0));
+
+        return builder;
+    }
+
+    public IndexEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
+    {
+        Clustering clustering = indexEntry.clustering();
+        CBuilder builder = CBuilder.create(baseCfs.getComparator());
+        for (int i = 0; i < baseCfs.getComparator().size(); i++)
+            builder.add(clustering.get(i + 1));
+
+        return new IndexEntry(indexedValue,
+                                clustering,
+                                indexEntry.primaryKeyLivenessInfo().timestamp(),
+                                clustering.get(0),
+                                builder.build());
+    }
+
+    public boolean supportsOperator(ColumnDefinition indexedColumn, Operator operator)
+    {
+        return operator == Operator.CONTAINS && !(indexedColumn.type instanceof SetType);
+    }
+
+    public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
+    {
+        ColumnDefinition columnDef = indexedColumn;
+        ComplexColumnData complexData = data.getComplexColumnData(columnDef);
+        if (complexData == null)
+            return true;
+
+        for (Cell cell : complexData)
+        {
+            if (cell.isLive(nowInSec) && ((CollectionType) columnDef.type).valueComparator()
+                                                                          .compare(indexValue, cell.value()) == 0)
+                return false;
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
new file mode 100644
index 0000000..77867fc
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.internal.composites;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.index.internal.CassandraIndex;
+import org.apache.cassandra.index.internal.CassandraIndexSearcher;
+import org.apache.cassandra.index.internal.IndexEntry;
+import org.apache.cassandra.utils.btree.BTreeSet;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+
+public class CompositesSearcher extends CassandraIndexSearcher
+{
+    private static final Logger logger = LoggerFactory.getLogger(CompositesSearcher.class);
+
+    public CompositesSearcher(ReadCommand command,
+                              RowFilter.Expression expression,
+                              CassandraIndex index)
+    {
+        super(command, expression, index);
+    }
+
+    private boolean isMatchingEntry(DecoratedKey partitionKey, IndexEntry entry, ReadCommand command)
+    {
+        return command.selects(partitionKey, entry.indexedEntryClustering);
+    }
+
+    protected UnfilteredPartitionIterator queryDataFromIndex(final DecoratedKey indexKey,
+                                                             final RowIterator indexHits,
+                                                             final ReadCommand command,
+                                                             final ReadOrderGroup orderGroup)
+    {
+        assert indexHits.staticRow() == Rows.EMPTY_STATIC_ROW;
+
+        return new UnfilteredPartitionIterator()
+        {
+            private IndexEntry nextEntry;
+
+            private UnfilteredRowIterator next;
+
+            public boolean isForThrift()
+            {
+                return command.isForThrift();
+            }
+
+            public CFMetaData metadata()
+            {
+                return command.metadata();
+            }
+
+            public boolean hasNext()
+            {
+                return prepareNext();
+            }
+
+            public UnfilteredRowIterator next()
+            {
+                if (next == null)
+                    prepareNext();
+
+                UnfilteredRowIterator toReturn = next;
+                next = null;
+                return toReturn;
+            }
+
+            private boolean prepareNext()
+            {
+                if (next != null)
+                    return true;
+
+                if (nextEntry == null)
+                {
+                    if (!indexHits.hasNext())
+                        return false;
+
+                    nextEntry = index.decodeEntry(indexKey, indexHits.next());
+                }
+
+                // Gather all index hits belonging to the same partition and query the data for those hits.
+                // TODO: it's much more efficient to do 1 read for all hits to the same partition than doing
+                // 1 read per index hit. However, this basically mean materializing all hits for a partition
+                // in memory so we should consider adding some paging mechanism. However, index hits should
+                // be relatively small so it's much better than the previous code that was materializing all
+                // *data* for a given partition.
+                BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(index.baseCfs.getComparator());
+                List<IndexEntry> entries = new ArrayList<>();
+                DecoratedKey partitionKey = index.baseCfs.decorateKey(nextEntry.indexedKey);
+
+                while (nextEntry != null && partitionKey.getKey().equals(nextEntry.indexedKey))
+                {
+                    // We're queried a slice of the index, but some hits may not match some of the clustering column constraints
+                    if (isMatchingEntry(partitionKey, nextEntry, command))
+                    {
+                        clusterings.add(nextEntry.indexedEntryClustering);
+                        entries.add(nextEntry);
+                    }
+
+                    nextEntry = indexHits.hasNext() ? index.decodeEntry(indexKey, indexHits.next()) : null;
+                }
+
+                // Because we've eliminated entries that don't match the clustering columns, it's possible we added nothing
+                if (clusterings.isEmpty())
+                    return prepareNext();
+
+                // Query the gathered index hits. We still need to filter stale hits from the resulting query.
+                ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(clusterings.build(), false);
+                SinglePartitionReadCommand dataCmd = new SinglePartitionNamesCommand(index.baseCfs.metadata,
+                                                                                     command.nowInSec(),
+                                                                                     command.columnFilter(),
+                                                                                     command.rowFilter(),
+                                                                                     DataLimits.NONE,
+                                                                                     partitionKey,
+                                                                                     filter);
+                @SuppressWarnings("resource") // We close right away if empty, and if it's assign to next it will be called either
+                                              // by the next caller of next, or through closing this iterator is this come before.
+                UnfilteredRowIterator dataIter = filterStaleEntries(dataCmd.queryMemtableAndDisk(index.baseCfs,
+                                                                                                 orderGroup.baseReadOpOrderGroup()),
+                                                                    indexKey.getKey(),
+                                                                    entries,
+                                                                    orderGroup.writeOpOrderGroup(),
+                                                                    command.nowInSec());
+
+
+                if (dataIter.isEmpty())
+                {
+                    dataIter.close();
+                    return prepareNext();
+                }
+
+                next = dataIter;
+                return true;
+            }
+
+            public void remove()
+            {
+                throw new UnsupportedOperationException();
+            }
+
+            public void close()
+            {
+                indexHits.close();
+                if (next != null)
+                    next.close();
+            }
+        };
+    }
+
+    private void deleteAllEntries(final List<IndexEntry> entries, final OpOrder.Group writeOp, final int nowInSec)
+    {
+        entries.forEach(entry ->
+            index.deleteStaleEntry(entry.indexValue,
+                                     entry.indexClustering,
+                                     new DeletionTime(entry.timestamp, nowInSec),
+                                     writeOp));
+    }
+
+    private UnfilteredRowIterator filterStaleEntries(UnfilteredRowIterator dataIter,
+                                                     final ByteBuffer indexValue,
+                                                     final List<IndexEntry> entries,
+                                                     final OpOrder.Group writeOp,
+                                                     final int nowInSec)
+    {
+        // collect stale index entries and delete them when we close this iterator
+        final List<IndexEntry> staleEntries = new ArrayList<>();
+
+        // if there is a partition level delete in the base table, we need to filter
+        // any index entries which would be shadowed by it
+        if (!dataIter.partitionLevelDeletion().isLive())
+        {
+            DeletionTime deletion = dataIter.partitionLevelDeletion();
+            entries.forEach(e -> {
+                if (deletion.deletes(e.timestamp))
+                    staleEntries.add(e);
+            });
+        }
+
+        return new AlteringUnfilteredRowIterator(dataIter)
+        {
+            private int entriesIdx;
+
+            public void close()
+            {
+                deleteAllEntries(staleEntries, writeOp, nowInSec);
+                super.close();
+            }
+
+            @Override
+            protected Row computeNext(Row row)
+            {
+                IndexEntry entry = findEntry(row.clustering());
+                if (!index.isStale(row, indexValue, nowInSec))
+                    return row;
+
+                staleEntries.add(entry);
+                return null;
+            }
+
+            private IndexEntry findEntry(Clustering clustering)
+            {
+                assert entriesIdx < entries.size();
+                while (entriesIdx < entries.size())
+                {
+                    IndexEntry entry = entries.get(entriesIdx++);
+                    // The entries are in clustering order. So that the requested entry should be the
+                    // next entry, the one at 'entriesIdx'. However, we can have stale entries, entries
+                    // that have no corresponding row in the base table typically because of a range
+                    // tombstone or partition level deletion. Delete such stale entries.
+                    int cmp = metadata().comparator.compare(entry.indexedEntryClustering, clustering);
+                    assert cmp <= 0; // this would means entries are not in clustering order, which shouldn't happen
+                    if (cmp == 0)
+                        return entry;
+                    else
+                        staleEntries.add(entry);
+                }
+                // entries correspond to the rows we've queried, so we shouldn't have a row that has no corresponding entry.
+                throw new AssertionError();
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/internal/composites/PartitionKeyIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/composites/PartitionKeyIndex.java b/src/java/org/apache/cassandra/index/internal/composites/PartitionKeyIndex.java
new file mode 100644
index 0000000..2c0b5aa
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/internal/composites/PartitionKeyIndex.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.internal.composites;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.index.internal.CassandraIndex;
+import org.apache.cassandra.index.internal.IndexEntry;
+import org.apache.cassandra.schema.IndexMetadata;
+
+/**
+ * Index on a PARTITION_KEY column definition.
+ *
+ * This suppose a composite row key:
+ *   rk = rk_0 ... rk_n
+ *
+ * The corresponding index entry will be:
+ *   - index row key will be rk_i (where i == columnDef.componentIndex)
+ *   - cell name will be: rk ck
+ *     where rk is the fully partition key and ck the clustering keys of the
+ *     original cell names (thus excluding the last column name as we want to refer to
+ *     the whole CQL3 row, not just the cell itself)
+ *
+ * Note that contrarily to other type of index, we repeat the indexed value in
+ * the index cell name (we use the whole partition key). The reason is that we
+ * want to order the index cell name by partitioner first, and skipping a part
+ * of the row key would change the order.
+ */
+public class PartitionKeyIndex extends CassandraIndex
+{
+    public PartitionKeyIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+    {
+        super(baseCfs, indexDef);
+    }
+
+    public ByteBuffer getIndexedValue(ByteBuffer partitionKey,
+                                      Clustering clustering,
+                                      CellPath path,
+                                      ByteBuffer cellValue)
+    {
+        CompositeType keyComparator = (CompositeType)baseCfs.metadata.getKeyValidator();
+        ByteBuffer[] components = keyComparator.split(partitionKey);
+        return components[indexedColumn.position()];
+    }
+
+    public CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey,
+                                               ClusteringPrefix prefix,
+                                               CellPath path)
+    {
+        CBuilder builder = CBuilder.create(getIndexComparator());
+        builder.add(partitionKey);
+        for (int i = 0; i < prefix.size(); i++)
+            builder.add(prefix.get(i));
+        return builder;
+    }
+
+    public IndexEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
+    {
+        int ckCount = baseCfs.metadata.clusteringColumns().size();
+        Clustering clustering = indexEntry.clustering();
+        CBuilder builder = CBuilder.create(baseCfs.getComparator());
+        for (int i = 0; i < ckCount; i++)
+            builder.add(clustering.get(i + 1));
+
+        return new IndexEntry(indexedValue,
+                              clustering,
+                              indexEntry.primaryKeyLivenessInfo().timestamp(),
+                              clustering.get(0),
+                              builder.build());
+    }
+
+    public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
+    {
+        return !data.hasLiveData(nowInSec);
+    }
+}