You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2017/08/30 16:38:02 UTC

[02/10] cassandra git commit: Fix race condition in read command serialization

Fix race condition in read command serialization

patch by Aleksey Yeschenko; reviewed by Sam Tunnicliffe for
CASSANDRA-13363


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7f297bcf
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7f297bcf
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7f297bcf

Branch: refs/heads/cassandra-3.11
Commit: 7f297bcf8aced983cbc9c4103d0ebefc1789f0dd
Parents: d03c046
Author: Aleksey Yeschenko <al...@yeschenko.com>
Authored: Mon Aug 14 16:43:06 2017 +0100
Committer: Aleksey Yeschenko <al...@yeschenko.com>
Committed: Wed Aug 30 16:16:46 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +-
 .../cql3/statements/SelectStatement.java        |  16 +-
 .../db/AbstractReadCommandBuilder.java          |   2 +-
 .../cassandra/db/PartitionRangeReadCommand.java | 133 +++++++++++---
 .../org/apache/cassandra/db/ReadCommand.java    | 149 ++++++++-------
 .../db/SinglePartitionReadCommand.java          | 180 ++++++++++++++++---
 .../cassandra/index/SecondaryIndexManager.java  |   9 +-
 .../internal/composites/CompositesSearcher.java |   6 +-
 .../index/internal/keys/KeysSearcher.java       |   3 +-
 .../cassandra/service/AbstractReadExecutor.java |   4 +-
 .../service/pager/PartitionRangeQueryPager.java |   8 +-
 .../cassandra/thrift/CassandraServer.java       |  69 ++++---
 test/unit/org/apache/cassandra/Util.java        |  26 +--
 .../apache/cassandra/db/SecondaryIndexTest.java |  10 +-
 .../db/SinglePartitionSliceCommandTest.java     |  45 ++---
 .../cassandra/io/sstable/SSTableReaderTest.java |   2 +-
 16 files changed, 427 insertions(+), 238 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 452dc9b..aca9e1f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 3.0.15
- * enable segement creation before recovering commitlogs (CASSANDRA-13587)
+ * Fix race condition in read command serialization (CASSANDRA-13363)
+ * Enable segement creation before recovering commitlogs (CASSANDRA-13587)
  * Fix AssertionError in short read protection (CASSANDRA-13747)
  * Don't skip corrupted sstables on startup (CASSANDRA-13620)
  * Fix the merging of cells with different user type versions (CASSANDRA-13776)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index bd377f4..3882a23 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -540,18 +540,10 @@ public class SelectStatement implements CQLStatement
         if (keyBounds == null)
             return ReadQuery.EMPTY;
 
-        PartitionRangeReadCommand command = new PartitionRangeReadCommand(cfm,
-                                                                          nowInSec,
-                                                                          queriedColumns,
-                                                                          rowFilter,
-                                                                          limit,
-                                                                          new DataRange(keyBounds, clusteringIndexFilter),
-                                                                          Optional.empty());
-        // If there's a secondary index that the command can use, have it validate
-        // the request parameters. Note that as a side effect, if a viable Index is
-        // identified by the CFS's index manager, it will be cached in the command
-        // and serialized during distribution to replicas in order to avoid performing
-        // further lookups.
+        PartitionRangeReadCommand command =
+            PartitionRangeReadCommand.create(false, cfm, nowInSec, queriedColumns, rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter));
+
+        // If there's a secondary index that the command can use, have it validate the request parameters.
         command.maybeValidateIndex();
 
         return command;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
index afbab74..d219816 100644
--- a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
+++ b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
@@ -336,7 +336,7 @@ public abstract class AbstractReadCommandBuilder
             else
                 bounds = new ExcludingBounds<>(start, end);
 
-            return new PartitionRangeReadCommand(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter()), Optional.empty());
+            return PartitionRangeReadCommand.create(false, cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter()));
         }
 
         static DecoratedKey makeKey(CFMetaData metadata, Object... partitionKey)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index fb2dd0d..9e557e0 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -20,8 +20,8 @@ package org.apache.cassandra.db;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.config.CFMetaData;
@@ -59,31 +59,39 @@ public class PartitionRangeReadCommand extends ReadCommand
     private final DataRange dataRange;
     private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
 
