You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ca...@apache.org on 2016/06/15 14:59:50 UTC
[17/20] cassandra git commit: Merge branch 'cassandra-2.2' into
cassandra-3.0
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/transform/UnfilteredRows.java
index 98640ae,0000000..f000fcf
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java
+++ b/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java
@@@ -1,40 -1,0 +1,60 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.db.transform;
+
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+
+final class UnfilteredRows extends BaseRows<Unfiltered, UnfilteredRowIterator> implements UnfilteredRowIterator
+{
+ private DeletionTime partitionLevelDeletion;
+
+ public UnfilteredRows(UnfilteredRowIterator input)
+ {
+ super(input);
+ partitionLevelDeletion = input.partitionLevelDeletion();
+ }
+
+ @Override
+ void add(Transformation add)
+ {
+ super.add(add);
+ partitionLevelDeletion = add.applyToDeletion(partitionLevelDeletion);
+ }
+
+ public DeletionTime partitionLevelDeletion()
+ {
+ return partitionLevelDeletion;
+ }
+
+ public EncodingStats stats()
+ {
+ return input.stats();
+ }
+
+ @Override
+ public boolean isEmpty()
+ {
+ return staticRow().isEmpty() && partitionLevelDeletion().isLive() && !hasNext();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/index/Index.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/Index.java
index ab6665d,0000000..469ef07
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/index/Index.java
+++ b/src/java/org/apache/cassandra/index/Index.java
@@@ -1,452 -1,0 +1,472 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.index;
+
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.function.BiFunction;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.index.transactions.IndexTransaction;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+/**
+ * Consisting of a top level Index interface and two sub-interfaces which handle read and write operations,
+ * Searcher and Indexer respectively, this defines a secondary index implementation.
+ * Instantiation is done via reflection and implementations must provide a constructor which takes the base
+ * table's ColumnFamilyStore and the IndexMetadata which defines the Index as arguments. e.g:
+ * {@code MyCustomIndex( ColumnFamilyStore baseCfs, IndexMetadata indexDef )}
+ *
+ * The main interface defines methods for index management, index selection at both write and query time,
+ * as well as validation of values that will ultimately be indexed.
+ * Two sub-interfaces are also defined, which represent single use helpers for short lived tasks at read and write time.
+ * Indexer: an event listener which receives notifications at particular points during an update of a single partition
+ * in the base table.
+ * Searcher: performs queries against the index based on a predicate defined in a RowFilter. An instance
+ * is expected to be single use, being involved in the execution of a single ReadCommand.
+ *
+ * The main interface includes factory methods for obtaining instances of both of the sub-interfaces;
+ *
+ * The methods defined in the top level interface can be grouped into 3 categories:
+ *
+ * Management Tasks:
+ * This group of methods is primarily concerned with maintenance of secondary indexes are are mainly called from
+ * SecondaryIndexManager. It includes methods for registering and un-registering an index, performing maintenance
+ * tasks such as (re)building an index from SSTable data, flushing, invalidating and so forth, as well as some to
+ * retrieve general metadata about the index (index name, any internal tables used for persistence etc).
+ * Several of these maintenance functions have a return type of Callable<?>; the expectation for these methods is
+ * that any work required to be performed by the method be done inside the Callable so that the responsibility for
+ * scheduling its execution can rest with SecondaryIndexManager. For instance, a task like reloading index metadata
+ * following potential updates caused by modifications to the base table may be performed in a blocking way. In
+ * contrast, adding a new index may require it to be built from existing SSTable data, a potentially expensive task
+ * which should be performed asyncronously.
+ *
+ * Index Selection:
+ * There are two facets to index selection, write time and read time selection. The former is concerned with
+ * identifying whether an index should be informed about a particular write operation. The latter is about providing
+ * means to use the index for search during query execution.
+ *
+ * Validation:
+ * Values that may be written to an index are checked as part of input validation, prior to an update or insert
+ * operation being accepted.
+ *
+ *
+ * Sub-interfaces:
+ *
+ * Update processing:
+ * Indexes are subscribed to the stream of events generated by modifications to the base table. Subscription is
+ * done via first registering the Index with the base table's SecondaryIndexManager. For each partition update, the set
+ * of registered indexes are then filtered based on the properties of the update using the selection methods on the main
+ * interface described above. Each of the indexes in the filtered set then provides an event listener to receive
+ * notifications about the update as it is processed. As such then, a event handler instance is scoped to a single
+ * partition update; SecondaryIndexManager obtains a new handler for every update it processes (via a call to the
+ * factory method, indexerFor. That handler will then receive all events for the update, before being
+ * discarded by the SecondaryIndexManager. Indexer instances are never re-used by SecondaryIndexManager and the
+ * expectation is that each call to indexerFor should return a unique instance, or at least if instances can
+ * be recycled, that a given instance is only used to process a single partition update at a time.
+ *
+ * Search:
+ * Each query (i.e. a single ReadCommand) that uses indexes will use a single instance of Index.Searcher. As with
+ * processing of updates, an Index must be registered with the primary table's SecondaryIndexManager to be able to
+ * support queries. During the processing of a ReadCommand, the Expressions in its RowFilter are examined to determine
+ * whether any of them are supported by a registered Index. supportsExpression is used to filter out Indexes which
+ * cannot support a given Expression. After filtering, the set of candidate indexes are ranked according to the result
+ * of getEstimatedResultRows and the most selective (i.e. the one expected to return the smallest number of results) is
+ * chosen. A Searcher instance is then obtained from the searcherFor method & used to perform the actual Index lookup.
+ * Finally, Indexes can define a post processing step to be performed on the coordinator, after results (partitions from
+ * the primary table) have been received from replicas and reconciled. This post processing is defined as a
+ * java.util.functions.BiFunction<PartitionIterator, RowFilter, PartitionIterator>, that is a function which takes as
+ * arguments a PartitionIterator (containing the reconciled result rows) and a RowFilter (from the ReadCommand being
+ * executed) and returns another iterator of partitions, possibly having transformed the initial results in some way.
+ * The post processing function is obtained from the Index's postProcessorFor method; the built-in indexes which ship
+ * with Cassandra return a no-op function here.
+ *
+ * An optional static method may be provided to validate custom index options (two variants are supported):
+ *
+ * <pre>{@code public static Map<String, String> validateOptions(Map<String, String> options);</pre>
+ *
+ * The input is the map of index options supplied in the WITH clause of a CREATE INDEX statement.
+ *
+ * <pre>{@code public static Map<String, String> validateOptions(Map<String, String> options, CFMetaData cfm);}</pre>
+ *
+ * In this version, the base table's metadata is also supplied as an argument.
+ * If both overloaded methods are provided, only the one including the base table's metadata will be invoked.
+ *
+ * The validation method should return a map containing any of the supplied options which are not valid for the
+ * implementation. If the returned map is not empty, validation is considered failed and an error is raised.
+ * Alternatively, the implementation may choose to throw an org.apache.cassandra.exceptions.ConfigurationException
+ * if invalid options are encountered.
+ *
+ */
+public interface Index
+{
+
+ /*
+ * Management functions
+ */
+
+ /**
+ * Return a task to perform any initialization work when a new index instance is created.
+ * This may involve costly operations such as (re)building the index, and is performed asynchronously
+ * by SecondaryIndexManager
+ * @return a task to perform any necessary initialization work
+ */
+ public Callable<?> getInitializationTask();
+
+ /**
+ * Returns the IndexMetadata which configures and defines the index instance. This should be the same
+ * object passed as the argument to setIndexMetadata.
+ * @return the index's metadata
+ */
+ public IndexMetadata getIndexMetadata();
+
+ /**
+ * Return a task to reload the internal metadata of an index.
+ * Called when the base table metadata is modified or when the configuration of the Index is updated
+ * Implementations should return a task which performs any necessary work to be done due to
+ * updating the configuration(s) such as (re)building etc. This task is performed asynchronously
+ * by SecondaryIndexManager
+ * @return task to be executed by the index manager during a reload
+ */
+ public Callable<?> getMetadataReloadTask(IndexMetadata indexMetadata);
+
+ /**
+ * An index must be registered in order to be able to either subscribe to update events on the base
+ * table and/or to provide Searcher functionality for reads. The double dispatch involved here, where
+ * the Index actually performs its own registration by calling back to the supplied IndexRegistry's
+ * own registerIndex method, is to make the decision as to whether or not to register an index belong
+ * to the implementation, not the manager.
+ * @param registry the index registry to register the instance with
+ */
+ public void register(IndexRegistry registry);
+
+ /**
+ * If the index implementation uses a local table to store its index data this method should return a
+ * handle to it. If not, an empty Optional should be returned. Typically, this is useful for the built-in
+ * Index implementations.
+ * @return an Optional referencing the Index's backing storage table if it has one, or Optional.empty() if not.
+ */
+ public Optional<ColumnFamilyStore> getBackingTable();
+
+ /**
+ * Return a task which performs a blocking flush of the index's data to persistent storage.
+ * @return task to be executed by the index manager to perform the flush.
+ */
+ public Callable<?> getBlockingFlushTask();
+
+ /**
+ * Return a task which invalidates the index, indicating it should no longer be considered usable.
+ * This should include an clean up and releasing of resources required when dropping an index.
+ * @return task to be executed by the index manager to invalidate the index.
+ */
+ public Callable<?> getInvalidateTask();
+
+ /**
+ * Return a task to truncate the index with the specified truncation timestamp.
+ * Called when the base table is truncated.
+ * @param truncatedAt timestamp of the truncation operation. This will be the same timestamp used
+ * in the truncation of the base table.
+ * @return task to be executed by the index manager when the base table is truncated.
+ */
+ public Callable<?> getTruncateTask(long truncatedAt);
+
+ /**
+ * Return true if this index can be built or rebuilt when the index manager determines it is necessary. Returning
+ * false enables the index implementation (or some other component) to control if and when SSTable data is
+ * incorporated into the index.
+ *
+ * This is called by SecondaryIndexManager in buildIndexBlocking, buildAllIndexesBlocking & rebuildIndexesBlocking
+ * where a return value of false causes the index to be exluded from the set of those which will process the
+ * SSTable data.
+ * @return if the index should be included in the set which processes SSTable data, false otherwise.
+ */
+ public boolean shouldBuildBlocking();
+
+
+ /*
+ * Index selection
+ */
+
+ /**
+ * Called to determine whether this index targets a specific column.
+ * Used during schema operations such as when dropping or renaming a column, to check if
+ * the index will be affected by the change. Typically, if an index answers that it does
+ * depend upon a column, then schema operations on that column are not permitted until the index
+ * is dropped or altered.
+ *
+ * @param column the column definition to check
+ * @return true if the index depends on the supplied column being present; false if the column may be
+ * safely dropped or modified without adversely affecting the index
+ */
+ public boolean dependsOn(ColumnDefinition column);
+
+ /**
+ * Called to determine whether this index can provide a searcher to execute a query on the
+ * supplied column using the specified operator. This forms part of the query validation done
+ * before a CQL select statement is executed.
+ * @param column the target column of a search query predicate
+ * @param operator the operator of a search query predicate
+ * @return true if this index is capable of supporting such expressions, false otherwise
+ */
+ public boolean supportsExpression(ColumnDefinition column, Operator operator);
+
+ /**
+ * If the index supports custom search expressions using the
+ * {@code}SELECT * FROM table WHERE expr(index_name, expression){@code} syntax, this
+ * method should return the expected type of the expression argument.
+ * For example, if the index supports custom expressions as Strings, calls to this
+ * method should return {@code}UTF8Type.instance{@code}.
+ * If the index implementation does not support custom expressions, then it should
+ * return null.
+ * @return an the type of custom index expressions supported by this index, or an
+ * null if custom expressions are not supported.
+ */
+ public AbstractType<?> customExpressionValueType();
+
+ /**
+ * Transform an initial RowFilter into the filter that will still need to applied
+ * to a set of Rows after the index has performed it's initial scan.
+ * Used in ReadCommand#executeLocal to reduce the amount of filtering performed on the
+ * results of the index query.
+ *
+ * @param filter the intial filter belonging to a ReadCommand
+ * @return the (hopefully) reduced filter that would still need to be applied after
+ * the index was used to narrow the initial result set
+ */
+ public RowFilter getPostIndexQueryFilter(RowFilter filter);
+
+ /**
+ * Return an estimate of the number of results this index is expected to return for any given
+ * query that it can be used to answer. Used in conjunction with indexes() and supportsExpression()
+ * to determine the most selective index for a given ReadCommand. Additionally, this is also used
+ * by StorageProxy.estimateResultsPerRange to calculate the initial concurrency factor for range requests
+ *
+ * @return the estimated average number of results a Searcher may return for any given query
+ */
+ public long getEstimatedResultRows();
+
+ /*
+ * Input validation
+ */
+
+ /**
+ * Called at write time to ensure that values present in the update
+ * are valid according to the rules of all registered indexes which
+ * will process it. The partition key as well as the clustering and
+ * cell values for each row in the update may be checked by index
+ * implementations
+ * @param update PartitionUpdate containing the values to be validated by registered Index implementations
+ * @throws InvalidRequestException
+ */
+ public void validate(PartitionUpdate update) throws InvalidRequestException;
+
+ /*
+ * Update processing
+ */
+
+ /**
+ * Creates an new {@code Indexer} object for updates to a given partition.
+ *
+ * @param key key of the partition being modified
+ * @param columns the regular and static columns the created indexer will have to deal with.
+ * This can be empty as an update might only contain partition, range and row deletions, but
+ * the indexer is guaranteed to not get any cells for a column that is not part of {@code columns}.
+ * @param nowInSec current time of the update operation
+ * @param opGroup operation group spanning the update operation
+ * @param transactionType indicates what kind of update is being performed on the base data
+ * i.e. a write time insert/update/delete or the result of compaction
+ * @return the newly created indexer or {@code null} if the index is not interested by the update
+ * (this could be because the index doesn't care about that particular partition, doesn't care about
+ * that type of transaction, ...).
+ */
+ public Indexer indexerFor(DecoratedKey key,
+ PartitionColumns columns,
+ int nowInSec,
+ OpOrder.Group opGroup,
+ IndexTransaction.Type transactionType);
+
+ /**
+ * Listener for processing events emitted during a single partition update.
+ * Instances of this are responsible for applying modifications to the index in response to a single update
+ * operation on a particular partition of the base table.
+ *
+ * That update may be generated by the normal write path, by iterating SSTables during streaming operations or when
+ * building or rebuilding an index from source. Updates also occur during compaction when multiple versions of a
+ * source partition from different SSTables are merged.
+ *
+ * Implementations should not make assumptions about resolution or filtering of the partition update being
+ * processed. That is to say that it is possible for an Indexer instance to receive notification of a
+ * PartitionDelete or RangeTombstones which shadow a Row it then receives via insertRow/updateRow.
+ *
+ * It is important to note that the only ordering guarantee made for the methods here is that the first call will
+ * be to begin() and the last call to finish(). The other methods may be called to process update events in any
+ * order. This can also include duplicate calls, in cases where a memtable partition is under contention from
+ * several updates. In that scenario, the same set of events may be delivered to the Indexer as memtable update
+ * which failed due to contention is re-applied.
+ */
+ public interface Indexer
+ {
+ /**
+ * Notification of the start of a partition update.
+ * This event always occurs before any other during the update.
+ */
+ public void begin();
+
+ /**
+ * Notification of a top level partition delete.
+ * @param deletionTime
+ */
+ public void partitionDelete(DeletionTime deletionTime);
+
+ /**
+ * Notification of a RangeTombstone.
+ * An update of a single partition may contain multiple RangeTombstones,
+ * and a notification will be passed for each of them.
+ * @param tombstone
+ */
+ public void rangeTombstone(RangeTombstone tombstone);
+
+ /**
+ * Notification that a new row was inserted into the Memtable holding the partition.
+ * This only implies that the inserted row was not already present in the Memtable,
+ * it *does not* guarantee that the row does not exist in an SSTable, potentially with
+ * additional column data.
+ *
+ * @param row the Row being inserted into the base table's Memtable.
+ */
+ public void insertRow(Row row);
+
+ /**
+ * Notification of a modification to a row in the base table's Memtable.
+ * This is allow an Index implementation to clean up entries for base data which is
+ * never flushed to disk (and so will not be purged during compaction).
+ * It's important to note that the old & new rows supplied here may not represent
+ * the totality of the data for the Row with this particular Clustering. There may be
+ * additional column data in SSTables which is not present in either the old or new row,
+ * so implementations should be aware of that.
+ * The supplied rows contain only column data which has actually been updated.
+ * oldRowData contains only the columns which have been removed from the Row's
+ * representation in the Memtable, while newRowData includes only new columns
+ * which were not previously present. Any column data which is unchanged by
+ * the update is not included.
+ *
+ * @param oldRowData data that was present in existing row and which has been removed from
+ * the base table's Memtable
+ * @param newRowData data that was not present in the existing row and is being inserted
+ * into the base table's Memtable
+ */
+ public void updateRow(Row oldRowData, Row newRowData);
+
+ /**
+ * Notification that a row was removed from the partition.
+ * Note that this is only called as part of either a compaction or a cleanup.
+ * This context is indicated by the TransactionType supplied to the indexerFor method.
+ *
+ * As with updateRow, it cannot be guaranteed that all data belonging to the Clustering
+ * of the supplied Row has been removed (although in the case of a cleanup, that is the
+ * ultimate intention).
+ * There may be data for the same row in other SSTables, so in this case Indexer implementations
+ * should *not* assume that all traces of the row have been removed. In particular,
+ * it is not safe to assert that all values associated with the Row's Clustering
+ * have been deleted, so implementations which index primary key columns should not
+ * purge those entries from their indexes.
+ *
+ * @param row data being removed from the base table
+ */
+ public void removeRow(Row row);
+
+ /**
+ * Notification of the end of the partition update.
+ * This event always occurs after all others for the particular update.
+ */
+ public void finish();
+ }
+
+ /*
+ * Querying
+ */
+
+ /**
+ * Used to validate the various parameters of a supplied {@code}ReadCommand{@code},
+ * this is called prior to execution. In theory, any command instance may be checked
+ * by any {@code}Index{@code} instance, but in practice the index will be the one
+ * returned by a call to the {@code}getIndex(ColumnFamilyStore cfs){@code} method on
+ * the supplied command.
+ *
+ * Custom index implementations should perform any validation of query expressions here and throw a meaningful
+ * InvalidRequestException when any expression or other parameter is invalid.
+ *
+ * @param command a ReadCommand whose parameters are to be verified
+ * @throws InvalidRequestException if the details of the command fail to meet the
+ * index's validation rules
+ */
+ default void validate(ReadCommand command) throws InvalidRequestException
+ {
+ }
+
+ /**
+ * Return a function which performs post processing on the results of a partition range read command.
+ * In future, this may be used as a generalized mechanism for transforming results on the coordinator prior
+ * to returning them to the caller.
+ *
+ * This is used on the coordinator during execution of a range command to perform post
+ * processing of merged results obtained from the necessary replicas. This is the only way in which results are
+ * transformed in this way but this may change over time as usage is generalized.
+ * See CASSANDRA-8717 for further discussion.
+ *
+ * The function takes a PartitionIterator of the results from the replicas which has already been collated
+ * & reconciled, along with the command being executed. It returns another PartitionIterator containing the results
+ * of the transformation (which may be the same as the input if the transformation is a no-op).
+ */
+ public BiFunction<PartitionIterator, ReadCommand, PartitionIterator> postProcessorFor(ReadCommand command);
+
+ /**
+ * Factory method for query time search helper.
+ *
+ * @param command the read command being executed
+ * @return an Searcher with which to perform the supplied command
+ */
+ public Searcher searcherFor(ReadCommand command);
+
+ /**
+ * Performs the actual index lookup during execution of a ReadCommand.
+ * An instance performs its query according to the RowFilter.Expression it was created for (see searcherFor)
+ * An Expression is a predicate of the form [column] [operator] [value].
+ */
+ public interface Searcher
+ {
+ /**
+ * @param orderGroup the collection of OpOrder.Groups which the ReadCommand is being performed under.
+ * @return partitions from the base table matching the criteria of the search.
+ */
+ public UnfilteredPartitionIterator search(ReadOrderGroup orderGroup);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/index/IndexRegistry.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/IndexRegistry.java
index 6a004fb,0000000..9f5ed02
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/index/IndexRegistry.java
+++ b/src/java/org/apache/cassandra/index/IndexRegistry.java
@@@ -1,22 -1,0 +1,42 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.index;
+
+import java.util.Collection;
+
+import org.apache.cassandra.schema.IndexMetadata;
+
+/**
+ * The collection of all Index instances for a base table.
+ * The SecondaryIndexManager for a ColumnFamilyStore contains an IndexRegistry
+ * (actually it implements this interface at present) and Index implementations
+ * register in order to:
+ * i) subscribe to the stream of updates being applied to partitions in the base table
+ * ii) provide searchers to support queries with the relevant search predicates
+ */
+public interface IndexRegistry
+{
+ void registerIndex(Index index);
+ void unregisterIndex(Index index);
+
+ Index getIndex(IndexMetadata indexMetadata);
+ Collection<Index> listIndexes();
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/internal/CassandraIndex.java
index 4bbf682,0000000..9d997a7
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@@ -1,862 -1,0 +1,882 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.index.internal;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.function.BiFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.cql3.statements.IndexTarget;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.EmptyType;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.index.IndexRegistry;
+import org.apache.cassandra.index.SecondaryIndexBuilder;
+import org.apache.cassandra.index.internal.composites.CompositesSearcher;
+import org.apache.cassandra.index.internal.keys.KeysSearcher;
+import org.apache.cassandra.index.transactions.IndexTransaction;
+import org.apache.cassandra.index.transactions.UpdateTransaction;
+import org.apache.cassandra.io.sstable.ReducingKeyIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.concurrent.Refs;
+
+import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
+
+/**
+ * Index implementation which indexes the values for a single column in the base
+ * table and which stores its index data in a local, hidden table.
+ */
+public abstract class CassandraIndex implements Index
+{
+ private static final Logger logger = LoggerFactory.getLogger(CassandraIndex.class);
+
+ public static final Pattern TARGET_REGEX = Pattern.compile("^(keys|entries|values|full)\\((.+)\\)$");
+
+ public final ColumnFamilyStore baseCfs;
+ protected IndexMetadata metadata;
+ protected ColumnFamilyStore indexCfs;
+ protected ColumnDefinition indexedColumn;
+ protected CassandraIndexFunctions functions;
+
+ protected CassandraIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+ {
+ this.baseCfs = baseCfs;
+ setMetadata(indexDef);
+ }
+
+ /**
+ * Returns true if an index of this type can support search predicates of the form [column] OPERATOR [value]
+ * @param indexedColumn
+ * @param operator
+ * @return
+ */
+ protected boolean supportsOperator(ColumnDefinition indexedColumn, Operator operator)
+ {
+ return operator == Operator.EQ;
+ }
+
+ /**
+ * Used to construct an the clustering for an entry in the index table based on values from the base data.
+ * The clustering columns in the index table encode the values required to retrieve the correct data from the base
+ * table and varies depending on the kind of the indexed column. See indexCfsMetadata for more details
+ * Used whenever a row in the index table is written or deleted.
+ * @param partitionKey from the base data being indexed
+ * @param prefix from the base data being indexed
+ * @param path from the base data being indexed
+ * @return a clustering prefix to be used to insert into the index table
+ */
+ protected abstract CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey,
+ ClusteringPrefix prefix,
+ CellPath path);
+
+ /**
+ * Used at search time to convert a row in the index table into a simple struct containing the values required
+ * to retrieve the corresponding row from the base table.
+ * @param indexedValue the partition key of the indexed table (i.e. the value that was indexed)
+ * @param indexEntry a row from the index table
+ * @return
+ */
+ public abstract IndexEntry decodeEntry(DecoratedKey indexedValue,
+ Row indexEntry);
+
+ /**
+ * Check whether a value retrieved from an index is still valid by comparing it to current row from the base table.
+ * Used at read time to identify out of date index entries so that they can be excluded from search results and
+ * repaired
+ * @param row the current row from the primary data table
+ * @param indexValue the value we retrieved from the index
+ * @param nowInSec
+ * @return true if the index is out of date and the entry should be dropped
+ */
+ public abstract boolean isStale(Row row, ByteBuffer indexValue, int nowInSec);
+
+ /**
+ * Extract the value to be inserted into the index from the components of the base data
+ * @param partitionKey from the primary data
+ * @param clustering from the primary data
+ * @param path from the primary data
+ * @param cellValue from the primary data
+ * @return a ByteBuffer containing the value to be inserted in the index. This will be used to make the partition
+ * key in the index table
+ */
+ protected abstract ByteBuffer getIndexedValue(ByteBuffer partitionKey,
+ Clustering clustering,
+ CellPath path,
+ ByteBuffer cellValue);
+
+ public ColumnDefinition getIndexedColumn()
+ {
+ return indexedColumn;
+ }
+
+ public ClusteringComparator getIndexComparator()
+ {
+ return indexCfs.metadata.comparator;
+ }
+
+ public ColumnFamilyStore getIndexCfs()
+ {
+ return indexCfs;
+ }
+
+ public void register(IndexRegistry registry)
+ {
+ registry.registerIndex(this);
+ }
+
+ public Callable<?> getInitializationTask()
+ {
+ // if we're just linking in the index on an already-built index post-restart or if the base
+ // table is empty we've nothing to do. Otherwise, submit for building via SecondaryIndexBuilder
+ return isBuilt() || baseCfs.isEmpty() ? null : getBuildIndexTask();
+ }
+
+ public IndexMetadata getIndexMetadata()
+ {
+ return metadata;
+ }
+
+ public Optional<ColumnFamilyStore> getBackingTable()
+ {
+ return indexCfs == null ? Optional.empty() : Optional.of(indexCfs);
+ }
+
+ public Callable<Void> getBlockingFlushTask()
+ {
+ return () -> {
+ indexCfs.forceBlockingFlush();
+ return null;
+ };
+ }
+
+ public Callable<?> getInvalidateTask()
+ {
+ return () -> {
+ invalidate();
+ return null;
+ };
+ }
+
+ public Callable<?> getMetadataReloadTask(IndexMetadata indexDef)
+ {
+ return () -> {
+ indexCfs.metadata.reloadIndexMetadataProperties(baseCfs.metadata);
+ indexCfs.reload();
+ return null;
+ };
+ }
+
+ @Override
+ public void validate(ReadCommand command) throws InvalidRequestException
+ {
+ Optional<RowFilter.Expression> target = getTargetExpression(command.rowFilter().getExpressions());
+
+ if (target.isPresent())
+ {
+ ByteBuffer indexValue = target.get().getIndexValue();
+ checkFalse(indexValue.remaining() > FBUtilities.MAX_UNSIGNED_SHORT,
+ "Index expression values may not be larger than 64K");
+ }
+ }
+
+ private void setMetadata(IndexMetadata indexDef)
+ {
+ metadata = indexDef;
+ Pair<ColumnDefinition, IndexTarget.Type> target = parseTarget(baseCfs.metadata, indexDef);
+ functions = getFunctions(indexDef, target);
+ CFMetaData cfm = indexCfsMetadata(baseCfs.metadata, indexDef);
+ indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace,
+ cfm.cfName,
+ cfm,
+ baseCfs.getTracker().loadsstables);
+ indexedColumn = target.left;
+ }
+
+ public Callable<?> getTruncateTask(final long truncatedAt)
+ {
+ return () -> {
+ indexCfs.discardSSTables(truncatedAt);
+ return null;
+ };
+ }
+
+ public boolean shouldBuildBlocking()
+ {
+ // built-in indexes are always included in builds initiated from SecondaryIndexManager
+ return true;
+ }
+
+ public boolean dependsOn(ColumnDefinition column)
+ {
+ return indexedColumn.name.equals(column.name);
+ }
+
+ public boolean supportsExpression(ColumnDefinition column, Operator operator)
+ {
+ return indexedColumn.name.equals(column.name)
+ && supportsOperator(indexedColumn, operator);
+ }
+
+ private boolean supportsExpression(RowFilter.Expression expression)
+ {
+ return supportsExpression(expression.column(), expression.operator());
+ }
+
+ public AbstractType<?> customExpressionValueType()
+ {
+ return null;
+ }
+
+ public long getEstimatedResultRows()
+ {
+ return indexCfs.getMeanColumns();
+ }
+
+ /**
+ * No post processing of query results, just return them unchanged
+ */
+ public BiFunction<PartitionIterator, ReadCommand, PartitionIterator> postProcessorFor(ReadCommand command)
+ {
+ return (partitionIterator, readCommand) -> partitionIterator;
+ }
+
+ public RowFilter getPostIndexQueryFilter(RowFilter filter)
+ {
+ return getTargetExpression(filter.getExpressions()).map(filter::without)
+ .orElse(filter);
+ }
+
+ private Optional<RowFilter.Expression> getTargetExpression(List<RowFilter.Expression> expressions)
+ {
+ return expressions.stream().filter(this::supportsExpression).findFirst();
+ }
+
+ public Index.Searcher searcherFor(ReadCommand command)
+ {
+ Optional<RowFilter.Expression> target = getTargetExpression(command.rowFilter().getExpressions());
+
+ if (target.isPresent())
+ {
+ target.get().validateForIndexing();
+ switch (getIndexMetadata().kind)
+ {
+ case COMPOSITES:
+ return new CompositesSearcher(command, target.get(), this);
+ case KEYS:
+ return new KeysSearcher(command, target.get(), this);
+ default:
+ throw new IllegalStateException(String.format("Unsupported index type %s for index %s on %s",
+ metadata.kind,
+ metadata.name,
+ indexedColumn.name.toString()));
+ }
+ }
+
+ return null;
+
+ }
+
+ public void validate(PartitionUpdate update) throws InvalidRequestException
+ {
+ switch (indexedColumn.kind)
+ {
+ case PARTITION_KEY:
+ validatePartitionKey(update.partitionKey());
+ break;
+ case CLUSTERING:
+ validateClusterings(update);
+ break;
+ case REGULAR:
+ if (update.columns().regulars.contains(indexedColumn))
+ validateRows(update);
+ break;
+ case STATIC:
+ if (update.columns().statics.contains(indexedColumn))
+ validateRows(Collections.singleton(update.staticRow()));
+ break;
+ }
+ }
+
+ public Indexer indexerFor(final DecoratedKey key,
+ final PartitionColumns columns,
+ final int nowInSec,
+ final OpOrder.Group opGroup,
+ final IndexTransaction.Type transactionType)
+ {
+ /**
+ * Indexes on regular and static columns (the non primary-key ones) only care about updates with live
+ * data for the column they index. In particular, they don't care about having just row or range deletions
+ * as they don't know how to update the index table unless they know exactly the value that is deleted.
+ *
+ * Note that in practice this means that those indexes are only purged of stale entries on compaction,
+ * when we resolve both the deletion and the prior data it deletes. Of course, such stale entries are also
+ * filtered on read.
+ */
+ if (!isPrimaryKeyIndex() && !columns.contains(indexedColumn))
+ return null;
+
+ return new Indexer()
+ {
+ public void begin()
+ {
+ }
+
+ public void partitionDelete(DeletionTime deletionTime)
+ {
+ }
+
+ public void rangeTombstone(RangeTombstone tombstone)
+ {
+ }
+
+ public void insertRow(Row row)
+ {
+ if (row.isStatic() != indexedColumn.isStatic())
+ return;
+
+ if (isPrimaryKeyIndex())
+ {
+ indexPrimaryKey(row.clustering(),
+ getPrimaryKeyIndexLiveness(row),
+ row.deletion());
+ }
+ else
+ {
+ if (indexedColumn.isComplex())
+ indexCells(row.clustering(), row.getComplexColumnData(indexedColumn));
+ else
+ indexCell(row.clustering(), row.getCell(indexedColumn));
+ }
+ }
+
+ public void removeRow(Row row)
+ {
+ if (isPrimaryKeyIndex())
+ return;
+
+ if (indexedColumn.isComplex())
+ removeCells(row.clustering(), row.getComplexColumnData(indexedColumn));
+ else
+ removeCell(row.clustering(), row.getCell(indexedColumn));
+ }
+
+ public void updateRow(Row oldRow, Row newRow)
+ {
+ assert oldRow.isStatic() == newRow.isStatic();
+ if (newRow.isStatic() != indexedColumn.isStatic())
+ return;
+
+ if (isPrimaryKeyIndex())
+ indexPrimaryKey(newRow.clustering(),
+ newRow.primaryKeyLivenessInfo(),
+ newRow.deletion());
+
+ if (indexedColumn.isComplex())
+ {
+ indexCells(newRow.clustering(), newRow.getComplexColumnData(indexedColumn));
+ removeCells(oldRow.clustering(), oldRow.getComplexColumnData(indexedColumn));
+ }
+ else
+ {
+ indexCell(newRow.clustering(), newRow.getCell(indexedColumn));
+ removeCell(oldRow.clustering(), oldRow.getCell(indexedColumn));
+ }
+ }
+
+ public void finish()
+ {
+ }
+
+ private void indexCells(Clustering clustering, Iterable<Cell> cells)
+ {
+ if (cells == null)
+ return;
+
+ for (Cell cell : cells)
+ indexCell(clustering, cell);
+ }
+
+ private void indexCell(Clustering clustering, Cell cell)
+ {
+ if (cell == null || !cell.isLive(nowInSec))
+ return;
+
+ insert(key.getKey(),
+ clustering,
+ cell,
+ LivenessInfo.create(cell.timestamp(), cell.ttl(), cell.localDeletionTime()),
+ opGroup);
+ }
+
+ private void removeCells(Clustering clustering, Iterable<Cell> cells)
+ {
+ if (cells == null)
+ return;
+
+ for (Cell cell : cells)
+ removeCell(clustering, cell);
+ }
+
+ private void removeCell(Clustering clustering, Cell cell)
+ {
+ if (cell == null || !cell.isLive(nowInSec))
+ return;
+
+ delete(key.getKey(), clustering, cell, opGroup, nowInSec);
+ }
+
+ private void indexPrimaryKey(final Clustering clustering,
+ final LivenessInfo liveness,
+ final Row.Deletion deletion)
+ {
+ if (liveness.timestamp() != LivenessInfo.NO_TIMESTAMP)
+ insert(key.getKey(), clustering, null, liveness, opGroup);
+
+ if (!deletion.isLive())
+ delete(key.getKey(), clustering, deletion.time(), opGroup);
+ }
+
+ private LivenessInfo getPrimaryKeyIndexLiveness(Row row)
+ {
+ long timestamp = row.primaryKeyLivenessInfo().timestamp();
+ int ttl = row.primaryKeyLivenessInfo().ttl();
+ for (Cell cell : row.cells())
+ {
+ long cellTimestamp = cell.timestamp();
+ if (cell.isLive(nowInSec))
+ {
+ if (cellTimestamp > timestamp)
+ {
+ timestamp = cellTimestamp;
+ ttl = cell.ttl();
+ }
+ }
+ }
+ return LivenessInfo.create(baseCfs.metadata, timestamp, ttl, nowInSec);
+ }
+ };
+ }
+
+ /**
+ * Specific to internal indexes, this is called by a
+ * searcher when it encounters a stale entry in the index
+ * @param indexKey the partition key in the index table
+ * @param indexClustering the clustering in the index table
+ * @param deletion deletion timestamp etc
+ * @param opGroup the operation under which to perform the deletion
+ */
+ public void deleteStaleEntry(DecoratedKey indexKey,
+ Clustering indexClustering,
+ DeletionTime deletion,
+ OpOrder.Group opGroup)
+ {
+ doDelete(indexKey, indexClustering, deletion, opGroup);
+ logger.trace("Removed index entry for stale value {}", indexKey);
+ }
+
+ /**
+ * Called when adding a new entry to the index
+ */
+ private void insert(ByteBuffer rowKey,
+ Clustering clustering,
+ Cell cell,
+ LivenessInfo info,
+ OpOrder.Group opGroup)
+ {
+ DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
+ clustering,
+ cell));
+ Row row = BTreeRow.noCellLiveRow(buildIndexClustering(rowKey, clustering, cell), info);
+ PartitionUpdate upd = partitionUpdate(valueKey, row);
+ indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null);
+ logger.trace("Inserted entry into index for value {}", valueKey);
+ }
+
+ /**
+ * Called when deleting entries on non-primary key columns
+ */
+ private void delete(ByteBuffer rowKey,
+ Clustering clustering,
+ Cell cell,
+ OpOrder.Group opGroup,
+ int nowInSec)
+ {
+ DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
+ clustering,
+ cell));
+ doDelete(valueKey,
+ buildIndexClustering(rowKey, clustering, cell),
+ new DeletionTime(cell.timestamp(), nowInSec),
+ opGroup);
+ }
+
+ /**
+ * Called when deleting entries from indexes on primary key columns
+ */
+ private void delete(ByteBuffer rowKey,
+ Clustering clustering,
+ DeletionTime deletion,
+ OpOrder.Group opGroup)
+ {
+ DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
+ clustering,
+ null));
+ doDelete(valueKey,
+ buildIndexClustering(rowKey, clustering, null),
+ deletion,
+ opGroup);
+ }
+
+ private void doDelete(DecoratedKey indexKey,
+ Clustering indexClustering,
+ DeletionTime deletion,
+ OpOrder.Group opGroup)
+ {
+ Row row = BTreeRow.emptyDeletedRow(indexClustering, Row.Deletion.regular(deletion));
+ PartitionUpdate upd = partitionUpdate(indexKey, row);
+ indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null);
+ logger.trace("Removed index entry for value {}", indexKey);
+ }
+
+ private void validatePartitionKey(DecoratedKey partitionKey) throws InvalidRequestException
+ {
+ assert indexedColumn.isPartitionKey();
+ validateIndexedValue(getIndexedValue(partitionKey.getKey(), null, null));
+ }
+
+ private void validateClusterings(PartitionUpdate update) throws InvalidRequestException
+ {
+ assert indexedColumn.isClusteringColumn();
+ for (Row row : update)
+ validateIndexedValue(getIndexedValue(null, row.clustering(), null));
+ }
+
+ private void validateRows(Iterable<Row> rows)
+ {
+ assert !indexedColumn.isPrimaryKeyColumn();
+ for (Row row : rows)
+ {
+ if (indexedColumn.isComplex())
+ {
+ ComplexColumnData data = row.getComplexColumnData(indexedColumn);
+ if (data != null)
+ {
+ for (Cell cell : data)
+ {
+ validateIndexedValue(getIndexedValue(null, null, cell.path(), cell.value()));
+ }
+ }
+ }
+ else
+ {
+ validateIndexedValue(getIndexedValue(null, null, row.getCell(indexedColumn)));
+ }
+ }
+ }
+
+ private void validateIndexedValue(ByteBuffer value)
+ {
+ if (value != null && value.remaining() >= FBUtilities.MAX_UNSIGNED_SHORT)
+ throw new InvalidRequestException(String.format(
+ "Cannot index value of size %d for index %s on %s.%s(%s) (maximum allowed size=%d)",
+ value.remaining(),
+ metadata.name,
+ baseCfs.metadata.ksName,
+ baseCfs.metadata.cfName,
+ indexedColumn.name.toString(),
+ FBUtilities.MAX_UNSIGNED_SHORT));
+ }
+
+ private ByteBuffer getIndexedValue(ByteBuffer rowKey,
+ Clustering clustering,
+ Cell cell)
+ {
+ return getIndexedValue(rowKey,
+ clustering,
+ cell == null ? null : cell.path(),
+ cell == null ? null : cell.value()
+ );
+ }
+
+ private Clustering buildIndexClustering(ByteBuffer rowKey,
+ Clustering clustering,
+ Cell cell)
+ {
+ return buildIndexClusteringPrefix(rowKey,
+ clustering,
+ cell == null ? null : cell.path()).build();
+ }
+
+ private DecoratedKey getIndexKeyFor(ByteBuffer value)
+ {
+ return indexCfs.decorateKey(value);
+ }
+
+ private PartitionUpdate partitionUpdate(DecoratedKey valueKey, Row row)
+ {
+ return PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row);
+ }
+
+ private void invalidate()
+ {
+ // interrupt in-progress compactions
+ Collection<ColumnFamilyStore> cfss = Collections.singleton(indexCfs);
+ CompactionManager.instance.interruptCompactionForCFs(cfss, true);
+ CompactionManager.instance.waitForCessation(cfss);
+ Keyspace.writeOrder.awaitNewBarrier();
+ indexCfs.forceBlockingFlush();
+ indexCfs.readOrdering.awaitNewBarrier();
+ indexCfs.invalidate();
+ }
+
+ private boolean isBuilt()
+ {
+ return SystemKeyspace.isIndexBuilt(baseCfs.keyspace.getName(), metadata.name);
+ }
+
+ private boolean isPrimaryKeyIndex()
+ {
+ return indexedColumn.isPrimaryKeyColumn();
+ }
+
+ private Callable<?> getBuildIndexTask()
+ {
+ return () -> {
+ buildBlocking();
+ return null;
+ };
+ }
+
+ private void buildBlocking()
+ {
+ baseCfs.forceBlockingFlush();
+
+ try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL));
+ Refs<SSTableReader> sstables = viewFragment.refs)
+ {
+ if (sstables.isEmpty())
+ {
+ logger.info("No SSTable data for {}.{} to build index {} from, marking empty index as built",
+ baseCfs.metadata.ksName,
+ baseCfs.metadata.cfName,
+ metadata.name);
+ baseCfs.indexManager.markIndexBuilt(metadata.name);
+ return;
+ }
+
+ logger.info("Submitting index build of {} for data in {}",
+ metadata.name,
+ getSSTableNames(sstables));
+
+ SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
+ Collections.singleton(this),
+ new ReducingKeyIterator(sstables));
+ Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
+ FBUtilities.waitOnFuture(future);
+ indexCfs.forceBlockingFlush();
+ baseCfs.indexManager.markIndexBuilt(metadata.name);
+ }
+ logger.info("Index build of {} complete", metadata.name);
+ }
+
+ private static String getSSTableNames(Collection<SSTableReader> sstables)
+ {
+ return StreamSupport.stream(sstables.spliterator(), false)
+ .map(SSTableReader::toString)
+ .collect(Collectors.joining(", "));
+ }
+
+ /**
+ * Construct the CFMetadata for an index table, the clustering columns in the index table
+ * vary dependent on the kind of the indexed value.
+ * @param baseCfsMetadata
+ * @param indexMetadata
+ * @return
+ */
+ public static final CFMetaData indexCfsMetadata(CFMetaData baseCfsMetadata, IndexMetadata indexMetadata)
+ {
+ Pair<ColumnDefinition, IndexTarget.Type> target = parseTarget(baseCfsMetadata, indexMetadata);
+ CassandraIndexFunctions utils = getFunctions(indexMetadata, target);
+ ColumnDefinition indexedColumn = target.left;
+ AbstractType<?> indexedValueType = utils.getIndexedValueType(indexedColumn);
+
+ // Tables for legacy KEYS indexes are non-compound and dense
+ CFMetaData.Builder builder = indexMetadata.isKeys()
+ ? CFMetaData.Builder.create(baseCfsMetadata.ksName,
+ baseCfsMetadata.indexColumnFamilyName(indexMetadata),
+ true, false, false)
+ : CFMetaData.Builder.create(baseCfsMetadata.ksName,
+ baseCfsMetadata.indexColumnFamilyName(indexMetadata));
+
+ builder = builder.withId(baseCfsMetadata.cfId)
+ .withPartitioner(new LocalPartitioner(indexedValueType))
+ .addPartitionKey(indexedColumn.name, indexedColumn.type)
+ .addClusteringColumn("partition_key", baseCfsMetadata.partitioner.partitionOrdering());
+
+ if (indexMetadata.isKeys())
+ {
+ // A dense, compact table for KEYS indexes must have a compact
+ // value column defined, even though it is never used
+ CompactTables.DefaultNames names =
+ CompactTables.defaultNameGenerator(ImmutableSet.of(indexedColumn.name.toString(), "partition_key"));
+ builder = builder.addRegularColumn(names.defaultCompactValueName(), EmptyType.instance);
+ }
+ else
+ {
+ // The clustering columns for a table backing a COMPOSITES index are dependent
+ // on the specific type of index (there are specializations for indexes on collections)
+ builder = utils.addIndexClusteringColumns(builder, baseCfsMetadata, indexedColumn);
+ }
+
+ return builder.build().reloadIndexMetadataProperties(baseCfsMetadata);
+ }
+
+ /**
+ * Factory method for new CassandraIndex instances
+ * @param baseCfs
+ * @param indexMetadata
+ * @return
+ */
+ public static CassandraIndex newIndex(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata)
+ {
+ return getFunctions(indexMetadata, parseTarget(baseCfs.metadata, indexMetadata)).newIndexInstance(baseCfs, indexMetadata);
+ }
+
+ // Public because it's also used to convert index metadata into a thrift-compatible format
+ public static Pair<ColumnDefinition, IndexTarget.Type> parseTarget(CFMetaData cfm,
+ IndexMetadata indexDef)
+ {
+ String target = indexDef.options.get("target");
+ assert target != null : String.format("No target definition found for index %s", indexDef.name);
+
+ // if the regex matches then the target is in the form "keys(foo)", "entries(bar)" etc
+ // if not, then it must be a simple column name and implictly its type is VALUES
+ Matcher matcher = TARGET_REGEX.matcher(target);
+ String columnName;
+ IndexTarget.Type targetType;
+ if (matcher.matches())
+ {
+ targetType = IndexTarget.Type.fromString(matcher.group(1));
+ columnName = matcher.group(2);
+ }
+ else
+ {
+ columnName = target;
+ targetType = IndexTarget.Type.VALUES;
+ }
+
+ // in the case of a quoted column name the name in the target string
+ // will be enclosed in quotes, which we need to unwrap. It may also
+ // include quote characters internally, escaped like so:
+ // abc"def -> abc""def.
+ // Because the target string is stored in a CQL compatible form, we
+ // need to un-escape any such quotes to get the actual column name
+ if (columnName.startsWith("\""))
+ {
+ columnName = StringUtils.substring(StringUtils.substring(columnName, 1), 0, -1);
+ columnName = columnName.replaceAll("\"\"", "\"");
+ }
+
+ // if it's not a CQL table, we can't assume that the column name is utf8, so
+ // in that case we have to do a linear scan of the cfm's columns to get the matching one
+ if (cfm.isCQLTable())
+ return Pair.create(cfm.getColumnDefinition(new ColumnIdentifier(columnName, true)), targetType);
+ else
+ for (ColumnDefinition column : cfm.allColumns())
+ if (column.name.toString().equals(columnName))
+ return Pair.create(column, targetType);
+
+ throw new RuntimeException(String.format("Unable to parse targets for index %s (%s)", indexDef.name, target));
+ }
+
+ static CassandraIndexFunctions getFunctions(IndexMetadata indexDef,
+ Pair<ColumnDefinition, IndexTarget.Type> target)
+ {
+ if (indexDef.isKeys())
+ return CassandraIndexFunctions.KEYS_INDEX_FUNCTIONS;
+
+ ColumnDefinition indexedColumn = target.left;
+ if (indexedColumn.type.isCollection() && indexedColumn.type.isMultiCell())
+ {
+ switch (((CollectionType)indexedColumn.type).kind)
+ {
+ case LIST:
+ return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS;
+ case SET:
+ return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS;
+ case MAP:
+ switch (target.right)
+ {
+ case KEYS:
+ return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS;
+ case KEYS_AND_VALUES:
+ return CassandraIndexFunctions.COLLECTION_ENTRY_INDEX_FUNCTIONS;
+ case VALUES:
+ return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS;
+ }
+ throw new AssertionError();
+ }
+ }
+
+ switch (indexedColumn.kind)
+ {
+ case CLUSTERING:
+ return CassandraIndexFunctions.CLUSTERING_COLUMN_INDEX_FUNCTIONS;
+ case REGULAR:
+ return CassandraIndexFunctions.REGULAR_COLUMN_INDEX_FUNCTIONS;
+ case PARTITION_KEY:
+ return CassandraIndexFunctions.PARTITION_KEY_INDEX_FUNCTIONS;
+ //case COMPACT_VALUE:
+ // return new CompositesIndexOnCompactValue();
+ }
+ throw new AssertionError();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/index/internal/CassandraIndexSearcher.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/internal/CassandraIndexSearcher.java
index 72d2528,0000000..d6b39e6
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/index/internal/CassandraIndexSearcher.java
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndexSearcher.java
@@@ -1,172 -1,0 +1,192 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.index.internal;
+
+import java.nio.ByteBuffer;
+import java.util.NavigableSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.utils.btree.BTreeSet;
+
+public abstract class CassandraIndexSearcher implements Index.Searcher
+{
+ private static final Logger logger = LoggerFactory.getLogger(CassandraIndexSearcher.class);
+
+ private final RowFilter.Expression expression;
+ protected final CassandraIndex index;
+ protected final ReadCommand command;
+
+ public CassandraIndexSearcher(ReadCommand command,
+ RowFilter.Expression expression,
+ CassandraIndex index)
+ {
+ this.command = command;
+ this.expression = expression;
+ this.index = index;
+ }
+
+ @SuppressWarnings("resource") // Both the OpOrder and 'indexIter' are closed on exception, or through the closing of the result
+ // of this method.
+ public UnfilteredPartitionIterator search(ReadOrderGroup orderGroup)
+ {
+ // the value of the index expression is the partition key in the index table
+ DecoratedKey indexKey = index.getBackingTable().get().decorateKey(expression.getIndexValue());
+ UnfilteredRowIterator indexIter = queryIndex(indexKey, command, orderGroup);
+ try
+ {
+ return queryDataFromIndex(indexKey, UnfilteredRowIterators.filter(indexIter, command.nowInSec()), command, orderGroup);
+ }
+ catch (RuntimeException | Error e)
+ {
+ indexIter.close();
+ throw e;
+ }
+ }
+
+ private UnfilteredRowIterator queryIndex(DecoratedKey indexKey, ReadCommand command, ReadOrderGroup orderGroup)
+ {
+ ClusteringIndexFilter filter = makeIndexFilter(command);
+ ColumnFamilyStore indexCfs = index.getBackingTable().get();
+ CFMetaData indexCfm = indexCfs.metadata;
+ return SinglePartitionReadCommand.create(indexCfm, command.nowInSec(), indexKey, ColumnFilter.all(indexCfm), filter)
+ .queryMemtableAndDisk(indexCfs, orderGroup.indexReadOpOrderGroup());
+ }
+
+ private ClusteringIndexFilter makeIndexFilter(ReadCommand command)
+ {
+ if (command instanceof SinglePartitionReadCommand)
+ {
+ // Note: as yet there's no route to get here - a 2i query *always* uses a
+ // PartitionRangeReadCommand. This is here in preparation for coming changes
+ // in SelectStatement.
+ SinglePartitionReadCommand sprc = (SinglePartitionReadCommand)command;
+ ByteBuffer pk = sprc.partitionKey().getKey();
+ ClusteringIndexFilter filter = sprc.clusteringIndexFilter();
+
+ if (filter instanceof ClusteringIndexNamesFilter)
+ {
+ NavigableSet<Clustering> requested = ((ClusteringIndexNamesFilter)filter).requestedRows();
+ BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(index.getIndexComparator());
+ for (Clustering c : requested)
+ clusterings.add(makeIndexClustering(pk, c));
+ return new ClusteringIndexNamesFilter(clusterings.build(), filter.isReversed());
+ }
+ else
+ {
+ Slices requested = ((ClusteringIndexSliceFilter)filter).requestedSlices();
+ Slices.Builder builder = new Slices.Builder(index.getIndexComparator());
+ for (Slice slice : requested)
+ builder.add(makeIndexBound(pk, slice.start()), makeIndexBound(pk, slice.end()));
+ return new ClusteringIndexSliceFilter(builder.build(), filter.isReversed());
+ }
+ }
+ else
+ {
+
+ DataRange dataRange = ((PartitionRangeReadCommand)command).dataRange();
+ AbstractBounds<PartitionPosition> range = dataRange.keyRange();
+
+ Slice slice = Slice.ALL;
+
+ /*
+ * XXX: If the range requested is a token range, we'll have to start at the beginning (and stop at the end) of
+ * the indexed row unfortunately (which will be inefficient), because we have no way to intuit the smallest possible
+ * key having a given token. A potential fix would be to actually store the token along the key in the indexed row.
+ */
+ if (range.left instanceof DecoratedKey)
+ {
+ // the right hand side of the range may not be a DecoratedKey (for instance if we're paging),
+ // but if it is, we can optimise slightly by restricting the slice
+ if (range.right instanceof DecoratedKey)
+ {
+
+ DecoratedKey startKey = (DecoratedKey) range.left;
+ DecoratedKey endKey = (DecoratedKey) range.right;
+
+ Slice.Bound start = Slice.Bound.BOTTOM;
+ Slice.Bound end = Slice.Bound.TOP;
+
+ /*
+ * For index queries over a range, we can't do a whole lot better than querying everything for the key range, though for
+ * slice queries where we can slightly restrict the beginning and end.
+ */
+ if (!dataRange.isNamesQuery())
+ {
+ ClusteringIndexSliceFilter startSliceFilter = ((ClusteringIndexSliceFilter) dataRange.clusteringIndexFilter(
+ startKey));
+ ClusteringIndexSliceFilter endSliceFilter = ((ClusteringIndexSliceFilter) dataRange.clusteringIndexFilter(
+ endKey));
+
+ // We can't effectively support reversed queries when we have a range, so we don't support it
+ // (or through post-query reordering) and shouldn't get there.
+ assert !startSliceFilter.isReversed() && !endSliceFilter.isReversed();
+
+ Slices startSlices = startSliceFilter.requestedSlices();
+ Slices endSlices = endSliceFilter.requestedSlices();
+
+ if (startSlices.size() > 0)
+ start = startSlices.get(0).start();
+
+ if (endSlices.size() > 0)
+ end = endSlices.get(endSlices.size() - 1).end();
+ }
+
+ slice = Slice.make(makeIndexBound(startKey.getKey(), start),
+ makeIndexBound(endKey.getKey(), end));
+ }
+ else
+ {
+ // otherwise, just start the index slice from the key we do have
+ slice = Slice.make(makeIndexBound(((DecoratedKey)range.left).getKey(), Slice.Bound.BOTTOM),
+ Slice.Bound.TOP);
+ }
+ }
+ return new ClusteringIndexSliceFilter(Slices.with(index.getIndexComparator(), slice), false);
+ }
+ }
+
+ private Slice.Bound makeIndexBound(ByteBuffer rowKey, Slice.Bound bound)
+ {
+ return index.buildIndexClusteringPrefix(rowKey, bound, null)
+ .buildBound(bound.isStart(), bound.isInclusive());
+ }
+
+ protected Clustering makeIndexClustering(ByteBuffer rowKey, Clustering clustering)
+ {
+ return index.buildIndexClusteringPrefix(rowKey, clustering, null).build();
+ }
+
+ protected abstract UnfilteredPartitionIterator queryDataFromIndex(DecoratedKey indexKey,
+ RowIterator indexHits,
+ ReadCommand command,
+ ReadOrderGroup orderGroup);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/index/internal/IndexEntry.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/internal/IndexEntry.java
index 6f94ace,0000000..97525d6
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/index/internal/IndexEntry.java
+++ b/src/java/org/apache/cassandra/index/internal/IndexEntry.java
@@@ -1,34 -1,0 +1,54 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.index.internal;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.DecoratedKey;
+
+/**
+ * Entries in indexes on non-compact tables (tables with composite comparators)
+ * can be encapsulated as IndexedEntry instances. These are not used when dealing
+ * with indexes on static/compact/thrift tables (i.e. KEYS indexes).
+ */
+public final class IndexEntry
+{
+ public final DecoratedKey indexValue;
+ public final Clustering indexClustering;
+ public final long timestamp;
+
+ public final ByteBuffer indexedKey;
+ public final Clustering indexedEntryClustering;
+
+ public IndexEntry(DecoratedKey indexValue,
+ Clustering indexClustering,
+ long timestamp,
+ ByteBuffer indexedKey,
+ Clustering indexedEntryClustering)
+ {
+ this.indexValue = indexValue;
+ this.indexClustering = indexClustering;
+ this.timestamp = timestamp;
+ this.indexedKey = indexedKey;
+ this.indexedEntryClustering = indexedEntryClustering;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/05bacc75/src/java/org/apache/cassandra/index/internal/keys/KeysIndex.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/internal/keys/KeysIndex.java
index 53ecd01,0000000..d680253
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/index/internal/keys/KeysIndex.java
+++ b/src/java/org/apache/cassandra/index/internal/keys/KeysIndex.java
@@@ -1,62 -1,0 +1,82 @@@
++/*
++ *
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ *
++ */
+package org.apache.cassandra.index.internal.keys;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.index.internal.CassandraIndex;
+import org.apache.cassandra.index.internal.IndexEntry;
+import org.apache.cassandra.schema.IndexMetadata;
+
+public class KeysIndex extends CassandraIndex
+{
+ public KeysIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+ {
+ super(baseCfs, indexDef);
+ }
+
+ public CFMetaData.Builder addIndexClusteringColumns(CFMetaData.Builder builder,
+ CFMetaData baseMetadata,
+ ColumnDefinition cfDef)
+ {
+ // no additional clustering columns required
+ return builder;
+ }
+
+ protected CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey,
+ ClusteringPrefix prefix,
+ CellPath path)
+ {
+ CBuilder builder = CBuilder.create(getIndexComparator());
+ builder.add(partitionKey);
+ return builder;
+ }
+
+ protected ByteBuffer getIndexedValue(ByteBuffer partitionKey,
+ Clustering clustering,
+ CellPath path, ByteBuffer cellValue)
+ {
+ return cellValue;
+ }
+
+ public IndexEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
+ {
+ throw new UnsupportedOperationException("KEYS indexes do not use a specialized index entry format");
+ }
+
+ public boolean isStale(Row row, ByteBuffer indexValue, int nowInSec)
+ {
+ if (row == null)
+ return true;
+
+ Cell cell = row.getCell(indexedColumn);
+
+ return (cell == null
+ || !cell.isLive(nowInSec)
+ || indexedColumn.type.compare(indexValue, cell.value()) != 0);
+ }
+}