You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ca...@apache.org on 2016/06/15 14:59:50 UTC

[17/20] cassandra git commit: Merge branch 'cassandra-2.2' into cassandra-3.0

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/UnfilteredRows.java
index 98640ae,0000000..f000fcf
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java
+++ b/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java
@@@ -1,40 -1,0 +1,60 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.db.transform;
 +
 +import org.apache.cassandra.db.DeletionTime;
 +import org.apache.cassandra.db.rows.EncodingStats;
 +import org.apache.cassandra.db.rows.Unfiltered;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +
 +final class UnfilteredRows extends BaseRows<Unfiltered, UnfilteredRowIterator> implements UnfilteredRowIterator
 +{
 +    private DeletionTime partitionLevelDeletion;
 +
 +    public UnfilteredRows(UnfilteredRowIterator input)
 +    {
 +        super(input);
 +        partitionLevelDeletion = input.partitionLevelDeletion();
 +    }
 +
 +    @Override
 +    void add(Transformation add)
 +    {
 +        super.add(add);
 +        partitionLevelDeletion = add.applyToDeletion(partitionLevelDeletion);
 +    }
 +
 +    public DeletionTime partitionLevelDeletion()
 +    {
 +        return partitionLevelDeletion;
 +    }
 +
 +    public EncodingStats stats()
 +    {
 +        return input.stats();
 +    }
 +
 +    @Override
 +    public boolean isEmpty()
 +    {
 +        return staticRow().isEmpty() && partitionLevelDeletion().isLive() && !hasNext();
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/index/Index.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/Index.java
index ab6665d,0000000..469ef07
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/index/Index.java
+++ b/src/java/org/apache/cassandra/index/Index.java
@@@ -1,452 -1,0 +1,472 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.index;
 +
 +import java.util.Optional;
 +import java.util.concurrent.Callable;
 +import java.util.function.BiFunction;
 +
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.Operator;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.filter.RowFilter;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.db.partitions.PartitionIterator;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.rows.Row;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.index.transactions.IndexTransaction;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +
 +/**
 + * Consisting of a top level Index interface and two sub-interfaces which handle read and write operations,
 + * Searcher and Indexer respectively, this defines a secondary index implementation.
 + * Instantiation is done via reflection and implementations must provide a constructor which takes the base
 + * table's ColumnFamilyStore and the IndexMetadata which defines the Index as arguments. e.g:
 + *  {@code MyCustomIndex( ColumnFamilyStore baseCfs, IndexMetadata indexDef )}
 + *
 + * The main interface defines methods for index management, index selection at both write and query time,
 + * as well as validation of values that will ultimately be indexed.
 + * Two sub-interfaces are also defined, which represent single use helpers for short lived tasks at read and write time.
 + * Indexer: an event listener which receives notifications at particular points during an update of a single partition
 + *          in the base table.
 + * Searcher: performs queries against the index based on a predicate defined in a RowFilter. An instance
 + *          is expected to be single use, being involved in the execution of a single ReadCommand.
 + *
 + * The main interface includes factory methods for obtaining instances of both of the sub-interfaces;
 + *
 + * The methods defined in the top level interface can be grouped into 3 categories:
 + *
 + * Management Tasks:
 + * This group of methods is primarily concerned with maintenance of secondary indexes are are mainly called from
 + * SecondaryIndexManager. It includes methods for registering and un-registering an index, performing maintenance
 + * tasks such as (re)building an index from SSTable data, flushing, invalidating and so forth, as well as some to
 + * retrieve general metadata about the index (index name, any internal tables used for persistence etc).
 + * Several of these maintenance functions have a return type of Callable<?>; the expectation for these methods is
 + * that any work required to be performed by the method be done inside the Callable so that the responsibility for
 + * scheduling its execution can rest with SecondaryIndexManager. For instance, a task like reloading index metadata
 + * following potential updates caused by modifications to the base table may be performed in a blocking way. In
 + * contrast, adding a new index may require it to be built from existing SSTable data, a potentially expensive task
 + * which should be performed asyncronously.
 + *
 + * Index Selection:
 + * There are two facets to index selection, write time and read time selection. The former is concerned with
 + * identifying whether an index should be informed about a particular write operation. The latter is about providing
 + * means to use the index for search during query execution.
 + *
 + * Validation:
 + * Values that may be written to an index are checked as part of input validation, prior to an update or insert
 + * operation being accepted.
 + *
 + *
 + * Sub-interfaces:
 + *
 + * Update processing:
 + * Indexes are subscribed to the stream of events generated by modifications to the base table. Subscription is
 + * done via first registering the Index with the base table's SecondaryIndexManager. For each partition update, the set
 + * of registered indexes are then filtered based on the properties of the update using the selection methods on the main
 + * interface described above. Each of the indexes in the filtered set then provides an event listener to receive
 + * notifications about the update as it is processed. As such then, a event handler instance is scoped to a single
 + * partition update; SecondaryIndexManager obtains a new handler for every update it processes (via a call to the
 + * factory method, indexerFor. That handler will then receive all events for the update, before being
 + * discarded by the SecondaryIndexManager. Indexer instances are never re-used by SecondaryIndexManager and the
 + * expectation is that each call to indexerFor should return a unique instance, or at least if instances can
 + * be recycled, that a given instance is only used to process a single partition update at a time.
 + *
 + * Search:
 + * Each query (i.e. a single ReadCommand) that uses indexes will use a single instance of Index.Searcher. As with
 + * processing of updates, an Index must be registered with the primary table's SecondaryIndexManager to be able to
 + * support queries. During the processing of a ReadCommand, the Expressions in its RowFilter are examined to determine
 + * whether any of them are supported by a registered Index. supportsExpression is used to filter out Indexes which
 + * cannot support a given Expression. After filtering, the set of candidate indexes are ranked according to the result
 + * of getEstimatedResultRows and the most selective (i.e. the one expected to return the smallest number of results) is
 + * chosen. A Searcher instance is then obtained from the searcherFor method & used to perform the actual Index lookup.
 + * Finally, Indexes can define a post processing step to be performed on the coordinator, after results (partitions from
 + * the primary table) have been received from replicas and reconciled. This post processing is defined as a
 + * java.util.functions.BiFunction<PartitionIterator, RowFilter, PartitionIterator>, that is a function which takes as
 + * arguments a PartitionIterator (containing the reconciled result rows) and a RowFilter (from the ReadCommand being
 + * executed) and returns another iterator of partitions, possibly having transformed the initial results in some way.
 + * The post processing function is obtained from the Index's postProcessorFor method; the built-in indexes which ship
 + * with Cassandra return a no-op function here.
 + *
 + * An optional static method may be provided to validate custom index options (two variants are supported):
 + *
 + * <pre>{@code public static Map<String, String> validateOptions(Map<String, String> options);</pre>
 + *
 + * The input is the map of index options supplied in the WITH clause of a CREATE INDEX statement.
 + *
 + * <pre>{@code public static Map<String, String> validateOptions(Map<String, String> options, CFMetaData cfm);}</pre>
 + *
 + * In this version, the base table's metadata is also supplied as an argument.
 + * If both overloaded methods are provided, only the one including the base table's metadata will be invoked.
 + *
 + * The validation method should return a map containing any of the supplied options which are not valid for the
 + * implementation. If the returned map is not empty, validation is considered failed and an error is raised.
 + * Alternatively, the implementation may choose to throw an org.apache.cassandra.exceptions.ConfigurationException
 + * if invalid options are encountered.
 + *
 + */
 +public interface Index
 +{
 +
 +    /*
 +     * Management functions
 +     */
 +
 +    /**
 +     * Return a task to perform any initialization work when a new index instance is created.
 +     * This may involve costly operations such as (re)building the index, and is performed asynchronously
 +     * by SecondaryIndexManager
 +     * @return a task to perform any necessary initialization work
 +     */
 +    public Callable<?> getInitializationTask();
 +
 +    /**
 +     * Returns the IndexMetadata which configures and defines the index instance. This should be the same
 +     * object passed as the argument to setIndexMetadata.
 +     * @return the index's metadata
 +     */
 +    public IndexMetadata getIndexMetadata();
 +
 +    /**
 +     * Return a task to reload the internal metadata of an index.
 +     * Called when the base table metadata is modified or when the configuration of the Index is updated
 +     * Implementations should return a task which performs any necessary work to be done due to
 +     * updating the configuration(s) such as (re)building etc. This task is performed asynchronously
 +     * by SecondaryIndexManager
 +     * @return task to be executed by the index manager during a reload
 +     */
 +    public Callable<?> getMetadataReloadTask(IndexMetadata indexMetadata);
 +
 +    /**
 +     * An index must be registered in order to be able to either subscribe to update events on the base
 +     * table and/or to provide Searcher functionality for reads. The double dispatch involved here, where
 +     * the Index actually performs its own registration by calling back to the supplied IndexRegistry's
 +     * own registerIndex method, is to make the decision as to whether or not to register an index belong
 +     * to the implementation, not the manager.
 +     * @param registry the index registry to register the instance with
 +     */
 +    public void register(IndexRegistry registry);
 +
 +    /**
 +     * If the index implementation uses a local table to store its index data this method should return a
 +     * handle to it. If not, an empty Optional should be returned. Typically, this is useful for the built-in
 +     * Index implementations.
 +     * @return an Optional referencing the Index's backing storage table if it has one, or Optional.empty() if not.
 +     */
 +    public Optional<ColumnFamilyStore> getBackingTable();
 +
 +    /**
 +     * Return a task which performs a blocking flush of the index's data to persistent storage.
 +     * @return task to be executed by the index manager to perform the flush.
 +     */
 +    public Callable<?> getBlockingFlushTask();
 +
 +    /**
 +     * Return a task which invalidates the index, indicating it should no longer be considered usable.
 +     * This should include an clean up and releasing of resources required when dropping an index.
 +     * @return task to be executed by the index manager to invalidate the index.
 +     */
 +    public Callable<?> getInvalidateTask();
 +
 +    /**
 +     * Return a task to truncate the index with the specified truncation timestamp.
 +     * Called when the base table is truncated.
 +     * @param truncatedAt timestamp of the truncation operation. This will be the same timestamp used
 +     *                    in the truncation of the base table.
 +     * @return task to be executed by the index manager when the base table is truncated.
 +     */
 +    public Callable<?> getTruncateTask(long truncatedAt);
 +
 +    /**
 +     * Return true if this index can be built or rebuilt when the index manager determines it is necessary. Returning
 +     * false enables the index implementation (or some other component) to control if and when SSTable data is
 +     * incorporated into the index.
 +     *
 +     * This is called by SecondaryIndexManager in buildIndexBlocking, buildAllIndexesBlocking & rebuildIndexesBlocking
 +     * where a return value of false causes the index to be exluded from the set of those which will process the
 +     * SSTable data.
 +     * @return if the index should be included in the set which processes SSTable data, false otherwise.
 +     */
 +    public boolean shouldBuildBlocking();
 +
 +
 +    /*
 +     * Index selection
 +     */
 +
 +    /**
 +     * Called to determine whether this index targets a specific column.
 +     * Used during schema operations such as when dropping or renaming a column, to check if
 +     * the index will be affected by the change. Typically, if an index answers that it does
 +     * depend upon a column, then schema operations on that column are not permitted until the index
 +     * is dropped or altered.
 +     *
 +     * @param column the column definition to check
 +     * @return true if the index depends on the supplied column being present; false if the column may be
 +     *              safely dropped or modified without adversely affecting the index
 +     */
 +    public boolean dependsOn(ColumnDefinition column);
 +
 +    /**
 +     * Called to determine whether this index can provide a searcher to execute a query on the
 +     * supplied column using the specified operator. This forms part of the query validation done
 +     * before a CQL select statement is executed.
 +     * @param column the target column of a search query predicate
 +     * @param operator the operator of a search query predicate
 +     * @return true if this index is capable of supporting such expressions, false otherwise
 +     */
 +    public boolean supportsExpression(ColumnDefinition column, Operator operator);
 +
 +    /**
 +     * If the index supports custom search expressions using the
 +     * {@code}SELECT * FROM table WHERE expr(index_name, expression){@code} syntax, this
 +     * method should return the expected type of the expression argument.
 +     * For example, if the index supports custom expressions as Strings, calls to this
 +     * method should return {@code}UTF8Type.instance{@code}.
 +     * If the index implementation does not support custom expressions, then it should
 +     * return null.
 +     * @return an the type of custom index expressions supported by this index, or an
 +     *         null if custom expressions are not supported.
 +     */
 +    public AbstractType<?> customExpressionValueType();
 +
 +    /**
 +     * Transform an initial RowFilter into the filter that will still need to applied
 +     * to a set of Rows after the index has performed it's initial scan.
 +     * Used in ReadCommand#executeLocal to reduce the amount of filtering performed on the
 +     * results of the index query.
 +     *
 +     * @param filter the intial filter belonging to a ReadCommand
 +     * @return the (hopefully) reduced filter that would still need to be applied after
 +     *         the index was used to narrow the initial result set
 +     */
 +    public RowFilter getPostIndexQueryFilter(RowFilter filter);
 +
 +    /**
 +     * Return an estimate of the number of results this index is expected to return for any given
 +     * query that it can be used to answer. Used in conjunction with indexes() and supportsExpression()
 +     * to determine the most selective index for a given ReadCommand. Additionally, this is also used
 +     * by StorageProxy.estimateResultsPerRange to calculate the initial concurrency factor for range requests
 +     *
 +     * @return the estimated average number of results a Searcher may return for any given query
 +     */
 +    public long getEstimatedResultRows();
 +
 +    /*
 +     * Input validation
 +     */
 +
 +    /**
 +     * Called at write time to ensure that values present in the update
 +     * are valid according to the rules of all registered indexes which
 +     * will process it. The partition key as well as the clustering and
 +     * cell values for each row in the update may be checked by index
 +     * implementations
 +     * @param update PartitionUpdate containing the values to be validated by registered Index implementations
 +     * @throws InvalidRequestException
 +     */
 +    public void validate(PartitionUpdate update) throws InvalidRequestException;
 +
 +    /*
 +     * Update processing
 +     */
 +
 +    /**
 +     * Creates an new {@code Indexer} object for updates to a given partition.
 +     *
 +     * @param key key of the partition being modified
 +     * @param columns the regular and static columns the created indexer will have to deal with.
 +     * This can be empty as an update might only contain partition, range and row deletions, but
 +     * the indexer is guaranteed to not get any cells for a column that is not part of {@code columns}.
 +     * @param nowInSec current time of the update operation
 +     * @param opGroup operation group spanning the update operation
 +     * @param transactionType indicates what kind of update is being performed on the base data
 +     *                        i.e. a write time insert/update/delete or the result of compaction
 +     * @return the newly created indexer or {@code null} if the index is not interested by the update
 +     * (this could be because the index doesn't care about that particular partition, doesn't care about
 +     * that type of transaction, ...).
 +     */
 +    public Indexer indexerFor(DecoratedKey key,
 +                              PartitionColumns columns,
 +                              int nowInSec,
 +                              OpOrder.Group opGroup,
 +                              IndexTransaction.Type transactionType);
 +
 +    /**
 +     * Listener for processing events emitted during a single partition update.
 +     * Instances of this are responsible for applying modifications to the index in response to a single update
 +     * operation on a particular partition of the base table.
 +     *
 +     * That update may be generated by the normal write path, by iterating SSTables during streaming operations or when
 +     * building or rebuilding an index from source. Updates also occur during compaction when multiple versions of a
 +     * source partition from different SSTables are merged.
 +     *
 +     * Implementations should not make assumptions about resolution or filtering of the partition update being
 +     * processed. That is to say that it is possible for an Indexer instance to receive notification of a
 +     * PartitionDelete or RangeTombstones which shadow a Row it then receives via insertRow/updateRow.
 +     *
 +     * It is important to note that the only ordering guarantee made for the methods here is that the first call will
 +     * be to begin() and the last call to finish(). The other methods may be called to process update events in any
 +     * order. This can also include duplicate calls, in cases where a memtable partition is under contention from
 +     * several updates. In that scenario, the same set of events may be delivered to the Indexer as memtable update
 +     * which failed due to contention is re-applied.
 +     */
 +    public interface Indexer
 +    {
 +        /**
 +         * Notification of the start of a partition update.
 +         * This event always occurs before any other during the update.
 +         */
 +        public void begin();
 +
 +        /**
 +         * Notification of a top level partition delete.
 +         * @param deletionTime
 +         */
 +        public void partitionDelete(DeletionTime deletionTime);
 +
 +        /**
 +         * Notification of a RangeTombstone.
 +         * An update of a single partition may contain multiple RangeTombstones,
 +         * and a notification will be passed for each of them.
 +         * @param tombstone
 +         */
 +        public void rangeTombstone(RangeTombstone tombstone);
 +
 +        /**
 +         * Notification that a new row was inserted into the Memtable holding the partition.
 +         * This only implies that the inserted row was not already present in the Memtable,
 +         * it *does not* guarantee that the row does not exist in an SSTable, potentially with
 +         * additional column data.
 +         *
 +         * @param row the Row being inserted into the base table's Memtable.
 +         */
 +        public void insertRow(Row row);
 +
 +        /**
 +         * Notification of a modification to a row in the base table's Memtable.
 +         * This is allow an Index implementation to clean up entries for base data which is
 +         * never flushed to disk (and so will not be purged during compaction).
 +         * It's important to note that the old & new rows supplied here may not represent
 +         * the totality of the data for the Row with this particular Clustering. There may be
 +         * additional column data in SSTables which is not present in either the old or new row,
 +         * so implementations should be aware of that.
 +         * The supplied rows contain only column data which has actually been updated.
 +         * oldRowData contains only the columns which have been removed from the Row's
 +         * representation in the Memtable, while newRowData includes only new columns
 +         * which were not previously present. Any column data which is unchanged by
 +         * the update is not included.
 +         *
 +         * @param oldRowData data that was present in existing row and which has been removed from
 +         *                   the base table's Memtable
 +         * @param newRowData data that was not present in the existing row and is being inserted
 +         *                   into the base table's Memtable
 +         */
 +        public void updateRow(Row oldRowData, Row newRowData);
 +
 +        /**
 +         * Notification that a row was removed from the partition.
 +         * Note that this is only called as part of either a compaction or a cleanup.
 +         * This context is indicated by the TransactionType supplied to the indexerFor method.
 +         *
 +         * As with updateRow, it cannot be guaranteed that all data belonging to the Clustering
 +         * of the supplied Row has been removed (although in the case of a cleanup, that is the
 +         * ultimate intention).
 +         * There may be data for the same row in other SSTables, so in this case Indexer implementations
 +         * should *not* assume that all traces of the row have been removed. In particular,
 +         * it is not safe to assert that all values associated with the Row's Clustering
 +         * have been deleted, so implementations which index primary key columns should not
 +         * purge those entries from their indexes.
 +         *
 +         * @param row data being removed from the base table
 +         */
 +        public void removeRow(Row row);
 +
 +        /**
 +         * Notification of the end of the partition update.
 +         * This event always occurs after all others for the particular update.
 +         */
 +        public void finish();
 +    }
 +
 +    /*
 +     * Querying
 +     */
 +
 +    /**
 +     * Used to validate the various parameters of a supplied {@code}ReadCommand{@code},
 +     * this is called prior to execution. In theory, any command instance may be checked
 +     * by any {@code}Index{@code} instance, but in practice the index will be the one
 +     * returned by a call to the {@code}getIndex(ColumnFamilyStore cfs){@code} method on
 +     * the supplied command.
 +     *
 +     * Custom index implementations should perform any validation of query expressions here and throw a meaningful
 +     * InvalidRequestException when any expression or other parameter is invalid.
 +     *
 +     * @param command a ReadCommand whose parameters are to be verified
 +     * @throws InvalidRequestException if the details of the command fail to meet the
 +     *         index's validation rules
 +     */
 +    default void validate(ReadCommand command) throws InvalidRequestException
 +    {
 +    }
 +
 +    /**
 +     * Return a function which performs post processing on the results of a partition range read command.
 +     * In future, this may be used as a generalized mechanism for transforming results on the coordinator prior
 +     * to returning them to the caller.
 +     *
 +     * This is used on the coordinator during execution of a range command to perform post
 +     * processing of merged results obtained from the necessary replicas. This is the only way in which results are
 +     * transformed in this way but this may change over time as usage is generalized.
 +     * See CASSANDRA-8717 for further discussion.
 +     *
 +     * The function takes a PartitionIterator of the results from the replicas which has already been collated
 +     * & reconciled, along with the command being executed. It returns another PartitionIterator containing the results
 +     * of the transformation (which may be the same as the input if the transformation is a no-op).
 +     */
 +    public BiFunction<PartitionIterator, ReadCommand, PartitionIterator> postProcessorFor(ReadCommand command);
 +
 +    /**
 +     * Factory method for query time search helper.
 +     *
 +     * @param command the read command being executed
 +     * @return an Searcher with which to perform the supplied command
 +     */
 +    public Searcher searcherFor(ReadCommand command);
 +
 +    /**
 +     * Performs the actual index lookup during execution of a ReadCommand.
 +     * An instance performs its query according to the RowFilter.Expression it was created for (see searcherFor)
 +     * An Expression is a predicate of the form [column] [operator] [value].
 +     */
 +    public interface Searcher
 +    {
 +        /**
 +         * @param orderGroup the collection of OpOrder.Groups which the ReadCommand is being performed under.
 +         * @return partitions from the base table matching the criteria of the search.
 +         */
 +        public UnfilteredPartitionIterator search(ReadOrderGroup orderGroup);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/index/IndexRegistry.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/IndexRegistry.java
index 6a004fb,0000000..9f5ed02
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/index/IndexRegistry.java
+++ b/src/java/org/apache/cassandra/index/IndexRegistry.java
@@@ -1,22 -1,0 +1,42 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.index;
 +
 +import java.util.Collection;
 +
 +import org.apache.cassandra.schema.IndexMetadata;
 +
 +/**
 + * The collection of all Index instances for a base table.
 + * The SecondaryIndexManager for a ColumnFamilyStore contains an IndexRegistry
 + * (actually it implements this interface at present) and Index implementations
 + * register in order to:
 + * i) subscribe to the stream of updates being applied to partitions in the base table
 + * ii) provide searchers to support queries with the relevant search predicates
 + */
 +public interface IndexRegistry
 +{
 +    void registerIndex(Index index);
 +    void unregisterIndex(Index index);
 +
 +    Index getIndex(IndexMetadata indexMetadata);
 +    Collection<Index> listIndexes();
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/internal/CassandraIndex.java
index 4bbf682,0000000..9d997a7
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@@ -1,862 -1,0 +1,882 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.index.internal;
 +
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.Future;
 +import java.util.function.BiFunction;
 +import java.util.regex.Matcher;
 +import java.util.regex.Pattern;
 +import java.util.stream.Collectors;
 +import java.util.stream.StreamSupport;
 +
 +import com.google.common.collect.ImmutableSet;
 +import org.apache.commons.lang3.StringUtils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.ColumnIdentifier;
 +import org.apache.cassandra.cql3.Operator;
 +import org.apache.cassandra.cql3.statements.IndexTarget;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.filter.RowFilter;
 +import org.apache.cassandra.db.lifecycle.SSTableSet;
 +import org.apache.cassandra.db.lifecycle.View;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.db.marshal.CollectionType;
 +import org.apache.cassandra.db.marshal.EmptyType;
 +import org.apache.cassandra.db.partitions.PartitionIterator;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.dht.LocalPartitioner;
 +import org.apache.cassandra.exceptions.InvalidRequestException;
 +import org.apache.cassandra.index.Index;
 +import org.apache.cassandra.index.IndexRegistry;
 +import org.apache.cassandra.index.SecondaryIndexBuilder;
 +import org.apache.cassandra.index.internal.composites.CompositesSearcher;
 +import org.apache.cassandra.index.internal.keys.KeysSearcher;
 +import org.apache.cassandra.index.transactions.IndexTransaction;
 +import org.apache.cassandra.index.transactions.UpdateTransaction;
 +import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.Pair;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +import org.apache.cassandra.utils.concurrent.Refs;
 +
 +import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
 +
 +/**
 + * Index implementation which indexes the values for a single column in the base
 + * table and which stores its index data in a local, hidden table.
 + */
 +public abstract class CassandraIndex implements Index
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(CassandraIndex.class);
 +
 +    public static final Pattern TARGET_REGEX = Pattern.compile("^(keys|entries|values|full)\\((.+)\\)$");
 +
 +    public final ColumnFamilyStore baseCfs;
 +    protected IndexMetadata metadata;
 +    protected ColumnFamilyStore indexCfs;
 +    protected ColumnDefinition indexedColumn;
 +    protected CassandraIndexFunctions functions;
 +
 +    protected CassandraIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
 +    {
 +        this.baseCfs = baseCfs;
 +        setMetadata(indexDef);
 +    }
 +
 +    /**
 +     * Returns true if an index of this type can support search predicates of the form [column] OPERATOR [value]
 +     * @param indexedColumn
 +     * @param operator
 +     * @return
 +     */
 +    protected boolean supportsOperator(ColumnDefinition indexedColumn, Operator operator)
 +    {
 +        return operator == Operator.EQ;
 +    }
 +
 +    /**
 +     * Used to construct an the clustering for an entry in the index table based on values from the base data.
 +     * The clustering columns in the index table encode the values required to retrieve the correct data from the base
 +     * table and varies depending on the kind of the indexed column. See indexCfsMetadata for more details
 +     * Used whenever a row in the index table is written or deleted.
 +     * @param partitionKey from the base data being indexed
 +     * @param prefix from the base data being indexed
 +     * @param path from the base data being indexed
 +     * @return a clustering prefix to be used to insert into the index table
 +     */
 +    protected abstract CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey,
 +                                                           ClusteringPrefix prefix,
 +                                                           CellPath path);
 +
 +    /**
 +     * Used at search time to convert a row in the index table into a simple struct containing the values required
 +     * to retrieve the corresponding row from the base table.
 +     * @param indexedValue the partition key of the indexed table (i.e. the value that was indexed)
 +     * @param indexEntry a row from the index table
 +     * @return
 +     */
 +    public abstract IndexEntry decodeEntry(DecoratedKey indexedValue,
 +                                           Row indexEntry);
 +
 +    /**
 +     * Check whether a value retrieved from an index is still valid by comparing it to current row from the base table.
 +     * Used at read time to identify out of date index entries so that they can be excluded from search results and
 +     * repaired
 +     * @param row the current row from the primary data table
 +     * @param indexValue the value we retrieved from the index
 +     * @param nowInSec
 +     * @return true if the index is out of date and the entry should be dropped
 +     */
 +    public abstract boolean isStale(Row row, ByteBuffer indexValue, int nowInSec);
 +
 +    /**
 +     * Extract the value to be inserted into the index from the components of the base data
 +     * @param partitionKey from the primary data
 +     * @param clustering from the primary data
 +     * @param path from the primary data
 +     * @param cellValue from the primary data
 +     * @return a ByteBuffer containing the value to be inserted in the index. This will be used to make the partition
 +     * key in the index table
 +     */
 +    protected abstract ByteBuffer getIndexedValue(ByteBuffer partitionKey,
 +                                                  Clustering clustering,
 +                                                  CellPath path,
 +                                                  ByteBuffer cellValue);
 +
 +    public ColumnDefinition getIndexedColumn()
 +    {
 +        return indexedColumn;
 +    }
 +
 +    public ClusteringComparator getIndexComparator()
 +    {
 +        return indexCfs.metadata.comparator;
 +    }
 +
 +    public ColumnFamilyStore getIndexCfs()
 +    {
 +        return indexCfs;
 +    }
 +
 +    public void register(IndexRegistry registry)
 +    {
 +        registry.registerIndex(this);
 +    }
 +
 +    public Callable<?> getInitializationTask()
 +    {
 +        // if we're just linking in the index on an already-built index post-restart or if the base
 +        // table is empty we've nothing to do. Otherwise, submit for building via SecondaryIndexBuilder
 +        return isBuilt() || baseCfs.isEmpty() ? null : getBuildIndexTask();
 +    }
 +
 +    public IndexMetadata getIndexMetadata()
 +    {
 +        return metadata;
 +    }
 +
 +    public Optional<ColumnFamilyStore> getBackingTable()
 +    {
 +        return indexCfs == null ? Optional.empty() : Optional.of(indexCfs);
 +    }
 +
 +    public Callable<Void> getBlockingFlushTask()
 +    {
 +        return () -> {
 +            indexCfs.forceBlockingFlush();
 +            return null;
 +        };
 +    }
 +
 +    public Callable<?> getInvalidateTask()
 +    {
 +        return () -> {
 +            invalidate();
 +            return null;
 +        };
 +    }
 +
 +    public Callable<?> getMetadataReloadTask(IndexMetadata indexDef)
 +    {
 +        return () -> {
 +            indexCfs.metadata.reloadIndexMetadataProperties(baseCfs.metadata);
 +            indexCfs.reload();
 +            return null;
 +        };
 +    }
 +
 +    @Override
 +    public void validate(ReadCommand command) throws InvalidRequestException
 +    {
 +        Optional<RowFilter.Expression> target = getTargetExpression(command.rowFilter().getExpressions());
 +
 +        if (target.isPresent())
 +        {
 +            ByteBuffer indexValue = target.get().getIndexValue();
 +            checkFalse(indexValue.remaining() > FBUtilities.MAX_UNSIGNED_SHORT,
 +                       "Index expression values may not be larger than 64K");
 +        }
 +    }
 +
 +    private void setMetadata(IndexMetadata indexDef)
 +    {
 +        metadata = indexDef;
 +        Pair<ColumnDefinition, IndexTarget.Type> target = parseTarget(baseCfs.metadata, indexDef);
 +        functions = getFunctions(indexDef, target);
 +        CFMetaData cfm = indexCfsMetadata(baseCfs.metadata, indexDef);
 +        indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace,
 +                                                             cfm.cfName,
 +                                                             cfm,
 +                                                             baseCfs.getTracker().loadsstables);
 +        indexedColumn = target.left;
 +    }
 +
 +    public Callable<?> getTruncateTask(final long truncatedAt)
 +    {
 +        return () -> {
 +            indexCfs.discardSSTables(truncatedAt);
 +            return null;
 +        };
 +    }
 +
 +    public boolean shouldBuildBlocking()
 +    {
 +        // built-in indexes are always included in builds initiated from SecondaryIndexManager
 +        return true;
 +    }
 +
 +    public boolean dependsOn(ColumnDefinition column)
 +    {
 +        return indexedColumn.name.equals(column.name);
 +    }
 +
 +    public boolean supportsExpression(ColumnDefinition column, Operator operator)
 +    {
 +        return indexedColumn.name.equals(column.name)
 +               && supportsOperator(indexedColumn, operator);
 +    }
 +
 +    private boolean supportsExpression(RowFilter.Expression expression)
 +    {
 +        return supportsExpression(expression.column(), expression.operator());
 +    }
 +
 +    public AbstractType<?> customExpressionValueType()
 +    {
 +        return null;
 +    }
 +
 +    public long getEstimatedResultRows()
 +    {
 +        return indexCfs.getMeanColumns();
 +    }
 +
 +    /**
 +     * No post processing of query results, just return them unchanged
 +     */
 +    public BiFunction<PartitionIterator, ReadCommand, PartitionIterator> postProcessorFor(ReadCommand command)
 +    {
 +        return (partitionIterator, readCommand) -> partitionIterator;
 +    }
 +
 +    public RowFilter getPostIndexQueryFilter(RowFilter filter)
 +    {
 +        return getTargetExpression(filter.getExpressions()).map(filter::without)
 +                                                           .orElse(filter);
 +    }
 +
 +    private Optional<RowFilter.Expression> getTargetExpression(List<RowFilter.Expression> expressions)
 +    {
 +        return expressions.stream().filter(this::supportsExpression).findFirst();
 +    }
 +
 +    public Index.Searcher searcherFor(ReadCommand command)
 +    {
 +        Optional<RowFilter.Expression> target = getTargetExpression(command.rowFilter().getExpressions());
 +
 +        if (target.isPresent())
 +        {
 +            target.get().validateForIndexing();
 +            switch (getIndexMetadata().kind)
 +            {
 +                case COMPOSITES:
 +                    return new CompositesSearcher(command, target.get(), this);
 +                case KEYS:
 +                    return new KeysSearcher(command, target.get(), this);
 +                default:
 +                    throw new IllegalStateException(String.format("Unsupported index type %s for index %s on %s",
 +                                                                  metadata.kind,
 +                                                                  metadata.name,
 +                                                                  indexedColumn.name.toString()));
 +            }
 +        }
 +
 +        return null;
 +
 +    }
 +
 +    public void validate(PartitionUpdate update) throws InvalidRequestException
 +    {
 +        switch (indexedColumn.kind)
 +        {
 +            case PARTITION_KEY:
 +                validatePartitionKey(update.partitionKey());
 +                break;
 +            case CLUSTERING:
 +                validateClusterings(update);
 +                break;
 +            case REGULAR:
 +                if (update.columns().regulars.contains(indexedColumn))
 +                    validateRows(update);
 +                break;
 +            case STATIC:
 +                if (update.columns().statics.contains(indexedColumn))
 +                    validateRows(Collections.singleton(update.staticRow()));
 +                break;
 +        }
 +    }
 +
 +    public Indexer indexerFor(final DecoratedKey key,
 +                              final PartitionColumns columns,
 +                              final int nowInSec,
 +                              final OpOrder.Group opGroup,
 +                              final IndexTransaction.Type transactionType)
 +    {
 +        /**
 +         * Indexes on regular and static columns (the non primary-key ones) only care about updates with live
 +         * data for the column they index. In particular, they don't care about having just row or range deletions
 +         * as they don't know how to update the index table unless they know exactly the value that is deleted.
 +         *
 +         * Note that in practice this means that those indexes are only purged of stale entries on compaction,
 +         * when we resolve both the deletion and the prior data it deletes. Of course, such stale entries are also
 +         * filtered on read.
 +         */
 +        if (!isPrimaryKeyIndex() && !columns.contains(indexedColumn))
 +            return null;
 +
 +        return new Indexer()
 +        {
 +            public void begin()
 +            {
 +            }
 +
 +            public void partitionDelete(DeletionTime deletionTime)
 +            {
 +            }
 +
 +            public void rangeTombstone(RangeTombstone tombstone)
 +            {
 +            }
 +
 +            public void insertRow(Row row)
 +            {
 +                if (row.isStatic() != indexedColumn.isStatic())
 +                    return;
 +
 +                if (isPrimaryKeyIndex())
 +                {
 +                    indexPrimaryKey(row.clustering(),
 +                                    getPrimaryKeyIndexLiveness(row),
 +                                    row.deletion());
 +                }
 +                else
 +                {
 +                    if (indexedColumn.isComplex())
 +                        indexCells(row.clustering(), row.getComplexColumnData(indexedColumn));
 +                    else
 +                        indexCell(row.clustering(), row.getCell(indexedColumn));
 +                }
 +            }
 +
 +            public void removeRow(Row row)
 +            {
 +                if (isPrimaryKeyIndex())
 +                    return;
 +
 +                if (indexedColumn.isComplex())
 +                    removeCells(row.clustering(), row.getComplexColumnData(indexedColumn));
 +                else
 +                    removeCell(row.clustering(), row.getCell(indexedColumn));
 +            }
 +
 +            public void updateRow(Row oldRow, Row newRow)
 +            {
 +                assert oldRow.isStatic() == newRow.isStatic();
 +                if (newRow.isStatic() != indexedColumn.isStatic())
 +                    return;
 +
 +                if (isPrimaryKeyIndex())
 +                    indexPrimaryKey(newRow.clustering(),
 +                                    newRow.primaryKeyLivenessInfo(),
 +                                    newRow.deletion());
 +
 +                if (indexedColumn.isComplex())
 +                {
 +                    indexCells(newRow.clustering(), newRow.getComplexColumnData(indexedColumn));
 +                    removeCells(oldRow.clustering(), oldRow.getComplexColumnData(indexedColumn));
 +                }
 +                else
 +                {
 +                    indexCell(newRow.clustering(), newRow.getCell(indexedColumn));
 +                    removeCell(oldRow.clustering(), oldRow.getCell(indexedColumn));
 +                }
 +            }
 +
 +            public void finish()
 +            {
 +            }
 +
 +            private void indexCells(Clustering clustering, Iterable<Cell> cells)
 +            {
 +                if (cells == null)
 +                    return;
 +
 +                for (Cell cell : cells)
 +                    indexCell(clustering, cell);
 +            }
 +
 +            private void indexCell(Clustering clustering, Cell cell)
 +            {
 +                if (cell == null || !cell.isLive(nowInSec))
 +                    return;
 +
 +                insert(key.getKey(),
 +                       clustering,
 +                       cell,
 +                       LivenessInfo.create(cell.timestamp(), cell.ttl(), cell.localDeletionTime()),
 +                       opGroup);
 +            }
 +
 +            private void removeCells(Clustering clustering, Iterable<Cell> cells)
 +            {
 +                if (cells == null)
 +                    return;
 +
 +                for (Cell cell : cells)
 +                    removeCell(clustering, cell);
 +            }
 +
 +            private void removeCell(Clustering clustering, Cell cell)
 +            {
 +                if (cell == null || !cell.isLive(nowInSec))
 +                    return;
 +
 +                delete(key.getKey(), clustering, cell, opGroup, nowInSec);
 +            }
 +
 +            private void indexPrimaryKey(final Clustering clustering,
 +                                         final LivenessInfo liveness,
 +                                         final Row.Deletion deletion)
 +            {
 +                if (liveness.timestamp() != LivenessInfo.NO_TIMESTAMP)
 +                    insert(key.getKey(), clustering, null, liveness, opGroup);
 +
 +                if (!deletion.isLive())
 +                    delete(key.getKey(), clustering, deletion.time(), opGroup);
 +            }
 +
 +            private LivenessInfo getPrimaryKeyIndexLiveness(Row row)
 +            {
 +                long timestamp = row.primaryKeyLivenessInfo().timestamp();
 +                int ttl = row.primaryKeyLivenessInfo().ttl();
 +                for (Cell cell : row.cells())
 +                {
 +                    long cellTimestamp = cell.timestamp();
 +                    if (cell.isLive(nowInSec))
 +                    {
 +                        if (cellTimestamp > timestamp)
 +                        {
 +                            timestamp = cellTimestamp;
 +                            ttl = cell.ttl();
 +                        }
 +                    }
 +                }
 +                return LivenessInfo.create(baseCfs.metadata, timestamp, ttl, nowInSec);
 +            }
 +        };
 +    }
 +
 +    /**
 +     * Specific to internal indexes, this is called by a
 +     * searcher when it encounters a stale entry in the index
 +     * @param indexKey the partition key in the index table
 +     * @param indexClustering the clustering in the index table
 +     * @param deletion deletion timestamp etc
 +     * @param opGroup the operation under which to perform the deletion
 +     */
 +    public void deleteStaleEntry(DecoratedKey indexKey,
 +                                 Clustering indexClustering,
 +                                 DeletionTime deletion,
 +                                 OpOrder.Group opGroup)
 +    {
 +        doDelete(indexKey, indexClustering, deletion, opGroup);
 +        logger.trace("Removed index entry for stale value {}", indexKey);
 +    }
 +
 +    /**
 +     * Called when adding a new entry to the index
 +     */
 +    private void insert(ByteBuffer rowKey,
 +                        Clustering clustering,
 +                        Cell cell,
 +                        LivenessInfo info,
 +                        OpOrder.Group opGroup)
 +    {
 +        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
 +                                                               clustering,
 +                                                               cell));
 +        Row row = BTreeRow.noCellLiveRow(buildIndexClustering(rowKey, clustering, cell), info);
 +        PartitionUpdate upd = partitionUpdate(valueKey, row);
 +        indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null);
 +        logger.trace("Inserted entry into index for value {}", valueKey);
 +    }
 +
 +    /**
 +     * Called when deleting entries on non-primary key columns
 +     */
 +    private void delete(ByteBuffer rowKey,
 +                        Clustering clustering,
 +                        Cell cell,
 +                        OpOrder.Group opGroup,
 +                        int nowInSec)
 +    {
 +        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
 +                                                               clustering,
 +                                                               cell));
 +        doDelete(valueKey,
 +                 buildIndexClustering(rowKey, clustering, cell),
 +                 new DeletionTime(cell.timestamp(), nowInSec),
 +                 opGroup);
 +    }
 +
 +    /**
 +     * Called when deleting entries from indexes on primary key columns
 +     */
 +    private void delete(ByteBuffer rowKey,
 +                        Clustering clustering,
 +                        DeletionTime deletion,
 +                        OpOrder.Group opGroup)
 +    {
 +        DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
 +                                                               clustering,
 +                                                               null));
 +        doDelete(valueKey,
 +                 buildIndexClustering(rowKey, clustering, null),
 +                 deletion,
 +                 opGroup);
 +    }
 +
 +    private void doDelete(DecoratedKey indexKey,
 +                          Clustering indexClustering,
 +                          DeletionTime deletion,
 +                          OpOrder.Group opGroup)
 +    {
 +        Row row = BTreeRow.emptyDeletedRow(indexClustering, Row.Deletion.regular(deletion));
 +        PartitionUpdate upd = partitionUpdate(indexKey, row);
 +        indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null);
 +        logger.trace("Removed index entry for value {}", indexKey);
 +    }
 +
 +    private void validatePartitionKey(DecoratedKey partitionKey) throws InvalidRequestException
 +    {
 +        assert indexedColumn.isPartitionKey();
 +        validateIndexedValue(getIndexedValue(partitionKey.getKey(), null, null));
 +    }
 +
 +    private void validateClusterings(PartitionUpdate update) throws InvalidRequestException
 +    {
 +        assert indexedColumn.isClusteringColumn();
 +        for (Row row : update)
 +            validateIndexedValue(getIndexedValue(null, row.clustering(), null));
 +    }
 +
 +    private void validateRows(Iterable<Row> rows)
 +    {
 +        assert !indexedColumn.isPrimaryKeyColumn();
 +        for (Row row : rows)
 +        {
 +            if (indexedColumn.isComplex())
 +            {
 +                ComplexColumnData data = row.getComplexColumnData(indexedColumn);
 +                if (data != null)
 +                {
 +                    for (Cell cell : data)
 +                    {
 +                        validateIndexedValue(getIndexedValue(null, null, cell.path(), cell.value()));
 +                    }
 +                }
 +            }
 +            else
 +            {
 +                validateIndexedValue(getIndexedValue(null, null, row.getCell(indexedColumn)));
 +            }
 +        }
 +    }
 +
 +    private void validateIndexedValue(ByteBuffer value)
 +    {
 +        if (value != null && value.remaining() >= FBUtilities.MAX_UNSIGNED_SHORT)
 +            throw new InvalidRequestException(String.format(
 +                                                           "Cannot index value of size %d for index %s on %s.%s(%s) (maximum allowed size=%d)",
 +                                                           value.remaining(),
 +                                                           metadata.name,
 +                                                           baseCfs.metadata.ksName,
 +                                                           baseCfs.metadata.cfName,
 +                                                           indexedColumn.name.toString(),
 +                                                           FBUtilities.MAX_UNSIGNED_SHORT));
 +    }
 +
 +    private ByteBuffer getIndexedValue(ByteBuffer rowKey,
 +                                       Clustering clustering,
 +                                       Cell cell)
 +    {
 +        return getIndexedValue(rowKey,
 +                               clustering,
 +                               cell == null ? null : cell.path(),
 +                               cell == null ? null : cell.value()
 +        );
 +    }
 +
 +    private Clustering buildIndexClustering(ByteBuffer rowKey,
 +                                            Clustering clustering,
 +                                            Cell cell)
 +    {
 +        return buildIndexClusteringPrefix(rowKey,
 +                                          clustering,
 +                                          cell == null ? null : cell.path()).build();
 +    }
 +
 +    private DecoratedKey getIndexKeyFor(ByteBuffer value)
 +    {
 +        return indexCfs.decorateKey(value);
 +    }
 +
 +    private PartitionUpdate partitionUpdate(DecoratedKey valueKey, Row row)
 +    {
 +        return PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row);
 +    }
 +
 +    private void invalidate()
 +    {
 +        // interrupt in-progress compactions
 +        Collection<ColumnFamilyStore> cfss = Collections.singleton(indexCfs);
 +        CompactionManager.instance.interruptCompactionForCFs(cfss, true);
 +        CompactionManager.instance.waitForCessation(cfss);
 +        Keyspace.writeOrder.awaitNewBarrier();
 +        indexCfs.forceBlockingFlush();
 +        indexCfs.readOrdering.awaitNewBarrier();
 +        indexCfs.invalidate();
 +    }
 +
 +    private boolean isBuilt()
 +    {
 +        return SystemKeyspace.isIndexBuilt(baseCfs.keyspace.getName(), metadata.name);
 +    }
 +
 +    private boolean isPrimaryKeyIndex()
 +    {
 +        return indexedColumn.isPrimaryKeyColumn();
 +    }
 +
 +    private Callable<?> getBuildIndexTask()
 +    {
 +        return () -> {
 +            buildBlocking();
 +            return null;
 +        };
 +    }
 +
 +    private void buildBlocking()
 +    {
 +        baseCfs.forceBlockingFlush();
 +
 +        try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL));
 +             Refs<SSTableReader> sstables = viewFragment.refs)
 +        {
 +            if (sstables.isEmpty())
 +            {
 +                logger.info("No SSTable data for {}.{} to build index {} from, marking empty index as built",
 +                            baseCfs.metadata.ksName,
 +                            baseCfs.metadata.cfName,
 +                            metadata.name);
 +                baseCfs.indexManager.markIndexBuilt(metadata.name);
 +                return;
 +            }
 +
 +            logger.info("Submitting index build of {} for data in {}",
 +                        metadata.name,
 +                        getSSTableNames(sstables));
 +
 +            SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
 +                                                                      Collections.singleton(this),
 +                                                                      new ReducingKeyIterator(sstables));
 +            Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
 +            FBUtilities.waitOnFuture(future);
 +            indexCfs.forceBlockingFlush();
 +            baseCfs.indexManager.markIndexBuilt(metadata.name);
 +        }
 +        logger.info("Index build of {} complete", metadata.name);
 +    }
 +
 +    private static String getSSTableNames(Collection<SSTableReader> sstables)
 +    {
 +        return StreamSupport.stream(sstables.spliterator(), false)
 +                            .map(SSTableReader::toString)
 +                            .collect(Collectors.joining(", "));
 +    }
 +
 +    /**
 +     * Construct the CFMetadata for an index table, the clustering columns in the index table
 +     * vary dependent on the kind of the indexed value.
 +     * @param baseCfsMetadata
 +     * @param indexMetadata
 +     * @return
 +     */
 +    public static final CFMetaData indexCfsMetadata(CFMetaData baseCfsMetadata, IndexMetadata indexMetadata)
 +    {
 +        Pair<ColumnDefinition, IndexTarget.Type> target = parseTarget(baseCfsMetadata, indexMetadata);
 +        CassandraIndexFunctions utils = getFunctions(indexMetadata, target);
 +        ColumnDefinition indexedColumn = target.left;
 +        AbstractType<?> indexedValueType = utils.getIndexedValueType(indexedColumn);
 +
 +        // Tables for legacy KEYS indexes are non-compound and dense
 +        CFMetaData.Builder builder = indexMetadata.isKeys()
 +                                     ? CFMetaData.Builder.create(baseCfsMetadata.ksName,
 +                                                                 baseCfsMetadata.indexColumnFamilyName(indexMetadata),
 +                                                                 true, false, false)
 +                                     : CFMetaData.Builder.create(baseCfsMetadata.ksName,
 +                                                                 baseCfsMetadata.indexColumnFamilyName(indexMetadata));
 +
 +        builder =  builder.withId(baseCfsMetadata.cfId)
 +                          .withPartitioner(new LocalPartitioner(indexedValueType))
 +                          .addPartitionKey(indexedColumn.name, indexedColumn.type)
 +                          .addClusteringColumn("partition_key", baseCfsMetadata.partitioner.partitionOrdering());
 +
 +        if (indexMetadata.isKeys())
 +        {
 +            // A dense, compact table for KEYS indexes must have a compact
 +            // value column defined, even though it is never used
 +            CompactTables.DefaultNames names =
 +                CompactTables.defaultNameGenerator(ImmutableSet.of(indexedColumn.name.toString(), "partition_key"));
 +            builder = builder.addRegularColumn(names.defaultCompactValueName(), EmptyType.instance);
 +        }
 +        else
 +        {
 +            // The clustering columns for a table backing a COMPOSITES index are dependent
 +            // on the specific type of index (there are specializations for indexes on collections)
 +            builder = utils.addIndexClusteringColumns(builder, baseCfsMetadata, indexedColumn);
 +        }
 +
 +        return builder.build().reloadIndexMetadataProperties(baseCfsMetadata);
 +    }
 +
 +    /**
 +     * Factory method for new CassandraIndex instances
 +     * @param baseCfs
 +     * @param indexMetadata
 +     * @return
 +     */
 +    public static CassandraIndex newIndex(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata)
 +    {
 +        return getFunctions(indexMetadata, parseTarget(baseCfs.metadata, indexMetadata)).newIndexInstance(baseCfs, indexMetadata);
 +    }
 +
 +    // Public because it's also used to convert index metadata into a thrift-compatible format
 +    public static Pair<ColumnDefinition, IndexTarget.Type> parseTarget(CFMetaData cfm,
 +                                                                       IndexMetadata indexDef)
 +    {
 +        String target = indexDef.options.get("target");
 +        assert target != null : String.format("No target definition found for index %s", indexDef.name);
 +
 +        // if the regex matches then the target is in the form "keys(foo)", "entries(bar)" etc
 +        // if not, then it must be a simple column name and implictly its type is VALUES
 +        Matcher matcher = TARGET_REGEX.matcher(target);
 +        String columnName;
 +        IndexTarget.Type targetType;
 +        if (matcher.matches())
 +        {
 +            targetType = IndexTarget.Type.fromString(matcher.group(1));
 +            columnName = matcher.group(2);
 +        }
 +        else
 +        {
 +            columnName = target;
 +            targetType = IndexTarget.Type.VALUES;
 +        }
 +
 +        // in the case of a quoted column name the name in the target string
 +        // will be enclosed in quotes, which we need to unwrap. It may also
 +        // include quote characters internally, escaped like so:
 +        //      abc"def -> abc""def.
 +        // Because the target string is stored in a CQL compatible form, we
 +        // need to un-escape any such quotes to get the actual column name
 +        if (columnName.startsWith("\""))
 +        {
 +            columnName = StringUtils.substring(StringUtils.substring(columnName, 1), 0, -1);
 +            columnName = columnName.replaceAll("\"\"", "\"");
 +        }
 +
 +        // if it's not a CQL table, we can't assume that the column name is utf8, so
 +        // in that case we have to do a linear scan of the cfm's columns to get the matching one
 +        if (cfm.isCQLTable())
 +            return Pair.create(cfm.getColumnDefinition(new ColumnIdentifier(columnName, true)), targetType);
 +        else
 +            for (ColumnDefinition column : cfm.allColumns())
 +                if (column.name.toString().equals(columnName))
 +                    return Pair.create(column, targetType);
 +
 +        throw new RuntimeException(String.format("Unable to parse targets for index %s (%s)", indexDef.name, target));
 +    }
 +
 +    static CassandraIndexFunctions getFunctions(IndexMetadata indexDef,
 +                                                Pair<ColumnDefinition, IndexTarget.Type> target)
 +    {
 +        if (indexDef.isKeys())
 +            return CassandraIndexFunctions.KEYS_INDEX_FUNCTIONS;
 +
 +        ColumnDefinition indexedColumn = target.left;
 +        if (indexedColumn.type.isCollection() && indexedColumn.type.isMultiCell())
 +        {
 +            switch (((CollectionType)indexedColumn.type).kind)
 +            {
 +                case LIST:
 +                    return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS;
 +                case SET:
 +                    return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS;
 +                case MAP:
 +                    switch (target.right)
 +                    {
 +                        case KEYS:
 +                            return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS;
 +                        case KEYS_AND_VALUES:
 +                            return CassandraIndexFunctions.COLLECTION_ENTRY_INDEX_FUNCTIONS;
 +                        case VALUES:
 +                            return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS;
 +                    }
 +                    throw new AssertionError();
 +            }
 +        }
 +
 +        switch (indexedColumn.kind)
 +        {
 +            case CLUSTERING:
 +                return CassandraIndexFunctions.CLUSTERING_COLUMN_INDEX_FUNCTIONS;
 +            case REGULAR:
 +                return CassandraIndexFunctions.REGULAR_COLUMN_INDEX_FUNCTIONS;
 +            case PARTITION_KEY:
 +                return CassandraIndexFunctions.PARTITION_KEY_INDEX_FUNCTIONS;
 +            //case COMPACT_VALUE:
 +            //    return new CompositesIndexOnCompactValue();
 +        }
 +        throw new AssertionError();
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/index/internal/CassandraIndexSearcher.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/internal/CassandraIndexSearcher.java
index 72d2528,0000000..d6b39e6
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndexSearcher.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndexSearcher.java
@@@ -1,172 -1,0 +1,192 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.index.internal;
 +
 +import java.nio.ByteBuffer;
 +import java.util.NavigableSet;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.filter.*;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.rows.RowIterator;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 +import org.apache.cassandra.dht.AbstractBounds;
 +import org.apache.cassandra.index.Index;
 +import org.apache.cassandra.utils.btree.BTreeSet;
 +
 +public abstract class CassandraIndexSearcher implements Index.Searcher
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(CassandraIndexSearcher.class);
 +
 +    private final RowFilter.Expression expression;
 +    protected final CassandraIndex index;
 +    protected final ReadCommand command;
 +
 +    public CassandraIndexSearcher(ReadCommand command,
 +                                  RowFilter.Expression expression,
 +                                  CassandraIndex index)
 +    {
 +        this.command = command;
 +        this.expression = expression;
 +        this.index = index;
 +    }
 +
 +    @SuppressWarnings("resource") // Both the OpOrder and 'indexIter' are closed on exception, or through the closing of the result
 +    // of this method.
 +    public UnfilteredPartitionIterator search(ReadOrderGroup orderGroup)
 +    {
 +        // the value of the index expression is the partition key in the index table
 +        DecoratedKey indexKey = index.getBackingTable().get().decorateKey(expression.getIndexValue());
 +        UnfilteredRowIterator indexIter = queryIndex(indexKey, command, orderGroup);
 +        try
 +        {
 +            return queryDataFromIndex(indexKey, UnfilteredRowIterators.filter(indexIter, command.nowInSec()), command, orderGroup);
 +        }
 +        catch (RuntimeException | Error e)
 +        {
 +            indexIter.close();
 +            throw e;
 +        }
 +    }
 +
 +    private UnfilteredRowIterator queryIndex(DecoratedKey indexKey, ReadCommand command, ReadOrderGroup orderGroup)
 +    {
 +        ClusteringIndexFilter filter = makeIndexFilter(command);
 +        ColumnFamilyStore indexCfs = index.getBackingTable().get();
 +        CFMetaData indexCfm = indexCfs.metadata;
 +        return SinglePartitionReadCommand.create(indexCfm, command.nowInSec(), indexKey, ColumnFilter.all(indexCfm), filter)
 +                                         .queryMemtableAndDisk(indexCfs, orderGroup.indexReadOpOrderGroup());
 +    }
 +
 +    private ClusteringIndexFilter makeIndexFilter(ReadCommand command)
 +    {
 +        if (command instanceof SinglePartitionReadCommand)
 +        {
 +            // Note: as yet there's no route to get here - a 2i query *always* uses a
 +            // PartitionRangeReadCommand. This is here in preparation for coming changes
 +            // in SelectStatement.
 +            SinglePartitionReadCommand sprc = (SinglePartitionReadCommand)command;
 +            ByteBuffer pk = sprc.partitionKey().getKey();
 +            ClusteringIndexFilter filter = sprc.clusteringIndexFilter();
 +
 +            if (filter instanceof ClusteringIndexNamesFilter)
 +            {
 +                NavigableSet<Clustering> requested = ((ClusteringIndexNamesFilter)filter).requestedRows();
 +                BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(index.getIndexComparator());
 +                for (Clustering c : requested)
 +                    clusterings.add(makeIndexClustering(pk, c));
 +                return new ClusteringIndexNamesFilter(clusterings.build(), filter.isReversed());
 +            }
 +            else
 +            {
 +                Slices requested = ((ClusteringIndexSliceFilter)filter).requestedSlices();
 +                Slices.Builder builder = new Slices.Builder(index.getIndexComparator());
 +                for (Slice slice : requested)
 +                    builder.add(makeIndexBound(pk, slice.start()), makeIndexBound(pk, slice.end()));
 +                return new ClusteringIndexSliceFilter(builder.build(), filter.isReversed());
 +            }
 +        }
 +        else
 +        {
 +
 +            DataRange dataRange = ((PartitionRangeReadCommand)command).dataRange();
 +            AbstractBounds<PartitionPosition> range = dataRange.keyRange();
 +
 +            Slice slice = Slice.ALL;
 +
 +            /*
 +             * XXX: If the range requested is a token range, we'll have to start at the beginning (and stop at the end) of
 +             * the indexed row unfortunately (which will be inefficient), because we have no way to intuit the smallest possible
 +             * key having a given token. A potential fix would be to actually store the token along the key in the indexed row.
 +             */
 +            if (range.left instanceof DecoratedKey)
 +            {
 +                // the right hand side of the range may not be a DecoratedKey (for instance if we're paging),
 +                // but if it is, we can optimise slightly by restricting the slice
 +                if (range.right instanceof DecoratedKey)
 +                {
 +
 +                    DecoratedKey startKey = (DecoratedKey) range.left;
 +                    DecoratedKey endKey = (DecoratedKey) range.right;
 +
 +                    Slice.Bound start = Slice.Bound.BOTTOM;
 +                    Slice.Bound end = Slice.Bound.TOP;
 +
 +                    /*
 +                     * For index queries over a range, we can't do a whole lot better than querying everything for the key range, though for
 +                     * slice queries where we can slightly restrict the beginning and end.
 +                     */
 +                    if (!dataRange.isNamesQuery())
 +                    {
 +                        ClusteringIndexSliceFilter startSliceFilter = ((ClusteringIndexSliceFilter) dataRange.clusteringIndexFilter(
 +                                                                                                                                   startKey));
 +                        ClusteringIndexSliceFilter endSliceFilter = ((ClusteringIndexSliceFilter) dataRange.clusteringIndexFilter(
 +                                                                                                                                 endKey));
 +
 +                        // We can't effectively support reversed queries when we have a range, so we don't support it
 +                        // (or through post-query reordering) and shouldn't get there.
 +                        assert !startSliceFilter.isReversed() && !endSliceFilter.isReversed();
 +
 +                        Slices startSlices = startSliceFilter.requestedSlices();
 +                        Slices endSlices = endSliceFilter.requestedSlices();
 +
 +                        if (startSlices.size() > 0)
 +                            start = startSlices.get(0).start();
 +
 +                        if (endSlices.size() > 0)
 +                            end = endSlices.get(endSlices.size() - 1).end();
 +                    }
 +
 +                    slice = Slice.make(makeIndexBound(startKey.getKey(), start),
 +                                       makeIndexBound(endKey.getKey(), end));
 +                }
 +                else
 +                {
 +                    // otherwise, just start the index slice from the key we do have
 +                    slice = Slice.make(makeIndexBound(((DecoratedKey)range.left).getKey(), Slice.Bound.BOTTOM),
 +                                       Slice.Bound.TOP);
 +                }
 +            }
 +            return new ClusteringIndexSliceFilter(Slices.with(index.getIndexComparator(), slice), false);
 +        }
 +    }
 +
 +    private Slice.Bound makeIndexBound(ByteBuffer rowKey, Slice.Bound bound)
 +    {
 +        return index.buildIndexClusteringPrefix(rowKey, bound, null)
 +                                 .buildBound(bound.isStart(), bound.isInclusive());
 +    }
 +
 +    protected Clustering makeIndexClustering(ByteBuffer rowKey, Clustering clustering)
 +    {
 +        return index.buildIndexClusteringPrefix(rowKey, clustering, null).build();
 +    }
 +
 +    protected abstract UnfilteredPartitionIterator queryDataFromIndex(DecoratedKey indexKey,
 +                                                                      RowIterator indexHits,
 +                                                                      ReadCommand command,
 +                                                                      ReadOrderGroup orderGroup);
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/index/internal/IndexEntry.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/internal/IndexEntry.java
index 6f94ace,0000000..97525d6
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/index/internal/IndexEntry.java
+++ b/src/java/org/apache/cassandra/index/internal/IndexEntry.java
@@@ -1,34 -1,0 +1,54 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.index.internal;
 +
 +import java.nio.ByteBuffer;
 +
 +import org.apache.cassandra.db.Clustering;
 +import org.apache.cassandra.db.DecoratedKey;
 +
 +/**
 + * Entries in indexes on non-compact tables (tables with composite comparators)
 + * can be encapsulated as IndexedEntry instances. These are not used when dealing
 + * with indexes on static/compact/thrift tables (i.e. KEYS indexes).
 + */
 +public final class IndexEntry
 +{
 +    public final DecoratedKey indexValue;
 +    public final Clustering indexClustering;
 +    public final long timestamp;
 +
 +    public final ByteBuffer indexedKey;
 +    public final Clustering indexedEntryClustering;
 +
 +    public IndexEntry(DecoratedKey indexValue,
 +                      Clustering indexClustering,
 +                      long timestamp,
 +                      ByteBuffer indexedKey,
 +                      Clustering indexedEntryClustering)
 +    {
 +        this.indexValue = indexValue;
 +        this.indexClustering = indexClustering;
 +        this.timestamp = timestamp;
 +        this.indexedKey = indexedKey;
 +        this.indexedEntryClustering = indexedEntryClustering;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/index/internal/keys/KeysIndex.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/internal/keys/KeysIndex.java
index 53ecd01,0000000..d680253
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/index/internal/keys/KeysIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/keys/KeysIndex.java
@@@ -1,62 -1,0 +1,82 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
 +package org.apache.cassandra.index.internal.keys;
 +
 +import java.nio.ByteBuffer;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.rows.Cell;
 +import org.apache.cassandra.db.rows.CellPath;
 +import org.apache.cassandra.db.rows.Row;
 +import org.apache.cassandra.index.internal.CassandraIndex;
 +import org.apache.cassandra.index.internal.IndexEntry;
 +import org.apache.cassandra.schema.IndexMetadata;
 +
 +public class KeysIndex extends CassandraIndex
 +{
 +    public KeysIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
 +    {
 +        super(baseCfs, indexDef);
 +    }
 +
 +    public CFMetaData.Builder addIndexClusteringColumns(CFMetaData.Builder builder,
 +                                                        CFMetaData baseMetadata,
 +                                                        ColumnDefinition cfDef)
 +    {
 +        // no additional clustering columns required
 +        return builder;
 +    }
 +
 +    protected CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey,
 +                                               ClusteringPrefix prefix,
 +                                               CellPath path)
 +    {
 +        CBuilder builder = CBuilder.create(getIndexComparator());
 +        builder.add(partitionKey);
 +        return builder;
 +    }
 +
 +    protected ByteBuffer getIndexedValue(ByteBuffer partitionKey,
 +                                      Clustering clustering,
 +                                      CellPath path, ByteBuffer cellValue)
 +    {
 +        return cellValue;
 +    }
 +
 +    public IndexEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
 +    {
 +        throw new UnsupportedOperationException("KEYS indexes do not use a specialized index entry format");
 +    }
 +
 +    public boolean isStale(Row row, ByteBuffer indexValue, int nowInSec)
 +    {
 +        if (row == null)
 +            return true;
 +
 +        Cell cell = row.getCell(indexedColumn);
 +
 +        return (cell == null
 +             || !cell.isLive(nowInSec)
 +             || indexedColumn.type.compare(indexValue, cell.value()) != 0);
 +    }
 +}