-    public PartitionRangeReadCommand(boolean isDigest,
-                                     int digestVersion,
-                                     boolean isForThrift,
-                                     CFMetaData metadata,
-                                     int nowInSec,
-                                     ColumnFilter columnFilter,
-                                     RowFilter rowFilter,
-                                     DataLimits limits,
-                                     DataRange dataRange,
-                                     Optional<IndexMetadata> index)
+    private PartitionRangeReadCommand(boolean isDigest,
+                                      int digestVersion,
+                                      boolean isForThrift,
+                                      CFMetaData metadata,
+                                      int nowInSec,
+                                      ColumnFilter columnFilter,
+                                      RowFilter rowFilter,
+                                      DataLimits limits,
+                                      DataRange dataRange,
+                                      IndexMetadata index)
     {
-        super(Kind.PARTITION_RANGE, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
+        super(Kind.PARTITION_RANGE, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index);
         this.dataRange = dataRange;
-        this.index = index;
     }
 
-    public PartitionRangeReadCommand(CFMetaData metadata,
-                                     int nowInSec,
-                                     ColumnFilter columnFilter,
-                                     RowFilter rowFilter,
-                                     DataLimits limits,
-                                     DataRange dataRange,
-                                     Optional<IndexMetadata> index)
+    public static PartitionRangeReadCommand create(boolean isForThrift,
+                                                   CFMetaData metadata,
+                                                   int nowInSec,
+                                                   ColumnFilter columnFilter,
+                                                   RowFilter rowFilter,
+                                                   DataLimits limits,
+                                                   DataRange dataRange)
     {
-        this(false, 0, false, metadata, nowInSec, columnFilter, rowFilter, limits, dataRange, index);
+        return new PartitionRangeReadCommand(false,
+                                             0,
+                                             isForThrift,
+                                             metadata,
+                                             nowInSec,
+                                             columnFilter,
+                                             rowFilter,
+                                             limits,
+                                             dataRange,
+                                             findIndex(metadata, rowFilter));
     }
 
     /**
@@ -96,13 +104,14 @@ public class PartitionRangeReadCommand extends ReadCommand
      */
     public static PartitionRangeReadCommand allDataRead(CFMetaData metadata, int nowInSec)
     {
-        return new PartitionRangeReadCommand(metadata,
+        return new PartitionRangeReadCommand(false, 0, false,
+                                             metadata,
                                              nowInSec,
                                              ColumnFilter.all(metadata),
                                              RowFilter.NONE,
                                              DataLimits.NONE,
                                              DataRange.allData(metadata.partitioner),
-                                             Optional.empty());
+                                             null);
     }
 
     public DataRange dataRange()
@@ -122,17 +131,72 @@ public class PartitionRangeReadCommand extends ReadCommand
 
     public PartitionRangeReadCommand forSubRange(AbstractBounds<PartitionPosition> range)
     {
-        return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange().forSubRange(range), index);
+        return new PartitionRangeReadCommand(isDigestQuery(),
+                                             digestVersion(),
+                                             isForThrift(),
+                                             metadata(),
+                                             nowInSec(),
+                                             columnFilter(),
+                                             rowFilter(),
+                                             limits(),
+                                             dataRange().forSubRange(range),
+                                             indexMetadata());
     }
 
     public PartitionRangeReadCommand copy()
     {
-        return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange(), index);
+        return new PartitionRangeReadCommand(isDigestQuery(),
+                                             digestVersion(),
+                                             isForThrift(),
+                                             metadata(),
+                                             nowInSec(),
+                                             columnFilter(),
+                                             rowFilter(),
+                                             limits(),
+                                             dataRange(),
+                                             indexMetadata());
+    }
+
+    public PartitionRangeReadCommand copyAsDigestQuery()
+    {
+        return new PartitionRangeReadCommand(true,
+                                             digestVersion(),
+                                             isForThrift(),
+                                             metadata(),
+                                             nowInSec(),
+                                             columnFilter(),
+                                             rowFilter(),
+                                             limits(),
+                                             dataRange(),
+                                             indexMetadata());
+    }
+
+    public PartitionRangeReadCommand withUpdatedDataRange(DataRange newDataRange)
+    {
+        return new PartitionRangeReadCommand(isDigestQuery(),
+                                             digestVersion(),
+                                             isForThrift(),
+                                             metadata(),
+                                             nowInSec(),
+                                             columnFilter(),
+                                             rowFilter(),
+                                             limits(),
+                                             newDataRange,
+                                             indexMetadata());
     }
 
-    public PartitionRangeReadCommand withUpdatedLimit(DataLimits newLimits)
+    public PartitionRangeReadCommand withUpdatedLimitsAndDataRange(DataLimits newLimits, DataRange newDataRange)
     {
-        return new PartitionRangeReadCommand(metadata(), nowInSec(), columnFilter(), rowFilter(), newLimits, dataRange(), index);
+        return new PartitionRangeReadCommand(isDigestQuery(),
+                                             digestVersion(),
+                                             isForThrift(),
+                                             metadata(),
+                                             nowInSec(),
+                                             columnFilter(),
+                                             rowFilter(),
+                                             newLimits,
+                                             newDataRange,
+                                             indexMetadata());
     }
 
     public long getTimeout()
@@ -173,7 +237,8 @@ public class PartitionRangeReadCommand extends ReadCommand
         metric.rangeLatency.addNano(latencyNanos);
     }
 
-    protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadOrderGroup orderGroup)
+    @VisibleForTesting
+    public UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadOrderGroup orderGroup)
     {
         ColumnFamilyStore.ViewFragment view = cfs.select(View.selectLive(dataRange().keyRange()));
         Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), dataRange().keyRange().getString(metadata().getKeyValidator()));
@@ -337,7 +402,17 @@ public class PartitionRangeReadCommand extends ReadCommand
 
     private static class Deserializer extends SelectionDeserializer
     {
-        public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index)
+        public ReadCommand deserialize(DataInputPlus in,
+                                       int version,
+                                       boolean isDigest,
+                                       int digestVersion,
+                                       boolean isForThrift,
+                                       CFMetaData metadata,
+                                       int nowInSec,
+                                       ColumnFilter columnFilter,
+                                       RowFilter rowFilter,
+                                       DataLimits limits,
+                                       IndexMetadata index)
         throws IOException
         {
             DataRange range = DataRange.serializer.deserialize(in, version, metadata);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 76180cc..66985b6 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -22,6 +22,8 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.function.Predicate;
 
+import javax.annotation.Nullable;
+
 import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -106,24 +108,27 @@ public abstract class ReadCommand implements ReadQuery
     private final RowFilter rowFilter;
     private final DataLimits limits;
 
-    // SecondaryIndexManager will attempt to provide the most selective of any available indexes
-    // during execution. Here we also store an the results of that lookup to repeating it over
-    // the lifetime of the command.
-    protected Optional<IndexMetadata> index = Optional.empty();
-
-    // Flag to indicate whether the index manager has been queried to select an index for this
-    // command. This is necessary as the result of that lookup may be null, in which case we
-    // still don't want to repeat it.
-    private boolean indexManagerQueried = false;
-
-    private boolean isDigestQuery;
+    private final boolean isDigestQuery;
     // if a digest query, the version for which the digest is expected. Ignored if not a digest.
     private int digestVersion;
     private final boolean isForThrift;
 
+    @Nullable
+    private final IndexMetadata index;
+
     protected static abstract class SelectionDeserializer
     {
-        public abstract ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index) throws IOException;
+        public abstract ReadCommand deserialize(DataInputPlus in,
+                                                int version,
+                                                boolean isDigest,
+                                                int digestVersion,
+                                                boolean isForThrift,
+                                                CFMetaData metadata,
+                                                int nowInSec,
+                                                ColumnFilter columnFilter,
+                                                RowFilter rowFilter,
+                                                DataLimits limits,
+                                                IndexMetadata index) throws IOException;
     }
 
     protected enum Kind
@@ -147,7 +152,8 @@ public abstract class ReadCommand implements ReadQuery
                           int nowInSec,
                           ColumnFilter columnFilter,
                           RowFilter rowFilter,
