You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2015/09/09 09:56:27 UTC

[1/3] cassandra git commit: Cache selected index in ReadCommand to avoid multiple lookups

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 f961e84aa -> e097efc5f
  refs/heads/trunk b81942267 -> c5408a3a1


Cache selected index in ReadCommand to avoid multiple lookups

Patch by Sam Tunnicliffe; reviewed by Jake Luciani for CASSANDRA-10215


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e097efc5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e097efc5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e097efc5

Branch: refs/heads/cassandra-3.0
Commit: e097efc5f6f76a0da8d15b307301dffff79e4a35
Parents: f961e84
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Wed Aug 26 16:29:58 2015 +0100
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Wed Sep 9 08:44:06 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cql3/statements/SelectStatement.java        |  4 +-
 .../db/AbstractReadCommandBuilder.java          |  8 +-
 .../cassandra/db/PartitionRangeReadCommand.java | 26 ++++--
 .../org/apache/cassandra/db/ReadCommand.java    | 96 ++++++++++++++++++--
 .../org/apache/cassandra/db/ReadOrderGroup.java |  2 +-
 .../db/SinglePartitionReadCommand.java          | 14 ++-
 .../cassandra/index/SecondaryIndexManager.java  | 23 ++---
 .../apache/cassandra/schema/IndexMetadata.java  | 39 +++++++-
 .../org/apache/cassandra/schema/Indexes.java    | 51 ++++++++---
 .../cassandra/schema/UnknownIndexException.java | 39 ++++++++
 .../apache/cassandra/service/StorageProxy.java  |  2 +-
 .../service/pager/RangeSliceQueryPager.java     | 16 ++--
 .../cassandra/thrift/CassandraServer.java       | 14 +--
 14 files changed, 258 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a1c66a2..ab1b4ed 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.0-rc1
+ * Cache selected index in read command to reduce lookups (CASSANDRA-10215)
  * Small optimizations of sstable index serialization (CASSANDRA-10232)
  * Support for both encrypted and unencrypted native transport connections (CASSANDRA-9590)
 Merged from 2.2:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 2aac6ab..7ad6c09 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -47,6 +47,7 @@ import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.view.MaterializedView;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.index.Index;
 import org.apache.cassandra.index.SecondaryIndexManager;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.service.ClientState;
@@ -458,12 +459,13 @@ public class SelectStatement implements CQLStatement
             return ReadQuery.EMPTY;
 
         RowFilter rowFilter = getRowFilter(options);
+
         // The LIMIT provided by the user is the number of CQL row he wants returned.
         // We want to have getRangeSlice to count the number of columns, not the number of keys.
         AbstractBounds<PartitionPosition> keyBounds = restrictions.getPartitionKeyBounds(options);
         return keyBounds == null
              ? ReadQuery.EMPTY
-             : new PartitionRangeReadCommand(cfm, nowInSec, queriedColumns, rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter));
+             : new PartitionRangeReadCommand(cfm, nowInSec, queriedColumns, rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter), Optional.empty());
     }
 
     private ClusteringIndexFilter makeClusteringIndexFilter(QueryOptions options)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
index 5e3b726..9bb89a6 100644
--- a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
+++ b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
@@ -23,9 +23,11 @@ import java.util.*;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -327,7 +329,7 @@ public abstract class AbstractReadCommandBuilder
             else
                 bounds = new ExcludingBounds<>(start, end);
 
-            return new PartitionRangeReadCommand(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter()));
+            return new PartitionRangeReadCommand(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter()), Optional.empty());
         }
 
         static DecoratedKey makeKey(CFMetaData metadata, Object... partitionKey)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index da62557..965e9af 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 import com.google.common.collect.Iterables;
 
@@ -39,6 +40,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.pager.*;
@@ -64,10 +66,12 @@ public class PartitionRangeReadCommand extends ReadCommand
                                      ColumnFilter columnFilter,
                                      RowFilter rowFilter,
                                      DataLimits limits,
