You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2015/08/22 01:48:41 UTC
[10/15] cassandra git commit: New 2i API and implementations for
built in indexes
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
new file mode 100644
index 0000000..b17ab4e
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java
@@ -0,0 +1,755 @@
+package org.apache.cassandra.index.internal;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.cql3.statements.IndexTarget;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.db.lifecycle.View;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.index.IndexRegistry;
+import org.apache.cassandra.index.SecondaryIndexBuilder;
+import org.apache.cassandra.index.internal.composites.CompositesSearcher;
+import org.apache.cassandra.index.internal.keys.KeysSearcher;
+import org.apache.cassandra.index.transactions.IndexTransaction;
+import org.apache.cassandra.index.transactions.UpdateTransaction;
+import org.apache.cassandra.io.sstable.ReducingKeyIterator;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.concurrent.Refs;
+
+/**
+ * Index implementation which indexes the values for a single column in the base
+ * table and which stores its index data in a local, hidden table.
+ */
+public abstract class CassandraIndex implements Index
+{
+ private static final Logger logger = LoggerFactory.getLogger(CassandraIndex.class);
+
+ public final ColumnFamilyStore baseCfs;
+ protected IndexMetadata metadata;
+ protected ColumnFamilyStore indexCfs;
+ protected ColumnDefinition indexedColumn;
+ protected CassandraIndexFunctions functions;
+
+ public CassandraIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+ {
+ this.baseCfs = baseCfs;
+ setMetadata(indexDef);
+ }
+
+ /**
+ * Returns true if an index of this type can support search predicates of the form [column] OPERATOR [value]
+ * @param indexedColumn
+ * @param operator
+ * @return
+ */
+ protected boolean supportsOperator(ColumnDefinition indexedColumn, Operator operator)
+ {
+ return operator.equals(Operator.EQ);
+ }
+
+ /**
+ * Used to construct an the clustering for an entry in the index table based on values from the base data.
+ * The clustering columns in the index table encode the values required to retrieve the correct data from the base
+ * table and varies depending on the kind of the indexed column. See indexCfsMetadata for more details
+ * Used whenever a row in the index table is written or deleted.
+ * @param metadata
+ * @param partitionKey from the base data being indexed
+ * @param prefix from the base data being indexed
+ * @param path from the base data being indexed
+ * @return a clustering prefix to be used to insert into the index table
+ */
+ protected abstract CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey,
+ ClusteringPrefix prefix,
+ CellPath path);
+
+ /**
+ * Used at search time to convert a row in the index table into a simple struct containing the values required
+ * to retrieve the corresponding row from the base table.
+ * @param metadata
+ * @param indexedValue the partition key of the indexed table (i.e. the value that was indexed)
+ * @param indexEntry a row from the index table
+ * @return
+ */
+ public abstract IndexEntry decodeEntry(DecoratedKey indexedValue,
+ Row indexEntry);
+
+ /**
+ * Check whether a value retrieved from an index is still valid by comparing it to current row from the base table.
+ * Used at read time to identify out of date index entries so that they can be excluded from search results and
+ * repaired
+ * @param metadata required to get the indexed column definition
+ * @param row the current row from the primary data table
+ * @param indexValue the value we retrieved from the index
+ * @param nowInSec
+ * @return true if the index is out of date and the entry should be dropped
+ */
+ public abstract boolean isStale(Row row, ByteBuffer indexValue, int nowInSec);
+
+ /**
+ * Extract the value to be inserted into the index from the components of the base data
+ * @param metadata
+ * @param partitionKey from the primary data
+ * @param clustering from the primary data
+ * @param path from the primary data
+ * @param cellValue from the primary data
+ * @return a ByteBuffer containing the value to be inserted in the index. This will be used to make the partition
+ * key in the index table
+ */
+ protected abstract ByteBuffer getIndexedValue(ByteBuffer partitionKey,
+ Clustering clustering,
+ CellPath path,
+ ByteBuffer cellValue);
+
+ public ColumnDefinition getIndexedColumn()
+ {
+ return indexedColumn;
+ }
+
+ public ClusteringComparator getIndexComparator()
+ {
+ return indexCfs.metadata.comparator;
+ }
+
+ public ColumnFamilyStore getIndexCfs()
+ {
+ return indexCfs;
+ }
+
+ public void register(IndexRegistry registry)
+ {
+ registry.registerIndex(this);
+ }
+
+ public Callable<?> getInitializationTask()
+ {
+ // if we're just linking in the index on an already-built index post-restart
+ // we've nothing to do. Otherwise, submit for building via SecondaryIndexBuilder
+ return isBuilt() ? null : getBuildIndexTask();
+ }
+
+ public IndexMetadata getIndexMetadata()
+ {
+ return metadata;
+ }
+
+ public String getIndexName()
+ {
+ // should return metadata.name, see CASSANDRA-10127
+ return indexCfs.name;
+ }
+
+ public Optional<ColumnFamilyStore> getBackingTable()
+ {
+ return indexCfs == null ? Optional.empty() : Optional.of(indexCfs);
+ }
+
+ public Callable<Void> getBlockingFlushTask()
+ {
+ return () -> {
+ indexCfs.forceBlockingFlush();
+ return null;
+ };
+ }
+
+ public Callable<?> getInvalidateTask()
+ {
+ return () -> {
+ markRemoved();
+ invalidate();
+ return null;
+ };
+ }
+
+ public Callable<?> getMetadataReloadTask(IndexMetadata indexDef)
+ {
+ setMetadata(indexDef);
+ return () -> {
+ indexCfs.metadata.reloadIndexMetadataProperties(baseCfs.metadata);
+ indexCfs.reload();
+ return null;
+ };
+ }
+
+ private void setMetadata(IndexMetadata indexDef)
+ {
+ metadata = indexDef;
+ functions = getFunctions(baseCfs.metadata, indexDef);
+ CFMetaData cfm = indexCfsMetadata(baseCfs.metadata, indexDef);
+ indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace,
+ cfm.cfName,
+ cfm,
+ baseCfs.getTracker().loadsstables);
+ assert indexDef.columns.size() == 1 : "Build in indexes on multiple target columns are not supported";
+ indexedColumn = indexDef.indexedColumn(baseCfs.metadata);
+ }
+
+ public Callable<?> getTruncateTask(final long truncatedAt)
+ {
+ return () -> {
+ indexCfs.discardSSTables(truncatedAt);
+ return null;
+ };
+ }
+
+ public boolean indexes(PartitionColumns columns)
+ {
+ // if we have indexes on the partition key or clustering columns, return true
+ return isPrimaryKeyIndex() || columns.contains(indexedColumn);
+ }
+
+ public boolean supportsExpression(ColumnDefinition column, Operator operator)
+ {
+ return indexedColumn.name.equals(column.name)
+ && supportsOperator(indexedColumn, operator);
+ }
+
+ private boolean supportsExpression(RowFilter.Expression expression)
+ {
+ return supportsExpression(expression.column(), expression.operator());
+ }
+
+ public long getEstimatedResultRows()
+ {
+ return indexCfs.getMeanColumns();
+ }
+
+ /**
+ * No post processing of query results, just return them unchanged
+ */
+ public BiFunction<PartitionIterator, ReadCommand, PartitionIterator> postProcessorFor(ReadCommand command)
+ {
+ return (partitionIterator, readCommand) -> partitionIterator;
+ }
+
+ public RowFilter getPostIndexQueryFilter(RowFilter filter)
+ {
+ return getTargetExpression(filter.getExpressions()).map(filter::without)
+ .orElse(filter);
+ }
+
+ private Optional<RowFilter.Expression> getTargetExpression(List<RowFilter.Expression> expressions)
+ {
+ return expressions.stream().filter(this::supportsExpression).findFirst();
+ }
+
+ public Index.Searcher searcherFor(ReadCommand command)
+ {
+ Optional<RowFilter.Expression> target = getTargetExpression(command.rowFilter().getExpressions());
+
+ if (target.isPresent())
+ {
+ target.get().validateForIndexing();
+ switch (getIndexMetadata().indexType)
+ {
+ case COMPOSITES:
+ return new CompositesSearcher(command, target.get(), this);
+ case KEYS:
+ return new KeysSearcher(command, target.get(), this);
+ default:
+ throw new IllegalStateException(String.format("Unsupported index type %s for index %s on %s",
+ metadata.indexType,
+ metadata.name,
+ indexedColumn.name.toString()));
+ }
+ }
+
+ return null;
+
+ }
+
+ public void validate(PartitionUpdate update) throws InvalidRequestException
+ {
+ switch (indexedColumn.kind)
+ {
+ case PARTITION_KEY:
+ validatePartitionKey(update.partitionKey());
+ break;
+ case CLUSTERING:
+ validateClusterings(update);
+ break;
+ case REGULAR:
+ validateRows(update);
+ break;
+ case STATIC:
+ validateRows(Collections.singleton(update.staticRow()));
+ break;
+ }
+ }
+
+ public Indexer indexerFor(final DecoratedKey key,
+ final int nowInSec,
+ final OpOrder.Group opGroup,
+ final IndexTransaction.Type transactionType)
+ {
+ return new Indexer()
+ {
+ public void begin()
+ {
+ }
+
+ public void partitionDelete(DeletionTime deletionTime)
+ {
+ }
+
+ public void rangeTombstone(RangeTombstone tombstone)
+ {
+ }
+
+ public void insertRow(Row row)
+ {
+ if (isPrimaryKeyIndex())
+ {
+ indexPrimaryKey(row.clustering(),
+ getPrimaryKeyIndexLiveness(row),
+ row.deletion());
+ }
+ else
+ {
+ if (indexedColumn.isComplex())
+ indexCells(row.clustering(), row.getComplexColumnData(indexedColumn));
+ else
+ indexCell(row.clustering(), row.getCell(indexedColumn));
+ }
+ }
+
+ public void removeRow(Row row)
+ {
+ if (isPrimaryKeyIndex())
+ indexPrimaryKey(row.clustering(), row.primaryKeyLivenessInfo(), row.deletion());
+
+ if (indexedColumn.isComplex())
+ removeCells(row.clustering(), row.getComplexColumnData(indexedColumn));
+ else
+ removeCell(row.clustering(), row.getCell(indexedColumn));
+ }
+
+
+ public void updateRow(Row oldRow, Row newRow)
+ {
+ if (isPrimaryKeyIndex())
+ indexPrimaryKey(newRow.clustering(),
+ newRow.primaryKeyLivenessInfo(),
+ newRow.deletion());
+
+ if (indexedColumn.isComplex())
+ {
+ indexCells(newRow.clustering(), newRow.getComplexColumnData(indexedColumn));
+ removeCells(oldRow.clustering(), oldRow.getComplexColumnData(indexedColumn));
+ }
+ else
+ {
+ indexCell(newRow.clustering(), newRow.getCell(indexedColumn));
+ removeCell(oldRow.clustering(), oldRow.getCell(indexedColumn));
+ }
+ }
+
+ public void finish()
+ {
+ }
+
+ private void indexCells(Clustering clustering, Iterable<Cell> cells)
+ {
+ if (cells == null)
+ return;
+
+ for (Cell cell : cells)
+ indexCell(clustering, cell);
+ }
+
+ private void indexCell(Clustering clustering, Cell cell)
+ {
+ if (cell == null || !cell.isLive(nowInSec))
+ return;
+
+ insert(key.getKey(),
+ clustering,
+ cell,
+ LivenessInfo.create(cell.timestamp(), cell.ttl(), cell.localDeletionTime()),
+ opGroup);
+ }
+
+ private void removeCells(Clustering clustering, Iterable<Cell> cells)
+ {
+ if (cells == null)
+ return;
+
+ for (Cell cell : cells)
+ removeCell(clustering, cell);
+ }
+
+ private void removeCell(Clustering clustering, Cell cell)
+ {
+ if (cell == null || !cell.isLive(nowInSec))
+ return;
+
+ delete(key.getKey(), clustering, cell, opGroup, nowInSec);
+ }
+
+ private void indexPrimaryKey(final Clustering clustering,
+ final LivenessInfo liveness,
+ final DeletionTime deletion)
+ {
+ if (liveness.timestamp() != LivenessInfo.NO_TIMESTAMP)
+ insert(key.getKey(), clustering, null, liveness, opGroup);
+
+ if (!deletion.isLive())
+ delete(key.getKey(), clustering, deletion, opGroup);
+ }
+
+ private LivenessInfo getPrimaryKeyIndexLiveness(Row row)
+ {
+ long timestamp = row.primaryKeyLivenessInfo().timestamp();
+ int ttl = row.primaryKeyLivenessInfo().ttl();
+ for (Cell cell : row.cells())
+ {
+ long cellTimestamp = cell.timestamp();
+ if (cell.isLive(nowInSec))
+ {
+ if (cellTimestamp > timestamp)
+ {
+ timestamp = cellTimestamp;
+ ttl = cell.ttl();
+ }
+ }
+ }
+ return LivenessInfo.create(baseCfs.metadata, timestamp, ttl, nowInSec);
+ }
+ };
+ }
+
+ /**
+ * Specific to internal indexes, this is called by a
+ * searcher when it encounters a stale entry in the index
+ * @param indexKey the partition key in the index table
+ * @param indexClustering the clustering in the index table
+ * @param deletion deletion timestamp etc
+ * @param opGroup the operation under which to perform the deletion
+ */
+ public void deleteStaleEntry(DecoratedKey indexKey,
+ Clustering indexClustering,
+ DeletionTime deletion,
+ OpOrder.Group opGroup)
+ {
+ doDelete(indexKey, indexClustering, deletion, opGroup);
+ logger.debug("Removed index entry for stale value {}", indexKey);
+ }
+
+ /**
+ * Called when adding a new entry to the index
+ */
+ private void insert(ByteBuffer rowKey,
+ Clustering clustering,
+ Cell cell,
+ LivenessInfo info,
+ OpOrder.Group opGroup)
+ {
+ DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
+ clustering,
+ cell));
+ Row row = BTreeRow.noCellLiveRow(buildIndexClustering(rowKey, clustering, cell), info);
+ PartitionUpdate upd = partitionUpdate(valueKey, row);
+ indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null);
+ logger.debug("Inserted entry into index for value {}", valueKey);
+ }
+
+ /**
+ * Called when deleting entries on non-primary key columns
+ */
+ private void delete(ByteBuffer rowKey,
+ Clustering clustering,
+ Cell cell,
+ OpOrder.Group opGroup,
+ int nowInSec)
+ {
+ DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
+ clustering,
+ cell));
+ doDelete(valueKey,
+ buildIndexClustering(rowKey, clustering, cell),
+ new DeletionTime(cell.timestamp(), nowInSec),
+ opGroup);
+ }
+
+ /**
+ * Called when deleting entries from indexes on primary key columns
+ */
+ private void delete(ByteBuffer rowKey,
+ Clustering clustering,
+ DeletionTime deletion,
+ OpOrder.Group opGroup)
+ {
+ DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey,
+ clustering,
+ null));
+ doDelete(valueKey,
+ buildIndexClustering(rowKey, clustering, null),
+ deletion,
+ opGroup);
+ }
+
+ private void doDelete(DecoratedKey indexKey,
+ Clustering indexClustering,
+ DeletionTime deletion,
+ OpOrder.Group opGroup)
+ {
+ Row row = BTreeRow.emptyDeletedRow(indexClustering, deletion);
+ PartitionUpdate upd = partitionUpdate(indexKey, row);
+ indexCfs.apply(upd, UpdateTransaction.NO_OP, opGroup, null);
+ logger.debug("Removed index entry for value {}", indexKey);
+ }
+
+ private void validatePartitionKey(DecoratedKey partitionKey) throws InvalidRequestException
+ {
+ assert indexedColumn.isPartitionKey();
+ validateIndexedValue(getIndexedValue(partitionKey.getKey(), null, null ));
+ }
+
+ private void validateClusterings(PartitionUpdate update) throws InvalidRequestException
+ {
+ assert indexedColumn.isClusteringColumn();
+ for (Row row : update)
+ validateIndexedValue(getIndexedValue(null, row.clustering(), null));
+ }
+
+ private void validateRows(Iterable<Row> rows)
+ {
+ assert !indexedColumn.isPrimaryKeyColumn();
+ for (Row row : rows)
+ {
+ if (indexedColumn.isComplex())
+ {
+ ComplexColumnData data = row.getComplexColumnData(indexedColumn);
+ if (data != null)
+ {
+ for (Cell cell : data)
+ {
+ validateIndexedValue(getIndexedValue(null, null, cell.path(), cell.value()));
+ }
+ }
+ }
+ else
+ {
+ validateIndexedValue(getIndexedValue(null, null, row.getCell(indexedColumn)));
+ }
+ }
+ }
+
+ private void validateIndexedValue(ByteBuffer value)
+ {
+ if (value != null && value.remaining() >= FBUtilities.MAX_UNSIGNED_SHORT)
+ throw new InvalidRequestException(String.format(
+ "Cannot index value of size %d for index %s on %s.%s(%s) (maximum allowed size=%d)",
+ value.remaining(),
+ getIndexName(),
+ baseCfs.metadata.ksName,
+ baseCfs.metadata.cfName,
+ indexedColumn.name.toString(),
+ FBUtilities.MAX_UNSIGNED_SHORT));
+ }
+
+ private ByteBuffer getIndexedValue(ByteBuffer rowKey,
+ Clustering clustering,
+ Cell cell)
+ {
+ return getIndexedValue(rowKey,
+ clustering,
+ cell == null ? null : cell.path(),
+ cell == null ? null : cell.value()
+ );
+ }
+
+ private Clustering buildIndexClustering(ByteBuffer rowKey,
+ Clustering clustering,
+ Cell cell)
+ {
+ return buildIndexClusteringPrefix(rowKey,
+ clustering,
+ cell == null ? null : cell.path()).build();
+ }
+
+ private DecoratedKey getIndexKeyFor(ByteBuffer value)
+ {
+ return indexCfs.decorateKey(value);
+ }
+
+ private PartitionUpdate partitionUpdate(DecoratedKey valueKey, Row row)
+ {
+ return PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row);
+ }
+
+ private void invalidate()
+ {
+ // interrupt in-progress compactions
+ Collection<ColumnFamilyStore> cfss = Collections.singleton(indexCfs);
+ CompactionManager.instance.interruptCompactionForCFs(cfss, true);
+ CompactionManager.instance.waitForCessation(cfss);
+ indexCfs.keyspace.writeOrder.awaitNewBarrier();
+ indexCfs.forceBlockingFlush();
+ indexCfs.readOrdering.awaitNewBarrier();
+ indexCfs.invalidate();
+ }
+
+ private boolean isBuilt()
+ {
+ return SystemKeyspace.isIndexBuilt(baseCfs.keyspace.getName(), getIndexName());
+ }
+
+ private void markBuilt()
+ {
+ SystemKeyspace.setIndexBuilt(baseCfs.keyspace.getName(), getIndexName());
+ }
+
+ private void markRemoved()
+ {
+ SystemKeyspace.setIndexRemoved(baseCfs.keyspace.getName(), getIndexName());
+ }
+
+ private boolean isPrimaryKeyIndex()
+ {
+ return indexedColumn.isPrimaryKeyColumn();
+ }
+
+ private Callable<?> getBuildIndexTask()
+ {
+ return () -> {
+ buildBlocking();
+ return null;
+ };
+ }
+
+ private void buildBlocking()
+ {
+ baseCfs.forceBlockingFlush();
+
+ try (ColumnFamilyStore.RefViewFragment viewFragment = baseCfs.selectAndReference(View.select(SSTableSet.CANONICAL));
+ Refs<SSTableReader> sstables = viewFragment.refs)
+ {
+ if (sstables.isEmpty())
+ {
+ logger.info("No SSTable data for {}.{} to build index {} from, marking empty index as built",
+ baseCfs.metadata.ksName,
+ baseCfs.metadata.cfName,
+ getIndexName());
+ markBuilt();
+ return;
+ }
+
+ logger.info("Submitting index build of {} for data in {}",
+ getIndexName(),
+ getSSTableNames(sstables));
+
+ SecondaryIndexBuilder builder = new SecondaryIndexBuilder(baseCfs,
+ Collections.singleton(this),
+ new ReducingKeyIterator(sstables));
+ Future<?> future = CompactionManager.instance.submitIndexBuild(builder);
+ FBUtilities.waitOnFuture(future);
+ indexCfs.forceBlockingFlush();
+ markBuilt();
+ }
+ logger.info("Index build of {} complete", getIndexName());
+ }
+
+ private static String getSSTableNames(Collection<SSTableReader> sstables)
+ {
+ return StreamSupport.stream(sstables.spliterator(), false)
+ .map(SSTableReader::toString)
+ .collect(Collectors.joining(", "));
+ }
+
+ /**
+ * Construct the CFMetadata for an index table, the clustering columns in the index table
+ * vary dependent on the kind of the indexed value.
+ * @param baseCfsMetadata
+ * @param indexMetadata
+ * @return
+ */
+ public static final CFMetaData indexCfsMetadata(CFMetaData baseCfsMetadata, IndexMetadata indexMetadata)
+ {
+ CassandraIndexFunctions utils = getFunctions(baseCfsMetadata, indexMetadata);
+ ColumnDefinition indexedColumn = indexMetadata.indexedColumn(baseCfsMetadata);
+ AbstractType<?> indexedValueType = utils.getIndexedValueType(indexedColumn);
+ CFMetaData.Builder builder = CFMetaData.Builder.create(baseCfsMetadata.ksName,
+ baseCfsMetadata.indexColumnFamilyName(indexMetadata))
+ .withId(baseCfsMetadata.cfId)
+ .withPartitioner(new LocalPartitioner(indexedValueType))
+ .addPartitionKey(indexedColumn.name, indexedColumn.type);
+
+ builder.addClusteringColumn("partition_key", baseCfsMetadata.partitioner.partitionOrdering());
+ builder = utils.addIndexClusteringColumns(builder, baseCfsMetadata, indexedColumn);
+ return builder.build().reloadIndexMetadataProperties(baseCfsMetadata);
+ }
+
+ /**
+ * Factory method for new CassandraIndex instances
+ * @param baseCfs
+ * @param indexMetadata
+ * @return
+ */
+ public static final CassandraIndex newIndex(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata)
+ {
+ return getFunctions(baseCfs.metadata, indexMetadata).newIndexInstance(baseCfs, indexMetadata);
+ }
+
+ private static CassandraIndexFunctions getFunctions(CFMetaData baseCfMetadata, IndexMetadata indexDef)
+ {
+ if (indexDef.isKeys())
+ return CassandraIndexFunctions.KEYS_INDEX_FUNCTIONS;
+
+ ColumnDefinition indexedColumn = indexDef.indexedColumn(baseCfMetadata);
+ if (indexedColumn.type.isCollection() && indexedColumn.type.isMultiCell())
+ {
+ switch (((CollectionType)indexedColumn.type).kind)
+ {
+ case LIST:
+ return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS;
+ case SET:
+ return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS;
+ case MAP:
+ if (indexDef.options.containsKey(IndexTarget.INDEX_KEYS_OPTION_NAME))
+ return CassandraIndexFunctions.COLLECTION_KEY_INDEX_FUNCTIONS;
+ else if (indexDef.options.containsKey(IndexTarget.INDEX_ENTRIES_OPTION_NAME))
+ return CassandraIndexFunctions.COLLECTION_ENTRY_INDEX_FUNCTIONS;
+ else
+ return CassandraIndexFunctions.COLLECTION_VALUE_INDEX_FUNCTIONS;
+ }
+ }
+
+ switch (indexedColumn.kind)
+ {
+ case CLUSTERING:
+ return CassandraIndexFunctions.CLUSTERING_COLUMN_INDEX_FUNCTIONS;
+ case REGULAR:
+ return CassandraIndexFunctions.REGULAR_COLUMN_INDEX_FUNCTIONS;
+ case PARTITION_KEY:
+ return CassandraIndexFunctions.PARTITION_KEY_INDEX_FUNCTIONS;
+ //case COMPACT_VALUE:
+ // return new CompositesIndexOnCompactValue();
+ }
+ throw new AssertionError();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/internal/CassandraIndexFunctions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndexFunctions.java b/src/java/org/apache/cassandra/index/internal/CassandraIndexFunctions.java
new file mode 100644
index 0000000..b7cb3a2
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndexFunctions.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.index.internal;
+
+import java.util.List;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.index.internal.composites.*;
+import org.apache.cassandra.index.internal.keys.KeysIndex;
+import org.apache.cassandra.schema.IndexMetadata;
+
+public interface CassandraIndexFunctions
+{
+ /**
+ *
+ * @param baseCfs
+ * @param indexMetadata
+ * @return
+ */
+ public CassandraIndex newIndexInstance(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata);
+
+ /**
+ * Returns the type of the the values in the index. For most columns this is simply its type, but for collections
+ * it depends on whether the index is on the collection name/value element or on a frozen collection
+ * @param indexedColumn
+ * @return
+ */
+ default AbstractType<?> getIndexedValueType(ColumnDefinition indexedColumn)
+ {
+ return indexedColumn.type;
+ }
+
+ /**
+ * Add the clustering columns for a specific type of index table to the a CFMetaData.Builder (which is being
+ * used to construct the index table's CFMetadata. In the default implementation, the clustering columns of the
+ * index table hold the partition key & clustering columns of the base table. This is overridden in several cases:
+ * * When the indexed value is itself a clustering column, in which case, we only need store the base table's
+ * *other* clustering values in the index - the indexed value being the index table's partition key
+ * * When the indexed value is a collection value, in which case we also need to capture the cell path from the base
+ * table
+ * * In a KEYS index (for thrift/compact storage/static column indexes), where only the base partition key is
+ * held in the index table.
+ *
+ * Called from indexCfsMetadata
+ * @param builder
+ * @param baseMetadata
+ * @param cfDef
+ * @return
+ */
+ default CFMetaData.Builder addIndexClusteringColumns(CFMetaData.Builder builder,
+ CFMetaData baseMetadata,
+ ColumnDefinition cfDef)
+ {
+ for (ColumnDefinition def : baseMetadata.clusteringColumns())
+ builder.addClusteringColumn(def.name, def.type);
+ return builder;
+ }
+
+ /*
+ * implementations providing specializations for the built in index types
+ */
+
+ static final CassandraIndexFunctions KEYS_INDEX_FUNCTIONS = new CassandraIndexFunctions()
+ {
+ public CassandraIndex newIndexInstance(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata)
+ {
+ return new KeysIndex(baseCfs, indexMetadata);
+ }
+ };
+
+ static final CassandraIndexFunctions REGULAR_COLUMN_INDEX_FUNCTIONS = new CassandraIndexFunctions()
+ {
+ public CassandraIndex newIndexInstance(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata)
+ {
+ return new RegularColumnIndex(baseCfs, indexMetadata);
+ }
+ };
+
+ static final CassandraIndexFunctions CLUSTERING_COLUMN_INDEX_FUNCTIONS = new CassandraIndexFunctions()
+ {
+ public CassandraIndex newIndexInstance(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata)
+ {
+ return new ClusteringColumnIndex(baseCfs, indexMetadata);
+ }
+
+ public CFMetaData.Builder addIndexClusteringColumns(CFMetaData.Builder builder,
+ CFMetaData baseMetadata,
+ ColumnDefinition columnDef)
+ {
+ List<ColumnDefinition> cks = baseMetadata.clusteringColumns();
+ for (int i = 0; i < columnDef.position(); i++)
+ {
+ ColumnDefinition def = cks.get(i);
+ builder.addClusteringColumn(def.name, def.type);
+ }
+ for (int i = columnDef.position() + 1; i < cks.size(); i++)
+ {
+ ColumnDefinition def = cks.get(i);
+ builder.addClusteringColumn(def.name, def.type);
+ }
+ return builder;
+ }
+ };
+
+ static final CassandraIndexFunctions PARTITION_KEY_INDEX_FUNCTIONS = new CassandraIndexFunctions()
+ {
+ public CassandraIndex newIndexInstance(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata)
+ {
+ return new PartitionKeyIndex(baseCfs, indexMetadata);
+ }
+ };
+
+ static final CassandraIndexFunctions COLLECTION_KEY_INDEX_FUNCTIONS = new CassandraIndexFunctions()
+ {
+ public CassandraIndex newIndexInstance(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata)
+ {
+ return new CollectionKeyIndex(baseCfs, indexMetadata);
+ }
+
+ public AbstractType<?> getIndexedValueType(ColumnDefinition indexedColumn)
+ {
+ return ((CollectionType) indexedColumn.type).nameComparator();
+ }
+ };
+
+ static final CassandraIndexFunctions COLLECTION_VALUE_INDEX_FUNCTIONS = new CassandraIndexFunctions()
+ {
+
+ public CassandraIndex newIndexInstance(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata)
+ {
+ return new CollectionValueIndex(baseCfs, indexMetadata);
+ }
+
+ public AbstractType<?> getIndexedValueType(ColumnDefinition indexedColumn)
+ {
+ return ((CollectionType)indexedColumn.type).valueComparator();
+ }
+
+ public CFMetaData.Builder addIndexClusteringColumns(CFMetaData.Builder builder,
+ CFMetaData baseMetadata,
+ ColumnDefinition columnDef)
+ {
+ for (ColumnDefinition def : baseMetadata.clusteringColumns())
+ builder.addClusteringColumn(def.name, def.type);
+
+ // collection key
+ builder.addClusteringColumn("cell_path", ((CollectionType)columnDef.type).nameComparator());
+ return builder;
+ }
+ };
+
+ static final CassandraIndexFunctions COLLECTION_ENTRY_INDEX_FUNCTIONS = new CassandraIndexFunctions()
+ {
+ public CassandraIndex newIndexInstance(ColumnFamilyStore baseCfs, IndexMetadata indexMetadata)
+ {
+ return new CollectionEntryIndex(baseCfs, indexMetadata);
+ }
+
+ public AbstractType<?> getIndexedValueType(ColumnDefinition indexedColumn)
+ {
+ CollectionType colType = (CollectionType)indexedColumn.type;
+ return CompositeType.getInstance(colType.nameComparator(), colType.valueComparator());
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/internal/CassandraIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndexSearcher.java b/src/java/org/apache/cassandra/index/internal/CassandraIndexSearcher.java
new file mode 100644
index 0000000..72d2528
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/internal/CassandraIndexSearcher.java
@@ -0,0 +1,172 @@
+package org.apache.cassandra.index.internal;
+
+import java.nio.ByteBuffer;
+import java.util.NavigableSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.index.Index;
+import org.apache.cassandra.utils.btree.BTreeSet;
+
+public abstract class CassandraIndexSearcher implements Index.Searcher
+{
+ private static final Logger logger = LoggerFactory.getLogger(CassandraIndexSearcher.class);
+
+ private final RowFilter.Expression expression;
+ protected final CassandraIndex index;
+ protected final ReadCommand command;
+
+ public CassandraIndexSearcher(ReadCommand command,
+ RowFilter.Expression expression,
+ CassandraIndex index)
+ {
+ this.command = command;
+ this.expression = expression;
+ this.index = index;
+ }
+
+ @SuppressWarnings("resource") // Both the OpOrder and 'indexIter' are closed on exception, or through the closing of the result
+ // of this method.
+ public UnfilteredPartitionIterator search(ReadOrderGroup orderGroup)
+ {
+ // the value of the index expression is the partition key in the index table
+ DecoratedKey indexKey = index.getBackingTable().get().decorateKey(expression.getIndexValue());
+ UnfilteredRowIterator indexIter = queryIndex(indexKey, command, orderGroup);
+ try
+ {
+ return queryDataFromIndex(indexKey, UnfilteredRowIterators.filter(indexIter, command.nowInSec()), command, orderGroup);
+ }
+ catch (RuntimeException | Error e)
+ {
+ indexIter.close();
+ throw e;
+ }
+ }
+
+ private UnfilteredRowIterator queryIndex(DecoratedKey indexKey, ReadCommand command, ReadOrderGroup orderGroup)
+ {
+ ClusteringIndexFilter filter = makeIndexFilter(command);
+ ColumnFamilyStore indexCfs = index.getBackingTable().get();
+ CFMetaData indexCfm = indexCfs.metadata;
+ return SinglePartitionReadCommand.create(indexCfm, command.nowInSec(), indexKey, ColumnFilter.all(indexCfm), filter)
+ .queryMemtableAndDisk(indexCfs, orderGroup.indexReadOpOrderGroup());
+ }
+
+ private ClusteringIndexFilter makeIndexFilter(ReadCommand command)
+ {
+ if (command instanceof SinglePartitionReadCommand)
+ {
+ // Note: as yet there's no route to get here - a 2i query *always* uses a
+ // PartitionRangeReadCommand. This is here in preparation for coming changes
+ // in SelectStatement.
+ SinglePartitionReadCommand sprc = (SinglePartitionReadCommand)command;
+ ByteBuffer pk = sprc.partitionKey().getKey();
+ ClusteringIndexFilter filter = sprc.clusteringIndexFilter();
+
+ if (filter instanceof ClusteringIndexNamesFilter)
+ {
+ NavigableSet<Clustering> requested = ((ClusteringIndexNamesFilter)filter).requestedRows();
+ BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(index.getIndexComparator());
+ for (Clustering c : requested)
+ clusterings.add(makeIndexClustering(pk, c));
+ return new ClusteringIndexNamesFilter(clusterings.build(), filter.isReversed());
+ }
+ else
+ {
+ Slices requested = ((ClusteringIndexSliceFilter)filter).requestedSlices();
+ Slices.Builder builder = new Slices.Builder(index.getIndexComparator());
+ for (Slice slice : requested)
+ builder.add(makeIndexBound(pk, slice.start()), makeIndexBound(pk, slice.end()));
+ return new ClusteringIndexSliceFilter(builder.build(), filter.isReversed());
+ }
+ }
+ else
+ {
+
+ DataRange dataRange = ((PartitionRangeReadCommand)command).dataRange();
+ AbstractBounds<PartitionPosition> range = dataRange.keyRange();
+
+ Slice slice = Slice.ALL;
+
+ /*
+ * XXX: If the range requested is a token range, we'll have to start at the beginning (and stop at the end) of
+ * the indexed row unfortunately (which will be inefficient), because we have no way to intuit the smallest possible
+ * key having a given token. A potential fix would be to actually store the token along the key in the indexed row.
+ */
+ if (range.left instanceof DecoratedKey)
+ {
+ // the right hand side of the range may not be a DecoratedKey (for instance if we're paging),
+ // but if it is, we can optimise slightly by restricting the slice
+ if (range.right instanceof DecoratedKey)
+ {
+
+ DecoratedKey startKey = (DecoratedKey) range.left;
+ DecoratedKey endKey = (DecoratedKey) range.right;
+
+ Slice.Bound start = Slice.Bound.BOTTOM;
+ Slice.Bound end = Slice.Bound.TOP;
+
+ /*
+ * For index queries over a range, we can't do a whole lot better than querying everything for the key range, though for
+ * slice queries where we can slightly restrict the beginning and end.
+ */
+ if (!dataRange.isNamesQuery())
+ {
+ ClusteringIndexSliceFilter startSliceFilter = ((ClusteringIndexSliceFilter) dataRange.clusteringIndexFilter(
+ startKey));
+ ClusteringIndexSliceFilter endSliceFilter = ((ClusteringIndexSliceFilter) dataRange.clusteringIndexFilter(
+ endKey));
+
+ // We can't effectively support reversed queries when we have a range, so we don't support it
+ // (or through post-query reordering) and shouldn't get there.
+ assert !startSliceFilter.isReversed() && !endSliceFilter.isReversed();
+
+ Slices startSlices = startSliceFilter.requestedSlices();
+ Slices endSlices = endSliceFilter.requestedSlices();
+
+ if (startSlices.size() > 0)
+ start = startSlices.get(0).start();
+
+ if (endSlices.size() > 0)
+ end = endSlices.get(endSlices.size() - 1).end();
+ }
+
+ slice = Slice.make(makeIndexBound(startKey.getKey(), start),
+ makeIndexBound(endKey.getKey(), end));
+ }
+ else
+ {
+ // otherwise, just start the index slice from the key we do have
+ slice = Slice.make(makeIndexBound(((DecoratedKey)range.left).getKey(), Slice.Bound.BOTTOM),
+ Slice.Bound.TOP);
+ }
+ }
+ return new ClusteringIndexSliceFilter(Slices.with(index.getIndexComparator(), slice), false);
+ }
+ }
+
+ private Slice.Bound makeIndexBound(ByteBuffer rowKey, Slice.Bound bound)
+ {
+ return index.buildIndexClusteringPrefix(rowKey, bound, null)
+ .buildBound(bound.isStart(), bound.isInclusive());
+ }
+
+ protected Clustering makeIndexClustering(ByteBuffer rowKey, Clustering clustering)
+ {
+ return index.buildIndexClusteringPrefix(rowKey, clustering, null).build();
+ }
+
+ protected abstract UnfilteredPartitionIterator queryDataFromIndex(DecoratedKey indexKey,
+ RowIterator indexHits,
+ ReadCommand command,
+ ReadOrderGroup orderGroup);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/internal/IndexEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/IndexEntry.java b/src/java/org/apache/cassandra/index/internal/IndexEntry.java
new file mode 100644
index 0000000..6f94ace
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/internal/IndexEntry.java
@@ -0,0 +1,34 @@
+package org.apache.cassandra.index.internal;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.DecoratedKey;
+
+/**
+ * Entries in indexes on non-compact tables (tables with composite comparators)
+ * can be encapsulated as IndexedEntry instances. These are not used when dealing
+ * with indexes on static/compact/thrift tables (i.e. KEYS indexes).
+ */
+public final class IndexEntry
+{
+ public final DecoratedKey indexValue;
+ public final Clustering indexClustering;
+ public final long timestamp;
+
+ public final ByteBuffer indexedKey;
+ public final Clustering indexedEntryClustering;
+
+ public IndexEntry(DecoratedKey indexValue,
+ Clustering indexClustering,
+ long timestamp,
+ ByteBuffer indexedKey,
+ Clustering indexedEntryClustering)
+ {
+ this.indexValue = indexValue;
+ this.indexClustering = indexClustering;
+ this.timestamp = timestamp;
+ this.indexedKey = indexedKey;
+ this.indexedEntryClustering = indexedEntryClustering;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/internal/composites/ClusteringColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/composites/ClusteringColumnIndex.java b/src/java/org/apache/cassandra/index/internal/composites/ClusteringColumnIndex.java
new file mode 100644
index 0000000..b932602
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/internal/composites/ClusteringColumnIndex.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.internal.composites;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.index.internal.CassandraIndex;
+import org.apache.cassandra.index.internal.IndexEntry;
+import org.apache.cassandra.schema.IndexMetadata;
+
+/**
+ * Index on a CLUSTERING_COLUMN column definition.
+ *
+ * A cell indexed by this index will have the general form:
+ * ck_0 ... ck_n c_name : v
+ * where ck_i are the cluster keys, c_name the last component of the cell
+ * composite name (or second to last if collections are in use, but this
+ * has no impact) and v the cell value.
+ *
+ * Such a cell is always indexed by this index (or rather, it is indexed if
+ * n >= columnDef.componentIndex, which will always be the case in practice)
+ * and it will generate (makeIndexColumnName()) an index entry whose:
+ * - row key will be ck_i (getIndexedValue()) where i == columnDef.componentIndex.
+ * - cell name will
+ * rk ck_0 ... ck_{i-1} ck_{i+1} ck_n
+ * where rk is the row key of the initial cell and i == columnDef.componentIndex.
+ */
+public class ClusteringColumnIndex extends CassandraIndex
+{
+ public ClusteringColumnIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+ {
+ super(baseCfs, indexDef);
+ }
+
+
+ public ByteBuffer getIndexedValue(ByteBuffer partitionKey,
+ Clustering clustering,
+ CellPath path, ByteBuffer cellValue)
+ {
+ return clustering.get(indexedColumn.position());
+ }
+
+ public CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey,
+ ClusteringPrefix prefix,
+ CellPath path)
+ {
+ CBuilder builder = CBuilder.create(getIndexComparator());
+ builder.add(partitionKey);
+ for (int i = 0; i < Math.min(indexedColumn.position(), prefix.size()); i++)
+ builder.add(prefix.get(i));
+ for (int i = indexedColumn.position() + 1; i < prefix.size(); i++)
+ builder.add(prefix.get(i));
+ return builder;
+ }
+
+ public IndexEntry decodeEntry(DecoratedKey indexedValue,
+ Row indexEntry)
+ {
+ int ckCount = baseCfs.metadata.clusteringColumns().size();
+
+ Clustering clustering = indexEntry.clustering();
+ CBuilder builder = CBuilder.create(baseCfs.getComparator());
+ for (int i = 0; i < indexedColumn.position(); i++)
+ builder.add(clustering.get(i + 1));
+
+ builder.add(indexedValue.getKey());
+
+ for (int i = indexedColumn.position() + 1; i < ckCount; i++)
+ builder.add(clustering.get(i));
+
+ return new IndexEntry(indexedValue,
+ clustering,
+ indexEntry.primaryKeyLivenessInfo().timestamp(),
+ clustering.get(0),
+ builder.build());
+ }
+
+ public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
+ {
+ return !data.hasLiveData(nowInSec);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/internal/composites/CollectionEntryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/composites/CollectionEntryIndex.java b/src/java/org/apache/cassandra/index/internal/composites/CollectionEntryIndex.java
new file mode 100644
index 0000000..1113600
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/internal/composites/CollectionEntryIndex.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.internal.composites;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.schema.IndexMetadata;
+
+/**
+ * Index on the element and value of cells participating in a collection.
+ *
+ * The row keys for this index are a composite of the collection element
+ * and value of indexed columns.
+ */
+public class CollectionEntryIndex extends CollectionKeyIndexBase
+{
+ public CollectionEntryIndex(ColumnFamilyStore baseCfs,
+ IndexMetadata indexDef)
+ {
+ super(baseCfs, indexDef);
+ }
+
+ public ByteBuffer getIndexedValue(ByteBuffer partitionKey,
+ Clustering clustering,
+ CellPath path, ByteBuffer cellValue)
+ {
+ return CompositeType.build(path.get(0), cellValue);
+ }
+
+ public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
+ {
+ ByteBuffer[] components = ((CompositeType)functions.getIndexedValueType(indexedColumn)).split(indexValue);
+ ByteBuffer mapKey = components[0];
+ ByteBuffer mapValue = components[1];
+
+ ColumnDefinition columnDef = indexedColumn;
+ Cell cell = data.getCell(columnDef, CellPath.create(mapKey));
+ if (cell == null || !cell.isLive(nowInSec))
+ return true;
+
+ AbstractType<?> valueComparator = ((CollectionType)columnDef.type).valueComparator();
+ return valueComparator.compare(mapValue, cell.value()) != 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/internal/composites/CollectionKeyIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/composites/CollectionKeyIndex.java b/src/java/org/apache/cassandra/index/internal/composites/CollectionKeyIndex.java
new file mode 100644
index 0000000..42c45e5
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/internal/composites/CollectionKeyIndex.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.internal.composites;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.schema.IndexMetadata;
+
+/**
+ * Index on the collection element of the cell name of a collection.
+ *
+ * The row keys for this index are given by the collection element for
+ * indexed columns.
+ */
+public class CollectionKeyIndex extends CollectionKeyIndexBase
+{
+ public CollectionKeyIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+ {
+ super(baseCfs, indexDef);
+ }
+
+ public ByteBuffer getIndexedValue(ByteBuffer partitionKey,
+ Clustering clustering,
+ CellPath path,
+ ByteBuffer cellValue)
+ {
+ return path.get(0);
+ }
+
+ public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
+ {
+ Cell cell = data.getCell(indexedColumn, CellPath.create(indexValue));
+ return cell == null || !cell.isLive(nowInSec);
+ }
+
+ public boolean supportsOperator(ColumnDefinition indexedColumn, Operator operator)
+ {
+ return operator == Operator.CONTAINS_KEY ||
+ operator == Operator.CONTAINS && indexedColumn.type instanceof SetType;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/internal/composites/CollectionKeyIndexBase.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/composites/CollectionKeyIndexBase.java b/src/java/org/apache/cassandra/index/internal/composites/CollectionKeyIndexBase.java
new file mode 100644
index 0000000..fe77c96
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/internal/composites/CollectionKeyIndexBase.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.internal.composites;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.index.internal.CassandraIndex;
+import org.apache.cassandra.index.internal.IndexEntry;
+import org.apache.cassandra.schema.IndexMetadata;
+
+/**
+ * Common superclass for indexes that capture collection keys, including
+ * indexes on such keys themselves.
+ *
+ * A cell indexed by this index will have the general form:
+ * ck_0 ... ck_n c_name [col_elt] : v
+ * where ck_i are the cluster keys, c_name the CQL3 column name, col_elt the
+ * collection element that we want to index (which may or may not be there depending
+ * on whether c_name is the collection we're indexing), and v the cell value.
+ *
+ * Such a cell is indexed if c_name is the indexed collection (in which case we are guaranteed to have
+ * col_elt). The index entry can be viewed in the following way:
+ * - the row key is determined by subclasses of this type.
+ * - the cell name will be 'rk ck_0 ... ck_n' where rk is the row key of the initial cell.
+ */
+public abstract class CollectionKeyIndexBase extends CassandraIndex
+{
+ public CollectionKeyIndexBase(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+ {
+ super(baseCfs, indexDef);
+ }
+
+ public CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey,
+ ClusteringPrefix prefix,
+ CellPath path)
+ {
+ CBuilder builder = CBuilder.create(getIndexComparator());
+ builder.add(partitionKey);
+ for (int i = 0; i < prefix.size(); i++)
+ builder.add(prefix.get(i));
+
+ return builder;
+ }
+
+ public IndexEntry decodeEntry(DecoratedKey indexedValue,
+ Row indexEntry)
+ {
+ int count = 1 + baseCfs.metadata.clusteringColumns().size();
+ Clustering clustering = indexEntry.clustering();
+ CBuilder builder = CBuilder.create(baseCfs.getComparator());
+ for (int i = 0; i < count - 1; i++)
+ builder.add(clustering.get(i + 1));
+
+ return new IndexEntry(indexedValue,
+ clustering,
+ indexEntry.primaryKeyLivenessInfo().timestamp(),
+ clustering.get(0),
+ builder.build());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/internal/composites/CollectionValueIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/composites/CollectionValueIndex.java b/src/java/org/apache/cassandra/index/internal/composites/CollectionValueIndex.java
new file mode 100644
index 0000000..95bd7e1
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/internal/composites/CollectionValueIndex.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.internal.composites;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.CollectionType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.index.internal.CassandraIndex;
+import org.apache.cassandra.index.internal.IndexEntry;
+import org.apache.cassandra.schema.IndexMetadata;
+
+/**
+ * Index the value of a collection cell.
+ *
+ * This is a lot like an index on REGULAR, except that we also need to make
+ * the collection key part of the index entry so that:
+ * 1) we don't have to scan the whole collection at query time to know the
+ * entry is stale and if it still satisfies the query.
+ * 2) if a collection has multiple time the same value, we need one entry
+ * for each so that if we delete one of the value only we only delete the
+ * entry corresponding to that value.
+ */
+public class CollectionValueIndex extends CassandraIndex
+{
+ public CollectionValueIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+ {
+ super(baseCfs, indexDef);
+ }
+
+ public ByteBuffer getIndexedValue(ByteBuffer partitionKey,
+ Clustering clustering,
+ CellPath path, ByteBuffer cellValue)
+ {
+ return cellValue;
+ }
+
+ public CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey,
+ ClusteringPrefix prefix,
+ CellPath path)
+ {
+ CBuilder builder = CBuilder.create(getIndexComparator());
+ builder.add(partitionKey);
+ for (int i = 0; i < prefix.size(); i++)
+ builder.add(prefix.get(i));
+
+ // When indexing, cell will be present, but when searching, it won't (CASSANDRA-7525)
+ if (prefix.size() == baseCfs.metadata.clusteringColumns().size() && path != null)
+ builder.add(path.get(0));
+
+ return builder;
+ }
+
+ public IndexEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
+ {
+ Clustering clustering = indexEntry.clustering();
+ CBuilder builder = CBuilder.create(baseCfs.getComparator());
+ for (int i = 0; i < baseCfs.getComparator().size(); i++)
+ builder.add(clustering.get(i + 1));
+
+ return new IndexEntry(indexedValue,
+ clustering,
+ indexEntry.primaryKeyLivenessInfo().timestamp(),
+ clustering.get(0),
+ builder.build());
+ }
+
+ public boolean supportsOperator(ColumnDefinition indexedColumn, Operator operator)
+ {
+ return operator == Operator.CONTAINS && !(indexedColumn.type instanceof SetType);
+ }
+
+ public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
+ {
+ ColumnDefinition columnDef = indexedColumn;
+ ComplexColumnData complexData = data.getComplexColumnData(columnDef);
+ if (complexData == null)
+ return true;
+
+ for (Cell cell : complexData)
+ {
+ if (cell.isLive(nowInSec) && ((CollectionType) columnDef.type).valueComparator()
+ .compare(indexValue, cell.value()) == 0)
+ return false;
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
new file mode 100644
index 0000000..77867fc
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.internal.composites;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.index.internal.CassandraIndex;
+import org.apache.cassandra.index.internal.CassandraIndexSearcher;
+import org.apache.cassandra.index.internal.IndexEntry;
+import org.apache.cassandra.utils.btree.BTreeSet;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+
+public class CompositesSearcher extends CassandraIndexSearcher
+{
+ private static final Logger logger = LoggerFactory.getLogger(CompositesSearcher.class);
+
+ public CompositesSearcher(ReadCommand command,
+ RowFilter.Expression expression,
+ CassandraIndex index)
+ {
+ super(command, expression, index);
+ }
+
+ private boolean isMatchingEntry(DecoratedKey partitionKey, IndexEntry entry, ReadCommand command)
+ {
+ return command.selects(partitionKey, entry.indexedEntryClustering);
+ }
+
+ protected UnfilteredPartitionIterator queryDataFromIndex(final DecoratedKey indexKey,
+ final RowIterator indexHits,
+ final ReadCommand command,
+ final ReadOrderGroup orderGroup)
+ {
+ assert indexHits.staticRow() == Rows.EMPTY_STATIC_ROW;
+
+ return new UnfilteredPartitionIterator()
+ {
+ private IndexEntry nextEntry;
+
+ private UnfilteredRowIterator next;
+
+ public boolean isForThrift()
+ {
+ return command.isForThrift();
+ }
+
+ public CFMetaData metadata()
+ {
+ return command.metadata();
+ }
+
+ public boolean hasNext()
+ {
+ return prepareNext();
+ }
+
+ public UnfilteredRowIterator next()
+ {
+ if (next == null)
+ prepareNext();
+
+ UnfilteredRowIterator toReturn = next;
+ next = null;
+ return toReturn;
+ }
+
+ private boolean prepareNext()
+ {
+ if (next != null)
+ return true;
+
+ if (nextEntry == null)
+ {
+ if (!indexHits.hasNext())
+ return false;
+
+ nextEntry = index.decodeEntry(indexKey, indexHits.next());
+ }
+
+ // Gather all index hits belonging to the same partition and query the data for those hits.
+ // TODO: it's much more efficient to do 1 read for all hits to the same partition than doing
+ // 1 read per index hit. However, this basically mean materializing all hits for a partition
+ // in memory so we should consider adding some paging mechanism. However, index hits should
+ // be relatively small so it's much better than the previous code that was materializing all
+ // *data* for a given partition.
+ BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(index.baseCfs.getComparator());
+ List<IndexEntry> entries = new ArrayList<>();
+ DecoratedKey partitionKey = index.baseCfs.decorateKey(nextEntry.indexedKey);
+
+ while (nextEntry != null && partitionKey.getKey().equals(nextEntry.indexedKey))
+ {
+ // We're queried a slice of the index, but some hits may not match some of the clustering column constraints
+ if (isMatchingEntry(partitionKey, nextEntry, command))
+ {
+ clusterings.add(nextEntry.indexedEntryClustering);
+ entries.add(nextEntry);
+ }
+
+ nextEntry = indexHits.hasNext() ? index.decodeEntry(indexKey, indexHits.next()) : null;
+ }
+
+ // Because we've eliminated entries that don't match the clustering columns, it's possible we added nothing
+ if (clusterings.isEmpty())
+ return prepareNext();
+
+ // Query the gathered index hits. We still need to filter stale hits from the resulting query.
+ ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(clusterings.build(), false);
+ SinglePartitionReadCommand dataCmd = new SinglePartitionNamesCommand(index.baseCfs.metadata,
+ command.nowInSec(),
+ command.columnFilter(),
+ command.rowFilter(),
+ DataLimits.NONE,
+ partitionKey,
+ filter);
+ @SuppressWarnings("resource") // We close right away if empty, and if it's assign to next it will be called either
+ // by the next caller of next, or through closing this iterator is this come before.
+ UnfilteredRowIterator dataIter = filterStaleEntries(dataCmd.queryMemtableAndDisk(index.baseCfs,
+ orderGroup.baseReadOpOrderGroup()),
+ indexKey.getKey(),
+ entries,
+ orderGroup.writeOpOrderGroup(),
+ command.nowInSec());
+
+
+ if (dataIter.isEmpty())
+ {
+ dataIter.close();
+ return prepareNext();
+ }
+
+ next = dataIter;
+ return true;
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void close()
+ {
+ indexHits.close();
+ if (next != null)
+ next.close();
+ }
+ };
+ }
+
+ private void deleteAllEntries(final List<IndexEntry> entries, final OpOrder.Group writeOp, final int nowInSec)
+ {
+ entries.forEach(entry ->
+ index.deleteStaleEntry(entry.indexValue,
+ entry.indexClustering,
+ new DeletionTime(entry.timestamp, nowInSec),
+ writeOp));
+ }
+
+ private UnfilteredRowIterator filterStaleEntries(UnfilteredRowIterator dataIter,
+ final ByteBuffer indexValue,
+ final List<IndexEntry> entries,
+ final OpOrder.Group writeOp,
+ final int nowInSec)
+ {
+ // collect stale index entries and delete them when we close this iterator
+ final List<IndexEntry> staleEntries = new ArrayList<>();
+
+ // if there is a partition level delete in the base table, we need to filter
+ // any index entries which would be shadowed by it
+ if (!dataIter.partitionLevelDeletion().isLive())
+ {
+ DeletionTime deletion = dataIter.partitionLevelDeletion();
+ entries.forEach(e -> {
+ if (deletion.deletes(e.timestamp))
+ staleEntries.add(e);
+ });
+ }
+
+ return new AlteringUnfilteredRowIterator(dataIter)
+ {
+ private int entriesIdx;
+
+ public void close()
+ {
+ deleteAllEntries(staleEntries, writeOp, nowInSec);
+ super.close();
+ }
+
+ @Override
+ protected Row computeNext(Row row)
+ {
+ IndexEntry entry = findEntry(row.clustering());
+ if (!index.isStale(row, indexValue, nowInSec))
+ return row;
+
+ staleEntries.add(entry);
+ return null;
+ }
+
+ private IndexEntry findEntry(Clustering clustering)
+ {
+ assert entriesIdx < entries.size();
+ while (entriesIdx < entries.size())
+ {
+ IndexEntry entry = entries.get(entriesIdx++);
+ // The entries are in clustering order. So that the requested entry should be the
+ // next entry, the one at 'entriesIdx'. However, we can have stale entries, entries
+ // that have no corresponding row in the base table typically because of a range
+ // tombstone or partition level deletion. Delete such stale entries.
+ int cmp = metadata().comparator.compare(entry.indexedEntryClustering, clustering);
+ assert cmp <= 0; // this would means entries are not in clustering order, which shouldn't happen
+ if (cmp == 0)
+ return entry;
+ else
+ staleEntries.add(entry);
+ }
+ // entries correspond to the rows we've queried, so we shouldn't have a row that has no corresponding entry.
+ throw new AssertionError();
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0626be86/src/java/org/apache/cassandra/index/internal/composites/PartitionKeyIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/composites/PartitionKeyIndex.java b/src/java/org/apache/cassandra/index/internal/composites/PartitionKeyIndex.java
new file mode 100644
index 0000000..2c0b5aa
--- /dev/null
+++ b/src/java/org/apache/cassandra/index/internal/composites/PartitionKeyIndex.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.index.internal.composites;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.index.internal.CassandraIndex;
+import org.apache.cassandra.index.internal.IndexEntry;
+import org.apache.cassandra.schema.IndexMetadata;
+
+/**
+ * Index on a PARTITION_KEY column definition.
+ *
+ * This suppose a composite row key:
+ * rk = rk_0 ... rk_n
+ *
+ * The corresponding index entry will be:
+ * - index row key will be rk_i (where i == columnDef.componentIndex)
+ * - cell name will be: rk ck
+ * where rk is the fully partition key and ck the clustering keys of the
+ * original cell names (thus excluding the last column name as we want to refer to
+ * the whole CQL3 row, not just the cell itself)
+ *
+ * Note that contrarily to other type of index, we repeat the indexed value in
+ * the index cell name (we use the whole partition key). The reason is that we
+ * want to order the index cell name by partitioner first, and skipping a part
+ * of the row key would change the order.
+ */
+public class PartitionKeyIndex extends CassandraIndex
+{
+ public PartitionKeyIndex(ColumnFamilyStore baseCfs, IndexMetadata indexDef)
+ {
+ super(baseCfs, indexDef);
+ }
+
+ public ByteBuffer getIndexedValue(ByteBuffer partitionKey,
+ Clustering clustering,
+ CellPath path,
+ ByteBuffer cellValue)
+ {
+ CompositeType keyComparator = (CompositeType)baseCfs.metadata.getKeyValidator();
+ ByteBuffer[] components = keyComparator.split(partitionKey);
+ return components[indexedColumn.position()];
+ }
+
+ public CBuilder buildIndexClusteringPrefix(ByteBuffer partitionKey,
+ ClusteringPrefix prefix,
+ CellPath path)
+ {
+ CBuilder builder = CBuilder.create(getIndexComparator());
+ builder.add(partitionKey);
+ for (int i = 0; i < prefix.size(); i++)
+ builder.add(prefix.get(i));
+ return builder;
+ }
+
+ public IndexEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
+ {
+ int ckCount = baseCfs.metadata.clusteringColumns().size();
+ Clustering clustering = indexEntry.clustering();
+ CBuilder builder = CBuilder.create(baseCfs.getComparator());
+ for (int i = 0; i < ckCount; i++)
+ builder.add(clustering.get(i + 1));
+
+ return new IndexEntry(indexedValue,
+ clustering,
+ indexEntry.primaryKeyLivenessInfo().timestamp(),
+ clustering.get(0),
+ builder.build());
+ }
+
+ public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec)
+ {
+ return !data.hasLiveData(nowInSec);
+ }
+}