-                          DataLimits limits)
+                          DataLimits limits,
+                          IndexMetadata index)
     {
         this.kind = kind;
         this.isDigestQuery = isDigestQuery;
@@ -158,6 +164,7 @@ public abstract class ReadCommand implements ReadQuery
         this.columnFilter = columnFilter;
         this.rowFilter = rowFilter;
         this.limits = limits;
+        this.index = index;
     }
 
     protected abstract void serializeSelection(DataOutputPlus out, int version) throws IOException;
@@ -253,18 +260,6 @@ public abstract class ReadCommand implements ReadQuery
     }
 
     /**
-     * Sets whether this command should be a digest one or not.
-     *
-     * @param isDigestQuery whether the command should be set as a digest one or not.
-     * @return this read command.
-     */
-    public ReadCommand setIsDigestQuery(boolean isDigestQuery)
-    {
-        this.isDigestQuery = isDigestQuery;
-        return this;
-    }
-
-    /**
      * Sets the digest version, for when digest for that command is requested.
      * <p>
      * Note that we allow setting this independently of setting the command as a digest query as
@@ -291,6 +286,30 @@ public abstract class ReadCommand implements ReadQuery
     }
 
     /**
+     * Index (metadata) chosen for this query. Can be null.
+     *
+     * @return index (metadata) chosen for this query
+     */
+    @Nullable
+    public IndexMetadata indexMetadata()
+    {
+        return index;
+    }
+
+    /**
+     *  Index instance chosen for this query. Can be null.
+     *
+     * @return Index instance chosen for this query. Can be null.
+     */
+    @Nullable
+    public Index index()
+    {
+        return null == index
+             ? null
+             : Keyspace.openAndGetStore(metadata).indexManager.getIndex(index);
+    }
+
+    /**
      * The clustering index filter this command to use for the provided key.
      * <p>
      * Note that that method should only be called on a key actually queried by this command
@@ -310,6 +329,11 @@ public abstract class ReadCommand implements ReadQuery
      */
     public abstract ReadCommand copy();
 
+    /**
+     * Returns a copy of this command with isDigestQuery set to true.
+     */
+    public abstract ReadCommand copyAsDigestQuery();
+
     protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadOrderGroup orderGroup);
 
     protected abstract int oldestUnrepairedTombstone();
@@ -321,35 +345,32 @@ public abstract class ReadCommand implements ReadQuery
              : ReadResponse.createDataResponse(iterator, this);
     }
 
