You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2015/09/09 09:56:27 UTC
[1/3] cassandra git commit: Cache selected index in ReadCommand to
avoid multiple lookups
Repository: cassandra
Updated Branches:
refs/heads/cassandra-3.0 f961e84aa -> e097efc5f
refs/heads/trunk b81942267 -> c5408a3a1
Cache selected index in ReadCommand to avoid multiple lookups
Patch by Sam Tunnicliffe; reviewed by Jake Luciani for CASSANDRA-10215
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e097efc5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e097efc5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e097efc5
Branch: refs/heads/cassandra-3.0
Commit: e097efc5f6f76a0da8d15b307301dffff79e4a35
Parents: f961e84
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Wed Aug 26 16:29:58 2015 +0100
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Wed Sep 9 08:44:06 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cql3/statements/SelectStatement.java | 4 +-
.../db/AbstractReadCommandBuilder.java | 8 +-
.../cassandra/db/PartitionRangeReadCommand.java | 26 ++++--
.../org/apache/cassandra/db/ReadCommand.java | 96 ++++++++++++++++++--
.../org/apache/cassandra/db/ReadOrderGroup.java | 2 +-
.../db/SinglePartitionReadCommand.java | 14 ++-
.../cassandra/index/SecondaryIndexManager.java | 23 ++---
.../apache/cassandra/schema/IndexMetadata.java | 39 +++++++-
.../org/apache/cassandra/schema/Indexes.java | 51 ++++++++---
.../cassandra/schema/UnknownIndexException.java | 39 ++++++++
.../apache/cassandra/service/StorageProxy.java | 2 +-
.../service/pager/RangeSliceQueryPager.java | 16 ++--
.../cassandra/thrift/CassandraServer.java | 14 +--
14 files changed, 258 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a1c66a2..ab1b4ed 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.0-rc1
+ * Cache selected index in read command to reduce lookups (CASSANDRA-10215)
* Small optimizations of sstable index serialization (CASSANDRA-10232)
* Support for both encrypted and unencrypted native transport connections (CASSANDRA-9590)
Merged from 2.2:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 2aac6ab..7ad6c09 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -47,6 +47,7 @@ import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.db.view.MaterializedView;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.index.Index;
import org.apache.cassandra.index.SecondaryIndexManager;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.service.ClientState;
@@ -458,12 +459,13 @@ public class SelectStatement implements CQLStatement
return ReadQuery.EMPTY;
RowFilter rowFilter = getRowFilter(options);
+
// The LIMIT provided by the user is the number of CQL row he wants returned.
// We want to have getRangeSlice to count the number of columns, not the number of keys.
AbstractBounds<PartitionPosition> keyBounds = restrictions.getPartitionKeyBounds(options);
return keyBounds == null
? ReadQuery.EMPTY
- : new PartitionRangeReadCommand(cfm, nowInSec, queriedColumns, rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter));
+ : new PartitionRangeReadCommand(cfm, nowInSec, queriedColumns, rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter), Optional.empty());
}
private ClusteringIndexFilter makeClusteringIndexFilter(QueryOptions options)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
index 5e3b726..9bb89a6 100644
--- a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
+++ b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
@@ -23,9 +23,11 @@ import java.util.*;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.utils.FBUtilities;
@@ -327,7 +329,7 @@ public abstract class AbstractReadCommandBuilder
else
bounds = new ExcludingBounds<>(start, end);
- return new PartitionRangeReadCommand(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter()));
+ return new PartitionRangeReadCommand(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter()), Optional.empty());
}
static DecoratedKey makeKey(CFMetaData metadata, Object... partitionKey)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index da62557..965e9af 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import com.google.common.collect.Iterables;
@@ -39,6 +40,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.pager.*;
@@ -64,10 +66,12 @@ public class PartitionRangeReadCommand extends ReadCommand
ColumnFilter columnFilter,
RowFilter rowFilter,
DataLimits limits,
- DataRange dataRange)
+ DataRange dataRange,
+ Optional<IndexMetadata> index)
{
super(Kind.PARTITION_RANGE, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
this.dataRange = dataRange;
+ this.index = index;
}
public PartitionRangeReadCommand(CFMetaData metadata,
@@ -75,9 +79,10 @@ public class PartitionRangeReadCommand extends ReadCommand
ColumnFilter columnFilter,
RowFilter rowFilter,
DataLimits limits,
- DataRange dataRange)
+ DataRange dataRange,
+ Optional<IndexMetadata> index)
{
- this(false, 0, false, metadata, nowInSec, columnFilter, rowFilter, limits, dataRange);
+ this(false, 0, false, metadata, nowInSec, columnFilter, rowFilter, limits, dataRange, index);
}
/**
@@ -95,7 +100,8 @@ public class PartitionRangeReadCommand extends ReadCommand
ColumnFilter.all(metadata),
RowFilter.NONE,
DataLimits.NONE,
- DataRange.allData(metadata.partitioner));
+ DataRange.allData(metadata.partitioner),
+ Optional.empty());
}
public DataRange dataRange()
@@ -115,17 +121,17 @@ public class PartitionRangeReadCommand extends ReadCommand
public PartitionRangeReadCommand forSubRange(AbstractBounds<PartitionPosition> range)
{
- return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange().forSubRange(range));
+ return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange().forSubRange(range), index);
}
public PartitionRangeReadCommand copy()
{
- return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange());
+ return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange(), index);
}
public PartitionRangeReadCommand withUpdatedLimit(DataLimits newLimits)
{
- return new PartitionRangeReadCommand(metadata(), nowInSec(), columnFilter(), rowFilter(), newLimits, dataRange());
+ return new PartitionRangeReadCommand(metadata(), nowInSec(), columnFilter(), rowFilter(), newLimits, dataRange(), index);
}
public long getTimeout()
@@ -275,7 +281,7 @@ public class PartitionRangeReadCommand extends ReadCommand
public PartitionIterator postReconciliationProcessing(PartitionIterator result)
{
ColumnFamilyStore cfs = Keyspace.open(metadata().ksName).getColumnFamilyStore(metadata().cfName);
- Index index = getIndex(cfs, false);
+ Index index = getIndex(cfs);
return index == null ? result : index.postProcessorFor(this).apply(result, this);
}
@@ -303,11 +309,11 @@ public class PartitionRangeReadCommand extends ReadCommand
private static class Deserializer extends SelectionDeserializer
{
- public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits)
+ public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index)
throws IOException
{
DataRange range = DataRange.serializer.deserialize(in, version, metadata);
- return new PartitionRangeReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, range);
+ return new PartitionRangeReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, range, index);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 5a10716..e183963 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -38,6 +38,8 @@ import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.schema.UnknownIndexException;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -70,6 +72,16 @@ public abstract class ReadCommand implements ReadQuery
private final RowFilter rowFilter;
private final DataLimits limits;
+ // SecondaryIndexManager will attempt to provide the most selective of any available indexes
+ // during execution. Here we also store an the results of that lookup to repeating it over
+ // the lifetime of the command.
+ protected Optional<IndexMetadata> index = Optional.empty();
+
+ // Flag to indicate whether the index manager has been queried to select an index for this
+ // command. This is necessary as the result of that lookup may be null, in which case we
+ // still don't want to repeat it.
+ private boolean indexManagerQueried = false;
+
private boolean isDigestQuery;
// if a digest query, the version for which the digest is expected. Ignored if not a digest.
private int digestVersion;
@@ -77,7 +89,7 @@ public abstract class ReadCommand implements ReadQuery
protected static abstract class SelectionDeserializer
{
- public abstract ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits) throws IOException;
+ public abstract ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index) throws IOException;
}
protected enum Kind
@@ -287,9 +299,35 @@ public abstract class ReadCommand implements ReadQuery
: ReadResponse.createDataResponse(iterator, selection);
}
- protected Index getIndex(ColumnFamilyStore cfs, boolean includeInTrace)
+ public long indexSerializedSize(int version)
+ {
+ if (index.isPresent())
+ return IndexMetadata.serializer.serializedSize(index.get(), version);
+ else
+ return 0;
+ }
+
+ public Index getIndex(ColumnFamilyStore cfs)
{
- return cfs.indexManager.getBestIndexFor(this, includeInTrace);
+ // if we've already consulted the index manager, and it returned a valid index
+ // the result should be cached here.
+ if(index.isPresent())
+ return cfs.indexManager.getIndex(index.get());
+
+ // if no cached index is present, but we've already consulted the index manager
+ // then no registered index is suitable for this command, so just return null.
+ if (indexManagerQueried)
+ return null;
+
+ // do the lookup, set the flag to indicate so and cache the result if not null
+ Index selected = cfs.indexManager.getBestIndexFor(this);
+ indexManagerQueried = true;
+
+ if (selected == null)
+ return null;
+
+ index = Optional.of(selected.getIndexMetadata());
+ return selected;
}
/**
@@ -306,9 +344,12 @@ public abstract class ReadCommand implements ReadQuery
long startTimeNanos = System.nanoTime();
ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata());
- Index index = getIndex(cfs, true);
+ Index index = getIndex(cfs);
Index.Searcher searcher = index == null ? null : index.searcherFor(this);
+ if (index != null)
+ Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.ksName, cfs.metadata.cfName, index.getIndexName());
+
UnfilteredPartitionIterator resultIterator = searcher == null
? queryStorage(cfs, orderGroup)
: searcher.search(orderGroup);
@@ -505,13 +546,23 @@ public abstract class ReadCommand implements ReadQuery
return (flags & 0x02) != 0;
}
+ private static int indexFlag(boolean hasIndex)
+ {
+ return hasIndex ? 0x04 : 0;
+ }
+
+ private static boolean hasIndex(int flags)
+ {
+ return (flags & 0x04) != 0;
+ }
+
public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
{
// for serialization, createLegacyMessage() should cause legacyReadCommandSerializer to be used directly
assert version >= MessagingService.VERSION_30;
out.writeByte(command.kind.ordinal());
- out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()));
+ out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()) | indexFlag(command.index.isPresent()));
if (command.isDigestQuery())
out.writeVInt(command.digestVersion());
CFMetaData.serializer.serialize(command.metadata(), out, version);
@@ -519,6 +570,8 @@ public abstract class ReadCommand implements ReadQuery
ColumnFilter.serializer.serialize(command.columnFilter(), out, version);
RowFilter.serializer.serialize(command.rowFilter(), out, version);
DataLimits.serializer.serialize(command.limits(), out, version);
+ if (command.index.isPresent())
+ IndexMetadata.serializer.serialize(command.index.get(), out, version);
command.serializeSelection(out, version);
}
@@ -532,14 +585,36 @@ public abstract class ReadCommand implements ReadQuery
int flags = in.readByte();
boolean isDigest = isDigest(flags);
boolean isForThrift = isForThrift(flags);
+ boolean hasIndex = hasIndex(flags);
int digestVersion = isDigest ? (int)in.readVInt() : 0;
CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
int nowInSec = in.readInt();
ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata);
RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata);
DataLimits limits = DataLimits.serializer.deserialize(in, version);
+ Optional<IndexMetadata> index = hasIndex
+ ? deserializeIndexMetadata(in, version, metadata)
+ : Optional.empty();
+
+ return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index);
+ }
- return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
+ private Optional<IndexMetadata> deserializeIndexMetadata(DataInputPlus in, int version, CFMetaData cfm) throws IOException
+ {
+ try
+ {
+ return Optional.of(IndexMetadata.serializer.deserialize(in, version, cfm));
+ }
+ catch (UnknownIndexException e)
+ {
+ String message = String.format("Couldn't find a defined index on %s.%s with the id %s. " +
+ "If an index was just created, this is likely due to the schema not " +
+ "being fully propagated. Local read will proceed without using the " +
+ "index. Please wait for schema agreement after index creation.",
+ cfm.ksName, cfm.cfName, e.indexId.toString());
+ logger.info(message);
+ return Optional.empty();
+ }
}
public long serializedSize(ReadCommand command, int version)
@@ -554,7 +629,8 @@ public abstract class ReadCommand implements ReadQuery
+ ColumnFilter.serializer.serializedSize(command.columnFilter(), version)
+ RowFilter.serializer.serializedSize(command.rowFilter(), version)
+ DataLimits.serializer.serializedSize(command.limits(), version)
- + command.selectionSerializedSize(version);
+ + command.selectionSerializedSize(version)
+ + command.indexSerializedSize(version);
}
}
@@ -739,7 +815,7 @@ public abstract class ReadCommand implements ReadQuery
else
limits = DataLimits.cqlLimits(maxResults);
- return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter));
+ return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter), Optional.empty());
}
static void serializeRowFilter(DataOutputPlus out, RowFilter rowFilter) throws IOException
@@ -850,7 +926,7 @@ public abstract class ReadCommand implements ReadQuery
DataRange newRange = new DataRange(command.dataRange().keyRange(), sliceFilter);
return new PartitionRangeReadCommand(
command.isDigestQuery(), command.digestVersion(), command.isForThrift(), metadata, command.nowInSec(),
- command.columnFilter(), command.rowFilter(), command.limits(), newRange);
+ command.columnFilter(), command.rowFilter(), command.limits(), newRange, Optional.empty());
}
static ColumnFilter getColumnSelectionForSlice(ClusteringIndexSliceFilter filter, int compositesToGroup, CFMetaData metadata)
@@ -1000,7 +1076,7 @@ public abstract class ReadCommand implements ReadQuery
// missing without any problems, so we can safely always set "inclusive" to false in the data range
dataRange = dataRange.forPaging(keyRange, metadata.comparator, startBound.getAsClustering(metadata), false);
}
- return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, dataRange);
+ return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, dataRange, Optional.empty());
}
public long serializedSize(ReadCommand command, int version)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/db/ReadOrderGroup.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadOrderGroup.java b/src/java/org/apache/cassandra/db/ReadOrderGroup.java
index 44befa2..0720d79 100644
--- a/src/java/org/apache/cassandra/db/ReadOrderGroup.java
+++ b/src/java/org/apache/cassandra/db/ReadOrderGroup.java
@@ -98,7 +98,7 @@ public class ReadOrderGroup implements AutoCloseable
private static ColumnFamilyStore maybeGetIndexCfs(ColumnFamilyStore baseCfs, ReadCommand command)
{
- Index index = baseCfs.indexManager.getBestIndexFor(command);
+ Index index = command.getIndex(baseCfs);
return index == null ? null : index.getBackingTable().orElse(null);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 7b62f5a..c08ef6a 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -21,20 +21,26 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
-import org.apache.cassandra.cache.*;
+import org.apache.cassandra.cache.IRowCacheEntry;
+import org.apache.cassandra.cache.RowCacheKey;
+import org.apache.cassandra.cache.RowCacheSentinel;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.*;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.pager.*;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -507,7 +513,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
private static class Deserializer extends SelectionDeserializer
{
- public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits)
+ public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index)
throws IOException
{
DecoratedKey key = metadata.decorateKey(metadata.getKeyValidator().readValue(in));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index fabfebc..bd3202d 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -536,19 +536,15 @@ public class SecondaryIndexManager implements IndexRegistry
* index should be performed in the searcherFor method to ensure that we pick the right
* index regardless of the validity of the expression.
*
- * This method is called at various points during the lifecycle of a ReadCommand (to obtain a Searcher,
- * get the index's underlying CFS for ReadOrderGroup, or an estimate of the result size from an average index
- * query).
- *
- * Ideally, we would do this relatively expensive operation only once, and attach the index to the
- * ReadCommand for future reference. This requires the index be passed onto additional commands generated
- * to process subranges etc.
+ * This method is only called once during the lifecycle of a ReadCommand and the result is
+ * cached for future use when obtaining a Searcher, getting the index's underlying CFS for
+ * ReadOrderGroup, or an estimate of the result size from an average index query.
*
* @param command ReadCommand to be executed
* @return an Index instance, ready to use during execution of the command, or null if none
* of the registered indexes can support the command.
*/
- public Index getBestIndexFor(ReadCommand command, boolean includeInTrace)
+ public Index getBestIndexFor(ReadCommand command)
{
if (indexes.isEmpty() || command.rowFilter().isEmpty())
return null;
@@ -564,8 +560,7 @@ public class SecondaryIndexManager implements IndexRegistry
if (searchableIndexes.isEmpty())
{
logger.debug("No applicable indexes found");
- if (includeInTrace)
- Tracing.trace("No applicable indexes found");
+ Tracing.trace("No applicable indexes found");
return null;
}
@@ -575,7 +570,7 @@ public class SecondaryIndexManager implements IndexRegistry
.orElseThrow(() -> new AssertionError("Could not select most selective index"));
// pay for an additional threadlocal get() rather than build the strings unnecessarily
- if (includeInTrace && Tracing.isTracing())
+ if (Tracing.isTracing())
{
Tracing.trace("Index mean cardinalities are {}. Scanning with {}.",
searchableIndexes.stream().map(i -> i.getIndexName() + ':' + i.getEstimatedResultRows())
@@ -585,12 +580,6 @@ public class SecondaryIndexManager implements IndexRegistry
return selected;
}
- // convenience method which doesn't emit tracing messages
- public Index getBestIndexFor(ReadCommand command)
- {
- return getBestIndexFor(command, false);
- }
-
/**
* Called at write time to ensure that values present in the update
* are valid according to the rules of all registered indexes which
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/schema/IndexMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/IndexMetadata.java b/src/java/org/apache/cassandra/schema/IndexMetadata.java
index 40a75c6..6846a14 100644
--- a/src/java/org/apache/cassandra/schema/IndexMetadata.java
+++ b/src/java/org/apache/cassandra/schema/IndexMetadata.java
@@ -18,10 +18,9 @@
package org.apache.cassandra.schema;
+import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableMap;
@@ -37,7 +36,10 @@ import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.cql3.statements.IndexTarget;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.index.Index;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDSerializer;
/**
* An immutable representation of secondary index metadata.
@@ -46,6 +48,8 @@ public final class IndexMetadata
{
private static final Logger logger = LoggerFactory.getLogger(IndexMetadata.class);
+ public static final Serializer serializer = new Serializer();
+
public enum IndexType
{
KEYS, CUSTOM, COMPOSITES
@@ -56,6 +60,9 @@ public final class IndexMetadata
COLUMN, ROW
}
+ // UUID for serialization. This is a deterministic UUID generated from the index name
+ // Both the id and name are guaranteed unique per keyspace.
+ public final UUID id;
public final String name;
public final IndexType indexType;
public final TargetType targetType;
@@ -68,6 +75,7 @@ public final class IndexMetadata
TargetType targetType,
Set<ColumnIdentifier> columns)
{
+ this.id = UUID.nameUUIDFromBytes(name.getBytes());
this.name = name;
this.options = options == null ? ImmutableMap.of() : ImmutableMap.copyOf(options);
this.indexType = indexType;
@@ -194,7 +202,7 @@ public final class IndexMetadata
public int hashCode()
{
- return Objects.hashCode(name, indexType, targetType, options, columns);
+ return Objects.hashCode(id, name, indexType, targetType, options, columns);
}
public boolean equalsWithoutName(IndexMetadata other)
@@ -215,12 +223,13 @@ public final class IndexMetadata
IndexMetadata other = (IndexMetadata)obj;
- return Objects.equal(name, other.name) && equalsWithoutName(other);
+ return Objects.equal(id, other.id) && Objects.equal(name, other.name) && equalsWithoutName(other);
}
public String toString()
{
return new ToStringBuilder(this)
+ .append("id", id.toString())
.append("name", name)
.append("indexType", indexType)
.append("targetType", targetType)
@@ -228,4 +237,24 @@ public final class IndexMetadata
.append("options", options)
.build();
}
+
+ public static class Serializer
+ {
+ public void serialize(IndexMetadata metadata, DataOutputPlus out, int version) throws IOException
+ {
+ UUIDSerializer.serializer.serialize(metadata.id, out, version);
+ }
+
+ public IndexMetadata deserialize(DataInputPlus in, int version, CFMetaData cfm) throws IOException
+ {
+ UUID id = UUIDSerializer.serializer.deserialize(in, version);
+ return cfm.getIndexes().get(id).orElseThrow(() -> new UnknownIndexException(cfm, id));
+ }
+
+ public long serializedSize(IndexMetadata metadata, int version)
+ {
+ return UUIDSerializer.serializer.serializedSize(metadata.id, version);
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/schema/Indexes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Indexes.java b/src/java/org/apache/cassandra/schema/Indexes.java
index 6227e0b..9114f63 100644
--- a/src/java/org/apache/cassandra/schema/Indexes.java
+++ b/src/java/org/apache/cassandra/schema/Indexes.java
@@ -40,12 +40,14 @@ import static com.google.common.collect.Iterables.filter;
*/
public class Indexes implements Iterable<IndexMetadata>
{
- private final ImmutableMap<String, IndexMetadata> indexes;
+ private final ImmutableMap<String, IndexMetadata> indexesByName;
+ private final ImmutableMap<UUID, IndexMetadata> indexesById;
private final ImmutableMultimap<ColumnIdentifier, IndexMetadata> indexesByColumn;
private Indexes(Builder builder)
{
- indexes = builder.indexes.build();
+ indexesByName = builder.indexesByName.build();
+ indexesById = builder.indexesById.build();
indexesByColumn = builder.indexesByColumn.build();
}
@@ -61,17 +63,17 @@ public class Indexes implements Iterable<IndexMetadata>
public Iterator<IndexMetadata> iterator()
{
- return indexes.values().iterator();
+ return indexesByName.values().iterator();
}
public int size()
{
- return indexes.size();
+ return indexesByName.size();
}
public boolean isEmpty()
{
- return indexes.isEmpty();
+ return indexesByName.isEmpty();
}
/**
@@ -82,7 +84,7 @@ public class Indexes implements Iterable<IndexMetadata>
*/
public Optional<IndexMetadata> get(String name)
{
- return indexes.values().stream().filter(def -> def.name.equals(name)).findFirst();
+ return Optional.ofNullable(indexesByName.get(name));
}
/**
@@ -92,7 +94,30 @@ public class Indexes implements Iterable<IndexMetadata>
*/
public boolean has(String name)
{
- return get(name).isPresent();
+ return indexesByName.containsKey(name);
+ }
+
+ /**
+ * Get the index with the specified id
+ *
+ * @param name a UUID which identifies an index
+ * @return an empty {@link Optional} if no index with the specified id is found; a non-empty optional of
+ * {@link IndexMetadata} otherwise
+ */
+
+ public Optional<IndexMetadata> get(UUID id)
+ {
+ return Optional.ofNullable(indexesById.get(id));
+ }
+
+ /**
+ * Answer true if contains an index with the specified id.
+ * @param name a UUID which identifies an index.
+ * @return true if an index with the specified id is found; false otherwise
+ */
+ public boolean has(UUID id)
+ {
+ return indexesById.containsKey(id);
}
/**
@@ -148,19 +173,19 @@ public class Indexes implements Iterable<IndexMetadata>
@Override
public boolean equals(Object o)
{
- return this == o || (o instanceof Indexes && indexes.equals(((Indexes) o).indexes));
+ return this == o || (o instanceof Indexes && indexesByName.equals(((Indexes) o).indexesByName));
}
@Override
public int hashCode()
{
- return indexes.hashCode();
+ return indexesByName.hashCode();
}
@Override
public String toString()
{
- return indexes.values().toString();
+ return indexesByName.values().toString();
}
public static String getAvailableIndexName(String ksName, String cfName, ColumnIdentifier columnName)
@@ -179,7 +204,8 @@ public class Indexes implements Iterable<IndexMetadata>
public static final class Builder
{
- final ImmutableMap.Builder<String, IndexMetadata> indexes = new ImmutableMap.Builder<>();
+ final ImmutableMap.Builder<String, IndexMetadata> indexesByName = new ImmutableMap.Builder<>();
+ final ImmutableMap.Builder<UUID, IndexMetadata> indexesById = new ImmutableMap.Builder<>();
final ImmutableMultimap.Builder<ColumnIdentifier, IndexMetadata> indexesByColumn = new ImmutableMultimap.Builder<>();
private Builder()
@@ -193,7 +219,8 @@ public class Indexes implements Iterable<IndexMetadata>
public Builder add(IndexMetadata index)
{
- indexes.put(index.name, index);
+ indexesByName.put(index.name, index);
+ indexesById.put(index.id, index);
// All indexes are column indexes at the moment
if (index.isColumnIndex())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/schema/UnknownIndexException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/UnknownIndexException.java b/src/java/org/apache/cassandra/schema/UnknownIndexException.java
new file mode 100644
index 0000000..5daf631
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/UnknownIndexException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.schema;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.cassandra.config.CFMetaData;
+
+/**
+ * Exception thrown when we read an index id from a serialized ReadCommand and no corresponding IndexMetadata
+ * can be found in the CFMetaData#indexes collection. Note that this is an internal exception and is not meant
+ * to be user facing, the node reading the ReadCommand should proceed as if no index id were present.
+ */
+public class UnknownIndexException extends IOException
+{
+ public final UUID indexId;
+ public UnknownIndexException(CFMetaData metadata, UUID id)
+ {
+ super(String.format("Unknown index %s for table %s.%s", id.toString(), metadata.ksName, metadata.cfName));
+ indexId = id;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 59f1c1c..e3b884e 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1717,7 +1717,7 @@ public class StorageProxy implements StorageProxyMBean
private static float estimateResultsPerRange(PartitionRangeReadCommand command, Keyspace keyspace)
{
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().cfId);
- Index index = cfs.indexManager.getBestIndexFor(command);
+ Index index = command.getIndex(cfs);
float maxExpectedResults = index == null
? command.limits().estimateTotalResults(cfs)
: index.getEstimatedResultRows();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
index 2e57a8b..87eb018 100644
--- a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
@@ -17,15 +17,17 @@
*/
package org.apache.cassandra.service.pager;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.dht.*;
-import org.apache.cassandra.exceptions.RequestExecutionException;
+import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+
/**
* Pages a RangeSliceCommand whose predicate is a slice query.
*
@@ -89,7 +91,9 @@ public class RangeSliceQueryPager extends AbstractQueryPager
}
}
- return new PartitionRangeReadCommand(command.metadata(), command.nowInSec(), command.columnFilter(), command.rowFilter(), limits, pageRange);
+ // it won't hurt for the next page command to query the index manager
+ // again to check for an applicable index, so don't supply one here
+ return new PartitionRangeReadCommand(command.metadata(), command.nowInSec(), command.columnFilter(), command.rowFilter(), limits, pageRange, Optional.empty());
}
protected void recordLast(DecoratedKey key, Row last)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 038384e..9cd1653 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -30,12 +30,9 @@ import java.util.zip.Inflater;
import com.google.common.base.Joiner;
import com.google.common.collect.*;
import com.google.common.primitives.Longs;
-
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.filter.ColumnFilter;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.config.*;
import org.apache.cassandra.cql3.QueryOptions;
@@ -1520,7 +1517,8 @@ public class CassandraServer implements Cassandra.Iface
columns,
ThriftConversion.rowFilterFromThrift(metadata, range.row_filter),
limits,
- new DataRange(bounds, filter));
+ new DataRange(bounds, filter),
+ Optional.empty());
try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel))
{
assert results != null;
@@ -1613,7 +1611,8 @@ public class CassandraServer implements Cassandra.Iface
ColumnFilter.all(metadata),
RowFilter.NONE,
limits,
- new DataRange(bounds, filter).forPaging(bounds, metadata.comparator, pageFrom, true));
+ new DataRange(bounds, filter).forPaging(bounds, metadata.comparator, pageFrom, true),
+ Optional.empty());
try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel))
{
return thriftifyKeySlices(results, new ColumnParent(column_family), limits.perPartitionCount());
@@ -1704,7 +1703,8 @@ public class CassandraServer implements Cassandra.Iface
columns,
ThriftConversion.rowFilterFromThrift(metadata, index_clause.expressions),
limits,
- new DataRange(bounds, filter));
+ new DataRange(bounds, filter),
+ Optional.empty());
try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel))
{
return thriftifyKeySlices(results, column_parent, limits.perPartitionCount());
[3/3] cassandra git commit: Merge branch 'cassandra-3.0' into trunk
Posted by sa...@apache.org.
Merge branch 'cassandra-3.0' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c5408a3a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c5408a3a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c5408a3a
Branch: refs/heads/trunk
Commit: c5408a3a163af593cb826bea38a46f3c86da8b1c
Parents: b819422 e097efc
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Wed Sep 9 08:52:11 2015 +0100
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Wed Sep 9 08:52:11 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cql3/statements/SelectStatement.java | 4 +-
.../db/AbstractReadCommandBuilder.java | 8 +-
.../cassandra/db/PartitionRangeReadCommand.java | 26 ++++--
.../org/apache/cassandra/db/ReadCommand.java | 96 ++++++++++++++++++--
.../org/apache/cassandra/db/ReadOrderGroup.java | 2 +-
.../db/SinglePartitionReadCommand.java | 14 ++-
.../cassandra/index/SecondaryIndexManager.java | 23 ++---
.../apache/cassandra/schema/IndexMetadata.java | 39 +++++++-
.../org/apache/cassandra/schema/Indexes.java | 51 ++++++++---
.../cassandra/schema/UnknownIndexException.java | 39 ++++++++
.../apache/cassandra/service/StorageProxy.java | 2 +-
.../service/pager/RangeSliceQueryPager.java | 16 ++--
.../cassandra/thrift/CassandraServer.java | 14 +--
14 files changed, 258 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5408a3a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 7cf3c47,ab1b4ed..3109833
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,8 -1,5 +1,9 @@@
+3.2
+ * Add transparent data encryption core classes (CASSANDRA-9945)
+
+
3.0.0-rc1
+ * Cache selected index in read command to reduce lookups (CASSANDRA-10215)
* Small optimizations of sstable index serialization (CASSANDRA-10232)
* Support for both encrypted and unencrypted native transport connections (CASSANDRA-9590)
Merged from 2.2:
[2/3] cassandra git commit: Cache selected index in ReadCommand to
avoid multiple lookups
Posted by sa...@apache.org.
Cache selected index in ReadCommand to avoid multiple lookups
Patch by Sam Tunnicliffe; reviewed by Jake Luciani for CASSANDRA-10215
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e097efc5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e097efc5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e097efc5
Branch: refs/heads/trunk
Commit: e097efc5f6f76a0da8d15b307301dffff79e4a35
Parents: f961e84
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Wed Aug 26 16:29:58 2015 +0100
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Wed Sep 9 08:44:06 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cql3/statements/SelectStatement.java | 4 +-
.../db/AbstractReadCommandBuilder.java | 8 +-
.../cassandra/db/PartitionRangeReadCommand.java | 26 ++++--
.../org/apache/cassandra/db/ReadCommand.java | 96 ++++++++++++++++++--
.../org/apache/cassandra/db/ReadOrderGroup.java | 2 +-
.../db/SinglePartitionReadCommand.java | 14 ++-
.../cassandra/index/SecondaryIndexManager.java | 23 ++---
.../apache/cassandra/schema/IndexMetadata.java | 39 +++++++-
.../org/apache/cassandra/schema/Indexes.java | 51 ++++++++---
.../cassandra/schema/UnknownIndexException.java | 39 ++++++++
.../apache/cassandra/service/StorageProxy.java | 2 +-
.../service/pager/RangeSliceQueryPager.java | 16 ++--
.../cassandra/thrift/CassandraServer.java | 14 +--
14 files changed, 258 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a1c66a2..ab1b4ed 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.0-rc1
+ * Cache selected index in read command to reduce lookups (CASSANDRA-10215)
* Small optimizations of sstable index serialization (CASSANDRA-10232)
* Support for both encrypted and unencrypted native transport connections (CASSANDRA-9590)
Merged from 2.2:
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 2aac6ab..7ad6c09 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -47,6 +47,7 @@ import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.db.view.MaterializedView;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.index.Index;
import org.apache.cassandra.index.SecondaryIndexManager;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.service.ClientState;
@@ -458,12 +459,13 @@ public class SelectStatement implements CQLStatement
return ReadQuery.EMPTY;
RowFilter rowFilter = getRowFilter(options);
+
// The LIMIT provided by the user is the number of CQL row he wants returned.
// We want to have getRangeSlice to count the number of columns, not the number of keys.
AbstractBounds<PartitionPosition> keyBounds = restrictions.getPartitionKeyBounds(options);
return keyBounds == null
? ReadQuery.EMPTY
- : new PartitionRangeReadCommand(cfm, nowInSec, queriedColumns, rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter));
+ : new PartitionRangeReadCommand(cfm, nowInSec, queriedColumns, rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter), Optional.empty());
}
private ClusteringIndexFilter makeClusteringIndexFilter(QueryOptions options)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
index 5e3b726..9bb89a6 100644
--- a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
+++ b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
@@ -23,9 +23,11 @@ import java.util.*;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.utils.FBUtilities;
@@ -327,7 +329,7 @@ public abstract class AbstractReadCommandBuilder
else
bounds = new ExcludingBounds<>(start, end);
- return new PartitionRangeReadCommand(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter()));
+ return new PartitionRangeReadCommand(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter()), Optional.empty());
}
static DecoratedKey makeKey(CFMetaData metadata, Object... partitionKey)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index da62557..965e9af 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import com.google.common.collect.Iterables;
@@ -39,6 +40,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.pager.*;
@@ -64,10 +66,12 @@ public class PartitionRangeReadCommand extends ReadCommand
ColumnFilter columnFilter,
RowFilter rowFilter,
DataLimits limits,
- DataRange dataRange)
+ DataRange dataRange,
+ Optional<IndexMetadata> index)
{
super(Kind.PARTITION_RANGE, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
this.dataRange = dataRange;
+ this.index = index;
}
public PartitionRangeReadCommand(CFMetaData metadata,
@@ -75,9 +79,10 @@ public class PartitionRangeReadCommand extends ReadCommand
ColumnFilter columnFilter,
RowFilter rowFilter,
DataLimits limits,
- DataRange dataRange)
+ DataRange dataRange,
+ Optional<IndexMetadata> index)
{
- this(false, 0, false, metadata, nowInSec, columnFilter, rowFilter, limits, dataRange);
+ this(false, 0, false, metadata, nowInSec, columnFilter, rowFilter, limits, dataRange, index);
}
/**
@@ -95,7 +100,8 @@ public class PartitionRangeReadCommand extends ReadCommand
ColumnFilter.all(metadata),
RowFilter.NONE,
DataLimits.NONE,
- DataRange.allData(metadata.partitioner));
+ DataRange.allData(metadata.partitioner),
+ Optional.empty());
}
public DataRange dataRange()
@@ -115,17 +121,17 @@ public class PartitionRangeReadCommand extends ReadCommand
public PartitionRangeReadCommand forSubRange(AbstractBounds<PartitionPosition> range)
{
- return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange().forSubRange(range));
+ return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange().forSubRange(range), index);
}
public PartitionRangeReadCommand copy()
{
- return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange());
+ return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange(), index);
}
public PartitionRangeReadCommand withUpdatedLimit(DataLimits newLimits)
{
- return new PartitionRangeReadCommand(metadata(), nowInSec(), columnFilter(), rowFilter(), newLimits, dataRange());
+ return new PartitionRangeReadCommand(metadata(), nowInSec(), columnFilter(), rowFilter(), newLimits, dataRange(), index);
}
public long getTimeout()
@@ -275,7 +281,7 @@ public class PartitionRangeReadCommand extends ReadCommand
public PartitionIterator postReconciliationProcessing(PartitionIterator result)
{
ColumnFamilyStore cfs = Keyspace.open(metadata().ksName).getColumnFamilyStore(metadata().cfName);
- Index index = getIndex(cfs, false);
+ Index index = getIndex(cfs);
return index == null ? result : index.postProcessorFor(this).apply(result, this);
}
@@ -303,11 +309,11 @@ public class PartitionRangeReadCommand extends ReadCommand
private static class Deserializer extends SelectionDeserializer
{
- public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits)
+ public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index)
throws IOException
{
DataRange range = DataRange.serializer.deserialize(in, version, metadata);
- return new PartitionRangeReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, range);
+ return new PartitionRangeReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, range, index);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 5a10716..e183963 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -38,6 +38,8 @@ import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.schema.UnknownIndexException;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -70,6 +72,16 @@ public abstract class ReadCommand implements ReadQuery
private final RowFilter rowFilter;
private final DataLimits limits;
+ // SecondaryIndexManager will attempt to provide the most selective of any available indexes
+ // during execution. Here we also store an the results of that lookup to repeating it over
+ // the lifetime of the command.
+ protected Optional<IndexMetadata> index = Optional.empty();
+
+ // Flag to indicate whether the index manager has been queried to select an index for this
+ // command. This is necessary as the result of that lookup may be null, in which case we
+ // still don't want to repeat it.
+ private boolean indexManagerQueried = false;
+
private boolean isDigestQuery;
// if a digest query, the version for which the digest is expected. Ignored if not a digest.
private int digestVersion;
@@ -77,7 +89,7 @@ public abstract class ReadCommand implements ReadQuery
protected static abstract class SelectionDeserializer
{
- public abstract ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits) throws IOException;
+ public abstract ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index) throws IOException;
}
protected enum Kind
@@ -287,9 +299,35 @@ public abstract class ReadCommand implements ReadQuery
: ReadResponse.createDataResponse(iterator, selection);
}
- protected Index getIndex(ColumnFamilyStore cfs, boolean includeInTrace)
+ public long indexSerializedSize(int version)
+ {
+ if (index.isPresent())
+ return IndexMetadata.serializer.serializedSize(index.get(), version);
+ else
+ return 0;
+ }
+
+ public Index getIndex(ColumnFamilyStore cfs)
{
- return cfs.indexManager.getBestIndexFor(this, includeInTrace);
+ // if we've already consulted the index manager, and it returned a valid index
+ // the result should be cached here.
+ if(index.isPresent())
+ return cfs.indexManager.getIndex(index.get());
+
+ // if no cached index is present, but we've already consulted the index manager
+ // then no registered index is suitable for this command, so just return null.
+ if (indexManagerQueried)
+ return null;
+
+ // do the lookup, set the flag to indicate so and cache the result if not null
+ Index selected = cfs.indexManager.getBestIndexFor(this);
+ indexManagerQueried = true;
+
+ if (selected == null)
+ return null;
+
+ index = Optional.of(selected.getIndexMetadata());
+ return selected;
}
/**
@@ -306,9 +344,12 @@ public abstract class ReadCommand implements ReadQuery
long startTimeNanos = System.nanoTime();
ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata());
- Index index = getIndex(cfs, true);
+ Index index = getIndex(cfs);
Index.Searcher searcher = index == null ? null : index.searcherFor(this);
+ if (index != null)
+ Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.ksName, cfs.metadata.cfName, index.getIndexName());
+
UnfilteredPartitionIterator resultIterator = searcher == null
? queryStorage(cfs, orderGroup)
: searcher.search(orderGroup);
@@ -505,13 +546,23 @@ public abstract class ReadCommand implements ReadQuery
return (flags & 0x02) != 0;
}
+ private static int indexFlag(boolean hasIndex)
+ {
+ return hasIndex ? 0x04 : 0;
+ }
+
+ private static boolean hasIndex(int flags)
+ {
+ return (flags & 0x04) != 0;
+ }
+
public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
{
// for serialization, createLegacyMessage() should cause legacyReadCommandSerializer to be used directly
assert version >= MessagingService.VERSION_30;
out.writeByte(command.kind.ordinal());
- out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()));
+ out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()) | indexFlag(command.index.isPresent()));
if (command.isDigestQuery())
out.writeVInt(command.digestVersion());
CFMetaData.serializer.serialize(command.metadata(), out, version);
@@ -519,6 +570,8 @@ public abstract class ReadCommand implements ReadQuery
ColumnFilter.serializer.serialize(command.columnFilter(), out, version);
RowFilter.serializer.serialize(command.rowFilter(), out, version);
DataLimits.serializer.serialize(command.limits(), out, version);
+ if (command.index.isPresent())
+ IndexMetadata.serializer.serialize(command.index.get(), out, version);
command.serializeSelection(out, version);
}
@@ -532,14 +585,36 @@ public abstract class ReadCommand implements ReadQuery
int flags = in.readByte();
boolean isDigest = isDigest(flags);
boolean isForThrift = isForThrift(flags);
+ boolean hasIndex = hasIndex(flags);
int digestVersion = isDigest ? (int)in.readVInt() : 0;
CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
int nowInSec = in.readInt();
ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata);
RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata);
DataLimits limits = DataLimits.serializer.deserialize(in, version);
+ Optional<IndexMetadata> index = hasIndex
+ ? deserializeIndexMetadata(in, version, metadata)
+ : Optional.empty();
+
+ return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index);
+ }
- return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
+ private Optional<IndexMetadata> deserializeIndexMetadata(DataInputPlus in, int version, CFMetaData cfm) throws IOException
+ {
+ try
+ {
+ return Optional.of(IndexMetadata.serializer.deserialize(in, version, cfm));
+ }
+ catch (UnknownIndexException e)
+ {
+ String message = String.format("Couldn't find a defined index on %s.%s with the id %s. " +
+ "If an index was just created, this is likely due to the schema not " +
+ "being fully propagated. Local read will proceed without using the " +
+ "index. Please wait for schema agreement after index creation.",
+ cfm.ksName, cfm.cfName, e.indexId.toString());
+ logger.info(message);
+ return Optional.empty();
+ }
}
public long serializedSize(ReadCommand command, int version)
@@ -554,7 +629,8 @@ public abstract class ReadCommand implements ReadQuery
+ ColumnFilter.serializer.serializedSize(command.columnFilter(), version)
+ RowFilter.serializer.serializedSize(command.rowFilter(), version)
+ DataLimits.serializer.serializedSize(command.limits(), version)
- + command.selectionSerializedSize(version);
+ + command.selectionSerializedSize(version)
+ + command.indexSerializedSize(version);
}
}
@@ -739,7 +815,7 @@ public abstract class ReadCommand implements ReadQuery
else
limits = DataLimits.cqlLimits(maxResults);
- return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter));
+ return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter), Optional.empty());
}
static void serializeRowFilter(DataOutputPlus out, RowFilter rowFilter) throws IOException
@@ -850,7 +926,7 @@ public abstract class ReadCommand implements ReadQuery
DataRange newRange = new DataRange(command.dataRange().keyRange(), sliceFilter);
return new PartitionRangeReadCommand(
command.isDigestQuery(), command.digestVersion(), command.isForThrift(), metadata, command.nowInSec(),
- command.columnFilter(), command.rowFilter(), command.limits(), newRange);
+ command.columnFilter(), command.rowFilter(), command.limits(), newRange, Optional.empty());
}
static ColumnFilter getColumnSelectionForSlice(ClusteringIndexSliceFilter filter, int compositesToGroup, CFMetaData metadata)
@@ -1000,7 +1076,7 @@ public abstract class ReadCommand implements ReadQuery
// missing without any problems, so we can safely always set "inclusive" to false in the data range
dataRange = dataRange.forPaging(keyRange, metadata.comparator, startBound.getAsClustering(metadata), false);
}
- return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, dataRange);
+ return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, dataRange, Optional.empty());
}
public long serializedSize(ReadCommand command, int version)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/db/ReadOrderGroup.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadOrderGroup.java b/src/java/org/apache/cassandra/db/ReadOrderGroup.java
index 44befa2..0720d79 100644
--- a/src/java/org/apache/cassandra/db/ReadOrderGroup.java
+++ b/src/java/org/apache/cassandra/db/ReadOrderGroup.java
@@ -98,7 +98,7 @@ public class ReadOrderGroup implements AutoCloseable
private static ColumnFamilyStore maybeGetIndexCfs(ColumnFamilyStore baseCfs, ReadCommand command)
{
- Index index = baseCfs.indexManager.getBestIndexFor(command);
+ Index index = command.getIndex(baseCfs);
return index == null ? null : index.getBackingTable().orElse(null);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 7b62f5a..c08ef6a 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -21,20 +21,26 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
-import org.apache.cassandra.cache.*;
+import org.apache.cassandra.cache.IRowCacheEntry;
+import org.apache.cassandra.cache.RowCacheKey;
+import org.apache.cassandra.cache.RowCacheSentinel;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.*;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.pager.*;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -507,7 +513,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
private static class Deserializer extends SelectionDeserializer
{
- public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits)
+ public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index)
throws IOException
{
DecoratedKey key = metadata.decorateKey(metadata.getKeyValidator().readValue(in));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index fabfebc..bd3202d 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -536,19 +536,15 @@ public class SecondaryIndexManager implements IndexRegistry
* index should be performed in the searcherFor method to ensure that we pick the right
* index regardless of the validity of the expression.
*
- * This method is called at various points during the lifecycle of a ReadCommand (to obtain a Searcher,
- * get the index's underlying CFS for ReadOrderGroup, or an estimate of the result size from an average index
- * query).
- *
- * Ideally, we would do this relatively expensive operation only once, and attach the index to the
- * ReadCommand for future reference. This requires the index be passed onto additional commands generated
- * to process subranges etc.
+ * This method is only called once during the lifecycle of a ReadCommand and the result is
+ * cached for future use when obtaining a Searcher, getting the index's underlying CFS for
+ * ReadOrderGroup, or an estimate of the result size from an average index query.
*
* @param command ReadCommand to be executed
* @return an Index instance, ready to use during execution of the command, or null if none
* of the registered indexes can support the command.
*/
- public Index getBestIndexFor(ReadCommand command, boolean includeInTrace)
+ public Index getBestIndexFor(ReadCommand command)
{
if (indexes.isEmpty() || command.rowFilter().isEmpty())
return null;
@@ -564,8 +560,7 @@ public class SecondaryIndexManager implements IndexRegistry
if (searchableIndexes.isEmpty())
{
logger.debug("No applicable indexes found");
- if (includeInTrace)
- Tracing.trace("No applicable indexes found");
+ Tracing.trace("No applicable indexes found");
return null;
}
@@ -575,7 +570,7 @@ public class SecondaryIndexManager implements IndexRegistry
.orElseThrow(() -> new AssertionError("Could not select most selective index"));
// pay for an additional threadlocal get() rather than build the strings unnecessarily
- if (includeInTrace && Tracing.isTracing())
+ if (Tracing.isTracing())
{
Tracing.trace("Index mean cardinalities are {}. Scanning with {}.",
searchableIndexes.stream().map(i -> i.getIndexName() + ':' + i.getEstimatedResultRows())
@@ -585,12 +580,6 @@ public class SecondaryIndexManager implements IndexRegistry
return selected;
}
- // convenience method which doesn't emit tracing messages
- public Index getBestIndexFor(ReadCommand command)
- {
- return getBestIndexFor(command, false);
- }
-
/**
* Called at write time to ensure that values present in the update
* are valid according to the rules of all registered indexes which
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/schema/IndexMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/IndexMetadata.java b/src/java/org/apache/cassandra/schema/IndexMetadata.java
index 40a75c6..6846a14 100644
--- a/src/java/org/apache/cassandra/schema/IndexMetadata.java
+++ b/src/java/org/apache/cassandra/schema/IndexMetadata.java
@@ -18,10 +18,9 @@
package org.apache.cassandra.schema;
+import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableMap;
@@ -37,7 +36,10 @@ import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.cql3.statements.IndexTarget;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.index.Index;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDSerializer;
/**
* An immutable representation of secondary index metadata.
@@ -46,6 +48,8 @@ public final class IndexMetadata
{
private static final Logger logger = LoggerFactory.getLogger(IndexMetadata.class);
+ public static final Serializer serializer = new Serializer();
+
public enum IndexType
{
KEYS, CUSTOM, COMPOSITES
@@ -56,6 +60,9 @@ public final class IndexMetadata
COLUMN, ROW
}
+ // UUID for serialization. This is a deterministic UUID generated from the index name
+ // Both the id and name are guaranteed unique per keyspace.
+ public final UUID id;
public final String name;
public final IndexType indexType;
public final TargetType targetType;
@@ -68,6 +75,7 @@ public final class IndexMetadata
TargetType targetType,
Set<ColumnIdentifier> columns)
{
+ this.id = UUID.nameUUIDFromBytes(name.getBytes());
this.name = name;
this.options = options == null ? ImmutableMap.of() : ImmutableMap.copyOf(options);
this.indexType = indexType;
@@ -194,7 +202,7 @@ public final class IndexMetadata
public int hashCode()
{
- return Objects.hashCode(name, indexType, targetType, options, columns);
+ return Objects.hashCode(id, name, indexType, targetType, options, columns);
}
public boolean equalsWithoutName(IndexMetadata other)
@@ -215,12 +223,13 @@ public final class IndexMetadata
IndexMetadata other = (IndexMetadata)obj;
- return Objects.equal(name, other.name) && equalsWithoutName(other);
+ return Objects.equal(id, other.id) && Objects.equal(name, other.name) && equalsWithoutName(other);
}
public String toString()
{
return new ToStringBuilder(this)
+ .append("id", id.toString())
.append("name", name)
.append("indexType", indexType)
.append("targetType", targetType)
@@ -228,4 +237,24 @@ public final class IndexMetadata
.append("options", options)
.build();
}
+
+ public static class Serializer
+ {
+ public void serialize(IndexMetadata metadata, DataOutputPlus out, int version) throws IOException
+ {
+ UUIDSerializer.serializer.serialize(metadata.id, out, version);
+ }
+
+ public IndexMetadata deserialize(DataInputPlus in, int version, CFMetaData cfm) throws IOException
+ {
+ UUID id = UUIDSerializer.serializer.deserialize(in, version);
+ return cfm.getIndexes().get(id).orElseThrow(() -> new UnknownIndexException(cfm, id));
+ }
+
+ public long serializedSize(IndexMetadata metadata, int version)
+ {
+ return UUIDSerializer.serializer.serializedSize(metadata.id, version);
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/schema/Indexes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Indexes.java b/src/java/org/apache/cassandra/schema/Indexes.java
index 6227e0b..9114f63 100644
--- a/src/java/org/apache/cassandra/schema/Indexes.java
+++ b/src/java/org/apache/cassandra/schema/Indexes.java
@@ -40,12 +40,14 @@ import static com.google.common.collect.Iterables.filter;
*/
public class Indexes implements Iterable<IndexMetadata>
{
- private final ImmutableMap<String, IndexMetadata> indexes;
+ private final ImmutableMap<String, IndexMetadata> indexesByName;
+ private final ImmutableMap<UUID, IndexMetadata> indexesById;
private final ImmutableMultimap<ColumnIdentifier, IndexMetadata> indexesByColumn;
private Indexes(Builder builder)
{
- indexes = builder.indexes.build();
+ indexesByName = builder.indexesByName.build();
+ indexesById = builder.indexesById.build();
indexesByColumn = builder.indexesByColumn.build();
}
@@ -61,17 +63,17 @@ public class Indexes implements Iterable<IndexMetadata>
public Iterator<IndexMetadata> iterator()
{
- return indexes.values().iterator();
+ return indexesByName.values().iterator();
}
public int size()
{
- return indexes.size();
+ return indexesByName.size();
}
public boolean isEmpty()
{
- return indexes.isEmpty();
+ return indexesByName.isEmpty();
}
/**
@@ -82,7 +84,7 @@ public class Indexes implements Iterable<IndexMetadata>
*/
public Optional<IndexMetadata> get(String name)
{
- return indexes.values().stream().filter(def -> def.name.equals(name)).findFirst();
+ return Optional.ofNullable(indexesByName.get(name));
}
/**
@@ -92,7 +94,30 @@ public class Indexes implements Iterable<IndexMetadata>
*/
public boolean has(String name)
{
- return get(name).isPresent();
+ return indexesByName.containsKey(name);
+ }
+
+ /**
+ * Get the index with the specified id
+ *
+ * @param name a UUID which identifies an index
+ * @return an empty {@link Optional} if no index with the specified id is found; a non-empty optional of
+ * {@link IndexMetadata} otherwise
+ */
+
+ public Optional<IndexMetadata> get(UUID id)
+ {
+ return Optional.ofNullable(indexesById.get(id));
+ }
+
+ /**
+ * Answer true if contains an index with the specified id.
+ * @param name a UUID which identifies an index.
+ * @return true if an index with the specified id is found; false otherwise
+ */
+ public boolean has(UUID id)
+ {
+ return indexesById.containsKey(id);
}
/**
@@ -148,19 +173,19 @@ public class Indexes implements Iterable<IndexMetadata>
@Override
public boolean equals(Object o)
{
- return this == o || (o instanceof Indexes && indexes.equals(((Indexes) o).indexes));
+ return this == o || (o instanceof Indexes && indexesByName.equals(((Indexes) o).indexesByName));
}
@Override
public int hashCode()
{
- return indexes.hashCode();
+ return indexesByName.hashCode();
}
@Override
public String toString()
{
- return indexes.values().toString();
+ return indexesByName.values().toString();
}
public static String getAvailableIndexName(String ksName, String cfName, ColumnIdentifier columnName)
@@ -179,7 +204,8 @@ public class Indexes implements Iterable<IndexMetadata>
public static final class Builder
{
- final ImmutableMap.Builder<String, IndexMetadata> indexes = new ImmutableMap.Builder<>();
+ final ImmutableMap.Builder<String, IndexMetadata> indexesByName = new ImmutableMap.Builder<>();
+ final ImmutableMap.Builder<UUID, IndexMetadata> indexesById = new ImmutableMap.Builder<>();
final ImmutableMultimap.Builder<ColumnIdentifier, IndexMetadata> indexesByColumn = new ImmutableMultimap.Builder<>();
private Builder()
@@ -193,7 +219,8 @@ public class Indexes implements Iterable<IndexMetadata>
public Builder add(IndexMetadata index)
{
- indexes.put(index.name, index);
+ indexesByName.put(index.name, index);
+ indexesById.put(index.id, index);
// All indexes are column indexes at the moment
if (index.isColumnIndex())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/schema/UnknownIndexException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/UnknownIndexException.java b/src/java/org/apache/cassandra/schema/UnknownIndexException.java
new file mode 100644
index 0000000..5daf631
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/UnknownIndexException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.schema;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.cassandra.config.CFMetaData;
+
+/**
+ * Exception thrown when we read an index id from a serialized ReadCommand and no corresponding IndexMetadata
+ * can be found in the CFMetaData#indexes collection. Note that this is an internal exception and is not meant
+ * to be user facing, the node reading the ReadCommand should proceed as if no index id were present.
+ */
+public class UnknownIndexException extends IOException
+{
+ public final UUID indexId;
+ public UnknownIndexException(CFMetaData metadata, UUID id)
+ {
+ super(String.format("Unknown index %s for table %s.%s", id.toString(), metadata.ksName, metadata.cfName));
+ indexId = id;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 59f1c1c..e3b884e 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1717,7 +1717,7 @@ public class StorageProxy implements StorageProxyMBean
private static float estimateResultsPerRange(PartitionRangeReadCommand command, Keyspace keyspace)
{
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().cfId);
- Index index = cfs.indexManager.getBestIndexFor(command);
+ Index index = command.getIndex(cfs);
float maxExpectedResults = index == null
? command.limits().estimateTotalResults(cfs)
: index.getEstimatedResultRows();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
index 2e57a8b..87eb018 100644
--- a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
@@ -17,15 +17,17 @@
*/
package org.apache.cassandra.service.pager;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.dht.*;
-import org.apache.cassandra.exceptions.RequestExecutionException;
+import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+
/**
* Pages a RangeSliceCommand whose predicate is a slice query.
*
@@ -89,7 +91,9 @@ public class RangeSliceQueryPager extends AbstractQueryPager
}
}
- return new PartitionRangeReadCommand(command.metadata(), command.nowInSec(), command.columnFilter(), command.rowFilter(), limits, pageRange);
+ // it won't hurt for the next page command to query the index manager
+ // again to check for an applicable index, so don't supply one here
+ return new PartitionRangeReadCommand(command.metadata(), command.nowInSec(), command.columnFilter(), command.rowFilter(), limits, pageRange, Optional.empty());
}
protected void recordLast(DecoratedKey key, Row last)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 038384e..9cd1653 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -30,12 +30,9 @@ import java.util.zip.Inflater;
import com.google.common.base.Joiner;
import com.google.common.collect.*;
import com.google.common.primitives.Longs;
-
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.filter.ColumnFilter;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.config.*;
import org.apache.cassandra.cql3.QueryOptions;
@@ -1520,7 +1517,8 @@ public class CassandraServer implements Cassandra.Iface
columns,
ThriftConversion.rowFilterFromThrift(metadata, range.row_filter),
limits,
- new DataRange(bounds, filter));
+ new DataRange(bounds, filter),
+ Optional.empty());
try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel))
{
assert results != null;
@@ -1613,7 +1611,8 @@ public class CassandraServer implements Cassandra.Iface
ColumnFilter.all(metadata),
RowFilter.NONE,
limits,
- new DataRange(bounds, filter).forPaging(bounds, metadata.comparator, pageFrom, true));
+ new DataRange(bounds, filter).forPaging(bounds, metadata.comparator, pageFrom, true),
+ Optional.empty());
try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel))
{
return thriftifyKeySlices(results, new ColumnParent(column_family), limits.perPartitionCount());
@@ -1704,7 +1703,8 @@ public class CassandraServer implements Cassandra.Iface
columns,
ThriftConversion.rowFilterFromThrift(metadata, index_clause.expressions),
limits,
- new DataRange(bounds, filter));
+ new DataRange(bounds, filter),
+ Optional.empty());
try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel))
{
return thriftifyKeySlices(results, column_parent, limits.perPartitionCount());