You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:47:50 UTC
[26/51] [partial] cassandra git commit: Storage engine refactor,
a.k.a CASSANDRA-8099
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
index ba48350..0b6577e 100644
--- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java
@@ -23,11 +23,11 @@ import java.util.concurrent.Future;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.LocalPartitioner;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -51,8 +51,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
columnDef = columnDefs.iterator().next();
- CellNameType indexComparator = SecondaryIndex.getIndexComparator(baseCfs.metadata, columnDef);
- CFMetaData indexedCfMetadata = CFMetaData.newIndexMetadata(baseCfs.metadata, columnDef, indexComparator);
+ CFMetaData indexedCfMetadata = SecondaryIndex.newIndexMetadata(baseCfs.metadata, columnDef);
indexCfs = ColumnFamilyStore.createColumnFamilyStore(baseCfs.keyspace,
indexedCfMetadata.cfName,
new LocalPartitioner(getIndexKeyComparator()),
@@ -65,73 +64,98 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
return columnDef.type;
}
+ public ColumnDefinition indexedColumn()
+ {
+ return columnDef;
+ }
+
@Override
String indexTypeForGrouping()
{
return "_internal_";
}
- protected abstract CellName makeIndexColumnName(ByteBuffer rowKey, Cell cell);
+ protected Clustering makeIndexClustering(ByteBuffer rowKey, Clustering clustering, Cell cell)
+ {
+ return makeIndexClustering(rowKey, clustering, cell == null ? null : cell.path());
+ }
- protected abstract ByteBuffer getIndexedValue(ByteBuffer rowKey, Cell cell);
+ protected Clustering makeIndexClustering(ByteBuffer rowKey, Clustering clustering, CellPath path)
+ {
+ return buildIndexClusteringPrefix(rowKey, clustering, path).build();
+ }
- protected abstract AbstractType getExpressionComparator();
+ protected Slice.Bound makeIndexBound(ByteBuffer rowKey, Slice.Bound bound)
+ {
+ return buildIndexClusteringPrefix(rowKey, bound, null).buildBound(bound.isStart(), bound.isInclusive());
+ }
- public String expressionString(IndexExpression expr)
+ protected abstract CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path);
+
+ protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, Cell cell)
{
- return String.format("'%s.%s %s %s'",
- baseCfs.name,
- getExpressionComparator().getString(expr.column),
- expr.operator,
- baseCfs.metadata.getColumnDefinition(expr.column).type.getString(expr.value));
+ return cell == null
+ ? getIndexedValue(rowKey, clustering, null, null)
+ : getIndexedValue(rowKey, clustering, cell.value(), cell.path());
}
- public void delete(ByteBuffer rowKey, Cell cell, OpOrder.Group opGroup)
+ protected abstract ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath cellPath);
+
+ public void delete(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup, int nowInSec)
{
- deleteForCleanup(rowKey, cell, opGroup);
+ deleteForCleanup(rowKey, clustering, cell, opGroup, nowInSec);
}
- public void deleteForCleanup(ByteBuffer rowKey, Cell cell, OpOrder.Group opGroup)
+ public void deleteForCleanup(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup, int nowInSec)
{
- if (!cell.isLive())
- return;
+ delete(rowKey, clustering, cell.value(), cell.path(), new SimpleDeletionTime(cell.livenessInfo().timestamp(), nowInSec), opGroup);
+ }
- DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, cell));
- int localDeletionTime = (int) (System.currentTimeMillis() / 1000);
- ColumnFamily cfi = ArrayBackedSortedColumns.factory.create(indexCfs.metadata, false, 1);
- cfi.addTombstone(makeIndexColumnName(rowKey, cell), localDeletionTime, cell.timestamp());
- indexCfs.apply(valueKey, cfi, SecondaryIndexManager.nullUpdater, opGroup, null);
+ public void delete(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path, DeletionTime deletion, OpOrder.Group opGroup)
+ {
+ DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, clustering, cellValue, path));
+ PartitionUpdate upd = new PartitionUpdate(indexCfs.metadata, valueKey, PartitionColumns.NONE, 1);
+ Row.Writer writer = upd.writer();
+ Rows.writeClustering(makeIndexClustering(rowKey, clustering, path), writer);
+ writer.writeRowDeletion(deletion);
+ writer.endOfRow();
+ indexCfs.apply(upd, SecondaryIndexManager.nullUpdater, opGroup, null);
if (logger.isDebugEnabled())
- logger.debug("removed index entry for cleaned-up value {}:{}", valueKey, cfi);
+ logger.debug("removed index entry for cleaned-up value {}:{}", valueKey, upd);
}
- public void insert(ByteBuffer rowKey, Cell cell, OpOrder.Group opGroup)
+ public void insert(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup)
{
- DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, cell));
- ColumnFamily cfi = ArrayBackedSortedColumns.factory.create(indexCfs.metadata, false, 1);
- CellName name = makeIndexColumnName(rowKey, cell);
- if (cell instanceof ExpiringCell)
- {
- ExpiringCell ec = (ExpiringCell) cell;
- cfi.addColumn(new BufferExpiringCell(name, ByteBufferUtil.EMPTY_BYTE_BUFFER, ec.timestamp(), ec.getTimeToLive(), ec.getLocalDeletionTime()));
- }
- else
- {
- cfi.addColumn(new BufferCell(name, ByteBufferUtil.EMPTY_BYTE_BUFFER, cell.timestamp()));
- }
+ insert(rowKey, clustering, cell, cell.livenessInfo(), opGroup);
+ }
+
+ public void insert(ByteBuffer rowKey, Clustering clustering, Cell cell, LivenessInfo info, OpOrder.Group opGroup)
+ {
+ DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, clustering, cell));
+
+ PartitionUpdate upd = new PartitionUpdate(indexCfs.metadata, valueKey, PartitionColumns.NONE, 1);
+ Row.Writer writer = upd.writer();
+ Rows.writeClustering(makeIndexClustering(rowKey, clustering, cell), writer);
+ writer.writePartitionKeyLivenessInfo(info);
+ writer.endOfRow();
if (logger.isDebugEnabled())
- logger.debug("applying index row {} in {}", indexCfs.metadata.getKeyValidator().getString(valueKey.getKey()), cfi);
+ logger.debug("applying index row {} in {}", indexCfs.metadata.getKeyValidator().getString(valueKey.getKey()), upd);
- indexCfs.apply(valueKey, cfi, SecondaryIndexManager.nullUpdater, opGroup, null);
+ indexCfs.apply(upd, SecondaryIndexManager.nullUpdater, opGroup, null);
}
- public void update(ByteBuffer rowKey, Cell oldCol, Cell col, OpOrder.Group opGroup)
+ public void update(ByteBuffer rowKey, Clustering clustering, Cell oldCell, Cell cell, OpOrder.Group opGroup, int nowInSec)
{
// insert the new value before removing the old one, so we never have a period
- // where the row is invisible to both queries (the opposite seems preferable); see CASSANDRA-5540
- insert(rowKey, col, opGroup);
- if (SecondaryIndexManager.shouldCleanupOldValue(oldCol, col))
- delete(rowKey, oldCol, opGroup);
+ // where the row is invisible to both queries (the opposite seems preferable); see CASSANDRA-5540
+ insert(rowKey, clustering, cell, opGroup);
+ if (SecondaryIndexManager.shouldCleanupOldValue(oldCell, cell))
+ delete(rowKey, clustering, oldCell, opGroup, nowInSec);
+ }
+
+ public boolean indexes(ColumnDefinition column)
+ {
+ return column.name.equals(columnDef.name);
}
public void removeIndex(ByteBuffer columnName)
@@ -165,6 +189,12 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
return indexCfs;
}
+ protected ClusteringComparator getIndexComparator()
+ {
+ assert indexCfs != null;
+ return indexCfs.metadata.comparator;
+ }
+
public String getIndexName()
{
return indexCfs.name;
@@ -172,18 +202,43 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec
public void reload()
{
- indexCfs.metadata.reloadSecondaryIndexMetadata(baseCfs.metadata);
+ indexCfs.metadata.reloadIndexMetadataProperties(baseCfs.metadata);
indexCfs.reload();
}
-
+
public long estimateResultRows()
{
return getIndexCfs().getMeanColumns();
}
- public boolean validate(ByteBuffer rowKey, Cell cell)
+ public void validate(DecoratedKey partitionKey) throws InvalidRequestException
+ {
+ if (columnDef.kind == ColumnDefinition.Kind.PARTITION_KEY)
+ validateIndexedValue(getIndexedValue(partitionKey.getKey(), null, null, null));
+ }
+
+ public void validate(Clustering clustering) throws InvalidRequestException
+ {
+ if (columnDef.kind == ColumnDefinition.Kind.CLUSTERING_COLUMN)
+ validateIndexedValue(getIndexedValue(null, clustering, null, null));
+ }
+
+ public void validate(ByteBuffer cellValue, CellPath path) throws InvalidRequestException
+ {
+ if (!columnDef.isPrimaryKeyColumn())
+ validateIndexedValue(getIndexedValue(null, null, cellValue, path));
+ }
+
+ private void validateIndexedValue(ByteBuffer value)
+ {
+ if (value != null && value.remaining() >= FBUtilities.MAX_UNSIGNED_SHORT)
+ throw new InvalidRequestException(String.format("Cannot index value of size %d for index %s on %s.%s(%s) (maximum allowed size=%d)",
+ value.remaining(), getIndexName(), baseKeyspace(), baseTable(), columnDef.name, FBUtilities.MAX_UNSIGNED_SHORT));
+ }
+
+ @Override
+ public String toString()
{
- return getIndexedValue(rowKey, cell).remaining() < FBUtilities.MAX_UNSIGNED_SHORT
- && makeIndexColumnName(rowKey, cell).toByteBuffer().remaining() < FBUtilities.MAX_UNSIGNED_SHORT;
+ return String.format("%s(%s)", baseTable(), columnDef.name);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
index ba902ec..ab8e688 100644
--- a/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java
@@ -19,9 +19,9 @@ package org.apache.cassandra.db.index;
import java.nio.ByteBuffer;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.utils.FBUtilities;
/**
* Base class for Secondary indexes that implement a unique index per column
@@ -35,12 +35,26 @@ public abstract class PerColumnSecondaryIndex extends SecondaryIndex
* @param rowKey the underlying row key which is indexed
* @param col all the column info
*/
- public abstract void delete(ByteBuffer rowKey, Cell col, OpOrder.Group opGroup);
+ public abstract void delete(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup, int nowInSec);
/**
* Called when a column has been removed due to a cleanup operation.
*/
- public abstract void deleteForCleanup(ByteBuffer rowKey, Cell col, OpOrder.Group opGroup);
+ public abstract void deleteForCleanup(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup, int nowInSec);
+
+ /**
+ * For indexes on the primary key, index the given PK.
+ */
+ public void maybeIndex(ByteBuffer partitionKey, Clustering clustering, long timestamp, int ttl, OpOrder.Group opGroup, int nowInSec)
+ {
+ }
+
+ /**
+ * For indexes on the primary key, delete the given PK.
+ */
+ public void maybeDelete(ByteBuffer partitionKey, Clustering clustering, DeletionTime deletion, OpOrder.Group opGroup)
+ {
+ }
/**
* insert a column to the index
@@ -48,7 +62,7 @@ public abstract class PerColumnSecondaryIndex extends SecondaryIndex
* @param rowKey the underlying row key which is indexed
* @param col all the column info
*/
- public abstract void insert(ByteBuffer rowKey, Cell col, OpOrder.Group opGroup);
+ public abstract void insert(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup);
/**
* update a column from the index
@@ -57,20 +71,44 @@ public abstract class PerColumnSecondaryIndex extends SecondaryIndex
* @param oldCol the previous column info
* @param col all the column info
*/
- public abstract void update(ByteBuffer rowKey, Cell oldCol, Cell col, OpOrder.Group opGroup);
+ public abstract void update(ByteBuffer rowKey, Clustering clustering, Cell oldCell, Cell cell, OpOrder.Group opGroup, int nowInSec);
- public String getNameForSystemKeyspace(ByteBuffer column)
+ protected boolean indexPrimaryKeyColumn()
{
- return getIndexName();
+ return false;
}
- public boolean validate(ByteBuffer rowKey, Cell cell)
+ public void indexRow(DecoratedKey key, Row row, OpOrder.Group opGroup, int nowInSec)
{
- return validate(cell);
+ Clustering clustering = row.clustering();
+ if (indexPrimaryKeyColumn())
+ {
+ // Same as in AtomicBTreePartition.maybeIndexPrimaryKeyColumn
+ long timestamp = row.primaryKeyLivenessInfo().timestamp();
+ int ttl = row.primaryKeyLivenessInfo().ttl();
+
+ for (Cell cell : row)
+ {
+ if (cell.isLive(nowInSec) && cell.livenessInfo().timestamp() > timestamp)
+ {
+ timestamp = cell.livenessInfo().timestamp();
+ ttl = cell.livenessInfo().ttl();
+ }
+ }
+ maybeIndex(key.getKey(), clustering, timestamp, ttl, opGroup, nowInSec);
+ }
+ for (Cell cell : row)
+ {
+ if (!indexes(cell.column()))
+ continue;
+
+ if (cell.isLive(nowInSec))
+ insert(key.getKey(), clustering, cell, opGroup);
+ }
}
- public boolean validate(Cell cell)
+ public String getNameForSystemKeyspace(ByteBuffer column)
{
- return cell.value().remaining() < FBUtilities.MAX_UNSIGNED_SHORT;
+ return getIndexName();
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
index f6f0e8d..502b213 100644
--- a/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java
@@ -20,11 +20,9 @@ package org.apache.cassandra.db.index;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.utils.ByteBufferUtil;
/**
@@ -33,19 +31,16 @@ import org.apache.cassandra.utils.ByteBufferUtil;
public abstract class PerRowSecondaryIndex extends SecondaryIndex
{
/**
- * Index the given row.
- *
- * @param rowKey the row key
- * @param cf the cf data to be indexed
+ * Index the given partition.
*/
- public abstract void index(ByteBuffer rowKey, ColumnFamily cf);
+ public abstract void index(ByteBuffer key, UnfilteredRowIterator atoms);
/**
* cleans up deleted columns from cassandra cleanup compaction
*
* @param key
*/
- public abstract void delete(DecoratedKey key, OpOrder.Group opGroup);
+ public abstract void delete(ByteBuffer key, OpOrder.Group opGroup);
public String getNameForSystemKeyspace(ByteBuffer columnName)
{
@@ -58,15 +53,4 @@ public abstract class PerRowSecondaryIndex extends SecondaryIndex
throw new RuntimeException(e);
}
}
-
-
- public boolean validate(ByteBuffer rowKey, Cell cell)
- {
- return validate(cell);
- }
-
- public boolean validate(Cell cell)
- {
- return true;
- }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
index 11626d6..7552fd5 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndex.java
@@ -32,15 +32,11 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.IndexType;
import org.apache.cassandra.cql3.Operator;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.SimpleDenseCellNameType;
import org.apache.cassandra.db.index.composites.CompositesIndex;
import org.apache.cassandra.db.index.keys.KeysIndex;
import org.apache.cassandra.db.marshal.AbstractType;
@@ -48,6 +44,7 @@ import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.LocalByPartionerType;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.io.sstable.ReducingKeyIterator;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
@@ -168,7 +165,7 @@ public abstract class SecondaryIndex
* @param columns the list of columns which belong to this index type
* @return the secondary index search impl
*/
- protected abstract SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ByteBuffer> columns);
+ protected abstract SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ColumnDefinition> columns);
/**
* Forces this indexes' in memory data to disk
@@ -308,22 +305,12 @@ public abstract class SecondaryIndex
}
/**
- * Returns true if the provided cell name is indexed by this secondary index.
+ * Returns true if the provided column is indexed by this secondary index.
*
* The default implementation checks whether the name is one the columnDef name,
* but this should be overriden but subclass if needed.
*/
- public abstract boolean indexes(CellName name);
-
- /**
- * Returns true if the provided column definition is indexed by this secondary index.
- *
- * The default implementation checks whether it is contained in this index column definitions set.
- */
- public boolean indexes(ColumnDefinition cdef)
- {
- return columnDefs.contains(cdef);
- }
+ public abstract boolean indexes(ColumnDefinition column);
/**
* This is the primary way to create a secondary index instance for a CF column.
@@ -371,28 +358,45 @@ public abstract class SecondaryIndex
return index;
}
- public abstract boolean validate(ByteBuffer rowKey, Cell cell);
+ public abstract void validate(DecoratedKey partitionKey) throws InvalidRequestException;
+ public abstract void validate(Clustering clustering) throws InvalidRequestException;
+ public abstract void validate(ByteBuffer cellValue, CellPath path) throws InvalidRequestException;
public abstract long estimateResultRows();
+ protected String baseKeyspace()
+ {
+ return baseCfs.metadata.ksName;
+ }
+
+ protected String baseTable()
+ {
+ return baseCfs.metadata.cfName;
+ }
+
/**
- * Returns the index comparator for index backed by CFS, or null.
- *
- * Note: it would be cleaner to have this be a member method. However we need this when opening indexes
- * sstables, but by then the CFS won't be fully initiated, so the SecondaryIndex object won't be accessible.
+ * Create the index metadata for the index on a given column of a given table.
*/
- public static CellNameType getIndexComparator(CFMetaData baseMetadata, ColumnDefinition cdef)
+ public static CFMetaData newIndexMetadata(CFMetaData baseMetadata, ColumnDefinition def)
{
- switch (cdef.getIndexType())
+ if (def.getIndexType() == IndexType.CUSTOM)
+ return null;
+
+ CFMetaData.Builder builder = CFMetaData.Builder.create(baseMetadata.ksName, baseMetadata.indexColumnFamilyName(def))
+ .withId(baseMetadata.cfId)
+ .addPartitionKey(def.name, def.type);
+
+ if (def.getIndexType() == IndexType.COMPOSITES)
{
- case KEYS:
- return new SimpleDenseCellNameType(keyComparator);
- case COMPOSITES:
- return CompositesIndex.getIndexComparator(baseMetadata, cdef);
- case CUSTOM:
- return null;
+ CompositesIndex.addIndexClusteringColumns(builder, baseMetadata, def);
}
- throw new AssertionError();
+ else
+ {
+ assert def.getIndexType() == IndexType.KEYS;
+ KeysIndex.addIndexClusteringColumns(builder, baseMetadata, def);
+ }
+
+ return builder.build().reloadIndexMetadataProperties(baseMetadata);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
index 916c286..a117f6d 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.db.compaction.CompactionInfo;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.compaction.CompactionInterruptedException;
import org.apache.cassandra.io.sstable.ReducingKeyIterator;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
/**
@@ -45,7 +46,7 @@ public class SecondaryIndexBuilder extends CompactionInfo.Holder
this.cfs = cfs;
this.idxNames = idxNames;
this.iter = iter;
- compactionId = UUIDGen.getTimeUUID();
+ this.compactionId = UUIDGen.getTimeUUID();
}
public CompactionInfo getCompactionInfo()
@@ -59,21 +60,26 @@ public class SecondaryIndexBuilder extends CompactionInfo.Holder
public void build()
{
- while (iter.hasNext())
- {
- if (isStopRequested())
- throw new CompactionInterruptedException(getCompactionInfo());
- DecoratedKey key = iter.next();
- Keyspace.indexRow(key, cfs, idxNames);
- }
-
try
{
- iter.close();
+ while (iter.hasNext())
+ {
+ if (isStopRequested())
+ throw new CompactionInterruptedException(getCompactionInfo());
+ DecoratedKey key = iter.next();
+ Keyspace.indexPartition(key, cfs, idxNames);
+ }
}
- catch (IOException e)
+ finally
{
- throw new RuntimeException(e);
+ try
+ {
+ iter.close();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index 4c1bf45..1bd5452 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.db.index;
import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -40,20 +39,14 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.IndexType;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.IndexExpression;
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.filter.ExtendedFilter;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.io.sstable.ReducingKeyIterator;
import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -67,11 +60,10 @@ public class SecondaryIndexManager
public static final Updater nullUpdater = new Updater()
{
- public void insert(Cell cell) { }
-
- public void update(Cell oldCell, Cell cell) { }
-
- public void remove(Cell current) { }
+ public void maybeIndex(Clustering clustering, long timestamp, int ttl, DeletionTime deletion) {}
+ public void insert(Clustering clustering, Cell cell) {}
+ public void update(Clustering clustering, Cell oldCell, Cell cell) {}
+ public void remove(Clustering clustering, Cell current) {}
public void updateRowLevelIndexes() {}
};
@@ -144,6 +136,24 @@ public class SecondaryIndexManager
return names;
}
+ public Set<PerColumnSecondaryIndex> perColumnIndexes()
+ {
+ Set<PerColumnSecondaryIndex> s = new HashSet<>();
+ for (SecondaryIndex index : allIndexes)
+ if (index instanceof PerColumnSecondaryIndex)
+ s.add((PerColumnSecondaryIndex)index);
+ return s;
+ }
+
+ public Set<PerRowSecondaryIndex> perRowIndexes()
+ {
+ Set<PerRowSecondaryIndex> s = new HashSet<>();
+ for (SecondaryIndex index : allIndexes)
+ if (index instanceof PerRowSecondaryIndex)
+ s.add((PerRowSecondaryIndex)index);
+ return s;
+ }
+
/**
* Does a full, blocking rebuild of the indexes specified by columns from the sstables.
* Does nothing if columns is empty.
@@ -171,26 +181,20 @@ public class SecondaryIndexManager
logger.info("Index build of {} complete", idxNames);
}
- public boolean indexes(CellName name, Set<SecondaryIndex> indexes)
+ public boolean indexes(ColumnDefinition column)
{
- boolean matching = false;
- for (SecondaryIndex index : indexes)
- {
- if (index.indexes(name))
- {
- matching = true;
- break;
- }
- }
- return matching;
+ for (SecondaryIndex index : allIndexes)
+ if (index.indexes(column))
+ return true;
+ return false;
}
- public Set<SecondaryIndex> indexFor(CellName name, Set<SecondaryIndex> indexes)
+ private Set<SecondaryIndex> indexFor(ColumnDefinition column)
{
Set<SecondaryIndex> matching = null;
- for (SecondaryIndex index : indexes)
+ for (SecondaryIndex index : allIndexes)
{
- if (index.indexes(name))
+ if (index.indexes(column))
{
if (matching == null)
matching = new HashSet<>();
@@ -200,36 +204,6 @@ public class SecondaryIndexManager
return matching == null ? Collections.<SecondaryIndex>emptySet() : matching;
}
- public boolean indexes(Cell cell)
- {
- return indexes(cell.name());
- }
-
- public boolean indexes(CellName name)
- {
- return indexes(name, allIndexes);
- }
-
- public Set<SecondaryIndex> indexFor(CellName name)
- {
- return indexFor(name, allIndexes);
- }
-
- /**
- * @return true if at least one of the indexes can handle the clause.
- */
- public boolean hasIndexFor(List<IndexExpression> clause)
- {
- if (clause == null || clause.isEmpty())
- return false;
-
- for (SecondaryIndexSearcher searcher : getIndexSearchersForQuery(clause))
- if (searcher.canHandleIndexClause(clause))
- return true;
-
- return false;
- }
-
/**
* Removes a existing index
* @param column the indexed column to remove
@@ -325,9 +299,9 @@ public class SecondaryIndexManager
* @param column the name of indexes column
* @return the index
*/
- public SecondaryIndex getIndexForColumn(ByteBuffer column)
+ public SecondaryIndex getIndexForColumn(ColumnDefinition column)
{
- return indexesByColumn.get(column);
+ return indexesByColumn.get(column.name.bytes);
}
/**
@@ -427,105 +401,125 @@ public class SecondaryIndexManager
}
/**
- * When building an index against existing data, add the given row to the index
- *
- * @param key the row key
- * @param cf the current rows data
+ * When building an index against existing data, add the given partition to the index
*/
- public void indexRow(ByteBuffer key, ColumnFamily cf, OpOrder.Group opGroup)
+ public void indexPartition(UnfilteredRowIterator partition, OpOrder.Group opGroup, Set<SecondaryIndex> allIndexes, int nowInSec)
{
- // Update entire row only once per row level index
- Set<Class<? extends SecondaryIndex>> appliedRowLevelIndexes = null;
+ Set<PerRowSecondaryIndex> perRowIndexes = perRowIndexes();
+ Set<PerColumnSecondaryIndex> perColumnIndexes = perColumnIndexes();
- for (SecondaryIndex index : allIndexes)
+ if (!perRowIndexes.isEmpty())
{
- if (index instanceof PerRowSecondaryIndex)
+ // TODO: This is passing the same partition iterator to all perRow index, which means this only
+ // work if there is only one of them. We should change the API so it doesn't work directly on the
+ // partition, but rather on individual rows, so we can do a single iteration on the partition in this
+ // method and pass the rows to index to all indexes.
+
+ // Update entire partition only once per row level index
+ Set<Class<? extends SecondaryIndex>> appliedRowLevelIndexes = new HashSet<>();
+ for (PerRowSecondaryIndex index : perRowIndexes)
{
- if (appliedRowLevelIndexes == null)
- appliedRowLevelIndexes = new HashSet<>();
-
if (appliedRowLevelIndexes.add(index.getClass()))
- ((PerRowSecondaryIndex)index).index(key, cf);
+ ((PerRowSecondaryIndex)index).index(partition.partitionKey().getKey(), partition);
}
- else
+ }
+
+ if (!perColumnIndexes.isEmpty())
+ {
+ DecoratedKey key = partition.partitionKey();
+
+ if (!partition.staticRow().isEmpty())
{
- for (Cell cell : cf)
- if (cell.isLive() && index.indexes(cell.name()))
- ((PerColumnSecondaryIndex) index).insert(key, cell, opGroup);
+ for (PerColumnSecondaryIndex index : perColumnIndexes)
+ index.indexRow(key, partition.staticRow(), opGroup, nowInSec);
+ }
+
+ try (RowIterator filtered = UnfilteredRowIterators.filter(partition, nowInSec))
+ {
+ while (filtered.hasNext())
+ {
+ Row row = filtered.next();
+ for (PerColumnSecondaryIndex index : perColumnIndexes)
+ index.indexRow(key, row, opGroup, nowInSec);
+ }
}
}
}
/**
- * Delete all columns from all indexes for this row. For when cleanup rips a row out entirely.
- *
- * @param key the row key
- * @param indexedColumnsInRow all column names in row
+ * Delete all data from all indexes for this partition. For when cleanup rips a partition out entirely.
*/
- public void deleteFromIndexes(DecoratedKey key, List<Cell> indexedColumnsInRow, OpOrder.Group opGroup)
+ public void deleteFromIndexes(UnfilteredRowIterator partition, OpOrder.Group opGroup, int nowInSec)
{
- // Update entire row only once per row level index
- Set<Class<? extends SecondaryIndex>> cleanedRowLevelIndexes = null;
+ ByteBuffer key = partition.partitionKey().getKey();
- for (Cell cell : indexedColumnsInRow)
+ for (PerRowSecondaryIndex index : perRowIndexes())
+ index.delete(key, opGroup);
+
+ Set<PerColumnSecondaryIndex> indexes = perColumnIndexes();
+
+ while (partition.hasNext())
{
- for (SecondaryIndex index : indexFor(cell.name()))
+ Unfiltered unfiltered = partition.next();
+ if (unfiltered.kind() != Unfiltered.Kind.ROW)
+ continue;
+
+ Row row = (Row) unfiltered;
+ Clustering clustering = row.clustering();
+ if (!row.deletion().isLive())
+ for (PerColumnSecondaryIndex index : indexes)
+ index.maybeDelete(key, clustering, row.deletion(), opGroup);
+ for (Cell cell : row)
{
- if (index instanceof PerRowSecondaryIndex)
+ for (PerColumnSecondaryIndex index : indexes)
{
- if (cleanedRowLevelIndexes == null)
- cleanedRowLevelIndexes = new HashSet<>();
- if (cleanedRowLevelIndexes.add(index.getClass()))
- ((PerRowSecondaryIndex) index).delete(key, opGroup);
- }
- else
- {
- ((PerColumnSecondaryIndex) index).deleteForCleanup(key.getKey(), cell, opGroup);
+ if (!index.indexes(cell.column()))
+ continue;
+
+ ((PerColumnSecondaryIndex) index).deleteForCleanup(key, clustering, cell, opGroup, nowInSec);
}
}
}
}
/**
- * This helper acts as a closure around the indexManager
- * and updated cf data to ensure that down in
- * Memtable's ColumnFamily implementation, the index
- * can get updated. Note: only a CF backed by AtomicSortedColumns implements
- * this behaviour fully, other types simply ignore the index updater.
+ * This helper acts as a closure around the indexManager and updated data
+ * to ensure that down in Memtable's ColumnFamily implementation, the index
+ * can get updated.
*/
- public Updater updaterFor(DecoratedKey key, ColumnFamily cf, OpOrder.Group opGroup)
+ public Updater updaterFor(PartitionUpdate update, OpOrder.Group opGroup, int nowInSec)
{
return (indexesByColumn.isEmpty() && rowLevelIndexMap.isEmpty())
? nullUpdater
- : new StandardUpdater(key, cf, opGroup);
+ : new StandardUpdater(update, opGroup, nowInSec);
}
/**
* Updated closure with only the modified row key.
*/
- public Updater gcUpdaterFor(DecoratedKey key)
+ public Updater gcUpdaterFor(DecoratedKey key, int nowInSec)
{
- return new GCUpdater(key);
+ return new GCUpdater(key, nowInSec);
}
/**
* Get a list of IndexSearchers from the union of expression index types
- * @param clause the query clause
+ * @param command the query
* @return the searchers needed to query the index
*/
- public List<SecondaryIndexSearcher> getIndexSearchersForQuery(List<IndexExpression> clause)
+ public List<SecondaryIndexSearcher> getIndexSearchersFor(ReadCommand command)
{
- Map<String, Set<ByteBuffer>> groupByIndexType = new HashMap<>();
+ Map<String, Set<ColumnDefinition>> groupByIndexType = new HashMap<>();
//Group columns by type
- for (IndexExpression ix : clause)
+ for (RowFilter.Expression e : command.rowFilter())
{
- SecondaryIndex index = getIndexForColumn(ix.column);
+ SecondaryIndex index = getIndexForColumn(e.column());
- if (index == null || !index.supportsOperator(ix.operator))
+ if (index == null || !index.supportsOperator(e.operator()))
continue;
- Set<ByteBuffer> columns = groupByIndexType.get(index.indexTypeForGrouping());
+ Set<ColumnDefinition> columns = groupByIndexType.get(index.indexTypeForGrouping());
if (columns == null)
{
@@ -533,107 +527,54 @@ public class SecondaryIndexManager
groupByIndexType.put(index.indexTypeForGrouping(), columns);
}
- columns.add(ix.column);
+ columns.add(e.column());
}
List<SecondaryIndexSearcher> indexSearchers = new ArrayList<>(groupByIndexType.size());
//create searcher per type
- for (Set<ByteBuffer> column : groupByIndexType.values())
+ for (Set<ColumnDefinition> column : groupByIndexType.values())
indexSearchers.add(getIndexForColumn(column.iterator().next()).createSecondaryIndexSearcher(column));
return indexSearchers;
}
- /**
- * Validates an union of expression index types. It will throw a {@link RuntimeException} if
- * any of the expressions in the provided clause is not valid for its index implementation.
- * @param clause the query clause
- * @throws org.apache.cassandra.exceptions.InvalidRequestException in case of validation errors
- */
- public void validateIndexSearchersForQuery(List<IndexExpression> clause) throws InvalidRequestException
+ public SecondaryIndexSearcher getBestIndexSearcherFor(ReadCommand command)
{
- // Group by index type
- Map<String, Set<IndexExpression>> expressionsByIndexType = new HashMap<>();
- Map<String, Set<ByteBuffer>> columnsByIndexType = new HashMap<>();
- for (IndexExpression indexExpression : clause)
- {
- SecondaryIndex index = getIndexForColumn(indexExpression.column);
-
- if (index == null)
- continue;
-
- String canonicalIndexName = index.getClass().getCanonicalName();
- Set<IndexExpression> expressions = expressionsByIndexType.get(canonicalIndexName);
- Set<ByteBuffer> columns = columnsByIndexType.get(canonicalIndexName);
- if (expressions == null)
- {
- expressions = new HashSet<>();
- columns = new HashSet<>();
- expressionsByIndexType.put(canonicalIndexName, expressions);
- columnsByIndexType.put(canonicalIndexName, columns);
- }
+ List<SecondaryIndexSearcher> indexSearchers = getIndexSearchersFor(command);
- expressions.add(indexExpression);
- columns.add(indexExpression.column);
- }
-
- // Validate
- boolean haveSupportedIndexLookup = false;
- for (Map.Entry<String, Set<IndexExpression>> expressions : expressionsByIndexType.entrySet())
- {
- Set<ByteBuffer> columns = columnsByIndexType.get(expressions.getKey());
- SecondaryIndex secondaryIndex = getIndexForColumn(columns.iterator().next());
- SecondaryIndexSearcher searcher = secondaryIndex.createSecondaryIndexSearcher(columns);
- for (IndexExpression expression : expressions.getValue())
- {
- searcher.validate(expression);
- haveSupportedIndexLookup |= secondaryIndex.supportsOperator(expression.operator);
- }
- }
+ if (indexSearchers.isEmpty())
+ return null;
- if (!haveSupportedIndexLookup)
+ SecondaryIndexSearcher mostSelective = null;
+ long bestEstimate = Long.MAX_VALUE;
+ for (SecondaryIndexSearcher searcher : indexSearchers)
{
- // build the error message
- int i = 0;
- StringBuilder sb = new StringBuilder("No secondary indexes on the restricted columns support the provided operators: ");
- for (Map.Entry<String, Set<IndexExpression>> expressions : expressionsByIndexType.entrySet())
+ SecondaryIndex highestSelectivityIndex = searcher.highestSelectivityIndex(command.rowFilter());
+ long estimate = highestSelectivityIndex.estimateResultRows();
+ if (estimate <= bestEstimate)
{
- for (IndexExpression expression : expressions.getValue())
- {
- if (i++ > 0)
- sb.append(", ");
- sb.append("'");
- String columnName;
- try
- {
- columnName = ByteBufferUtil.string(expression.column);
- }
- catch (CharacterCodingException ex)
- {
- columnName = "<unprintable>";
- }
- sb.append(columnName).append(" ").append(expression.operator).append(" <value>").append("'");
- }
+ bestEstimate = estimate;
+ mostSelective = searcher;
}
-
- throw new InvalidRequestException(sb.toString());
}
+ return mostSelective;
}
/**
- * Performs a search across a number of column indexes
- *
- * @param filter the column range to restrict to
- * @return found indexed rows
+ * Validates an union of expression index types. It will throw an {@link InvalidRequestException} if
+ * any of the expressions in the provided clause is not valid for its index implementation.
+ * @param filter the filter to check
+ * @throws org.apache.cassandra.exceptions.InvalidRequestException in case of validation errors
*/
- public List<Row> search(ExtendedFilter filter)
+ public void validateFilter(RowFilter filter) throws InvalidRequestException
{
- SecondaryIndexSearcher mostSelective = getHighestSelectivityIndexSearcher(filter.getClause());
- if (mostSelective == null)
- return Collections.emptyList();
- else
- return mostSelective.search(filter);
+ for (RowFilter.Expression expression : filter)
+ {
+ SecondaryIndex index = getIndexForColumn(expression.column());
+ if (index != null && index.supportsOperator(expression.operator()))
+ expression.validateForIndexing();
+ }
}
public Set<SecondaryIndex> getIndexesByNames(Set<String> idxNames)
@@ -666,19 +607,27 @@ public class SecondaryIndexManager
index.setIndexRemoved();
}
- public SecondaryIndex validate(ByteBuffer rowKey, Cell cell)
+ public void validate(DecoratedKey partitionKey) throws InvalidRequestException
{
- for (SecondaryIndex index : indexFor(cell.name()))
- {
- if (!index.validate(rowKey, cell))
- return index;
- }
- return null;
+ for (SecondaryIndex index : perColumnIndexes())
+ index.validate(partitionKey);
+ }
+
+ public void validate(Clustering clustering) throws InvalidRequestException
+ {
+ for (SecondaryIndex index : perColumnIndexes())
+ index.validate(clustering);
+ }
+
+ public void validate(ColumnDefinition column, ByteBuffer value, CellPath path) throws InvalidRequestException
+ {
+ for (SecondaryIndex index : indexFor(column))
+ index.validate(value, path);
}
static boolean shouldCleanupOldValue(Cell oldCell, Cell newCell)
{
- // If any one of name/value/timestamp are different, then we
+ // If either the value or timestamp is different, then we
// should delete from the index. If not, then we can infer that
// at least one of the cells is an ExpiringColumn and that the
// difference is in the expiry time. In this case, we don't want to
@@ -686,10 +635,9 @@ public class SecondaryIndexManager
// will just hide the inserted value.
// Completely identical cells (including expiring columns with
// identical ttl & localExpirationTime) will not get this far due
- // to the oldCell.equals(newColumn) in StandardUpdater.update
- return !oldCell.name().equals(newCell.name())
- || !oldCell.value().equals(newCell.value())
- || oldCell.timestamp() != newCell.timestamp();
+ // to the oldCell.equals(newCell) in StandardUpdater.update
+ return !oldCell.value().equals(newCell.value())
+ || oldCell.livenessInfo().timestamp() != newCell.livenessInfo().timestamp();
}
private Set<String> filterByColumn(Set<String> idxNames)
@@ -712,14 +660,17 @@ public class SecondaryIndexManager
public static interface Updater
{
+ /** Called when a row with the provided clustering and row infos is inserted */
+ public void maybeIndex(Clustering clustering, long timestamp, int ttl, DeletionTime deletion);
+
/** called when constructing the index against pre-existing data */
- public void insert(Cell cell);
+ public void insert(Clustering clustering, Cell cell);
/** called when updating the index from a memtable */
- public void update(Cell oldCell, Cell cell);
+ public void update(Clustering clustering, Cell oldCell, Cell cell);
/** called when lazy-updating the index during compaction (CASSANDRA-2897) */
- public void remove(Cell current);
+ public void remove(Clustering clustering, Cell current);
/** called after memtable updates are complete (CASSANDRA-5397) */
public void updateRowLevelIndexes();
@@ -728,34 +679,38 @@ public class SecondaryIndexManager
private final class GCUpdater implements Updater
{
private final DecoratedKey key;
+ private final int nowInSec;
- public GCUpdater(DecoratedKey key)
+ public GCUpdater(DecoratedKey key, int nowInSec)
{
this.key = key;
+ this.nowInSec = nowInSec;
}
- public void insert(Cell cell)
+ public void maybeIndex(Clustering clustering, long timestamp, int ttl, DeletionTime deletion)
{
throw new UnsupportedOperationException();
}
- public void update(Cell oldCell, Cell newCell)
+ public void insert(Clustering clustering, Cell cell)
{
throw new UnsupportedOperationException();
}
- public void remove(Cell cell)
+ public void update(Clustering clustering, Cell oldCell, Cell newCell)
{
- if (!cell.isLive())
- return;
+ throw new UnsupportedOperationException();
+ }
- for (SecondaryIndex index : indexFor(cell.name()))
+ public void remove(Clustering clustering, Cell cell)
+ {
+ for (SecondaryIndex index : indexFor(cell.column()))
{
if (index instanceof PerColumnSecondaryIndex)
{
try (OpOrder.Group opGroup = baseCfs.keyspace.writeOrder.start())
{
- ((PerColumnSecondaryIndex) index).delete(key.getKey(), cell, opGroup);
+ ((PerColumnSecondaryIndex) index).delete(key.getKey(), clustering, cell, opGroup, nowInSec);
}
}
}
@@ -770,39 +725,50 @@ public class SecondaryIndexManager
private final class StandardUpdater implements Updater
{
- private final DecoratedKey key;
- private final ColumnFamily cf;
+ private final PartitionUpdate update;
private final OpOrder.Group opGroup;
+ private final int nowInSec;
- public StandardUpdater(DecoratedKey key, ColumnFamily cf, OpOrder.Group opGroup)
+ public StandardUpdater(PartitionUpdate update, OpOrder.Group opGroup, int nowInSec)
{
- this.key = key;
- this.cf = cf;
+ this.update = update;
this.opGroup = opGroup;
+ this.nowInSec = nowInSec;
+ }
+
+ public void maybeIndex(Clustering clustering, long timestamp, int ttl, DeletionTime deletion)
+ {
+ for (PerColumnSecondaryIndex index : perColumnIndexes())
+ {
+ if (timestamp != LivenessInfo.NO_TIMESTAMP)
+ index.maybeIndex(update.partitionKey().getKey(), clustering, timestamp, ttl, opGroup, nowInSec);
+ if (!deletion.isLive())
+ index.maybeDelete(update.partitionKey().getKey(), clustering, deletion, opGroup);
+ }
}
- public void insert(Cell cell)
+ public void insert(Clustering clustering, Cell cell)
{
- if (!cell.isLive())
+ if (!cell.isLive(nowInSec))
return;
- for (SecondaryIndex index : indexFor(cell.name()))
+ for (SecondaryIndex index : indexFor(cell.column()))
if (index instanceof PerColumnSecondaryIndex)
- ((PerColumnSecondaryIndex) index).insert(key.getKey(), cell, opGroup);
+ ((PerColumnSecondaryIndex) index).insert(update.partitionKey().getKey(), clustering, cell, opGroup);
}
- public void update(Cell oldCell, Cell cell)
+ public void update(Clustering clustering, Cell oldCell, Cell cell)
{
if (oldCell.equals(cell))
return;
- for (SecondaryIndex index : indexFor(cell.name()))
+ for (SecondaryIndex index : indexFor(cell.column()))
{
if (index instanceof PerColumnSecondaryIndex)
{
- if (cell.isLive())
+ if (cell.isLive(nowInSec))
{
- ((PerColumnSecondaryIndex) index).update(key.getKey(), oldCell, cell, opGroup);
+ ((PerColumnSecondaryIndex) index).update(update.partitionKey().getKey(), clustering, oldCell, cell, opGroup, nowInSec);
}
else
{
@@ -812,53 +778,22 @@ public class SecondaryIndexManager
// identical values and ttl) Then, we don't want to delete as the
// tombstone will hide the new value we just inserted; see CASSANDRA-7268
if (shouldCleanupOldValue(oldCell, cell))
- ((PerColumnSecondaryIndex) index).delete(key.getKey(), oldCell, opGroup);
+ ((PerColumnSecondaryIndex) index).delete(update.partitionKey().getKey(), clustering, oldCell, opGroup, nowInSec);
}
}
}
}
- public void remove(Cell cell)
+ public void remove(Clustering clustering, Cell cell)
{
- if (!cell.isLive())
- return;
-
- for (SecondaryIndex index : indexFor(cell.name()))
- if (index instanceof PerColumnSecondaryIndex)
- ((PerColumnSecondaryIndex) index).delete(key.getKey(), cell, opGroup);
+ throw new UnsupportedOperationException();
}
public void updateRowLevelIndexes()
{
for (SecondaryIndex index : rowLevelIndexMap.values())
- ((PerRowSecondaryIndex) index).index(key.getKey(), cf);
- }
-
- }
-
- public SecondaryIndexSearcher getHighestSelectivityIndexSearcher(List<IndexExpression> clause)
- {
- if (clause == null)
- return null;
-
- List<SecondaryIndexSearcher> indexSearchers = getIndexSearchersForQuery(clause);
-
- if (indexSearchers.isEmpty())
- return null;
-
- SecondaryIndexSearcher mostSelective = null;
- long bestEstimate = Long.MAX_VALUE;
- for (SecondaryIndexSearcher searcher : indexSearchers)
- {
- SecondaryIndex highestSelectivityIndex = searcher.highestSelectivityIndex(clause);
- long estimate = highestSelectivityIndex.estimateResultRows();
- if (estimate <= bestEstimate)
- {
- bestEstimate = estimate;
- mostSelective = searcher;
- }
+ ((PerRowSecondaryIndex) index).index(update.partitionKey().getKey(), update.unfilteredIterator());
}
- return mostSelective;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
index 5812e9d..4f63ae8 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java
@@ -20,75 +20,192 @@ package org.apache.cassandra.db.index;
import java.nio.ByteBuffer;
import java.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.ExtendedFilter;
-import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
public abstract class SecondaryIndexSearcher
{
+ private static final Logger logger = LoggerFactory.getLogger(SecondaryIndexSearcher.class);
+
protected final SecondaryIndexManager indexManager;
- protected final Set<ByteBuffer> columns;
+ protected final Set<ColumnDefinition> columns;
protected final ColumnFamilyStore baseCfs;
- public SecondaryIndexSearcher(SecondaryIndexManager indexManager, Set<ByteBuffer> columns)
+ public SecondaryIndexSearcher(SecondaryIndexManager indexManager, Set<ColumnDefinition> columns)
{
this.indexManager = indexManager;
this.columns = columns;
this.baseCfs = indexManager.baseCfs;
}
- public SecondaryIndex highestSelectivityIndex(List<IndexExpression> clause)
+ public SecondaryIndex highestSelectivityIndex(RowFilter filter)
{
- IndexExpression expr = highestSelectivityPredicate(clause, false);
- return expr == null ? null : indexManager.getIndexForColumn(expr.column);
+ RowFilter.Expression expr = highestSelectivityPredicate(filter, false);
+ return expr == null ? null : indexManager.getIndexForColumn(expr.column());
}
- public abstract List<Row> search(ExtendedFilter filter);
+ public RowFilter.Expression primaryClause(ReadCommand command)
+ {
+ return highestSelectivityPredicate(command.rowFilter(), false);
+ }
- /**
- * @return true this index is able to handle the given index expressions.
- */
- public boolean canHandleIndexClause(List<IndexExpression> clause)
+ @SuppressWarnings("resource") // Both the OpOrder and 'indexIter' are closed on exception, or through the closing of the result
+ // of this method.
+ public UnfilteredPartitionIterator search(ReadCommand command, ReadOrderGroup orderGroup)
{
- for (IndexExpression expression : clause)
- {
- if (!columns.contains(expression.column))
- continue;
+ RowFilter.Expression primary = highestSelectivityPredicate(command.rowFilter(), true);
+ assert primary != null;
+
+ AbstractSimplePerColumnSecondaryIndex index = (AbstractSimplePerColumnSecondaryIndex)indexManager.getIndexForColumn(primary.column());
+ assert index != null && index.getIndexCfs() != null;
+
+ if (logger.isDebugEnabled())
+ logger.debug("Most-selective indexed predicate is {}", primary);
- SecondaryIndex index = indexManager.getIndexForColumn(expression.column);
- if (index != null && index.getIndexCfs() != null && index.supportsOperator(expression.operator))
- return true;
+ DecoratedKey indexKey = index.getIndexKeyFor(primary.getIndexValue());
+
+ UnfilteredRowIterator indexIter = queryIndex(index, indexKey, command, orderGroup);
+ try
+ {
+ return queryDataFromIndex(index, indexKey, UnfilteredRowIterators.filter(indexIter, command.nowInSec()), command, orderGroup);
+ }
+ catch (RuntimeException | Error e)
+ {
+ indexIter.close();
+ throw e;
}
- return false;
}
-
- /**
- * Validates the specified {@link IndexExpression}. It will throw an {@link org.apache.cassandra.exceptions.InvalidRequestException}
- * if the provided clause is not valid for the index implementation.
- *
- * @param indexExpression An {@link IndexExpression} to be validated
- * @throws org.apache.cassandra.exceptions.InvalidRequestException in case of validation errors
- */
- public void validate(IndexExpression indexExpression) throws InvalidRequestException
+
+ private UnfilteredRowIterator queryIndex(AbstractSimplePerColumnSecondaryIndex index, DecoratedKey indexKey, ReadCommand command, ReadOrderGroup orderGroup)
+ {
+ ClusteringIndexFilter filter = makeIndexFilter(index, command);
+ CFMetaData indexMetadata = index.getIndexCfs().metadata;
+ return SinglePartitionReadCommand.create(indexMetadata, command.nowInSec(), indexKey, ColumnFilter.all(indexMetadata), filter)
+ .queryMemtableAndDisk(index.getIndexCfs(), orderGroup.indexReadOpOrderGroup());
+ }
+
+ private ClusteringIndexFilter makeIndexFilter(AbstractSimplePerColumnSecondaryIndex index, ReadCommand command)
{
+ if (command instanceof SinglePartitionReadCommand)
+ {
+ // Note: as yet there's no route to get here - a 2i query *always* uses a
+ // PartitionRangeReadCommand. This is here in preparation for coming changes
+ // in SelectStatement.
+ SinglePartitionReadCommand sprc = (SinglePartitionReadCommand)command;
+ ByteBuffer pk = sprc.partitionKey().getKey();
+ ClusteringIndexFilter filter = sprc.clusteringIndexFilter();
+
+ if (filter instanceof ClusteringIndexNamesFilter)
+ {
+ NavigableSet<Clustering> requested = ((ClusteringIndexNamesFilter)filter).requestedRows();
+ NavigableSet<Clustering> clusterings = new TreeSet<>(index.getIndexComparator());
+ for (Clustering c : requested)
+ clusterings.add(index.makeIndexClustering(pk, c, (Cell)null).takeAlias());
+ return new ClusteringIndexNamesFilter(clusterings, filter.isReversed());
+ }
+ else
+ {
+ Slices requested = ((ClusteringIndexSliceFilter)filter).requestedSlices();
+ Slices.Builder builder = new Slices.Builder(index.getIndexComparator());
+ for (Slice slice : requested)
+ builder.add(index.makeIndexBound(pk, slice.start()), index.makeIndexBound(pk, slice.end()));
+ return new ClusteringIndexSliceFilter(builder.build(), filter.isReversed());
+ }
+ }
+ else
+ {
+
+ DataRange dataRange = ((PartitionRangeReadCommand)command).dataRange();
+ AbstractBounds<PartitionPosition> range = dataRange.keyRange();
+
+ Slice slice = Slice.ALL;
+
+ /*
+ * XXX: If the range requested is a token range, we'll have to start at the beginning (and stop at the end) of
+ * the indexed row unfortunately (which will be inefficient), because we have no way to intuit the smallest possible
+ * key having a given token. A potential fix would be to actually store the token along the key in the indexed row.
+ */
+ if (range.left instanceof DecoratedKey)
+ {
+ // the right hand side of the range may not be a DecoratedKey (for instance if we're paging),
+ // but if it is, we can optimise slightly by restricting the slice
+ if (range.right instanceof DecoratedKey)
+ {
+
+ DecoratedKey startKey = (DecoratedKey) range.left;
+ DecoratedKey endKey = (DecoratedKey) range.right;
+
+ Slice.Bound start = Slice.Bound.BOTTOM;
+ Slice.Bound end = Slice.Bound.TOP;
+
+ /*
+ * For index queries over a range, we can't do a whole lot better than querying everything for the key range, though for
+ * slice queries where we can slightly restrict the beginning and end.
+ */
+ if (!dataRange.isNamesQuery())
+ {
+ ClusteringIndexSliceFilter startSliceFilter = ((ClusteringIndexSliceFilter) dataRange.clusteringIndexFilter(startKey));
+ ClusteringIndexSliceFilter endSliceFilter = ((ClusteringIndexSliceFilter) dataRange.clusteringIndexFilter(endKey));
+
+ // We can't effectively support reversed queries when we have a range, so we don't support it
+ // (or through post-query reordering) and shouldn't get there.
+ assert !startSliceFilter.isReversed() && !endSliceFilter.isReversed();
+
+ Slices startSlices = startSliceFilter.requestedSlices();
+ Slices endSlices = endSliceFilter.requestedSlices();
+
+ if (startSlices.size() > 0)
+ start = startSlices.get(0).start();
+
+ if (endSlices.size() > 0)
+ end = endSlices.get(endSlices.size() - 1).end();
+ }
+
+ slice = Slice.make(index.makeIndexBound(startKey.getKey(), start),
+ index.makeIndexBound(endKey.getKey(), end));
+ }
+ else
+ {
+ // otherwise, just start the index slice from the key we do have
+ slice = Slice.make(index.makeIndexBound(((DecoratedKey)range.left).getKey(), Slice.Bound.BOTTOM),
+ Slice.Bound.TOP);
+ }
+ }
+ return new ClusteringIndexSliceFilter(Slices.with(index.getIndexComparator(), slice), false);
+ }
}
- protected IndexExpression highestSelectivityPredicate(List<IndexExpression> clause, boolean includeInTrace)
+ protected abstract UnfilteredPartitionIterator queryDataFromIndex(AbstractSimplePerColumnSecondaryIndex index,
+ DecoratedKey indexKey,
+ RowIterator indexHits,
+ ReadCommand command,
+ ReadOrderGroup orderGroup);
+
+ protected RowFilter.Expression highestSelectivityPredicate(RowFilter filter, boolean includeInTrace)
{
- IndexExpression best = null;
+ RowFilter.Expression best = null;
int bestMeanCount = Integer.MAX_VALUE;
Map<SecondaryIndex, Integer> candidates = new HashMap<>();
- for (IndexExpression expression : clause)
+ for (RowFilter.Expression expression : filter)
{
// skip columns belonging to a different index type
- if (!columns.contains(expression.column))
+ if (!columns.contains(expression.column()))
continue;
- SecondaryIndex index = indexManager.getIndexForColumn(expression.column);
- if (index == null || index.getIndexCfs() == null || !index.supportsOperator(expression.operator))
+ SecondaryIndex index = indexManager.getIndexForColumn(expression.column());
+ if (index == null || index.getIndexCfs() == null || !index.supportsOperator(expression.operator()))
continue;
int columns = index.getIndexCfs().getMeanColumns();
@@ -106,34 +223,21 @@ public abstract class SecondaryIndexSearcher
Tracing.trace("No applicable indexes found");
else if (Tracing.isTracing())
// pay for an additional threadlocal get() rather than build the strings unnecessarily
- Tracing.trace("Candidate index mean cardinalities are {}. Scanning with {}.",
- FBUtilities.toString(candidates),
- indexManager.getIndexForColumn(best.column).getIndexName());
+ Tracing.trace("Candidate index mean cardinalities are {}. Scanning with {}.", FBUtilities.toString(candidates), indexManager.getIndexForColumn(best.column()).getIndexName());
}
return best;
}
/**
- * Returns {@code true} if the specified list of {@link IndexExpression}s require a full scan of all the nodes.
- *
- * @param clause A list of {@link IndexExpression}s
- * @return {@code true} if the {@code IndexExpression}s require a full scan, {@code false} otherwise
- */
- public boolean requiresScanningAllRanges(List<IndexExpression> clause)
- {
- return false;
- }
-
- /**
- * Combines index query results from multiple nodes. This is done by the coordinator node after it has reconciled
+ * Post-process the result of an index query. This is done by the coordinator node after it has reconciled
* the replica responses.
*
- * @param clause A list of {@link IndexExpression}s
- * @param rows The index query results to be combined
- * @return The combination of the index query results
+ * @param command The {@code ReadCommand} use for the query.
+ * @param result The index query results to be post-processed
+ * @return The post-processed results
*/
- public List<Row> postReconciliationProcessing(List<IndexExpression> clause, List<Row> rows)
+ public PartitionIterator postReconciliationProcessing(RowFilter filter, PartitionIterator result)
{
- return rows;
+ return result;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
index e88d456..9333bcf 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
@@ -26,36 +26,20 @@ import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.index.SecondaryIndexManager;
import org.apache.cassandra.db.index.SecondaryIndexSearcher;
-import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CollectionType;
import org.apache.cassandra.exceptions.ConfigurationException;
/**
- * Base class for secondary indexes where composites are involved.
+ * Base class for internal secondary indexes (this could be merged with AbstractSimplePerColumnSecondaryIndex).
*/
public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIndex
{
- private volatile CellNameType indexComparator;
-
- protected CellNameType getIndexComparator()
- {
- // Yes, this is racy, but doing this more than once is not a big deal, we just want to avoid doing it every time
- // More seriously, we should fix that whole SecondaryIndex API so this can be a final and avoid all that non-sense.
- if (indexComparator == null)
- {
- assert columnDef != null;
- indexComparator = getIndexComparator(baseCfs.metadata, columnDef);
- }
- return indexComparator;
- }
-
public static CompositesIndex create(ColumnDefinition cfDef)
{
if (cfDef.type.isCollection() && cfDef.type.isMultiCell())
@@ -90,68 +74,56 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
throw new AssertionError();
}
- // Check SecondaryIndex.getIndexComparator if you want to know why this is static
- public static CellNameType getIndexComparator(CFMetaData baseMetadata, ColumnDefinition cfDef)
+ public static void addIndexClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition cfDef)
{
if (cfDef.type.isCollection() && cfDef.type.isMultiCell())
{
- switch (((CollectionType)cfDef.type).kind)
+ CollectionType type = (CollectionType)cfDef.type;
+ if (type.kind == CollectionType.Kind.LIST
+ || (type.kind == CollectionType.Kind.MAP && cfDef.hasIndexOption(SecondaryIndex.INDEX_VALUES_OPTION_NAME)))
{
- case LIST:
- return CompositesIndexOnCollectionValue.buildIndexComparator(baseMetadata, cfDef);
- case SET:
- return CompositesIndexOnCollectionKey.buildIndexComparator(baseMetadata, cfDef);
- case MAP:
- if (cfDef.hasIndexOption(SecondaryIndex.INDEX_KEYS_OPTION_NAME))
- return CompositesIndexOnCollectionKey.buildIndexComparator(baseMetadata, cfDef);
- else if (cfDef.hasIndexOption(SecondaryIndex.INDEX_ENTRIES_OPTION_NAME))
- return CompositesIndexOnCollectionKeyAndValue.buildIndexComparator(baseMetadata, cfDef);
- else
- return CompositesIndexOnCollectionValue.buildIndexComparator(baseMetadata, cfDef);
+ CompositesIndexOnCollectionValue.addClusteringColumns(indexMetadata, baseMetadata, cfDef);
+ }
+ else
+ {
+ addGenericClusteringColumns(indexMetadata, baseMetadata, cfDef);
}
}
-
- switch (cfDef.kind)
+ else if (cfDef.isClusteringColumn())
{
- case CLUSTERING_COLUMN:
- return CompositesIndexOnClusteringKey.buildIndexComparator(baseMetadata, cfDef);
- case REGULAR:
- return CompositesIndexOnRegular.buildIndexComparator(baseMetadata, cfDef);
- case PARTITION_KEY:
- return CompositesIndexOnPartitionKey.buildIndexComparator(baseMetadata, cfDef);
- //case COMPACT_VALUE:
- // return CompositesIndexOnCompactValue.buildIndexComparator(baseMetadata, cfDef);
+ CompositesIndexOnClusteringKey.addClusteringColumns(indexMetadata, baseMetadata, cfDef);
+ }
+ else
+ {
+ addGenericClusteringColumns(indexMetadata, baseMetadata, cfDef);
}
- throw new AssertionError();
}
- protected CellName makeIndexColumnName(ByteBuffer rowKey, Cell cell)
+ protected static void addGenericClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition columnDef)
{
- return getIndexComparator().create(makeIndexColumnPrefix(rowKey, cell.name()), null);
+ indexMetadata.addClusteringColumn("partition_key", SecondaryIndex.keyComparator);
+ for (ColumnDefinition def : baseMetadata.clusteringColumns())
+ indexMetadata.addClusteringColumn(def.name, def.type);
}
- protected abstract Composite makeIndexColumnPrefix(ByteBuffer rowKey, Composite columnName);
+ public abstract IndexedEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry);
- public abstract IndexedEntry decodeEntry(DecoratedKey indexedValue, Cell indexEntry);
+ public abstract boolean isStale(Row row, ByteBuffer indexValue, int nowInSec);
- public abstract boolean isStale(IndexedEntry entry, ColumnFamily data, long now);
-
- public void delete(IndexedEntry entry, OpOrder.Group opGroup)
+ public void delete(IndexedEntry entry, OpOrder.Group opGroup, int nowInSec)
{
- int localDeletionTime = (int) (System.currentTimeMillis() / 1000);
- ColumnFamily cfi = ArrayBackedSortedColumns.factory.create(indexCfs.metadata);
- cfi.addTombstone(entry.indexEntry, localDeletionTime, entry.timestamp);
- indexCfs.apply(entry.indexValue, cfi, SecondaryIndexManager.nullUpdater, opGroup, null);
- if (logger.isDebugEnabled())
- logger.debug("removed index entry for cleaned-up value {}:{}", entry.indexValue, cfi);
- }
+ PartitionUpdate upd = new PartitionUpdate(indexCfs.metadata, entry.indexValue, PartitionColumns.NONE, 1);
+ Row.Writer writer = upd.writer();
+ Rows.writeClustering(entry.indexClustering, writer);
+ writer.writeRowDeletion(new SimpleDeletionTime(entry.timestamp, nowInSec));
+ writer.endOfRow();
+ indexCfs.apply(upd, SecondaryIndexManager.nullUpdater, opGroup, null);
- protected AbstractType<?> getExpressionComparator()
- {
- return baseCfs.metadata.getColumnDefinitionComparator(columnDef);
+ if (logger.isDebugEnabled())
+ logger.debug("removed index entry for cleaned-up value {}:{}", entry.indexValue, upd);
}
- public SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ByteBuffer> columns)
+ public SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ColumnDefinition> columns)
{
return new CompositesSearcher(baseCfs.indexManager, columns);
}
@@ -178,31 +150,19 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
public static class IndexedEntry
{
public final DecoratedKey indexValue;
- public final CellName indexEntry;
+ public final Clustering indexClustering;
public final long timestamp;
public final ByteBuffer indexedKey;
- public final Composite indexedEntryPrefix;
- public final ByteBuffer indexedEntryCollectionKey; // may be null
-
- public IndexedEntry(DecoratedKey indexValue, CellName indexEntry, long timestamp, ByteBuffer indexedKey, Composite indexedEntryPrefix)
- {
- this(indexValue, indexEntry, timestamp, indexedKey, indexedEntryPrefix, null);
- }
+ public final Clustering indexedEntryClustering;
- public IndexedEntry(DecoratedKey indexValue,
- CellName indexEntry,
- long timestamp,
- ByteBuffer indexedKey,
- Composite indexedEntryPrefix,
- ByteBuffer indexedEntryCollectionKey)
+ public IndexedEntry(DecoratedKey indexValue, Clustering indexClustering, long timestamp, ByteBuffer indexedKey, Clustering indexedEntryClustering)
{
this.indexValue = indexValue;
- this.indexEntry = indexEntry;
+ this.indexClustering = indexClustering.takeAlias();
this.timestamp = timestamp;
this.indexedKey = indexedKey;
- this.indexedEntryPrefix = indexedEntryPrefix;
- this.indexedEntryCollectionKey = indexedEntryCollectionKey;
+ this.indexedEntryClustering = indexedEntryClustering.takeAlias();
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/composites/CompositesIndexIncludingCollectionKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexIncludingCollectionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexIncludingCollectionKey.java
index 402ea05..7624c1f 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexIncludingCollectionKey.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexIncludingCollectionKey.java
@@ -18,19 +18,9 @@
package org.apache.cassandra.db.index.composites;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.CBuilder;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.composites.CompoundDenseCellNameType;
-import org.apache.cassandra.db.index.SecondaryIndex;
-import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.rows.*;
/**
* Common superclass for indexes that capture collection keys, including
@@ -49,41 +39,22 @@ import org.apache.cassandra.db.marshal.*;
*/
public abstract class CompositesIndexIncludingCollectionKey extends CompositesIndex
{
- public static CellNameType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef)
+ protected CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path)
{
- int count = 1 + baseMetadata.clusteringColumns().size(); // row key + clustering prefix
- List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(count);
- types.add(SecondaryIndex.keyComparator);
- for (int i = 0; i < count - 1; i++)
- types.add(baseMetadata.comparator.subtype(i));
- return new CompoundDenseCellNameType(types);
- }
-
- protected Composite makeIndexColumnPrefix(ByteBuffer rowKey, Composite cellName)
- {
- int count = 1 + baseCfs.metadata.clusteringColumns().size();
- CBuilder builder = getIndexComparator().builder();
+ CBuilder builder = CBuilder.create(getIndexComparator());
builder.add(rowKey);
- for (int i = 0; i < Math.min(cellName.size(), count - 1); i++)
- builder.add(cellName.get(i));
- return builder.build();
+ for (int i = 0; i < prefix.size(); i++)
+ builder.add(prefix.get(i));
+ return builder;
}
- public IndexedEntry decodeEntry(DecoratedKey indexedValue, Cell indexEntry)
+ public IndexedEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry)
{
int count = 1 + baseCfs.metadata.clusteringColumns().size();
- CBuilder builder = baseCfs.getComparator().builder();
+ Clustering clustering = indexEntry.clustering();
+ CBuilder builder = CBuilder.create(baseCfs.getComparator());
for (int i = 0; i < count - 1; i++)
- builder.add(indexEntry.name().get(i + 1));
- return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), indexEntry.name().get(0), builder.build());
- }
-
- @Override
- public boolean indexes(CellName name)
- {
- // We index if the CQL3 column name is the one of the collection we index
- AbstractType<?> comp = baseCfs.metadata.getColumnDefinitionComparator(columnDef);
- return name.size() > columnDef.position()
- && comp.compare(name.get(columnDef.position()), columnDef.name.bytes) == 0;
+ builder.add(clustering.get(i + 1));
+ return new IndexedEntry(indexedValue, clustering, indexEntry.primaryKeyLivenessInfo().timestamp(), clustering.get(0), builder.build());
}
}