-    public long indexSerializedSize(int version)
+    long indexSerializedSize(int version)
     {
-        if (index.isPresent())
-            return IndexMetadata.serializer.serializedSize(index.get(), version);
-        else
-            return 0;
+        return null != index
+             ? IndexMetadata.serializer.serializedSize(index, version)
+             : 0;
     }
 
     public Index getIndex(ColumnFamilyStore cfs)
     {
-        // if we've already consulted the index manager, and it returned a valid index
-        // the result should be cached here.
-        if(index.isPresent())
-            return cfs.indexManager.getIndex(index.get());
-
-        // if no cached index is present, but we've already consulted the index manager
-        // then no registered index is suitable for this command, so just return null.
-        if (indexManagerQueried)
+        return null != index
+             ? cfs.indexManager.getIndex(index)
+             : null;
+    }
+
+    static IndexMetadata findIndex(CFMetaData table, RowFilter rowFilter)
+    {
+        if (table.getIndexes().isEmpty() || rowFilter.isEmpty())
             return null;
 
-        // do the lookup, set the flag to indicate so and cache the result if not null
-        Index selected = cfs.indexManager.getBestIndexFor(this);
-        indexManagerQueried = true;
+        ColumnFamilyStore cfs = Keyspace.openAndGetStore(table);
 
-        if (selected == null)
-            return null;
+        Index index = cfs.indexManager.getBestIndexFor(rowFilter);
 
-        index = Optional.of(selected.getIndexMetadata());
-        return selected;
+        return null != index
+             ? index.getIndexMetadata()
+             : null;
     }
 
     /**
@@ -602,7 +623,7 @@ public abstract class ReadCommand implements ReadQuery
             assert version >= MessagingService.VERSION_30;
 
             out.writeByte(command.kind.ordinal());
-            out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()) | indexFlag(command.index.isPresent()));
+            out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()) | indexFlag(null != command.index));
             if (command.isDigestQuery())
                 out.writeUnsignedVInt(command.digestVersion());
             CFMetaData.serializer.serialize(command.metadata(), out, version);
@@ -610,8 +631,8 @@ public abstract class ReadCommand implements ReadQuery
             ColumnFilter.serializer.serialize(command.columnFilter(), out, version);
             RowFilter.serializer.serialize(command.rowFilter(), out, version);
             DataLimits.serializer.serialize(command.limits(), out, version);
-            if (command.index.isPresent())
-                IndexMetadata.serializer.serialize(command.index.get(), out, version);
+            if (null != command.index)
+                IndexMetadata.serializer.serialize(command.index, out, version);
 
             command.serializeSelection(out, version);
         }
@@ -631,18 +652,16 @@ public abstract class ReadCommand implements ReadQuery
             ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata);
             RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata);
             DataLimits limits = DataLimits.serializer.deserialize(in, version);
-            Optional<IndexMetadata> index = hasIndex
-                                            ? deserializeIndexMetadata(in, version, metadata)
-                                            : Optional.empty();
+            IndexMetadata index = hasIndex ? deserializeIndexMetadata(in, version, metadata) : null;
 
             return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index);
         }
 
-        private Optional<IndexMetadata> deserializeIndexMetadata(DataInputPlus in, int version, CFMetaData cfm) throws IOException
+        private IndexMetadata deserializeIndexMetadata(DataInputPlus in, int version, CFMetaData cfm) throws IOException
         {
             try
             {
-                return Optional.of(IndexMetadata.serializer.deserialize(in, version, cfm));
+                return IndexMetadata.serializer.deserialize(in, version, cfm);
             }
             catch (UnknownIndexException e)
             {
@@ -652,7 +671,7 @@ public abstract class ReadCommand implements ReadQuery
                                                "index. Please wait for schema agreement after index creation.",
                                                cfm.ksName, cfm.cfName, e.indexId.toString());
                 logger.info(message);
-                return Optional.empty();
+                return null;
             }
         }
 
@@ -830,7 +849,7 @@ public abstract class ReadCommand implements ReadQuery
             else
                 limits = DataLimits.cqlLimits(maxResults);
 
-            return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter), Optional.empty());
+            return PartitionRangeReadCommand.create(true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter));
         }
 
         static void serializeRowFilter(DataOutputPlus out, RowFilter rowFilter) throws IOException
@@ -939,9 +958,8 @@ public abstract class ReadCommand implements ReadQuery
             ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) command.dataRange().clusteringIndexFilter;
             ClusteringIndexSliceFilter sliceFilter = LegacyReadCommandSerializer.convertNamesFilterToSliceFilter(filter, metadata);
             DataRange newRange = new DataRange(command.dataRange().keyRange(), sliceFilter);
-            return new PartitionRangeReadCommand(
-                    command.isDigestQuery(), command.digestVersion(), command.isForThrift(), metadata, command.nowInSec(),
-                    command.columnFilter(), command.rowFilter(), command.limits(), newRange, Optional.empty());
+
+            return command.withUpdatedDataRange(newRange);
         }
 
         static ColumnFilter getColumnSelectionForSlice(boolean selectsStatics, int compositesToGroup, CFMetaData metadata)
@@ -1096,7 +1114,7 @@ public abstract class ReadCommand implements ReadQuery
                 // missing without any problems, so we can safely always set "inclusive" to false in the data range
                 dataRange = dataRange.forPaging(keyRange, metadata.comparator, startBound.getAsClustering(metadata), false);
             }
-            return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, dataRange, Optional.empty());
+            return PartitionRangeReadCommand.create(true, metadata, nowInSec, selection, rowFilter, limits, dataRange);
         }
 
         public long serializedSize(ReadCommand command, int version)
@@ -1290,10 +1308,7 @@ public abstract class ReadCommand implements ReadQuery
         {
             Pair<ColumnFilter, ClusteringIndexNamesFilter> selectionAndFilter = deserializeNamesSelectionAndFilter(in, metadata);
 
-            // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift
-            return new SinglePartitionReadCommand(
-                    isDigest, version, true, metadata, nowInSeconds, selectionAndFilter.left, RowFilter.NONE, DataLimits.NONE,
-                    key, selectionAndFilter.right);
+            return SinglePartitionReadCommand.legacyNamesCommand(isDigest, version, metadata, nowInSeconds, selectionAndFilter.left, key, selectionAndFilter.right);
         }
 
         static Pair<ColumnFilter, ClusteringIndexNamesFilter> deserializeNamesSelectionAndFilter(DataInputPlus in, CFMetaData metadata) throws IOException
@@ -1422,8 +1437,7 @@ public abstract class ReadCommand implements ReadQuery
             else
                 limits = DataLimits.cqlLimits(count);
 
-            // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift
-            return new SinglePartitionReadCommand(isDigest, version, true, metadata, nowInSeconds, columnFilter, RowFilter.NONE, limits, key, filter);
+            return SinglePartitionReadCommand.legacySliceCommand(isDigest, version, metadata, nowInSeconds, columnFilter, limits, key, filter);
         }
 
         private long serializedSliceCommandSize(SinglePartitionReadCommand command)
@@ -1605,9 +1619,8 @@ public abstract class ReadCommand implements ReadQuery
 
             ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter)command.clusteringIndexFilter();
             ClusteringIndexSliceFilter sliceFilter = convertNamesFilterToSliceFilter(filter, metadata);
-            return new SinglePartitionReadCommand(
-                    command.isDigestQuery(), command.digestVersion(), command.isForThrift(), metadata, command.nowInSec(),
-                    command.columnFilter(), command.rowFilter(), command.limits(), command.partitionKey(), sliceFilter);
+
+            return command.withUpdatedClusteringIndexFilter(sliceFilter);
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 686ec35..00464ca 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -70,18 +70,19 @@ public class SinglePartitionReadCommand extends ReadCommand
 
     private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
 
-    public SinglePartitionReadCommand(boolean isDigest,
-                                      int digestVersion,
-                                      boolean isForThrift,
-                                      CFMetaData metadata,
-                                      int nowInSec,
-                                      ColumnFilter columnFilter,
-                                      RowFilter rowFilter,
-                                      DataLimits limits,
-                                      DecoratedKey partitionKey,
-                                      ClusteringIndexFilter clusteringIndexFilter)
+    private SinglePartitionReadCommand(boolean isDigest,
+                                       int digestVersion,
+                                       boolean isForThrift,
+                                       CFMetaData metadata,
+                                       int nowInSec,
+                                       ColumnFilter columnFilter,
+                                       RowFilter rowFilter,
+                                       DataLimits limits,
+                                       DecoratedKey partitionKey,
+                                       ClusteringIndexFilter clusteringIndexFilter,
+                                       IndexMetadata index)
     {
-        super(Kind.SINGLE_PARTITION, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
+        super(Kind.SINGLE_PARTITION, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index);
         assert partitionKey.getPartitioner() == metadata.partitioner;
         this.partitionKey = partitionKey;
         this.clusteringIndexFilter = clusteringIndexFilter;
@@ -90,6 +91,44 @@ public class SinglePartitionReadCommand extends ReadCommand
     /**
      * Creates a new read command on a single partition.
      *
+     * @param isForThrift whether the query is for thrift or not.
+     * @param metadata the table to query.
+     * @param nowInSec the time in seconds to use are "now" for this query.
+     * @param columnFilter the column filter to use for the query.
+     * @param rowFilter the row filter to use for the query.
+     * @param limits the limits to use for the query.
+     * @param partitionKey the partition key for the partition to query.
+     * @param clusteringIndexFilter the clustering index filter to use for the query.
+     * @param indexMetadata explicitly specified index to use for the query
+     *
+     * @return a newly created read command.
+     */
+    public static SinglePartitionReadCommand create(boolean isForThrift,
+                                                    CFMetaData metadata,
+                                                    int nowInSec,
+                                                    ColumnFilter columnFilter,
+                                                    RowFilter rowFilter,
+                                                    DataLimits limits,
+                                                    DecoratedKey partitionKey,
+                                                    ClusteringIndexFilter clusteringIndexFilter,
+                                                    IndexMetadata indexMetadata)
+    {
+        return new SinglePartitionReadCommand(false,
+                                              0,
+                                              isForThrift,
+                                              metadata,
+                                              nowInSec,
+                                              columnFilter,
+                                              rowFilter,
+                                              limits,
+                                              partitionKey,
+                                              clusteringIndexFilter,
+                                              indexMetadata);
+    }
+
+    /**
+     * Creates a new read command on a single partition.
+     *
      * @param metadata the table to query.
      * @param nowInSec the time in seconds to use are "now" for this query.
      * @param columnFilter the column filter to use for the query.
@@ -112,7 +151,7 @@ public class SinglePartitionReadCommand extends ReadCommand
     }
 
     /**
-     * Creates a new read command on a single partition for thrift.
+     * Creates a new read command on a single partition.
      *
      * @param isForThrift whether the query is for thrift or not.
      * @param metadata the table to query.
@@ -134,7 +173,15 @@ public class SinglePartitionReadCommand extends ReadCommand
                                                     DecoratedKey partitionKey,
                                                     ClusteringIndexFilter clusteringIndexFilter)
     {
-        return new SinglePartitionReadCommand(false, 0, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
+        return create(isForThrift,
+                      metadata,
+                      nowInSec,
+                      columnFilter,
+                      rowFilter,
+                      limits,
+                      partitionKey,
+                      clusteringIndexFilter,
+                      findIndex(metadata, rowFilter));
     }
 
     /**
@@ -148,7 +195,11 @@ public class SinglePartitionReadCommand extends ReadCommand
      *
      * @return a newly created read command. The returned command will use no row filter and have no limits.
      */