-                                     DataRange dataRange)
+                                     DataRange dataRange,
+                                     Optional<IndexMetadata> index)
     {
         super(Kind.PARTITION_RANGE, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
         this.dataRange = dataRange;
+        this.index = index;
     }
 
     public PartitionRangeReadCommand(CFMetaData metadata,
@@ -75,9 +79,10 @@ public class PartitionRangeReadCommand extends ReadCommand
                                      ColumnFilter columnFilter,
                                      RowFilter rowFilter,
                                      DataLimits limits,
-                                     DataRange dataRange)
+                                     DataRange dataRange,
+                                     Optional<IndexMetadata> index)
     {
-        this(false, 0, false, metadata, nowInSec, columnFilter, rowFilter, limits, dataRange);
+        this(false, 0, false, metadata, nowInSec, columnFilter, rowFilter, limits, dataRange, index);
     }
 
     /**
@@ -95,7 +100,8 @@ public class PartitionRangeReadCommand extends ReadCommand
                                              ColumnFilter.all(metadata),
                                              RowFilter.NONE,
                                              DataLimits.NONE,
-                                             DataRange.allData(metadata.partitioner));
+                                             DataRange.allData(metadata.partitioner),
+                                             Optional.empty());
     }
 
     public DataRange dataRange()
@@ -115,17 +121,17 @@ public class PartitionRangeReadCommand extends ReadCommand
 
     public PartitionRangeReadCommand forSubRange(AbstractBounds<PartitionPosition> range)
     {
-        return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange().forSubRange(range));
+        return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange().forSubRange(range), index);
     }
 
     public PartitionRangeReadCommand copy()
     {
-        return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange());
+        return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange(), index);
     }
 
     public PartitionRangeReadCommand withUpdatedLimit(DataLimits newLimits)
     {
-        return new PartitionRangeReadCommand(metadata(), nowInSec(), columnFilter(), rowFilter(), newLimits, dataRange());
+        return new PartitionRangeReadCommand(metadata(), nowInSec(), columnFilter(), rowFilter(), newLimits, dataRange(), index);
     }
 
     public long getTimeout()
@@ -275,7 +281,7 @@ public class PartitionRangeReadCommand extends ReadCommand
     public PartitionIterator postReconciliationProcessing(PartitionIterator result)
     {
         ColumnFamilyStore cfs = Keyspace.open(metadata().ksName).getColumnFamilyStore(metadata().cfName);
-        Index index = getIndex(cfs, false);
+        Index index = getIndex(cfs);
         return index == null ? result : index.postProcessorFor(this).apply(result, this);
     }
 
@@ -303,11 +309,11 @@ public class PartitionRangeReadCommand extends ReadCommand
 
     private static class Deserializer extends SelectionDeserializer
     {
-        public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits)
+        public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index)
         throws IOException
         {
             DataRange range = DataRange.serializer.deserialize(in, version, metadata);
-            return new PartitionRangeReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, range);
+            return new PartitionRangeReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, range, index);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 5a10716..e183963 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -38,6 +38,8 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.schema.UnknownIndexException;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -70,6 +72,16 @@ public abstract class ReadCommand implements ReadQuery
     private final RowFilter rowFilter;
     private final DataLimits limits;
 
+    // SecondaryIndexManager will attempt to provide the most selective of any available indexes
+    // during execution. Here we also store an the results of that lookup to repeating it over
+    // the lifetime of the command.
+    protected Optional<IndexMetadata> index = Optional.empty();
+
+    // Flag to indicate whether the index manager has been queried to select an index for this
+    // command. This is necessary as the result of that lookup may be null, in which case we
+    // still don't want to repeat it.
+    private boolean indexManagerQueried = false;
+
     private boolean isDigestQuery;
     // if a digest query, the version for which the digest is expected. Ignored if not a digest.
     private int digestVersion;
@@ -77,7 +89,7 @@ public abstract class ReadCommand implements ReadQuery
 
     protected static abstract class SelectionDeserializer
     {
-        public abstract ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits) throws IOException;
+        public abstract ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index) throws IOException;
     }
 
     protected enum Kind
@@ -287,9 +299,35 @@ public abstract class ReadCommand implements ReadQuery
              : ReadResponse.createDataResponse(iterator, selection);
     }
 
-    protected Index getIndex(ColumnFamilyStore cfs, boolean includeInTrace)
+    public long indexSerializedSize(int version)
+    {
+        if (index.isPresent())
+            return IndexMetadata.serializer.serializedSize(index.get(), version);
+        else
+            return 0;
+    }
+
+    public Index getIndex(ColumnFamilyStore cfs)
     {
-        return cfs.indexManager.getBestIndexFor(this, includeInTrace);
+        // if we've already consulted the index manager, and it returned a valid index
+        // the result should be cached here.
+        if(index.isPresent())
+            return cfs.indexManager.getIndex(index.get());
+
+        // if no cached index is present, but we've already consulted the index manager
+        // then no registered index is suitable for this command, so just return null.
+        if (indexManagerQueried)
+            return null;
+
+        // do the lookup, set the flag to indicate so and cache the result if not null
+        Index selected = cfs.indexManager.getBestIndexFor(this);
+        indexManagerQueried = true;
+
+        if (selected == null)
+            return null;
+
+        index = Optional.of(selected.getIndexMetadata());
+        return selected;
     }
 
     /**
@@ -306,9 +344,12 @@ public abstract class ReadCommand implements ReadQuery
         long startTimeNanos = System.nanoTime();
 
         ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata());
-        Index index = getIndex(cfs, true);
+        Index index = getIndex(cfs);
         Index.Searcher searcher = index == null ? null : index.searcherFor(this);
 
+        if (index != null)
+            Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.ksName, cfs.metadata.cfName, index.getIndexName());
+
         UnfilteredPartitionIterator resultIterator = searcher == null
                                          ? queryStorage(cfs, orderGroup)
                                          : searcher.search(orderGroup);
@@ -505,13 +546,23 @@ public abstract class ReadCommand implements ReadQuery
             return (flags & 0x02) != 0;
         }
 
+        private static int indexFlag(boolean hasIndex)
+        {
+            return hasIndex ? 0x04 : 0;
+        }
+
+        private static boolean hasIndex(int flags)
+        {
+            return (flags & 0x04) != 0;
+        }
+
         public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
         {
             // for serialization, createLegacyMessage() should cause legacyReadCommandSerializer to be used directly
             assert version >= MessagingService.VERSION_30;
 
             out.writeByte(command.kind.ordinal());
-            out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()));
+            out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()) | indexFlag(command.index.isPresent()));
             if (command.isDigestQuery())
                 out.writeVInt(command.digestVersion());
             CFMetaData.serializer.serialize(command.metadata(), out, version);
@@ -519,6 +570,8 @@ public abstract class ReadCommand implements ReadQuery
             ColumnFilter.serializer.serialize(command.columnFilter(), out, version);
             RowFilter.serializer.serialize(command.rowFilter(), out, version);
             DataLimits.serializer.serialize(command.limits(), out, version);
+            if (command.index.isPresent())
+                IndexMetadata.serializer.serialize(command.index.get(), out, version);
 
             command.serializeSelection(out, version);
         }
@@ -532,14 +585,36 @@ public abstract class ReadCommand implements ReadQuery
             int flags = in.readByte();
             boolean isDigest = isDigest(flags);
             boolean isForThrift = isForThrift(flags);
+            boolean hasIndex = hasIndex(flags);
             int digestVersion = isDigest ? (int)in.readVInt() : 0;
             CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
             int nowInSec = in.readInt();
             ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata);
             RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata);
             DataLimits limits = DataLimits.serializer.deserialize(in, version);
+            Optional<IndexMetadata> index = hasIndex
+                                            ? deserializeIndexMetadata(in, version, metadata)
+                                            : Optional.empty();
+
+            return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index);
+        }
 
-            return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
+        private Optional<IndexMetadata> deserializeIndexMetadata(DataInputPlus in, int version, CFMetaData cfm) throws IOException
+        {
+            try
+            {
+                return Optional.of(IndexMetadata.serializer.deserialize(in, version, cfm));
+            }
+            catch (UnknownIndexException e)
+            {
+                String message = String.format("Couldn't find a defined index on %s.%s with the id %s. " +
+                                               "If an index was just created, this is likely due to the schema not " +
+                                               "being fully propagated. Local read will proceed without using the " +
+                                               "index. Please wait for schema agreement after index creation.",
+                                               cfm.ksName, cfm.cfName, e.indexId.toString());
+                logger.info(message);
+                return Optional.empty();
+            }
         }
 
         public long serializedSize(ReadCommand command, int version)
@@ -554,7 +629,8 @@ public abstract class ReadCommand implements ReadQuery
                  + ColumnFilter.serializer.serializedSize(command.columnFilter(), version)
                  + RowFilter.serializer.serializedSize(command.rowFilter(), version)
                  + DataLimits.serializer.serializedSize(command.limits(), version)
-                 + command.selectionSerializedSize(version);
+                 + command.selectionSerializedSize(version)
+                 + command.indexSerializedSize(version);
         }
     }
 
@@ -739,7 +815,7 @@ public abstract class ReadCommand implements ReadQuery
             else
                 limits = DataLimits.cqlLimits(maxResults);
 
-            return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter));
+            return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter), Optional.empty());
         }
 
         static void serializeRowFilter(DataOutputPlus out, RowFilter rowFilter) throws IOException
@@ -850,7 +926,7 @@ public abstract class ReadCommand implements ReadQuery
             DataRange newRange = new DataRange(command.dataRange().keyRange(), sliceFilter);
             return new PartitionRangeReadCommand(
                     command.isDigestQuery(), command.digestVersion(), command.isForThrift(), metadata, command.nowInSec(),
-                    command.columnFilter(), command.rowFilter(), command.limits(), newRange);
+                    command.columnFilter(), command.rowFilter(), command.limits(), newRange, Optional.empty());
         }
 
         static ColumnFilter getColumnSelectionForSlice(ClusteringIndexSliceFilter filter, int compositesToGroup, CFMetaData metadata)
@@ -1000,7 +1076,7 @@ public abstract class ReadCommand implements ReadQuery
                 // missing without any problems, so we can safely always set "inclusive" to false in the data range
                 dataRange = dataRange.forPaging(keyRange, metadata.comparator, startBound.getAsClustering(metadata), false);
             }
-            return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, dataRange);
+            return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, dataRange, Optional.empty());
         }
 
         public long serializedSize(ReadCommand command, int version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/db/ReadOrderGroup.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadOrderGroup.java b/src/java/org/apache/cassandra/db/ReadOrderGroup.java
index 44befa2..0720d79 100644
--- a/src/java/org/apache/cassandra/db/ReadOrderGroup.java
+++ b/src/java/org/apache/cassandra/db/ReadOrderGroup.java
@@ -98,7 +98,7 @@ public class ReadOrderGroup implements AutoCloseable
 
     private static ColumnFamilyStore maybeGetIndexCfs(ColumnFamilyStore baseCfs, ReadCommand command)
     {
-        Index index = baseCfs.indexManager.getBestIndexFor(command);
+        Index index = command.getIndex(baseCfs);
         return index == null ? null : index.getBackingTable().orElse(null);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 7b62f5a..c08ef6a 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -21,20 +21,26 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import org.apache.cassandra.cache.*;
+import org.apache.cassandra.cache.IRowCacheEntry;
+import org.apache.cassandra.cache.RowCacheKey;
+import org.apache.cassandra.cache.RowCacheSentinel;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.*;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.pager.*;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -507,7 +513,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
 
     private static class Deserializer extends SelectionDeserializer
     {
-        public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits)
+        public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index)
         throws IOException
         {
             DecoratedKey key = metadata.decorateKey(metadata.getKeyValidator().readValue(in));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index fabfebc..bd3202d 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -536,19 +536,15 @@ public class SecondaryIndexManager implements IndexRegistry
      * index should be performed in the searcherFor method to ensure that we pick the right
      * index regardless of the validity of the expression.
      *
-     * This method is called at various points during the lifecycle of a ReadCommand (to obtain a Searcher,
-     * get the index's underlying CFS for ReadOrderGroup, or an estimate of the result size from an average index
-     * query).
-     *
-     * Ideally, we would do this relatively expensive operation only once, and attach the index to the
-     * ReadCommand for future reference. This requires the index be passed onto additional commands generated
-     * to process subranges etc.
+     * This method is only called once during the lifecycle of a ReadCommand and the result is
+     * cached for future use when obtaining a Searcher, getting the index's underlying CFS for
+     * ReadOrderGroup, or an estimate of the result size from an average index query.
      *
      * @param command ReadCommand to be executed
      * @return an Index instance, ready to use during execution of the command, or null if none
      * of the registered indexes can support the command.
      */