-    public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, ColumnFilter columnFilter, ClusteringIndexFilter filter)
+    public static SinglePartitionReadCommand create(CFMetaData metadata,
+                                                    int nowInSec,
+                                                    DecoratedKey key,
+                                                    ColumnFilter columnFilter,
+                                                    ClusteringIndexFilter filter)
     {
         return create(metadata, nowInSec, columnFilter, RowFilter.NONE, DataLimits.NONE, key, filter);
     }
@@ -164,7 +215,7 @@ public class SinglePartitionReadCommand extends ReadCommand
      */
     public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, DecoratedKey key)
     {
-        return SinglePartitionReadCommand.create(metadata, nowInSec, key, Slices.ALL);
+        return create(metadata, nowInSec, key, Slices.ALL);
     }
 
     /**
@@ -178,7 +229,7 @@ public class SinglePartitionReadCommand extends ReadCommand
      */
     public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, ByteBuffer key)
     {
-        return SinglePartitionReadCommand.create(metadata, nowInSec, metadata.decorateKey(key), Slices.ALL);
+        return create(metadata, nowInSec, metadata.decorateKey(key), Slices.ALL);
     }
 
     /**
@@ -211,7 +262,7 @@ public class SinglePartitionReadCommand extends ReadCommand
     public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Slices slices)
     {
         ClusteringIndexSliceFilter filter = new ClusteringIndexSliceFilter(slices, false);
-        return SinglePartitionReadCommand.create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter);
+        return create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter);
     }
 
     /**
@@ -244,7 +295,7 @@ public class SinglePartitionReadCommand extends ReadCommand
     public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, NavigableSet<Clustering> names)
     {
         ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(names, false);
-        return SinglePartitionReadCommand.create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter);
+        return create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter);
     }
 
     /**
@@ -265,7 +316,82 @@ public class SinglePartitionReadCommand extends ReadCommand
 
     public SinglePartitionReadCommand copy()
     {
-        return new SinglePartitionReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter());
+        return new SinglePartitionReadCommand(isDigestQuery(),
+                                              digestVersion(),
+                                              isForThrift(),
+                                              metadata(),
+                                              nowInSec(),
+                                              columnFilter(),
+                                              rowFilter(),
+                                              limits(),
+                                              partitionKey(),
+                                              clusteringIndexFilter(),
+                                              indexMetadata());
+    }
+
+    public SinglePartitionReadCommand copyAsDigestQuery()
+    {
+        return new SinglePartitionReadCommand(true,
+                                              digestVersion(),
+                                              isForThrift(),
+                                              metadata(),
+                                              nowInSec(),
+                                              columnFilter(),
+                                              rowFilter(),
+                                              limits(),
+                                              partitionKey(),
+                                              clusteringIndexFilter(),
+                                              indexMetadata());
+    }
+
+    public SinglePartitionReadCommand withUpdatedClusteringIndexFilter(ClusteringIndexFilter filter)
+    {
+        return new SinglePartitionReadCommand(isDigestQuery(),
+                                              digestVersion(),
+                                              isForThrift(),
+                                              metadata(),
+                                              nowInSec(),
+                                              columnFilter(),
+                                              rowFilter(),
+                                              limits(),
+                                              partitionKey(),
+                                              filter,
+                                              indexMetadata());
+    }
+
+    static SinglePartitionReadCommand legacySliceCommand(boolean isDigest,
+                                                         int digestVersion,
+                                                         CFMetaData metadata,
+                                                         int nowInSec,
+                                                         ColumnFilter columnFilter,
+                                                         DataLimits limits,
+                                                         DecoratedKey partitionKey,
+                                                         ClusteringIndexSliceFilter filter)
+    {
+        // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift
+        return new SinglePartitionReadCommand(isDigest,
+                                              digestVersion,
+                                              true,
+                                              metadata,
+                                              nowInSec,
+                                              columnFilter,
+                                              RowFilter.NONE,
+                                              limits,
+                                              partitionKey,
+                                              filter,
+                                              null);
+    }
+
+    static SinglePartitionReadCommand legacyNamesCommand(boolean isDigest,
+                                                         int digestVersion,
+                                                         CFMetaData metadata,
+                                                         int nowInSec,
+                                                         ColumnFilter columnFilter,
+                                                         DecoratedKey partitionKey,
+                                                         ClusteringIndexNamesFilter filter)
+    {
+        // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift
+        return new SinglePartitionReadCommand(isDigest, digestVersion, true, metadata, nowInSec, columnFilter, RowFilter.NONE, DataLimits.NONE, partitionKey, filter,null);
     }
 
     public DecoratedKey partitionKey()
@@ -432,7 +558,7 @@ public class SinglePartitionReadCommand extends ReadCommand
                 final int rowsToCache = metadata().params.caching.rowsPerPartitionToCache();
 
                 @SuppressWarnings("resource") // we close on exception or upon closing the result of this method
-                UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(metadata(), nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, readOp);
+                UnfilteredRowIterator iter = fullPartitionRead(metadata(), nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, readOp);
                 try
                 {
                     // Use a custom iterator instead of DataLimits to avoid stopping the original iterator
@@ -1068,12 +1194,22 @@ public class SinglePartitionReadCommand extends ReadCommand
 
     private static class Deserializer extends SelectionDeserializer
     {
-        public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index)
+        public ReadCommand deserialize(DataInputPlus in,
+                                       int version,
+                                       boolean isDigest,
+                                       int digestVersion,
+                                       boolean isForThrift,
+                                       CFMetaData metadata,
+                                       int nowInSec,
+                                       ColumnFilter columnFilter,
+                                       RowFilter rowFilter,
+                                       DataLimits limits,
+                                       IndexMetadata index)
         throws IOException
         {
             DecoratedKey key = metadata.decorateKey(metadata.getKeyValidator().readValue(in, DatabaseDescriptor.getMaxValueSize()));
             ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata);
-            return new SinglePartitionReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter);
+            return new SinglePartitionReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter, index);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
index dd6dde4..5976ddf 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.statements.IndexTarget;
@@ -697,17 +698,17 @@ public class SecondaryIndexManager implements IndexRegistry
      * cached for future use when obtaining a Searcher, getting the index's underlying CFS for
      * ReadOrderGroup, or an estimate of the result size from an average index query.
      *
-     * @param command ReadCommand to be executed
+     * @param rowFilter RowFilter of the command to be executed
      * @return an Index instance, ready to use during execution of the command, or null if none
      * of the registered indexes can support the command.
      */