-    public Index getBestIndexFor(ReadCommand command, boolean includeInTrace)
+    public Index getBestIndexFor(ReadCommand command)
     {
         if (indexes.isEmpty() || command.rowFilter().isEmpty())
             return null;
@@ -564,8 +560,7 @@ public class SecondaryIndexManager implements IndexRegistry
         if (searchableIndexes.isEmpty())
         {
             logger.debug("No applicable indexes found");
-            if (includeInTrace)
-                Tracing.trace("No applicable indexes found");
+            Tracing.trace("No applicable indexes found");
             return null;
         }
 
@@ -575,7 +570,7 @@ public class SecondaryIndexManager implements IndexRegistry
                                           .orElseThrow(() -> new AssertionError("Could not select most selective index"));
 
         // pay for an additional threadlocal get() rather than build the strings unnecessarily
-        if (includeInTrace && Tracing.isTracing())
+        if (Tracing.isTracing())
         {
             Tracing.trace("Index mean cardinalities are {}. Scanning with {}.",
                           searchableIndexes.stream().map(i -> i.getIndexName() + ':' + i.getEstimatedResultRows())
@@ -585,12 +580,6 @@ public class SecondaryIndexManager implements IndexRegistry
         return selected;
     }
 
-    // convenience method which doesn't emit tracing messages
-    public Index getBestIndexFor(ReadCommand command)
-    {
-        return getBestIndexFor(command, false);
-    }
-
     /**
      * Called at write time to ensure that values present in the update
      * are valid according to the rules of all registered indexes which

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/schema/IndexMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/IndexMetadata.java b/src/java/org/apache/cassandra/schema/IndexMetadata.java
index 40a75c6..6846a14 100644
--- a/src/java/org/apache/cassandra/schema/IndexMetadata.java
+++ b/src/java/org/apache/cassandra/schema/IndexMetadata.java
@@ -18,10 +18,9 @@
 
 package org.apache.cassandra.schema;
 
+import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableMap;
@@ -37,7 +36,10 @@ import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.statements.IndexTarget;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.index.Index;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDSerializer;
 
 /**
  * An immutable representation of secondary index metadata.
@@ -46,6 +48,8 @@ public final class IndexMetadata
 {
     private static final Logger logger = LoggerFactory.getLogger(IndexMetadata.class);
 
+    public static final Serializer serializer = new Serializer();
+
     public enum IndexType
     {
         KEYS, CUSTOM, COMPOSITES
@@ -56,6 +60,9 @@ public final class IndexMetadata
         COLUMN, ROW
     }
 
+    // UUID for serialization. This is a deterministic UUID generated from the index name
+    // Both the id and name are guaranteed unique per keyspace.
+    public final UUID id;
     public final String name;
     public final IndexType indexType;
     public final TargetType targetType;
@@ -68,6 +75,7 @@ public final class IndexMetadata
                           TargetType targetType,
                           Set<ColumnIdentifier> columns)
     {
+        this.id = UUID.nameUUIDFromBytes(name.getBytes());
         this.name = name;
         this.options = options == null ? ImmutableMap.of() : ImmutableMap.copyOf(options);
         this.indexType = indexType;
@@ -194,7 +202,7 @@ public final class IndexMetadata
 
     public int hashCode()
     {
-        return Objects.hashCode(name, indexType, targetType, options, columns);
+        return Objects.hashCode(id, name, indexType, targetType, options, columns);
     }
 
     public boolean equalsWithoutName(IndexMetadata other)
@@ -215,12 +223,13 @@ public final class IndexMetadata
 
         IndexMetadata other = (IndexMetadata)obj;
 
-        return Objects.equal(name, other.name) && equalsWithoutName(other);
+        return Objects.equal(id, other.id) && Objects.equal(name, other.name) && equalsWithoutName(other);
     }
 
     public String toString()
     {
         return new ToStringBuilder(this)
+            .append("id", id.toString())
             .append("name", name)
             .append("indexType", indexType)
             .append("targetType", targetType)
@@ -228,4 +237,24 @@ public final class IndexMetadata
             .append("options", options)
             .build();
     }
+
+    public static class Serializer
+    {
+        public void serialize(IndexMetadata metadata, DataOutputPlus out, int version) throws IOException
+        {
+            UUIDSerializer.serializer.serialize(metadata.id, out, version);
+        }
+
+        public IndexMetadata deserialize(DataInputPlus in, int version, CFMetaData cfm) throws IOException
+        {
+            UUID id = UUIDSerializer.serializer.deserialize(in, version);
+            return cfm.getIndexes().get(id).orElseThrow(() -> new UnknownIndexException(cfm, id));
+        }
+
+        public long serializedSize(IndexMetadata metadata, int version)
+        {
+            return UUIDSerializer.serializer.serializedSize(metadata.id, version);
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/schema/Indexes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Indexes.java b/src/java/org/apache/cassandra/schema/Indexes.java
index 6227e0b..9114f63 100644
--- a/src/java/org/apache/cassandra/schema/Indexes.java
+++ b/src/java/org/apache/cassandra/schema/Indexes.java
@@ -40,12 +40,14 @@ import static com.google.common.collect.Iterables.filter;
  */
 public class Indexes implements Iterable<IndexMetadata>
 {
-    private final ImmutableMap<String, IndexMetadata> indexes;
+    private final ImmutableMap<String, IndexMetadata> indexesByName;
+    private final ImmutableMap<UUID, IndexMetadata> indexesById;
     private final ImmutableMultimap<ColumnIdentifier, IndexMetadata> indexesByColumn;
 
     private Indexes(Builder builder)
     {
-        indexes = builder.indexes.build();
+        indexesByName = builder.indexesByName.build();
+        indexesById = builder.indexesById.build();
         indexesByColumn = builder.indexesByColumn.build();
     }
 
@@ -61,17 +63,17 @@ public class Indexes implements Iterable<IndexMetadata>
 
     public Iterator<IndexMetadata> iterator()
     {
-        return indexes.values().iterator();
+        return indexesByName.values().iterator();
     }
 
     public int size()
     {
-        return indexes.size();
+        return indexesByName.size();
     }
 
     public boolean isEmpty()
     {
-        return indexes.isEmpty();
+        return indexesByName.isEmpty();
     }
 
     /**
@@ -82,7 +84,7 @@ public class Indexes implements Iterable<IndexMetadata>
      */
     public Optional<IndexMetadata> get(String name)
     {
-        return indexes.values().stream().filter(def -> def.name.equals(name)).findFirst();
+        return Optional.ofNullable(indexesByName.get(name));
     }
 
     /**
@@ -92,7 +94,30 @@ public class Indexes implements Iterable<IndexMetadata>
      */
     public boolean has(String name)
     {
-        return get(name).isPresent();
+        return indexesByName.containsKey(name);
+    }
+
+    /**
+     * Get the index with the specified id
+     *
+     * @param name a UUID which identifies an index
+     * @return an empty {@link Optional} if no index with the specified id is found; a non-empty optional of
+     *         {@link IndexMetadata} otherwise
+     */
+
+    public Optional<IndexMetadata> get(UUID id)
+    {
+        return Optional.ofNullable(indexesById.get(id));
+    }
+
+    /**
+     * Answer true if contains an index with the specified id.
+     * @param name a UUID which identifies an index.
+     * @return true if an index with the specified id is found; false otherwise
+     */
+    public boolean has(UUID id)
+    {
+        return indexesById.containsKey(id);
     }
 
     /**
@@ -148,19 +173,19 @@ public class Indexes implements Iterable<IndexMetadata>
     @Override
     public boolean equals(Object o)
     {
-        return this == o || (o instanceof Indexes && indexes.equals(((Indexes) o).indexes));
+        return this == o || (o instanceof Indexes && indexesByName.equals(((Indexes) o).indexesByName));
     }
 
     @Override
     public int hashCode()
     {
-        return indexes.hashCode();
+        return indexesByName.hashCode();
     }
 
     @Override
     public String toString()
     {
-        return indexes.values().toString();
+        return indexesByName.values().toString();
     }
 
     public static String getAvailableIndexName(String ksName, String cfName, ColumnIdentifier columnName)
@@ -179,7 +204,8 @@ public class Indexes implements Iterable<IndexMetadata>
 
     public static final class Builder
     {
-        final ImmutableMap.Builder<String, IndexMetadata> indexes = new ImmutableMap.Builder<>();
+        final ImmutableMap.Builder<String, IndexMetadata> indexesByName = new ImmutableMap.Builder<>();
+        final ImmutableMap.Builder<UUID, IndexMetadata> indexesById = new ImmutableMap.Builder<>();
         final ImmutableMultimap.Builder<ColumnIdentifier, IndexMetadata> indexesByColumn = new ImmutableMultimap.Builder<>();
 
         private Builder()
@@ -193,7 +219,8 @@ public class Indexes implements Iterable<IndexMetadata>
 
         public Builder add(IndexMetadata index)
         {
-            indexes.put(index.name, index);
+            indexesByName.put(index.name, index);
+            indexesById.put(index.id, index);
             // All indexes are column indexes at the moment
             if (index.isColumnIndex())
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/schema/UnknownIndexException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/UnknownIndexException.java b/src/java/org/apache/cassandra/schema/UnknownIndexException.java
new file mode 100644
index 0000000..5daf631
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/UnknownIndexException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.schema;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.cassandra.config.CFMetaData;
+
+/**
+ * Exception thrown when we read an index id from a serialized ReadCommand and no corresponding IndexMetadata
+ * can be found in the CFMetaData#indexes collection. Note that this is an internal exception and is not meant
+ * to be user facing, the node reading the ReadCommand should proceed as if no index id were present.
+ */
+public class UnknownIndexException extends IOException
+{
+    public final UUID indexId;
+    public UnknownIndexException(CFMetaData metadata, UUID id)
+    {
+        super(String.format("Unknown index %s for table %s.%s", id.toString(), metadata.ksName, metadata.cfName));
+        indexId = id;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 59f1c1c..e3b884e 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1717,7 +1717,7 @@ public class StorageProxy implements StorageProxyMBean
     private static float estimateResultsPerRange(PartitionRangeReadCommand command, Keyspace keyspace)
     {
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().cfId);
-        Index index = cfs.indexManager.getBestIndexFor(command);
+        Index index = command.getIndex(cfs);
         float maxExpectedResults = index == null
                                  ? command.limits().estimateTotalResults(cfs)
                                  : index.getEstimatedResultRows();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
index 2e57a8b..87eb018 100644
--- a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
@@ -17,15 +17,17 @@
  */
 package org.apache.cassandra.service.pager;
 
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.dht.*;
-import org.apache.cassandra.exceptions.RequestExecutionException;
+import java.util.Optional;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+
 /**
  * Pages a RangeSliceCommand whose predicate is a slice query.
  *
@@ -89,7 +91,9 @@ public class RangeSliceQueryPager extends AbstractQueryPager
             }
         }
 
-        return new PartitionRangeReadCommand(command.metadata(), command.nowInSec(), command.columnFilter(), command.rowFilter(), limits, pageRange);
+        // it won't hurt for the next page command to query the index manager
+        // again to check for an applicable index, so don't supply one here
+        return new PartitionRangeReadCommand(command.metadata(), command.nowInSec(), command.columnFilter(), command.rowFilter(), limits, pageRange, Optional.empty());
     }
 
     protected void recordLast(DecoratedKey key, Row last)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 038384e..9cd1653 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -30,12 +30,9 @@ import java.util.zip.Inflater;
 import com.google.common.base.Joiner;
 import com.google.common.collect.*;
 import com.google.common.primitives.Longs;
-
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.filter.ColumnFilter;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.QueryOptions;
@@ -1520,7 +1517,8 @@ public class CassandraServer implements Cassandra.Iface
                                                                               columns,
                                                                               ThriftConversion.rowFilterFromThrift(metadata, range.row_filter),
                                                                               limits,
-                                                                              new DataRange(bounds, filter));
+                                                                              new DataRange(bounds, filter),
+                                                                              Optional.empty());
                 try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel))
                 {
                     assert results != null;
@@ -1613,7 +1611,8 @@ public class CassandraServer implements Cassandra.Iface
                                                                               ColumnFilter.all(metadata),
                                                                               RowFilter.NONE,
                                                                               limits,
-                                                                              new DataRange(bounds, filter).forPaging(bounds, metadata.comparator, pageFrom, true));
+                                                                              new DataRange(bounds, filter).forPaging(bounds, metadata.comparator, pageFrom, true),
+                                                                              Optional.empty());
                 try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel))
                 {
                     return thriftifyKeySlices(results, new ColumnParent(column_family), limits.perPartitionCount());
@@ -1704,7 +1703,8 @@ public class CassandraServer implements Cassandra.Iface
                                                                           columns,
                                                                           ThriftConversion.rowFilterFromThrift(metadata, index_clause.expressions),
                                                                           limits,
-                                                                          new DataRange(bounds, filter));
+                                                                          new DataRange(bounds, filter),
+                                                                          Optional.empty());
             try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel))
             {
                 return thriftifyKeySlices(results, column_parent, limits.perPartitionCount());


[3/3] cassandra git commit: Merge branch 'cassandra-3.0' into trunk

Posted by sa...@apache.org.
Merge branch 'cassandra-3.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c5408a3a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c5408a3a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c5408a3a

Branch: refs/heads/trunk
Commit: c5408a3a163af593cb826bea38a46f3c86da8b1c
Parents: b819422 e097efc
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Wed Sep 9 08:52:11 2015 +0100
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Wed Sep 9 08:52:11 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cql3/statements/SelectStatement.java        |  4 +-
 .../db/AbstractReadCommandBuilder.java          |  8 +-
 .../cassandra/db/PartitionRangeReadCommand.java | 26 ++++--
 .../org/apache/cassandra/db/ReadCommand.java    | 96 ++++++++++++++++++--
 .../org/apache/cassandra/db/ReadOrderGroup.java |  2 +-
 .../db/SinglePartitionReadCommand.java          | 14 ++-
 .../cassandra/index/SecondaryIndexManager.java  | 23 ++---
 .../apache/cassandra/schema/IndexMetadata.java  | 39 +++++++-
 .../org/apache/cassandra/schema/Indexes.java    | 51 ++++++++---
 .../cassandra/schema/UnknownIndexException.java | 39 ++++++++
 .../apache/cassandra/service/StorageProxy.java  |  2 +-
 .../service/pager/RangeSliceQueryPager.java     | 16 ++--
 .../cassandra/thrift/CassandraServer.java       | 14 +--
 14 files changed, 258 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c5408a3a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 7cf3c47,ab1b4ed..3109833
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,8 -1,5 +1,9 @@@
 +3.2
 + * Add transparent data encryption core classes (CASSANDRA-9945)
 +
 +
  3.0.0-rc1
+  * Cache selected index in read command to reduce lookups (CASSANDRA-10215)
   * Small optimizations of sstable index serialization (CASSANDRA-10232)
   * Support for both encrypted and unencrypted native transport connections (CASSANDRA-9590)
  Merged from 2.2:


[2/3] cassandra git commit: Cache selected index in ReadCommand to avoid multiple lookups

Posted by sa...@apache.org.
Cache selected index in ReadCommand to avoid multiple lookups

Patch by Sam Tunnicliffe; reviewed by Jake Luciani for CASSANDRA-10215


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e097efc5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e097efc5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e097efc5

Branch: refs/heads/trunk
Commit: e097efc5f6f76a0da8d15b307301dffff79e4a35
Parents: f961e84
Author: Sam Tunnicliffe <sa...@beobal.com>
Authored: Wed Aug 26 16:29:58 2015 +0100
Committer: Sam Tunnicliffe <sa...@beobal.com>
Committed: Wed Sep 9 08:44:06 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cql3/statements/SelectStatement.java        |  4 +-
 .../db/AbstractReadCommandBuilder.java          |  8 +-
 .../cassandra/db/PartitionRangeReadCommand.java | 26 ++++--
 .../org/apache/cassandra/db/ReadCommand.java    | 96 ++++++++++++++++++--
 .../org/apache/cassandra/db/ReadOrderGroup.java |  2 +-
 .../db/SinglePartitionReadCommand.java          | 14 ++-
 .../cassandra/index/SecondaryIndexManager.java  | 23 ++---
 .../apache/cassandra/schema/IndexMetadata.java  | 39 +++++++-
 .../org/apache/cassandra/schema/Indexes.java    | 51 ++++++++---
 .../cassandra/schema/UnknownIndexException.java | 39 ++++++++
 .../apache/cassandra/service/StorageProxy.java  |  2 +-
 .../service/pager/RangeSliceQueryPager.java     | 16 ++--
 .../cassandra/thrift/CassandraServer.java       | 14 +--
 14 files changed, 258 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a1c66a2..ab1b4ed 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.0-rc1
+ * Cache selected index in read command to reduce lookups (CASSANDRA-10215)
  * Small optimizations of sstable index serialization (CASSANDRA-10232)
  * Support for both encrypted and unencrypted native transport connections (CASSANDRA-9590)
 Merged from 2.2:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 2aac6ab..7ad6c09 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -47,6 +47,7 @@ import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.view.MaterializedView;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.index.Index;
 import org.apache.cassandra.index.SecondaryIndexManager;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.service.ClientState;
@@ -458,12 +459,13 @@ public class SelectStatement implements CQLStatement
             return ReadQuery.EMPTY;
 
         RowFilter rowFilter = getRowFilter(options);
+
         // The LIMIT provided by the user is the number of CQL row he wants returned.
         // We want to have getRangeSlice to count the number of columns, not the number of keys.
         AbstractBounds<PartitionPosition> keyBounds = restrictions.getPartitionKeyBounds(options);
         return keyBounds == null
              ? ReadQuery.EMPTY
-             : new PartitionRangeReadCommand(cfm, nowInSec, queriedColumns, rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter));
+             : new PartitionRangeReadCommand(cfm, nowInSec, queriedColumns, rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter), Optional.empty());
     }
 
     private ClusteringIndexFilter makeClusteringIndexFilter(QueryOptions options)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
index 5e3b726..9bb89a6 100644
--- a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
+++ b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
@@ -23,9 +23,11 @@ import java.util.*;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.*;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -327,7 +329,7 @@ public abstract class AbstractReadCommandBuilder
             else
                 bounds = new ExcludingBounds<>(start, end);
 
-            return new PartitionRangeReadCommand(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter()));
+            return new PartitionRangeReadCommand(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter()), Optional.empty());
         }
 
         static DecoratedKey makeKey(CFMetaData metadata, Object... partitionKey)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index da62557..965e9af 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 import com.google.common.collect.Iterables;
 
@@ -39,6 +40,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.pager.*;
@@ -64,10 +66,12 @@ public class PartitionRangeReadCommand extends ReadCommand
                                      ColumnFilter columnFilter,
                                      RowFilter rowFilter,
                                      DataLimits limits,
-                                     DataRange dataRange)
+                                     DataRange dataRange,
+                                     Optional<IndexMetadata> index)
     {
         super(Kind.PARTITION_RANGE, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
         this.dataRange = dataRange;
+        this.index = index;
     }
 
     public PartitionRangeReadCommand(CFMetaData metadata,
@@ -75,9 +79,10 @@ public class PartitionRangeReadCommand extends ReadCommand
                                      ColumnFilter columnFilter,
                                      RowFilter rowFilter,
                                      DataLimits limits,
-                                     DataRange dataRange)
+                                     DataRange dataRange,
+                                     Optional<IndexMetadata> index)
     {
-        this(false, 0, false, metadata, nowInSec, columnFilter, rowFilter, limits, dataRange);
+        this(false, 0, false, metadata, nowInSec, columnFilter, rowFilter, limits, dataRange, index);
     }
 
     /**
@@ -95,7 +100,8 @@ public class PartitionRangeReadCommand extends ReadCommand
                                              ColumnFilter.all(metadata),
                                              RowFilter.NONE,
                                              DataLimits.NONE,
-                                             DataRange.allData(metadata.partitioner));
+                                             DataRange.allData(metadata.partitioner),
+                                             Optional.empty());
     }
 
     public DataRange dataRange()
@@ -115,17 +121,17 @@ public class PartitionRangeReadCommand extends ReadCommand
 
     public PartitionRangeReadCommand forSubRange(AbstractBounds<PartitionPosition> range)
     {
-        return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange().forSubRange(range));
+        return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange().forSubRange(range), index);
     }
 
     public PartitionRangeReadCommand copy()
     {
-        return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange());
+        return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange(), index);
     }
 
     public PartitionRangeReadCommand withUpdatedLimit(DataLimits newLimits)
     {
-        return new PartitionRangeReadCommand(metadata(), nowInSec(), columnFilter(), rowFilter(), newLimits, dataRange());
+        return new PartitionRangeReadCommand(metadata(), nowInSec(), columnFilter(), rowFilter(), newLimits, dataRange(), index);
     }
 
     public long getTimeout()
@@ -275,7 +281,7 @@ public class PartitionRangeReadCommand extends ReadCommand
     public PartitionIterator postReconciliationProcessing(PartitionIterator result)
     {
         ColumnFamilyStore cfs = Keyspace.open(metadata().ksName).getColumnFamilyStore(metadata().cfName);
-        Index index = getIndex(cfs, false);
+        Index index = getIndex(cfs);
         return index == null ? result : index.postProcessorFor(this).apply(result, this);
     }
 
@@ -303,11 +309,11 @@ public class PartitionRangeReadCommand extends ReadCommand
 
     private static class Deserializer extends SelectionDeserializer
     {
-        public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits)
+        public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index)
         throws IOException
         {
             DataRange range = DataRange.serializer.deserialize(in, version, metadata);
-            return new PartitionRangeReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, range);
+            return new PartitionRangeReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, range, index);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 5a10716..e183963 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -38,6 +38,8 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.schema.UnknownIndexException;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -70,6 +72,16 @@ public abstract class ReadCommand implements ReadQuery
     private final RowFilter rowFilter;
     private final DataLimits limits;
 
+    // SecondaryIndexManager will attempt to provide the most selective of any available indexes
+    // during execution. Here we also store an the results of that lookup to repeating it over
+    // the lifetime of the command.
+    protected Optional<IndexMetadata> index = Optional.empty();
+
+    // Flag to indicate whether the index manager has been queried to select an index for this
+    // command. This is necessary as the result of that lookup may be null, in which case we
+    // still don't want to repeat it.
+    private boolean indexManagerQueried = false;
+
     private boolean isDigestQuery;
     // if a digest query, the version for which the digest is expected. Ignored if not a digest.
     private int digestVersion;
@@ -77,7 +89,7 @@ public abstract class ReadCommand implements ReadQuery
 
     protected static abstract class SelectionDeserializer
     {
-        public abstract ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits) throws IOException;
+        public abstract ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index) throws IOException;
     }
 
     protected enum Kind
@@ -287,9 +299,35 @@ public abstract class ReadCommand implements ReadQuery
              : ReadResponse.createDataResponse(iterator, selection);
     }
 
-    protected Index getIndex(ColumnFamilyStore cfs, boolean includeInTrace)
+    public long indexSerializedSize(int version)
+    {
+        if (index.isPresent())
+            return IndexMetadata.serializer.serializedSize(index.get(), version);
+        else
+            return 0;
+    }
+
+    public Index getIndex(ColumnFamilyStore cfs)
     {
-        return cfs.indexManager.getBestIndexFor(this, includeInTrace);
+        // if we've already consulted the index manager, and it returned a valid index
+        // the result should be cached here.
+        if(index.isPresent())
+            return cfs.indexManager.getIndex(index.get());
+
+        // if no cached index is present, but we've already consulted the index manager
+        // then no registered index is suitable for this command, so just return null.
+        if (indexManagerQueried)
+            return null;
+
+        // do the lookup, set the flag to indicate so and cache the result if not null
+        Index selected = cfs.indexManager.getBestIndexFor(this);
+        indexManagerQueried = true;
+
+        if (selected == null)
+            return null;
+
+        index = Optional.of(selected.getIndexMetadata());
+        return selected;
     }
 
     /**
@@ -306,9 +344,12 @@ public abstract class ReadCommand implements ReadQuery
         long startTimeNanos = System.nanoTime();
 
         ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata());
-        Index index = getIndex(cfs, true);
+        Index index = getIndex(cfs);
         Index.Searcher searcher = index == null ? null : index.searcherFor(this);
 
+        if (index != null)
+            Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.ksName, cfs.metadata.cfName, index.getIndexName());
+
         UnfilteredPartitionIterator resultIterator = searcher == null
                                          ? queryStorage(cfs, orderGroup)
                                          : searcher.search(orderGroup);
@@ -505,13 +546,23 @@ public abstract class ReadCommand implements ReadQuery
             return (flags & 0x02) != 0;
         }
 
+        private static int indexFlag(boolean hasIndex)
+        {
+            return hasIndex ? 0x04 : 0;
+        }
+
+        private static boolean hasIndex(int flags)
+        {
+            return (flags & 0x04) != 0;
+        }
+
         public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
         {
             // for serialization, createLegacyMessage() should cause legacyReadCommandSerializer to be used directly
             assert version >= MessagingService.VERSION_30;
 
             out.writeByte(command.kind.ordinal());
-            out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()));
+            out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()) | indexFlag(command.index.isPresent()));
             if (command.isDigestQuery())
                 out.writeVInt(command.digestVersion());
             CFMetaData.serializer.serialize(command.metadata(), out, version);
@@ -519,6 +570,8 @@ public abstract class ReadCommand implements ReadQuery
             ColumnFilter.serializer.serialize(command.columnFilter(), out, version);
             RowFilter.serializer.serialize(command.rowFilter(), out, version);
             DataLimits.serializer.serialize(command.limits(), out, version);
+            if (command.index.isPresent())
+                IndexMetadata.serializer.serialize(command.index.get(), out, version);
 
             command.serializeSelection(out, version);
         }
@@ -532,14 +585,36 @@ public abstract class ReadCommand implements ReadQuery
             int flags = in.readByte();
             boolean isDigest = isDigest(flags);
             boolean isForThrift = isForThrift(flags);
+            boolean hasIndex = hasIndex(flags);
             int digestVersion = isDigest ? (int)in.readVInt() : 0;
             CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
             int nowInSec = in.readInt();
             ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata);
             RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata);
             DataLimits limits = DataLimits.serializer.deserialize(in, version);
+            Optional<IndexMetadata> index = hasIndex
+                                            ? deserializeIndexMetadata(in, version, metadata)
+                                            : Optional.empty();
+
+            return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index);
+        }
 
-            return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
+        private Optional<IndexMetadata> deserializeIndexMetadata(DataInputPlus in, int version, CFMetaData cfm) throws IOException
+        {
+            try
+            {
+                return Optional.of(IndexMetadata.serializer.deserialize(in, version, cfm));
+            }
+            catch (UnknownIndexException e)
+            {
+                String message = String.format("Couldn't find a defined index on %s.%s with the id %s. " +
+                                               "If an index was just created, this is likely due to the schema not " +
+                                               "being fully propagated. Local read will proceed without using the " +
+                                               "index. Please wait for schema agreement after index creation.",
+                                               cfm.ksName, cfm.cfName, e.indexId.toString());
+                logger.info(message);
+                return Optional.empty();
+            }
         }
 
         public long serializedSize(ReadCommand command, int version)
@@ -554,7 +629,8 @@ public abstract class ReadCommand implements ReadQuery
                  + ColumnFilter.serializer.serializedSize(command.columnFilter(), version)
                  + RowFilter.serializer.serializedSize(command.rowFilter(), version)
                  + DataLimits.serializer.serializedSize(command.limits(), version)
-                 + command.selectionSerializedSize(version);
+                 + command.selectionSerializedSize(version)
+                 + command.indexSerializedSize(version);
         }
     }
 
@@ -739,7 +815,7 @@ public abstract class ReadCommand implements ReadQuery
             else
                 limits = DataLimits.cqlLimits(maxResults);
 
-            return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter));
+            return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter), Optional.empty());
         }
 
         static void serializeRowFilter(DataOutputPlus out, RowFilter rowFilter) throws IOException
@@ -850,7 +926,7 @@ public abstract class ReadCommand implements ReadQuery
             DataRange newRange = new DataRange(command.dataRange().keyRange(), sliceFilter);
             return new PartitionRangeReadCommand(
                     command.isDigestQuery(), command.digestVersion(), command.isForThrift(), metadata, command.nowInSec(),
-                    command.columnFilter(), command.rowFilter(), command.limits(), newRange);
+                    command.columnFilter(), command.rowFilter(), command.limits(), newRange, Optional.empty());
         }
 
         static ColumnFilter getColumnSelectionForSlice(ClusteringIndexSliceFilter filter, int compositesToGroup, CFMetaData metadata)
@@ -1000,7 +1076,7 @@ public abstract class ReadCommand implements ReadQuery
                 // missing without any problems, so we can safely always set "inclusive" to false in the data range
                 dataRange = dataRange.forPaging(keyRange, metadata.comparator, startBound.getAsClustering(metadata), false);
             }
-            return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, dataRange);
+            return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, dataRange, Optional.empty());
         }
 
         public long serializedSize(ReadCommand command, int version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/db/ReadOrderGroup.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadOrderGroup.java b/src/java/org/apache/cassandra/db/ReadOrderGroup.java
index 44befa2..0720d79 100644
--- a/src/java/org/apache/cassandra/db/ReadOrderGroup.java
+++ b/src/java/org/apache/cassandra/db/ReadOrderGroup.java
@@ -98,7 +98,7 @@ public class ReadOrderGroup implements AutoCloseable
 
     private static ColumnFamilyStore maybeGetIndexCfs(ColumnFamilyStore baseCfs, ReadCommand command)
     {
-        Index index = baseCfs.indexManager.getBestIndexFor(command);
+        Index index = command.getIndex(baseCfs);
         return index == null ? null : index.getBackingTable().orElse(null);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 7b62f5a..c08ef6a 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -21,20 +21,26 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
-import org.apache.cassandra.cache.*;
+import org.apache.cassandra.cache.IRowCacheEntry;
+import org.apache.cassandra.cache.RowCacheKey;
+import org.apache.cassandra.cache.RowCacheSentinel;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.*;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.pager.*;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -507,7 +513,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
 
     private static class Deserializer extends SelectionDeserializer
     {
-        public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits)
+        public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index)
         throws IOException
         {
             DecoratedKey key = metadata.decorateKey(metadata.getKeyValidator().readValue(in));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index fabfebc..bd3202d 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -536,19 +536,15 @@ public class SecondaryIndexManager implements IndexRegistry
      * index should be performed in the searcherFor method to ensure that we pick the right
      * index regardless of the validity of the expression.
      *
-     * This method is called at various points during the lifecycle of a ReadCommand (to obtain a Searcher,
-     * get the index's underlying CFS for ReadOrderGroup, or an estimate of the result size from an average index
-     * query).
-     *
-     * Ideally, we would do this relatively expensive operation only once, and attach the index to the
-     * ReadCommand for future reference. This requires the index be passed onto additional commands generated
-     * to process subranges etc.
+     * This method is only called once during the lifecycle of a ReadCommand and the result is
+     * cached for future use when obtaining a Searcher, getting the index's underlying CFS for
+     * ReadOrderGroup, or an estimate of the result size from an average index query.
      *
      * @param command ReadCommand to be executed
      * @return an Index instance, ready to use during execution of the command, or null if none
      * of the registered indexes can support the command.
      */
-    public Index getBestIndexFor(ReadCommand command, boolean includeInTrace)
+    public Index getBestIndexFor(ReadCommand command)
     {
         if (indexes.isEmpty() || command.rowFilter().isEmpty())
             return null;
@@ -564,8 +560,7 @@ public class SecondaryIndexManager implements IndexRegistry
         if (searchableIndexes.isEmpty())
         {
             logger.debug("No applicable indexes found");
-            if (includeInTrace)
-                Tracing.trace("No applicable indexes found");
+            Tracing.trace("No applicable indexes found");
             return null;
         }
 
@@ -575,7 +570,7 @@ public class SecondaryIndexManager implements IndexRegistry
                                           .orElseThrow(() -> new AssertionError("Could not select most selective index"));
 
         // pay for an additional threadlocal get() rather than build the strings unnecessarily
-        if (includeInTrace && Tracing.isTracing())
+        if (Tracing.isTracing())
         {
             Tracing.trace("Index mean cardinalities are {}. Scanning with {}.",
                           searchableIndexes.stream().map(i -> i.getIndexName() + ':' + i.getEstimatedResultRows())
@@ -585,12 +580,6 @@ public class SecondaryIndexManager implements IndexRegistry
         return selected;
     }
 
-    // convenience method which doesn't emit tracing messages
-    public Index getBestIndexFor(ReadCommand command)
-    {
-        return getBestIndexFor(command, false);
-    }
-
     /**
      * Called at write time to ensure that values present in the update
      * are valid according to the rules of all registered indexes which

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/schema/IndexMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/IndexMetadata.java b/src/java/org/apache/cassandra/schema/IndexMetadata.java
index 40a75c6..6846a14 100644
--- a/src/java/org/apache/cassandra/schema/IndexMetadata.java
+++ b/src/java/org/apache/cassandra/schema/IndexMetadata.java
@@ -18,10 +18,9 @@
 
 package org.apache.cassandra.schema;
 
+import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableMap;
@@ -37,7 +36,10 @@ import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.statements.IndexTarget;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.index.Index;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDSerializer;
 
 /**
  * An immutable representation of secondary index metadata.
@@ -46,6 +48,8 @@ public final class IndexMetadata
 {
     private static final Logger logger = LoggerFactory.getLogger(IndexMetadata.class);
 
+    public static final Serializer serializer = new Serializer();
+
     public enum IndexType
     {
         KEYS, CUSTOM, COMPOSITES
@@ -56,6 +60,9 @@ public final class IndexMetadata
         COLUMN, ROW
     }
 
+    // UUID for serialization. This is a deterministic UUID generated from the index name
+    // Both the id and name are guaranteed unique per keyspace.
+    public final UUID id;
     public final String name;
     public final IndexType indexType;
     public final TargetType targetType;
@@ -68,6 +75,7 @@ public final class IndexMetadata
                           TargetType targetType,
                           Set<ColumnIdentifier> columns)
     {
+        this.id = UUID.nameUUIDFromBytes(name.getBytes());
         this.name = name;
         this.options = options == null ? ImmutableMap.of() : ImmutableMap.copyOf(options);
         this.indexType = indexType;
@@ -194,7 +202,7 @@ public final class IndexMetadata
 
     public int hashCode()
     {
-        return Objects.hashCode(name, indexType, targetType, options, columns);
+        return Objects.hashCode(id, name, indexType, targetType, options, columns);
     }
 
     public boolean equalsWithoutName(IndexMetadata other)
@@ -215,12 +223,13 @@ public final class IndexMetadata
 
         IndexMetadata other = (IndexMetadata)obj;
 
-        return Objects.equal(name, other.name) && equalsWithoutName(other);
+        return Objects.equal(id, other.id) && Objects.equal(name, other.name) && equalsWithoutName(other);
     }
 
     public String toString()
     {
         return new ToStringBuilder(this)
+            .append("id", id.toString())
             .append("name", name)
             .append("indexType", indexType)
             .append("targetType", targetType)
@@ -228,4 +237,24 @@ public final class IndexMetadata
             .append("options", options)
             .build();
     }
+
+    public static class Serializer
+    {
+        public void serialize(IndexMetadata metadata, DataOutputPlus out, int version) throws IOException
+        {
+            UUIDSerializer.serializer.serialize(metadata.id, out, version);
+        }
+
+        public IndexMetadata deserialize(DataInputPlus in, int version, CFMetaData cfm) throws IOException
+        {
+            UUID id = UUIDSerializer.serializer.deserialize(in, version);
+            return cfm.getIndexes().get(id).orElseThrow(() -> new UnknownIndexException(cfm, id));
+        }
+
+        public long serializedSize(IndexMetadata metadata, int version)
+        {
+            return UUIDSerializer.serializer.serializedSize(metadata.id, version);
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/schema/Indexes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/Indexes.java b/src/java/org/apache/cassandra/schema/Indexes.java
index 6227e0b..9114f63 100644
--- a/src/java/org/apache/cassandra/schema/Indexes.java
+++ b/src/java/org/apache/cassandra/schema/Indexes.java
@@ -40,12 +40,14 @@ import static com.google.common.collect.Iterables.filter;
  */
 public class Indexes implements Iterable<IndexMetadata>
 {
-    private final ImmutableMap<String, IndexMetadata> indexes;
+    private final ImmutableMap<String, IndexMetadata> indexesByName;
+    private final ImmutableMap<UUID, IndexMetadata> indexesById;
     private final ImmutableMultimap<ColumnIdentifier, IndexMetadata> indexesByColumn;
 
     private Indexes(Builder builder)
     {
-        indexes = builder.indexes.build();
+        indexesByName = builder.indexesByName.build();
+        indexesById = builder.indexesById.build();
         indexesByColumn = builder.indexesByColumn.build();
     }
 
@@ -61,17 +63,17 @@ public class Indexes implements Iterable<IndexMetadata>
 
     public Iterator<IndexMetadata> iterator()
     {
-        return indexes.values().iterator();
+        return indexesByName.values().iterator();
     }
 
     public int size()
     {
-        return indexes.size();
+        return indexesByName.size();
     }
 
     public boolean isEmpty()
     {
-        return indexes.isEmpty();
+        return indexesByName.isEmpty();
     }
 
     /**
@@ -82,7 +84,7 @@ public class Indexes implements Iterable<IndexMetadata>
      */
     public Optional<IndexMetadata> get(String name)
     {
-        return indexes.values().stream().filter(def -> def.name.equals(name)).findFirst();
+        return Optional.ofNullable(indexesByName.get(name));
     }
 
     /**
@@ -92,7 +94,30 @@ public class Indexes implements Iterable<IndexMetadata>
      */
     public boolean has(String name)
     {
-        return get(name).isPresent();
+        return indexesByName.containsKey(name);
+    }
+
+    /**
+     * Get the index with the specified id
+     *
+     * @param name a UUID which identifies an index
+     * @return an empty {@link Optional} if no index with the specified id is found; a non-empty optional of
+     *         {@link IndexMetadata} otherwise
+     */
+
+    public Optional<IndexMetadata> get(UUID id)
+    {
+        return Optional.ofNullable(indexesById.get(id));
+    }
+
+    /**
+     * Answer true if contains an index with the specified id.
+     * @param name a UUID which identifies an index.
+     * @return true if an index with the specified id is found; false otherwise
+     */
+    public boolean has(UUID id)
+    {
+        return indexesById.containsKey(id);
     }
 
     /**
@@ -148,19 +173,19 @@ public class Indexes implements Iterable<IndexMetadata>
     @Override
     public boolean equals(Object o)
     {
-        return this == o || (o instanceof Indexes && indexes.equals(((Indexes) o).indexes));
+        return this == o || (o instanceof Indexes && indexesByName.equals(((Indexes) o).indexesByName));
     }
 
     @Override
     public int hashCode()
     {
-        return indexes.hashCode();
+        return indexesByName.hashCode();
     }
 
     @Override
     public String toString()
     {
-        return indexes.values().toString();
+        return indexesByName.values().toString();
     }
 
     public static String getAvailableIndexName(String ksName, String cfName, ColumnIdentifier columnName)
@@ -179,7 +204,8 @@ public class Indexes implements Iterable<IndexMetadata>
 
     public static final class Builder
     {
-        final ImmutableMap.Builder<String, IndexMetadata> indexes = new ImmutableMap.Builder<>();
+        final ImmutableMap.Builder<String, IndexMetadata> indexesByName = new ImmutableMap.Builder<>();
+        final ImmutableMap.Builder<UUID, IndexMetadata> indexesById = new ImmutableMap.Builder<>();
         final ImmutableMultimap.Builder<ColumnIdentifier, IndexMetadata> indexesByColumn = new ImmutableMultimap.Builder<>();
 
         private Builder()
@@ -193,7 +219,8 @@ public class Indexes implements Iterable<IndexMetadata>
 
         public Builder add(IndexMetadata index)
         {
-            indexes.put(index.name, index);
+            indexesByName.put(index.name, index);
+            indexesById.put(index.id, index);
             // All indexes are column indexes at the moment
             if (index.isColumnIndex())
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/schema/UnknownIndexException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/UnknownIndexException.java b/src/java/org/apache/cassandra/schema/UnknownIndexException.java
new file mode 100644
index 0000000..5daf631
--- /dev/null
+++ b/src/java/org/apache/cassandra/schema/UnknownIndexException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.schema;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.cassandra.config.CFMetaData;
+
+/**
+ * Exception thrown when we read an index id from a serialized ReadCommand and no corresponding IndexMetadata
+ * can be found in the CFMetaData#indexes collection. Note that this is an internal exception and is not meant
+ * to be user facing, the node reading the ReadCommand should proceed as if no index id were present.
+ */
+public class UnknownIndexException extends IOException
+{
+    public final UUID indexId;
+    public UnknownIndexException(CFMetaData metadata, UUID id)
+    {
+        super(String.format("Unknown index %s for table %s.%s", id.toString(), metadata.ksName, metadata.cfName));
+        indexId = id;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 59f1c1c..e3b884e 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1717,7 +1717,7 @@ public class StorageProxy implements StorageProxyMBean
     private static float estimateResultsPerRange(PartitionRangeReadCommand command, Keyspace keyspace)
     {
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().cfId);
-        Index index = cfs.indexManager.getBestIndexFor(command);
+        Index index = command.getIndex(cfs);
         float maxExpectedResults = index == null
                                  ? command.limits().estimateTotalResults(cfs)
                                  : index.getEstimatedResultRows();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
index 2e57a8b..87eb018 100644
--- a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
@@ -17,15 +17,17 @@
  */
 package org.apache.cassandra.service.pager;
 
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.dht.*;
-import org.apache.cassandra.exceptions.RequestExecutionException;
+import java.util.Optional;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+
 /**
  * Pages a RangeSliceCommand whose predicate is a slice query.
  *
@@ -89,7 +91,9 @@ public class RangeSliceQueryPager extends AbstractQueryPager
             }
         }
 
-        return new PartitionRangeReadCommand(command.metadata(), command.nowInSec(), command.columnFilter(), command.rowFilter(), limits, pageRange);
+        // it won't hurt for the next page command to query the index manager
+        // again to check for an applicable index, so don't supply one here
+        return new PartitionRangeReadCommand(command.metadata(), command.nowInSec(), command.columnFilter(), command.rowFilter(), limits, pageRange, Optional.empty());
     }
 
     protected void recordLast(DecoratedKey key, Row last)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e097efc5/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 038384e..9cd1653 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -30,12 +30,9 @@ import java.util.zip.Inflater;
 import com.google.common.base.Joiner;
 import com.google.common.collect.*;
 import com.google.common.primitives.Longs;
-
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.filter.ColumnFilter;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.cql3.QueryOptions;
@@ -1520,7 +1517,8 @@ public class CassandraServer implements Cassandra.Iface
                                                                               columns,
                                                                               ThriftConversion.rowFilterFromThrift(metadata, range.row_filter),
                                                                               limits,
-                                                                              new DataRange(bounds, filter));
+                                                                              new DataRange(bounds, filter),
+                                                                              Optional.empty());
                 try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel))
                 {
                     assert results != null;
@@ -1613,7 +1611,8 @@ public class CassandraServer implements Cassandra.Iface
                                                                               ColumnFilter.all(metadata),
                                                                               RowFilter.NONE,
                                                                               limits,
-                                                                              new DataRange(bounds, filter).forPaging(bounds, metadata.comparator, pageFrom, true));
+                                                                              new DataRange(bounds, filter).forPaging(bounds, metadata.comparator, pageFrom, true),
+                                                                              Optional.empty());
                 try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel))
                 {
                     return thriftifyKeySlices(results, new ColumnParent(column_family), limits.perPartitionCount());
@@ -1704,7 +1703,8 @@ public class CassandraServer implements Cassandra.Iface
                                                                           columns,
                                                                           ThriftConversion.rowFilterFromThrift(metadata, index_clause.expressions),
                                                                           limits,
-                                                                          new DataRange(bounds, filter));
+                                                                          new DataRange(bounds, filter),
+                                                                          Optional.empty());
             try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel))
             {
                 return thriftifyKeySlices(results, column_parent, limits.perPartitionCount());