-    public Index getBestIndexFor(ReadCommand command)
+    public Index getBestIndexFor(RowFilter rowFilter)
     {
-        if (indexes.isEmpty() || command.rowFilter().isEmpty())
+        if (indexes.isEmpty() || rowFilter.isEmpty())
             return null;
 
         Set<Index> searchableIndexes = new HashSet<>();
-        for (RowFilter.Expression expression : command.rowFilter())
+        for (RowFilter.Expression expression : rowFilter)
         {
             if (expression.isCustom())
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
index 135839b..f8a7c66 100644
--- a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
@@ -136,13 +136,15 @@ public class CompositesSearcher extends CassandraIndexSearcher
 
                     // Query the gathered index hits. We still need to filter stale hits from the resulting query.
                     ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(clusterings.build(), false);
-                    SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(index.baseCfs.metadata,
+                    SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(isForThrift(),
+                                                                                           index.baseCfs.metadata,
                                                                                            command.nowInSec(),
                                                                                            command.columnFilter(),
                                                                                            command.rowFilter(),
                                                                                            DataLimits.NONE,
                                                                                            partitionKey,
-                                                                                           filter);
+                                                                                           filter,
+                                                                                           null);
                     @SuppressWarnings("resource") // We close right away if empty, and if it's assign to next it will be called either
                     // by the next caller of next, or through closing this iterator is this come before.
                     UnfilteredRowIterator dataIter =

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java b/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
index 189b652..c14c5a7 100644
--- a/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
@@ -99,7 +99,8 @@ public class KeysSearcher extends CassandraIndexSearcher
                                                                                            command.rowFilter(),
                                                                                            DataLimits.NONE,
                                                                                            key,
-                                                                                           command.clusteringIndexFilter(key));
+                                                                                           command.clusteringIndexFilter(key),
+                                                                                           null);
 
                     @SuppressWarnings("resource") // filterIfStale closes it's iterator if either it materialize it or if it returns null.
                                                   // Otherwise, we close right away if empty, and if it's assigned to next it will be called either

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index cae1f1a..177fdb2 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -88,7 +88,7 @@ public abstract class AbstractReadExecutor
 
     protected void makeDigestRequests(Iterable<InetAddress> endpoints)
     {
-        makeRequests(command.copy().setIsDigestQuery(true), endpoints);
+        makeRequests(command.copyAsDigestQuery(), endpoints);
     }
 
     private void makeRequests(ReadCommand readCommand, Iterable<InetAddress> endpoints)
@@ -284,7 +284,7 @@ public abstract class AbstractReadExecutor
                 // Could be waiting on the data, or on enough digests.
                 ReadCommand retryCommand = command;
                 if (handler.resolver.isDataPresent())
-                    retryCommand = command.copy().setIsDigestQuery(true);
+                    retryCommand = command.copyAsDigestQuery();
 
                 InetAddress extraReplica = Iterables.getLast(targetReplicas);
                 if (traceState != null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
index 9c216e3..ea79017 100644
--- a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
@@ -17,8 +17,6 @@
  */
 package org.apache.cassandra.service.pager;
 
-import java.util.Optional;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -27,8 +25,6 @@ import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.exceptions.RequestExecutionException;
-import org.apache.cassandra.index.Index;
-import org.apache.cassandra.schema.IndexMetadata;
 
 /**
  * Pages a PartitionRangeReadCommand.
@@ -90,9 +86,7 @@ public class PartitionRangeQueryPager extends AbstractQueryPager
             }
         }
 
-        Index index = command.getIndex(Keyspace.openAndGetStore(command.metadata()));
-        Optional<IndexMetadata> indexMetadata = index != null ? Optional.of(index.getIndexMetadata()) : Optional.empty();
-        return new PartitionRangeReadCommand(command.metadata(), command.nowInSec(), command.columnFilter(), command.rowFilter(), limits, pageRange, indexMetadata);
+        return ((PartitionRangeReadCommand) command).withUpdatedLimitsAndDataRange(limits, pageRange);
     }
 
     protected void recordLast(DecoratedKey key, Row last)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 86caac3..cb74b15 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -48,7 +48,6 @@ import org.apache.cassandra.db.view.View;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.index.Index;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.locator.DynamicEndpointSnitch;
 import org.apache.cassandra.metrics.ClientMetrics;
@@ -1520,16 +1519,16 @@ public class CassandraServer implements Cassandra.Iface
                 ColumnFilter columns = makeColumnFilter(metadata, column_parent, predicate);
                 ClusteringIndexFilter filter = toInternalFilter(metadata, column_parent, predicate);
                 DataLimits limits = getLimits(range.count, metadata.isSuper() && !column_parent.isSetSuper_column(), predicate);
-                PartitionRangeReadCommand cmd = new PartitionRangeReadCommand(false,
-                                                                              0,
-                                                                              true,
-                                                                              metadata,
-                                                                              nowInSec,
-                                                                              columns,
-                                                                              ThriftConversion.rowFilterFromThrift(metadata, range.row_filter),
-                                                                              limits,
-                                                                              new DataRange(bounds, filter),
-                                                                              Optional.empty());
+
+                PartitionRangeReadCommand cmd =
+                    PartitionRangeReadCommand.create(true,
+                                                     metadata,
+                                                     nowInSec,
+                                                     columns,
+                                                     ThriftConversion.rowFilterFromThrift(metadata, range.row_filter),
+                                                     limits,
+                                                     new DataRange(bounds, filter));
+
                 try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel))
                 {
                     assert results != null;
@@ -1614,16 +1613,16 @@ public class CassandraServer implements Cassandra.Iface
                 Clustering pageFrom = metadata.isSuper()
                                     ? new Clustering(start_column)
                                     : LegacyLayout.decodeCellName(metadata, start_column).clustering;
-                PartitionRangeReadCommand cmd = new PartitionRangeReadCommand(false,
-                                                                              0,
-                                                                              true,
-                                                                              metadata,
-                                                                              nowInSec,
-                                                                              ColumnFilter.all(metadata),
-                                                                              RowFilter.NONE,
-                                                                              limits,
-                                                                              new DataRange(bounds, filter).forPaging(bounds, metadata.comparator, pageFrom, true),
-                                                                              Optional.empty());
+
+                PartitionRangeReadCommand cmd =
+                    PartitionRangeReadCommand.create(true,
+                                                     metadata,
+                                                     nowInSec,
+                                                     ColumnFilter.all(metadata),
+                                                     RowFilter.NONE,
+                                                     limits,
+                                                     new DataRange(bounds, filter).forPaging(bounds, metadata.comparator, pageFrom, true));
+
                 try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel))
                 {
                     return thriftifyKeySlices(results, new ColumnParent(column_family), limits.perPartitionCount());
@@ -1706,21 +1705,17 @@ public class CassandraServer implements Cassandra.Iface
             ColumnFilter columns = makeColumnFilter(metadata, column_parent, column_predicate);
             ClusteringIndexFilter filter = toInternalFilter(metadata, column_parent, column_predicate);
             DataLimits limits = getLimits(index_clause.count, metadata.isSuper() && !column_parent.isSetSuper_column(), column_predicate);
-            PartitionRangeReadCommand cmd = new PartitionRangeReadCommand(false,
-                                                                          0,
-                                                                          true,
-                                                                          metadata,
-                                                                          nowInSec,
-                                                                          columns,
-                                                                          ThriftConversion.rowFilterFromThrift(metadata, index_clause.expressions),
-                                                                          limits,
-                                                                          new DataRange(bounds, filter),
-                                                                          Optional.empty());
-            // If there's a secondary index that the command can use, have it validate
-            // the request parameters. Note that as a side effect, if a viable Index is
-            // identified by the CFS's index manager, it will be cached in the command
-            // and serialized during distribution to replicas in order to avoid performing
-            // further lookups.
+
+            PartitionRangeReadCommand cmd =
+                PartitionRangeReadCommand.create(true,
+                                                 metadata,
+                                                 nowInSec,
+                                                 columns,
+                                                 ThriftConversion.rowFilterFromThrift(metadata, index_clause.expressions),
+                                                 limits,
+                                                 new DataRange(bounds, filter));
+
+            // If there's a secondary index that the command can use, have it validate the request parameters.
             cmd.maybeValidateIndex();
 
             try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel))
@@ -2533,7 +2528,7 @@ public class CassandraServer implements Cassandra.Iface
                 // We want to know if the partition exists, so just fetch a single cell.
                 ClusteringIndexSliceFilter filter = new ClusteringIndexSliceFilter(Slices.ALL, false);
                 DataLimits limits = DataLimits.thriftLimits(1, 1);
-                return new SinglePartitionReadCommand(false, 0, true, metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, limits, key, filter);
+                return SinglePartitionReadCommand.create(true, metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, limits, key, filter);
             }
 
             // Gather the clustering for the expected values and query those.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index e8b42bc..d758efe 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -629,31 +629,7 @@ public class Util
                                                              ColumnFamilyStore cfs,
                                                              ReadOrderGroup orderGroup)
     {
-        return new InternalPartitionRangeReadCommand(command).queryStorageInternal(cfs, orderGroup);
-    }
-
-    private static final class InternalPartitionRangeReadCommand extends PartitionRangeReadCommand
-    {
-
-        private InternalPartitionRangeReadCommand(PartitionRangeReadCommand original)
-        {
-            super(original.isDigestQuery(),
-                  original.digestVersion(),
-                  original.isForThrift(),
-                  original.metadata(),
-                  original.nowInSec(),
-                  original.columnFilter(),
-                  original.rowFilter(),
-                  original.limits(),
-                  original.dataRange(),
-                  Optional.empty());
-        }
-
-        private UnfilteredPartitionIterator queryStorageInternal(ColumnFamilyStore cfs,
-                                                                 ReadOrderGroup orderGroup)
-        {
-            return queryStorage(cfs, orderGroup);
-        }
+        return command.queryStorage(cfs, orderGroup);
     }
 
     public static Closeable markDirectoriesUnwriteable(ColumnFamilyStore cfs)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java b/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
index bbccc48..2457c4a 100644
--- a/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
@@ -118,7 +118,7 @@ public class SecondaryIndexTest
                                       .filterOn("birthdate", Operator.EQ, 1L)
                                       .build();
 
-        Index.Searcher searcher = cfs.indexManager.getBestIndexFor(rc).searcherFor(rc);
+        Index.Searcher searcher = rc.index().searcherFor(rc);
         try (ReadOrderGroup orderGroup = rc.startOrderGroup(); UnfilteredPartitionIterator pi = searcher.search(orderGroup))
         {
             assertTrue(pi.hasNext());
@@ -204,7 +204,7 @@ public class SecondaryIndexTest
 
         // verify that it's not being indexed under any other value either
         ReadCommand rc = Util.cmd(cfs).build();
-        assertNull(cfs.indexManager.getBestIndexFor(rc));
+        assertNull(rc.index());
 
         // resurrect w/ a newer timestamp
         new RowUpdateBuilder(cfs.metadata, 2, "k1").clustering("c").add("birthdate", 1L).build().apply();;
@@ -222,13 +222,13 @@ public class SecondaryIndexTest
         // todo - checking the # of index searchers for the command is probably not the best thing to test here
         RowUpdateBuilder.deleteRow(cfs.metadata, 3, "k1", "c").applyUnsafe();
         rc = Util.cmd(cfs).build();
-        assertNull(cfs.indexManager.getBestIndexFor(rc));
+        assertNull(rc.index());
 
         // make sure obsolete mutations don't generate an index entry
         // todo - checking the # of index searchers for the command is probably not the best thing to test here
         new RowUpdateBuilder(cfs.metadata, 3, "k1").clustering("c").add("birthdate", 1L).build().apply();;
         rc = Util.cmd(cfs).build();
-        assertNull(cfs.indexManager.getBestIndexFor(rc));
+        assertNull(rc.index());
     }
 
     @Test
@@ -504,7 +504,7 @@ public class SecondaryIndexTest
         ColumnDefinition cdef = cfs.metadata.getColumnDefinition(col);
 
         ReadCommand rc = Util.cmd(cfs).filterOn(cdef.name.toString(), Operator.EQ, ((AbstractType) cdef.cellValueType()).decompose(val)).build();
-        Index.Searcher searcher = cfs.indexManager.getBestIndexFor(rc).searcherFor(rc);
+        Index.Searcher searcher = rc.index().searcherFor(rc);
         if (count != 0)
             assertNotNull(searcher);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
index 7f59e2f..02b642e 100644
--- a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
@@ -116,13 +116,14 @@ public class SinglePartitionSliceCommandTest
         ByteBuffer zero = ByteBufferUtil.bytes(0);
         Slices slices = Slices.with(cfm.comparator, Slice.make(Slice.Bound.inclusiveStartOf(zero), Slice.Bound.inclusiveEndOf(zero)));
         ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(slices, false);
-        ReadCommand cmd = new SinglePartitionReadCommand(false, MessagingService.VERSION_30, true, cfm,
-                                                          FBUtilities.nowInSeconds(),
-                                                          columnFilter,
-                                                          RowFilter.NONE,
-                                                          DataLimits.NONE,
-                                                          key,
-                                                          sliceFilter);
+        ReadCommand cmd = SinglePartitionReadCommand.create(true,
+                                                            cfm,
+                                                            FBUtilities.nowInSeconds(),
+                                                            columnFilter,
+                                                            RowFilter.NONE,
+                                                            DataLimits.NONE,
+                                                            key,
+                                                            sliceFilter);
 
         DataOutputBuffer out = new DataOutputBuffer((int) ReadCommand.legacyReadCommandSerializer.serializedSize(cmd, MessagingService.VERSION_21));
         ReadCommand.legacyReadCommandSerializer.serialize(cmd, out, MessagingService.VERSION_21);
@@ -166,13 +167,14 @@ public class SinglePartitionSliceCommandTest
 
         ColumnFilter columnFilter = ColumnFilter.selection(PartitionColumns.of(s));
         ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.NONE, false);
-        ReadCommand cmd = new SinglePartitionReadCommand(false, MessagingService.VERSION_30, true, cfm,
-                                                         FBUtilities.nowInSeconds(),
-                                                         columnFilter,
-                                                         RowFilter.NONE,
-                                                         DataLimits.NONE,
-                                                         key,
-                                                         sliceFilter);
+        ReadCommand cmd = SinglePartitionReadCommand.create(true,
+                                                            cfm,
+                                                            FBUtilities.nowInSeconds(),
+                                                            columnFilter,
+                                                            RowFilter.NONE,
+                                                            DataLimits.NONE,
+                                                            key,
+                                                            sliceFilter);
 
         // check raw iterator for static cell
         try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator pi = cmd.executeLocally(orderGroup))
@@ -224,13 +226,14 @@ public class SinglePartitionSliceCommandTest
         ColumnFilter columnFilter = ColumnFilter.selection(PartitionColumns.of(s));
         Slice slice = Slice.make(Slice.Bound.BOTTOM, Slice.Bound.inclusiveEndOf(ByteBufferUtil.bytes("i1")));
         ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.with(cfm.comparator, slice), false);
-        ReadCommand cmd = new SinglePartitionReadCommand(false, MessagingService.VERSION_30, true, cfm,
-                                                         FBUtilities.nowInSeconds(),
-                                                         columnFilter,
-                                                         RowFilter.NONE,
-                                                         DataLimits.NONE,
-                                                         key,
-                                                         sliceFilter);
+        ReadCommand cmd = SinglePartitionReadCommand.create(true,
+                                                            cfm,
+                                                            FBUtilities.nowInSeconds(),
+                                                            columnFilter,
+                                                            RowFilter.NONE,
+                                                            DataLimits.NONE,
+                                                            key,
+                                                            sliceFilter);
 
         String ret = cmd.toCQLString();
         Assert.assertNotNull(ret);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index 640b68b..c2598ec 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -604,7 +604,7 @@ public class SSTableReaderTest
                                              .columns("birthdate")
                                              .filterOn("birthdate", Operator.EQ, 1L)
                                              .build();
-        Index.Searcher searcher = indexedCFS.indexManager.getBestIndexFor(rc).searcherFor(rc);
+        Index.Searcher searcher = rc.index().searcherFor(rc);
         assertNotNull(searcher);
         try (ReadOrderGroup orderGroup = ReadOrderGroup.forCommand(rc))
         {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org