You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/11/02 15:17:08 UTC

[2/3] cassandra git commit: Don't use the timestamp ordered path for name queries with counters/collections

Don't use the timestamp ordered path for name queries with counters/collections

patch by slebresne; reviewed by iamaleksey for CASSANDRA-10572

Name queries usually query sstable sequentially in recency order in the
hope of only needing the more recent sstable(s). We cannot ever save
sstable that way for counters and collections however.

The patch makes name queries with either counters or collections use the
more generic path of looking at all sstables simultaneously. For that,
the patch remove the sublcasses of SinglePartitionReadCommand: those
classes were mostly justified by having name and slice queries using
different path, but it's not fully the case anymore. Both path
(timestamp ordered and all sstable simultaneously) are moved to
SinglePartitionReadCommand and used when relevant.

This actually fix a bug with counters since those were previously using
the timestamp ordered path and were potentially skipping older sstable
which is incorrect.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4beb54da
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4beb54da
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4beb54da

Branch: refs/heads/trunk
Commit: 4beb54da50752fdd5573d8d1d2b0da108eaded6c
Parents: f27cc7e
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Fri Oct 23 11:25:49 2015 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Mon Nov 2 15:11:08 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cql3/statements/CQL3CasRequest.java         |   2 +-
 .../cql3/statements/ModificationStatement.java  |   4 +-
 .../cql3/statements/SelectStatement.java        |   2 +-
 .../db/AbstractReadCommandBuilder.java          |   2 +-
 .../org/apache/cassandra/db/ReadCommand.java    |  36 +-
 .../db/SinglePartitionNamesCommand.java         | 276 ----------
 .../db/SinglePartitionReadCommand.java          | 504 +++++++++++++++++--
 .../db/SinglePartitionSliceCommand.java         | 265 ----------
 .../db/partitions/PartitionIterators.java       |   2 +-
 .../UnfilteredPartitionIterators.java           |   2 +-
 .../internal/composites/CompositesSearcher.java |  14 +-
 .../cassandra/schema/LegacySchemaMigrator.java  |   2 +-
 .../apache/cassandra/schema/SchemaKeyspace.java |   4 +-
 .../apache/cassandra/service/DataResolver.java  |  16 +-
 .../apache/cassandra/service/StorageProxy.java  |   6 +-
 .../service/pager/SinglePartitionPager.java     |   4 +-
 .../cassandra/thrift/CassandraServer.java       |  14 +-
 .../org/apache/cassandra/db/KeyspaceTest.java   |  30 +-
 .../apache/cassandra/db/RangeTombstoneTest.java |   2 +-
 .../db/SinglePartitionSliceCommandTest.java     |  16 +-
 .../cassandra/service/QueryPagerTest.java       |  12 +-
 22 files changed, 541 insertions(+), 675 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4beb54da/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 72f48f5..6d5d49b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Don't use 'names query' read path for counters (CASSANDRA-10572)
  * Fix backward compatibility for counters (CASSANDRA-10470)
  * Remove memory_allocator paramter from cassandra.yaml (CASSANDRA-10581)
  * Execute the metadata reload task of all registered indexes on CFS::reload (CASSANDRA-10604)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4beb54da/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index 41aef83..9564005 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@ -133,7 +133,7 @@ public class CQL3CasRequest implements CASRequest
         return conditionColumns;
     }
 
-    public SinglePartitionReadCommand<?> readCommand(int nowInSec)
+    public SinglePartitionReadCommand readCommand(int nowInSec)
     {
         assert !conditions.isEmpty();
         Slices.Builder builder = new Slices.Builder(cfm.comparator, conditions.size());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4beb54da/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index eb0f9ff..71597f4 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -345,7 +345,7 @@ public abstract class ModificationStatement implements CQLStatement
             throw new InvalidRequestException(String.format("Write operation require a read but consistency %s is not supported on reads", cl));
         }
 
-        List<SinglePartitionReadCommand<?>> commands = new ArrayList<>(partitionKeys.size());
+        List<SinglePartitionReadCommand> commands = new ArrayList<>(partitionKeys.size());
         int nowInSec = FBUtilities.nowInSeconds();
         for (ByteBuffer key : partitionKeys)
             commands.add(SinglePartitionReadCommand.create(cfm,
@@ -573,7 +573,7 @@ public abstract class ModificationStatement implements CQLStatement
     {
         UUID ballot = UUIDGen.getTimeUUIDFromMicros(state.getTimestamp());
 
-        SinglePartitionReadCommand<?> readCommand = request.readCommand(FBUtilities.nowInSeconds());
+        SinglePartitionReadCommand readCommand = request.readCommand(FBUtilities.nowInSeconds());
         FilteredPartition current;
         try (ReadOrderGroup orderGroup = readCommand.startOrderGroup(); PartitionIterator iter = readCommand.executeInternal(orderGroup))
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4beb54da/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 8bf4d2e..ab1da45 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -440,7 +440,7 @@ public class SelectStatement implements CQLStatement
 
         // Note that we use the total limit for every key, which is potentially inefficient.
         // However, IN + LIMIT is not a very sensible choice.
-        List<SinglePartitionReadCommand<?>> commands = new ArrayList<>(keys.size());
+        List<SinglePartitionReadCommand> commands = new ArrayList<>(keys.size());
         for (ByteBuffer key : keys)
         {
             QueryProcessor.validateKey(key);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4beb54da/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
index 9bb89a6..dab22c7 100644
--- a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
+++ b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
@@ -255,7 +255,7 @@ public abstract class AbstractReadCommandBuilder
         @Override
         public ReadCommand build()
         {
-            return SinglePartitionSliceCommand.create(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), partitionKey, makeFilter());
+            return SinglePartitionReadCommand.create(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), partitionKey, makeFilter());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4beb54da/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 2eb1d1d..ace5e1e 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -1144,9 +1144,9 @@ public abstract class ReadCommand implements ReadQuery
             out.writeLong(singleReadCommand.nowInSec() * 1000L);  // convert from seconds to millis
 
             if (singleReadCommand.clusteringIndexFilter().kind() == ClusteringIndexFilter.Kind.SLICE)
-                serializeSliceCommand((SinglePartitionSliceCommand) singleReadCommand, out);
+                serializeSliceCommand(singleReadCommand, out);
             else
-                serializeNamesCommand((SinglePartitionNamesCommand) singleReadCommand, out);
+                serializeNamesCommand(singleReadCommand, out);
         }
 
         public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
@@ -1192,14 +1192,14 @@ public abstract class ReadCommand implements ReadQuery
             size += TypeSizes.sizeof((long) command.nowInSec());
 
             if (singleReadCommand.clusteringIndexFilter().kind() == ClusteringIndexFilter.Kind.SLICE)
-                return size + serializedSliceCommandSize((SinglePartitionSliceCommand) singleReadCommand);
+                return size + serializedSliceCommandSize(singleReadCommand);
             else
-                return size + serializedNamesCommandSize((SinglePartitionNamesCommand) singleReadCommand);
+                return size + serializedNamesCommandSize(singleReadCommand);
         }
 
-        private void serializeNamesCommand(SinglePartitionNamesCommand command, DataOutputPlus out) throws IOException
+        private void serializeNamesCommand(SinglePartitionReadCommand command, DataOutputPlus out) throws IOException
         {
-            serializeNamesFilter(command, command.clusteringIndexFilter(), out);
+            serializeNamesFilter(command, (ClusteringIndexNamesFilter)command.clusteringIndexFilter(), out);
         }
 
         private static void serializeNamesFilter(ReadCommand command, ClusteringIndexNamesFilter filter, DataOutputPlus out) throws IOException
@@ -1257,12 +1257,12 @@ public abstract class ReadCommand implements ReadQuery
             return size + TypeSizes.sizeof(true);  // countCql3Rows
         }
 
-        private SinglePartitionNamesCommand deserializeNamesCommand(DataInputPlus in, boolean isDigest, CFMetaData metadata, DecoratedKey key, int nowInSeconds, int version) throws IOException
+        private SinglePartitionReadCommand deserializeNamesCommand(DataInputPlus in, boolean isDigest, CFMetaData metadata, DecoratedKey key, int nowInSeconds, int version) throws IOException
         {
             Pair<ColumnFilter, ClusteringIndexNamesFilter> selectionAndFilter = deserializeNamesSelectionAndFilter(in, metadata);
 
             // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift
-            return new SinglePartitionNamesCommand(
+            return new SinglePartitionReadCommand(
                     isDigest, version, true, metadata, nowInSeconds, selectionAndFilter.left, RowFilter.NONE, DataLimits.NONE,
                     key, selectionAndFilter.right);
         }
@@ -1312,17 +1312,17 @@ public abstract class ReadCommand implements ReadQuery
             return Pair.create(selectionBuilder.build(), filter);
         }
 
-        private long serializedNamesCommandSize(SinglePartitionNamesCommand command)
+        private long serializedNamesCommandSize(SinglePartitionReadCommand command)
         {
-            ClusteringIndexNamesFilter filter = command.clusteringIndexFilter();
+            ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter)command.clusteringIndexFilter();
             PartitionColumns columns = command.columnFilter().fetchedColumns();
             return serializedNamesFilterSize(filter, command.metadata(), columns);
         }
 
-        private void serializeSliceCommand(SinglePartitionSliceCommand command, DataOutputPlus out) throws IOException
+        private void serializeSliceCommand(SinglePartitionReadCommand command, DataOutputPlus out) throws IOException
         {
             CFMetaData metadata = command.metadata();
-            ClusteringIndexSliceFilter filter = command.clusteringIndexFilter();
+            ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter)command.clusteringIndexFilter();
 
             Slices slices = filter.requestedSlices();
             boolean makeStaticSlice = !command.columnFilter().fetchedColumns().statics.isEmpty() && !slices.selects(Clustering.STATIC_CLUSTERING);
@@ -1349,7 +1349,7 @@ public abstract class ReadCommand implements ReadQuery
             out.writeInt(compositesToGroup);
         }
 
-        private SinglePartitionSliceCommand deserializeSliceCommand(DataInputPlus in, boolean isDigest, CFMetaData metadata, DecoratedKey key, int nowInSeconds, int version) throws IOException
+        private SinglePartitionReadCommand deserializeSliceCommand(DataInputPlus in, boolean isDigest, CFMetaData metadata, DecoratedKey key, int nowInSeconds, int version) throws IOException
         {
             Pair<ClusteringIndexSliceFilter, Boolean> p = deserializeSlicePartitionFilter(in, metadata);
             ClusteringIndexSliceFilter filter = p.left;
@@ -1370,13 +1370,13 @@ public abstract class ReadCommand implements ReadQuery
                 limits = DataLimits.cqlLimits(count);
 
             // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift
-            return new SinglePartitionSliceCommand(isDigest, version, true, metadata, nowInSeconds, columnFilter, RowFilter.NONE, limits, key, filter);
+            return new SinglePartitionReadCommand(isDigest, version, true, metadata, nowInSeconds, columnFilter, RowFilter.NONE, limits, key, filter);
         }
 
-        private long serializedSliceCommandSize(SinglePartitionSliceCommand command)
+        private long serializedSliceCommandSize(SinglePartitionReadCommand command)
         {
             CFMetaData metadata = command.metadata();
-            ClusteringIndexSliceFilter filter = command.clusteringIndexFilter();
+            ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter)command.clusteringIndexFilter();
 
             Slices slices = filter.requestedSlices();
             boolean makeStaticSlice = !command.columnFilter().fetchedColumns().statics.isEmpty() && !slices.selects(Clustering.STATIC_CLUSTERING);
@@ -1550,9 +1550,9 @@ public abstract class ReadCommand implements ReadQuery
             if (!shouldConvertNamesToSlice(metadata, command.columnFilter().fetchedColumns()))
                 return command;
 
-            ClusteringIndexNamesFilter filter = ((SinglePartitionNamesCommand) command).clusteringIndexFilter();
+            ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter)command.clusteringIndexFilter();
             ClusteringIndexSliceFilter sliceFilter = convertNamesFilterToSliceFilter(filter, metadata);
-            return new SinglePartitionSliceCommand(
+            return new SinglePartitionReadCommand(
                     command.isDigestQuery(), command.digestVersion(), command.isForThrift(), metadata, command.nowInSec(),
                     command.columnFilter(), command.rowFilter(), command.limits(), command.partitionKey(), sliceFilter);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4beb54da/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
deleted file mode 100644
index 763919e..0000000
--- a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import com.google.common.collect.Sets;
-
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.db.lifecycle.View;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.metrics.TableMetrics;
-import org.apache.cassandra.thrift.ThriftResultsMerger;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.SearchIterator;
-import org.apache.cassandra.utils.btree.BTreeSet;
-import org.apache.cassandra.utils.memory.HeapAllocator;
-
-/**
- * General interface for storage-engine read queries.
- */
-public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<ClusteringIndexNamesFilter>
-{
-    private int oldestUnrepairedDeletionTime = Integer.MAX_VALUE;
-
-    protected SinglePartitionNamesCommand(boolean isDigest,
-                                          int digestVersion,
-                                          boolean isForThrift,
-                                          CFMetaData metadata,
-                                          int nowInSec,
-                                          ColumnFilter columnFilter,
-                                          RowFilter rowFilter,
-                                          DataLimits limits,
-                                          DecoratedKey partitionKey,
-                                          ClusteringIndexNamesFilter clusteringIndexFilter)
-    {
-        super(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
-    }
-
-    public SinglePartitionNamesCommand(CFMetaData metadata,
-                                       int nowInSec,
-                                       ColumnFilter columnFilter,
-                                       RowFilter rowFilter,
-                                       DataLimits limits,
-                                       DecoratedKey partitionKey,
-                                       ClusteringIndexNamesFilter clusteringIndexFilter)
-    {
-        this(false, 0, false, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
-    }
-
-    public SinglePartitionNamesCommand(CFMetaData metadata,
-                                       int nowInSec,
-                                       ColumnFilter columnFilter,
-                                       RowFilter rowFilter,
-                                       DataLimits limits,
-                                       ByteBuffer key,
-                                       ClusteringIndexNamesFilter clusteringIndexFilter)
-    {
-        this(metadata, nowInSec, columnFilter, rowFilter, limits, metadata.decorateKey(key), clusteringIndexFilter);
-    }
-
-    public SinglePartitionNamesCommand copy()
-    {
-        return new SinglePartitionNamesCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter());
-    }
-
-    @Override
-    protected int oldestUnrepairedTombstone()
-    {
-        return oldestUnrepairedDeletionTime;
-    }
-
-    protected UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, boolean copyOnHeap)
-    {
-        Tracing.trace("Acquiring sstable references");
-        ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
-
-        ImmutableBTreePartition result = null;
-        ClusteringIndexNamesFilter filter = clusteringIndexFilter();
-
-        Tracing.trace("Merging memtable contents");
-        for (Memtable memtable : view.memtables)
-        {
-            Partition partition = memtable.getPartition(partitionKey());
-            if (partition == null)
-                continue;
-
-            try (UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition))
-            {
-                if (iter.isEmpty())
-                    continue;
-
-                UnfilteredRowIterator clonedFilter = copyOnHeap
-                                                   ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance)
-                                                   : iter;
-                result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(clonedFilter, nowInSec()) : clonedFilter, result, false);
-            }
-        }
-
-        /* add the SSTables on disk */
-        Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
-        int sstablesIterated = 0;
-
-        // read sorted sstables
-        for (SSTableReader sstable : view.sstables)
-        {
-            // if we've already seen a partition tombstone with a timestamp greater
-            // than the most recent update to this sstable, we're done, since the rest of the sstables
-            // will also be older
-            if (result != null && sstable.getMaxTimestamp() < result.partitionLevelDeletion().markedForDeleteAt())
-                break;
-
-            long currentMaxTs = sstable.getMaxTimestamp();
-            filter = reduceFilter(filter, result, currentMaxTs);
-            if (filter == null)
-                break;
-
-            Tracing.trace("Merging data from sstable {}", sstable.descriptor.generation);
-            sstable.incrementReadCount();
-            try (UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift())))
-            {
-                if (iter.isEmpty())
-                    continue;
-
-                sstablesIterated++;
-                result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter, result, sstable.isRepaired());
-            }
-        }
-
-        cfs.metric.updateSSTableIterated(sstablesIterated);
-
-        if (result == null || result.isEmpty())
-            return EmptyIterators.unfilteredRow(metadata(), partitionKey(), false);
-
-        DecoratedKey key = result.partitionKey();
-        cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1);
-
-        // "hoist up" the requested data into a more recent sstable
-        if (sstablesIterated > cfs.getMinimumCompactionThreshold()
-            && !cfs.isAutoCompactionDisabled()
-            && cfs.getCompactionStrategyManager().shouldDefragment())
-        {
-            // !!WARNING!!   if we stop copying our data to a heap-managed object,
-            //               we will need to track the lifetime of this mutation as well
-            Tracing.trace("Defragmenting requested data");
-
-            try (UnfilteredRowIterator iter = result.unfilteredIterator(columnFilter(), Slices.ALL, false))
-            {
-                final Mutation mutation = new Mutation(PartitionUpdate.fromIterator(iter));
-                StageManager.getStage(Stage.MUTATION).execute(new Runnable()
-                {
-                    public void run()
-                    {
-                        // skipping commitlog and index updates is fine since we're just de-fragmenting existing data
-                        Keyspace.open(mutation.getKeyspaceName()).apply(mutation, false, false);
-                    }
-                });
-            }
-        }
-
-        return result.unfilteredIterator(columnFilter(), Slices.ALL, clusteringIndexFilter().isReversed());
-    }
-
-    private ImmutableBTreePartition add(UnfilteredRowIterator iter, ImmutableBTreePartition result, boolean isRepaired)
-    {
-        if (!isRepaired)
-            oldestUnrepairedDeletionTime = Math.min(oldestUnrepairedDeletionTime, iter.stats().minLocalDeletionTime);
-
-        int maxRows = Math.max(clusteringIndexFilter().requestedRows().size(), 1);
-        if (result == null)
-            return ImmutableBTreePartition.create(iter, maxRows);
-
-        try (UnfilteredRowIterator merged = UnfilteredRowIterators.merge(Arrays.asList(iter, result.unfilteredIterator(columnFilter(), Slices.ALL, clusteringIndexFilter().isReversed())), nowInSec()))
-        {
-            return ImmutableBTreePartition.create(merged, maxRows);
-        }
-    }
-
-    private ClusteringIndexNamesFilter reduceFilter(ClusteringIndexNamesFilter filter, Partition result, long sstableTimestamp)
-    {
-        if (result == null)
-            return filter;
-
-        SearchIterator<Clustering, Row> searchIter = result.searchIterator(columnFilter(), false);
-
-        PartitionColumns columns = columnFilter().fetchedColumns();
-        NavigableSet<Clustering> clusterings = filter.requestedRows();
-
-        // We want to remove rows for which we have values for all requested columns. We have to deal with both static and regular rows.
-        // TODO: we could also remove a selected column if we've found values for every requested row but we'll leave
-        // that for later.
-
-        boolean removeStatic = false;
-        if (!columns.statics.isEmpty())
-        {
-            Row staticRow = searchIter.next(Clustering.STATIC_CLUSTERING);
-            removeStatic = staticRow != null && canRemoveRow(staticRow, columns.statics, sstableTimestamp);
-        }
-
-        NavigableSet<Clustering> toRemove = null;
-        for (Clustering clustering : clusterings)
-        {
-            if (!searchIter.hasNext())
-                break;
-
-            Row row = searchIter.next(clustering);
-            if (row == null || !canRemoveRow(row, columns.regulars, sstableTimestamp))
-                continue;
-
-            if (toRemove == null)
-                toRemove = new TreeSet<>(result.metadata().comparator);
-            toRemove.add(clustering);
-        }
-
-        if (!removeStatic && toRemove == null)
-            return filter;
-
-        // Check if we have everything we need
-        boolean hasNoMoreStatic = columns.statics.isEmpty() || removeStatic;
-        boolean hasNoMoreClusterings = clusterings.isEmpty() || (toRemove != null && toRemove.size() == clusterings.size());
-        if (hasNoMoreStatic && hasNoMoreClusterings)
-            return null;
-
-        if (toRemove != null)
-        {
-            BTreeSet.Builder<Clustering> newClusterings = BTreeSet.builder(result.metadata().comparator);
-            newClusterings.addAll(Sets.difference(clusterings, toRemove));
-            clusterings = newClusterings.build();
-        }
-        return new ClusteringIndexNamesFilter(clusterings, filter.isReversed());
-    }
-
-    private boolean canRemoveRow(Row row, Columns requestedColumns, long sstableTimestamp)
-    {
-        // We can remove a row if it has data that is more recent that the next sstable to consider for the data that the query
-        // cares about. And the data we care about is 1) the row timestamp (since every query cares if the row exists or not)
-        // and 2) the requested columns.
-        if (row.primaryKeyLivenessInfo().isEmpty() || row.primaryKeyLivenessInfo().timestamp() <= sstableTimestamp)
-            return false;
-
-        for (ColumnDefinition column : requestedColumns)
-        {
-            // We can never be sure we have all of a collection, so never remove rows in that case.
-            if (column.type.isCollection())
-                return false;
-
-            Cell cell = row.getCell(column);
-            if (cell == null || cell.timestamp() <= sstableTimestamp)
-                return false;
-        }
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4beb54da/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index f349885..4d7d93c 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -22,17 +22,22 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
 import org.apache.cassandra.cache.IRowCacheEntry;
 import org.apache.cassandra.cache.RowCacheKey;
 import org.apache.cassandra.cache.RowCacheSentinel;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.lifecycle.*;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.partitions.*;
-import org.apache.cassandra.db.rows.UnfilteredRowIterator;
-import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.metrics.TableMetrics;
@@ -43,29 +48,37 @@ import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.pager.*;
+import org.apache.cassandra.thrift.ThriftResultsMerger;
 import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.SearchIterator;
+import org.apache.cassandra.utils.btree.BTreeSet;
 import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.HeapAllocator;
+
 
 /**
  * A read command that selects a (part of a) single partition.
  */
-public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter> extends ReadCommand
+public class SinglePartitionReadCommand extends ReadCommand
 {
     protected static final SelectionDeserializer selectionDeserializer = new Deserializer();
 
     private final DecoratedKey partitionKey;
-    private final F clusteringIndexFilter;
-
-    protected SinglePartitionReadCommand(boolean isDigest,
-                                         int digestVersion,
-                                         boolean isForThrift,
-                                         CFMetaData metadata,
-                                         int nowInSec,
-                                         ColumnFilter columnFilter,
-                                         RowFilter rowFilter,
-                                         DataLimits limits,
-                                         DecoratedKey partitionKey,
-                                         F clusteringIndexFilter)
+    private final ClusteringIndexFilter clusteringIndexFilter;
+
+    private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
+
+    public SinglePartitionReadCommand(boolean isDigest,
+                                      int digestVersion,
+                                      boolean isForThrift,
+                                      CFMetaData metadata,
+                                      int nowInSec,
+                                      ColumnFilter columnFilter,
+                                      RowFilter rowFilter,
+                                      DataLimits limits,
+                                      DecoratedKey partitionKey,
+                                      ClusteringIndexFilter clusteringIndexFilter)
     {
         super(Kind.SINGLE_PARTITION, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
         assert partitionKey.getPartitioner() == metadata.partitioner;
@@ -86,13 +99,13 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
      *
      * @return a newly created read command.
      */
-    public static SinglePartitionReadCommand<?> create(CFMetaData metadata,
-                                                       int nowInSec,
-                                                       ColumnFilter columnFilter,
-                                                       RowFilter rowFilter,
-                                                       DataLimits limits,
-                                                       DecoratedKey partitionKey,
-                                                       ClusteringIndexFilter clusteringIndexFilter)
+    public static SinglePartitionReadCommand create(CFMetaData metadata,
+                                                    int nowInSec,
+                                                    ColumnFilter columnFilter,
+                                                    RowFilter rowFilter,
+                                                    DataLimits limits,
+                                                    DecoratedKey partitionKey,
+                                                    ClusteringIndexFilter clusteringIndexFilter)
     {
         return create(false, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
     }
@@ -111,20 +124,16 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
      *
      * @return a newly created read command.
      */
-    public static SinglePartitionReadCommand<?> create(boolean isForThrift,
-                                                       CFMetaData metadata,
-                                                       int nowInSec,
-                                                       ColumnFilter columnFilter,
-                                                       RowFilter rowFilter,
-                                                       DataLimits limits,
-                                                       DecoratedKey partitionKey,
-                                                       ClusteringIndexFilter clusteringIndexFilter)
+    public static SinglePartitionReadCommand create(boolean isForThrift,
+                                                    CFMetaData metadata,
+                                                    int nowInSec,
+                                                    ColumnFilter columnFilter,
+                                                    RowFilter rowFilter,
+                                                    DataLimits limits,
+                                                    DecoratedKey partitionKey,
+                                                    ClusteringIndexFilter clusteringIndexFilter)
     {
-        if (clusteringIndexFilter instanceof ClusteringIndexSliceFilter)
-            return new SinglePartitionSliceCommand(false, 0, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, (ClusteringIndexSliceFilter) clusteringIndexFilter);
-
-        assert clusteringIndexFilter instanceof ClusteringIndexNamesFilter;
-        return new SinglePartitionNamesCommand(false, 0, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, (ClusteringIndexNamesFilter) clusteringIndexFilter);
+        return new SinglePartitionReadCommand(false, 0, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
     }
 
     /**
@@ -138,7 +147,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
      *
      * @return a newly created read command. The returned command will use no row filter and have no limits.
      */
-    public static SinglePartitionReadCommand<?> create(CFMetaData metadata, int nowInSec, DecoratedKey key, ColumnFilter columnFilter, ClusteringIndexFilter filter)
+    public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, ColumnFilter columnFilter, ClusteringIndexFilter filter)
     {
         return create(metadata, nowInSec, columnFilter, RowFilter.NONE, DataLimits.NONE, key, filter);
     }
@@ -154,7 +163,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
      */
     public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, DecoratedKey key)
     {
-        return SinglePartitionSliceCommand.create(metadata, nowInSec, key, Slices.ALL);
+        return SinglePartitionReadCommand.create(metadata, nowInSec, key, Slices.ALL);
     }
 
     /**
@@ -168,7 +177,61 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
      */
     public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, ByteBuffer key)
     {
-        return SinglePartitionSliceCommand.create(metadata, nowInSec, metadata.decorateKey(key), Slices.ALL);
+        return SinglePartitionReadCommand.create(metadata, nowInSec, metadata.decorateKey(key), Slices.ALL);
+    }
+
+    /**
+     * Creates a new single partition slice command for the provided single slice.
+     *
+     * @param metadata the table to query.
+     * @param nowInSec the time in seconds to use are "now" for this query.
+     * @param key the partition key for the partition to query.
+     * @param slice the slice of rows to query.
+     *
+     * @return a newly created read command that queries {@code slice} in {@code key}. The returned query will
+     * query every columns for the table (without limit or row filtering) and be in forward order.
+     */
+    public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Slice slice)
+    {
+        return create(metadata, nowInSec, key, Slices.with(metadata.comparator, slice));
+    }
+
+    /**
+     * Creates a new single partition slice command for the provided slices.
+     *
+     * @param metadata the table to query.
+     * @param nowInSec the time in seconds to use are "now" for this query.
+     * @param key the partition key for the partition to query.
+     * @param slices the slices of rows to query.
+     *
+     * @return a newly created read command that queries the {@code slices} in {@code key}. The returned query will
+     * query every columns for the table (without limit or row filtering) and be in forward order.
+     */
+    public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Slices slices)
+    {
+        ClusteringIndexSliceFilter filter = new ClusteringIndexSliceFilter(slices, false);
+        return SinglePartitionReadCommand.create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter);
+    }
+
+    /**
+     * Creates a new single partition slice command for the provided slices.
+     *
+     * @param metadata the table to query.
+     * @param nowInSec the time in seconds to use are "now" for this query.
+     * @param key the partition key for the partition to query.
+     * @param slices the slices of rows to query.
+     *
+     * @return a newly created read command that queries the {@code slices} in {@code key}. The returned query will
+     * query every columns for the table (without limit or row filtering) and be in forward order.
+     */
+    public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, ByteBuffer key, Slices slices)
+    {
+        return create(metadata, nowInSec, metadata.decorateKey(key), slices);
+    }
+
+    public SinglePartitionReadCommand copy()
+    {
+        return new SinglePartitionReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter());
     }
 
     public DecoratedKey partitionKey()
@@ -176,7 +239,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
         return partitionKey;
     }
 
-    public F clusteringIndexFilter()
+    public ClusteringIndexFilter clusteringIndexFilter()
     {
         return clusteringIndexFilter;
     }
@@ -394,7 +457,355 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
         return queryMemtableAndDiskInternal(cfs, copyOnHeap);
     }
 
-    protected abstract UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, boolean copyOnHeap);
+    @Override
+    protected int oldestUnrepairedTombstone()
+    {
+        return oldestUnrepairedTombstone;
+    }
+
+    private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, boolean copyOnHeap)
+    {
+        /*
+         * We have 2 main strategies:
+         *   1) We query memtables and sstables simulateneously. This is our most generic strategy and the one we use
+         *      unless we have a names filter that we know we can optimize futher.
+         *   2) If we have a name filter (so we query specific rows), we can make a bet: that all column for all queried row
+         *      will have data in the most recent sstable(s), thus saving us from reading older ones. This does imply we
+         *      have a way to guarantee we have all the data for what is queried, which is only possible for name queries
+         *      and if we have neither collections nor counters (indeed, for a collection, we can't guarantee an older sstable
+         *      won't have some elements that weren't in the most recent sstables, and counters are intrinsically a collection
+         *      of shards so have the same problem).
+         */
+        if (clusteringIndexFilter() instanceof ClusteringIndexNamesFilter && queryNeitherCountersNorCollections())
+            return queryMemtableAndSSTablesInTimestampOrder(cfs, copyOnHeap, (ClusteringIndexNamesFilter)clusteringIndexFilter());
+
+        Tracing.trace("Acquiring sstable references");
+        ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
+
+        List<UnfilteredRowIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
+        ClusteringIndexFilter filter = clusteringIndexFilter();
+
+        try
+        {
+            for (Memtable memtable : view.memtables)
+            {
+                Partition partition = memtable.getPartition(partitionKey());
+                if (partition == null)
+                    continue;
+
+                @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
+                UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition);
+                @SuppressWarnings("resource") // same as above
+                UnfilteredRowIterator maybeCopied = copyOnHeap ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance) : iter;
+                oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, partition.stats().minLocalDeletionTime);
+                iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(maybeCopied, nowInSec()) : maybeCopied);
+            }
+            /*
+             * We can't eliminate full sstables based on the timestamp of what we've already read like
+             * in collectTimeOrderedData, but we still want to eliminate sstable whose maxTimestamp < mostRecentTombstone
+             * we've read. We still rely on the sstable ordering by maxTimestamp since if
+             *   maxTimestamp_s1 > maxTimestamp_s0,
+             * we're guaranteed that s1 cannot have a row tombstone such that
+             *   timestamp(tombstone) > maxTimestamp_s0
+             * since we necessarily have
+             *   timestamp(tombstone) <= maxTimestamp_s1
+             * In other words, iterating in maxTimestamp order allow to do our mostRecentPartitionTombstone elimination
+             * in one pass, and minimize the number of sstables for which we read a partition tombstone.
+             */
+            int sstablesIterated = 0;
+            Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
+            List<SSTableReader> skippedSSTables = null;
+            long mostRecentPartitionTombstone = Long.MIN_VALUE;
+            long minTimestamp = Long.MAX_VALUE;
+            int nonIntersectingSSTables = 0;
+
+            for (SSTableReader sstable : view.sstables)
+            {
+                minTimestamp = Math.min(minTimestamp, sstable.getMinTimestamp());
+                // if we've already seen a partition tombstone with a timestamp greater
+                // than the most recent update to this sstable, we can skip it
+                if (sstable.getMaxTimestamp() < mostRecentPartitionTombstone)
+                    break;
+
+                if (!filter.shouldInclude(sstable))
+                {
+                    nonIntersectingSSTables++;
+                    // sstable contains no tombstone if maxLocalDeletionTime == Integer.MAX_VALUE, so we can safely skip those entirely
+                    if (sstable.getSSTableMetadata().maxLocalDeletionTime != Integer.MAX_VALUE)
+                    {
+                        if (skippedSSTables == null)
+                            skippedSSTables = new ArrayList<>();
+                        skippedSSTables.add(sstable);
+                    }
+                    continue;
+                }
+
+                sstable.incrementReadCount();
+                @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
+                UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift()));
+                if (!sstable.isRepaired())
+                    oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
+
+                iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter);
+                mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone, iter.partitionLevelDeletion().markedForDeleteAt());
+                sstablesIterated++;
+            }
+
+            int includedDueToTombstones = 0;
+            // Check for partition tombstones in the skipped sstables
+            if (skippedSSTables != null)
+            {
+                for (SSTableReader sstable : skippedSSTables)
+                {
+                    if (sstable.getMaxTimestamp() <= minTimestamp)
+                        continue;
+
+                    sstable.incrementReadCount();
+                    @SuppressWarnings("resource") // 'iter' is either closed right away, or added to iterators which is close on exception, or through the closing of the final merged iterator
+                    UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift()));
+                    if (iter.partitionLevelDeletion().markedForDeleteAt() > minTimestamp)
+                    {
+                        iterators.add(iter);
+                        if (!sstable.isRepaired())
+                            oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
+                        includedDueToTombstones++;
+                        sstablesIterated++;
+                    }
+                    else
+                    {
+                        iter.close();
+                    }
+                }
+            }
+            if (Tracing.isTracing())
+                Tracing.trace("Skipped {}/{} non-slice-intersecting sstables, included {} due to tombstones",
+                              nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones);
+
+            cfs.metric.updateSSTableIterated(sstablesIterated);
+
+            if (iterators.isEmpty())
+                return EmptyIterators.unfilteredRow(cfs.metadata, partitionKey(), filter.isReversed());
+
+            Tracing.trace("Merging data from memtables and {} sstables", sstablesIterated);
+
+            @SuppressWarnings("resource") //  Closed through the closing of the result of that method.
+            UnfilteredRowIterator merged = UnfilteredRowIterators.merge(iterators, nowInSec());
+            if (!merged.isEmpty())
+            {
+                DecoratedKey key = merged.partitionKey();
+                cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1);
+            }
+
+            return merged;
+        }
+        catch (RuntimeException | Error e)
+        {
+            try
+            {
+                FBUtilities.closeAll(iterators);
+            }
+            catch (Exception suppressed)
+            {
+                e.addSuppressed(suppressed);
+            }
+            throw e;
+        }
+    }
+
+    private boolean queryNeitherCountersNorCollections()
+    {
+        for (ColumnDefinition column : columnFilter().fetchedColumns())
+        {
+            if (column.type.isCollection() || column.type.isCounter())
+                return false;
+        }
+        return true;
+    }
+
+    /**
+     * Do a read by querying the memtable(s) first, and then each relevant sstables sequentially by order of the sstable
+     * max timestamp.
+     *
+     * This is used for names query in the hope of only having to query the 1 or 2 most recent query and then knowing nothing
+     * more recent could be in the older sstables (which we can only guarantee if we know exactly which row we queries, and if
+     * no collection or counters are included).
+     * This method assumes the filter is a {@code ClusteringIndexNamesFilter}.
+     */
+    private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFamilyStore cfs, boolean copyOnHeap, ClusteringIndexNamesFilter filter)
+    {
+        Tracing.trace("Acquiring sstable references");
+        ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
+
+        ImmutableBTreePartition result = null;
+
+        Tracing.trace("Merging memtable contents");
+        for (Memtable memtable : view.memtables)
+        {
+            Partition partition = memtable.getPartition(partitionKey());
+            if (partition == null)
+                continue;
+
+            try (UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition))
+            {
+                if (iter.isEmpty())
+                    continue;
+
+                UnfilteredRowIterator clonedFilter = copyOnHeap
+                                                   ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance)
+                                                   : iter;
+                result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(clonedFilter, nowInSec()) : clonedFilter, result, filter, false);
+            }
+        }
+
+        /* add the SSTables on disk */
+        Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
+        int sstablesIterated = 0;
+
+        // read sorted sstables
+        for (SSTableReader sstable : view.sstables)
+        {
+            // if we've already seen a partition tombstone with a timestamp greater
+            // than the most recent update to this sstable, we're done, since the rest of the sstables
+            // will also be older
+            if (result != null && sstable.getMaxTimestamp() < result.partitionLevelDeletion().markedForDeleteAt())
+                break;
+
+            long currentMaxTs = sstable.getMaxTimestamp();
+            filter = reduceFilter(filter, result, currentMaxTs);
+            if (filter == null)
+                break;
+
+            Tracing.trace("Merging data from sstable {}", sstable.descriptor.generation);
+            sstable.incrementReadCount();
+            try (UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift())))
+            {
+                if (iter.isEmpty())
+                    continue;
+
+                sstablesIterated++;
+                result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter, result, filter, sstable.isRepaired());
+            }
+        }
+
+        cfs.metric.updateSSTableIterated(sstablesIterated);
+
+        if (result == null || result.isEmpty())
+            return EmptyIterators.unfilteredRow(metadata(), partitionKey(), false);
+
+        DecoratedKey key = result.partitionKey();
+        cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1);
+
+        // "hoist up" the requested data into a more recent sstable
+        if (sstablesIterated > cfs.getMinimumCompactionThreshold()
+            && !cfs.isAutoCompactionDisabled()
+            && cfs.getCompactionStrategyManager().shouldDefragment())
+        {
+            // !!WARNING!!   if we stop copying our data to a heap-managed object,
+            //               we will need to track the lifetime of this mutation as well
+            Tracing.trace("Defragmenting requested data");
+
+            try (UnfilteredRowIterator iter = result.unfilteredIterator(columnFilter(), Slices.ALL, false))
+            {
+                final Mutation mutation = new Mutation(PartitionUpdate.fromIterator(iter));
+                StageManager.getStage(Stage.MUTATION).execute(new Runnable()
+                {
+                    public void run()
+                    {
+                        // skipping commitlog and index updates is fine since we're just de-fragmenting existing data
+                        Keyspace.open(mutation.getKeyspaceName()).apply(mutation, false, false);
+                    }
+                });
+            }
+        }
+
+        return result.unfilteredIterator(columnFilter(), Slices.ALL, clusteringIndexFilter().isReversed());
+    }
+
+    private ImmutableBTreePartition add(UnfilteredRowIterator iter, ImmutableBTreePartition result, ClusteringIndexNamesFilter filter, boolean isRepaired)
+    {
+        if (!isRepaired)
+            oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, iter.stats().minLocalDeletionTime);
+
+        int maxRows = Math.max(filter.requestedRows().size(), 1);
+        if (result == null)
+            return ImmutableBTreePartition.create(iter, maxRows);
+
+        try (UnfilteredRowIterator merged = UnfilteredRowIterators.merge(Arrays.asList(iter, result.unfilteredIterator(columnFilter(), Slices.ALL, filter.isReversed())), nowInSec()))
+        {
+            return ImmutableBTreePartition.create(merged, maxRows);
+        }
+    }
+
+    private ClusteringIndexNamesFilter reduceFilter(ClusteringIndexNamesFilter filter, Partition result, long sstableTimestamp)
+    {
+        if (result == null)
+            return filter;
+
+        SearchIterator<Clustering, Row> searchIter = result.searchIterator(columnFilter(), false);
+
+        PartitionColumns columns = columnFilter().fetchedColumns();
+        NavigableSet<Clustering> clusterings = filter.requestedRows();
+
+        // We want to remove rows for which we have values for all requested columns. We have to deal with both static and regular rows.
+        // TODO: we could also remove a selected column if we've found values for every requested row but we'll leave
+        // that for later.
+
+        boolean removeStatic = false;
+        if (!columns.statics.isEmpty())
+        {
+            Row staticRow = searchIter.next(Clustering.STATIC_CLUSTERING);
+            removeStatic = staticRow != null && canRemoveRow(staticRow, columns.statics, sstableTimestamp);
+        }
+
+        NavigableSet<Clustering> toRemove = null;
+        for (Clustering clustering : clusterings)
+        {
+            if (!searchIter.hasNext())
+                break;
+
+            Row row = searchIter.next(clustering);
+            if (row == null || !canRemoveRow(row, columns.regulars, sstableTimestamp))
+                continue;
+
+            if (toRemove == null)
+                toRemove = new TreeSet<>(result.metadata().comparator);
+            toRemove.add(clustering);
+        }
+
+        if (!removeStatic && toRemove == null)
+            return filter;
+
+        // Check if we have everything we need
+        boolean hasNoMoreStatic = columns.statics.isEmpty() || removeStatic;
+        boolean hasNoMoreClusterings = clusterings.isEmpty() || (toRemove != null && toRemove.size() == clusterings.size());
+        if (hasNoMoreStatic && hasNoMoreClusterings)
+            return null;
+
+        if (toRemove != null)
+        {
+            BTreeSet.Builder<Clustering> newClusterings = BTreeSet.builder(result.metadata().comparator);
+            newClusterings.addAll(Sets.difference(clusterings, toRemove));
+            clusterings = newClusterings.build();
+        }
+        return new ClusteringIndexNamesFilter(clusterings, filter.isReversed());
+    }
+
+    private boolean canRemoveRow(Row row, Columns requestedColumns, long sstableTimestamp)
+    {
+        // We can remove a row if it has data that is more recent that the next sstable to consider for the data that the query
+        // cares about. And the data we care about is 1) the row timestamp (since every query cares if the row exists or not)
+        // and 2) the requested columns.
+        if (row.primaryKeyLivenessInfo().isEmpty() || row.primaryKeyLivenessInfo().timestamp() <= sstableTimestamp)
+            return false;
+
+        for (ColumnDefinition column : requestedColumns)
+        {
+            Cell cell = row.getCell(column);
+            if (cell == null || cell.timestamp() <= sstableTimestamp)
+                return false;
+        }
+        return true;
+    }
 
     @Override
     public String toString()
@@ -448,11 +859,11 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
      */
     public static class Group implements ReadQuery
     {
-        public final List<SinglePartitionReadCommand<?>> commands;
+        public final List<SinglePartitionReadCommand> commands;
         private final DataLimits limits;
         private final int nowInSec;
 
-        public Group(List<SinglePartitionReadCommand<?>> commands, DataLimits limits)
+        public Group(List<SinglePartitionReadCommand> commands, DataLimits limits)
         {
             assert !commands.isEmpty();
             this.commands = commands;
@@ -462,9 +873,9 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
                 assert commands.get(i).nowInSec() == nowInSec;
         }
 
-        public static Group one(SinglePartitionReadCommand<?> command)
+        public static Group one(SinglePartitionReadCommand command)
         {
-            return new Group(Collections.<SinglePartitionReadCommand<?>>singletonList(command), command.limits());
+            return new Group(Collections.<SinglePartitionReadCommand>singletonList(command), command.limits());
         }
 
         public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException
@@ -536,10 +947,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
         {
             DecoratedKey key = metadata.decorateKey(metadata.getKeyValidator().readValue(in));
             ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata);
-            if (filter instanceof ClusteringIndexNamesFilter)
-                return new SinglePartitionNamesCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, (ClusteringIndexNamesFilter)filter);
-            else
-                return new SinglePartitionSliceCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, (ClusteringIndexSliceFilter)filter);
+            return new SinglePartitionReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4beb54da/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
deleted file mode 100644
index f4e7af1..0000000
--- a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-
-import com.google.common.collect.Iterables;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
-import org.apache.cassandra.db.lifecycle.SSTableSet;
-import org.apache.cassandra.db.lifecycle.View;
-import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.partitions.Partition;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.metrics.TableMetrics;
-import org.apache.cassandra.thrift.ThriftResultsMerger;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.memory.HeapAllocator;
-
-/**
- * General interface for storage-engine read queries.
- */
-public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<ClusteringIndexSliceFilter>
-{
-    private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
-
-    public SinglePartitionSliceCommand(boolean isDigest,
-                                       int digestVersion,
-                                       boolean isForThrift,
-                                       CFMetaData metadata,
-                                       int nowInSec,
-                                       ColumnFilter columnFilter,
-                                       RowFilter rowFilter,
-                                       DataLimits limits,
-                                       DecoratedKey partitionKey,
-                                       ClusteringIndexSliceFilter clusteringIndexFilter)
-    {
-        super(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
-    }
-
-    public SinglePartitionSliceCommand(CFMetaData metadata,
-                                       int nowInSec,
-                                       ColumnFilter columnFilter,
-                                       RowFilter rowFilter,
-                                       DataLimits limits,
-                                       DecoratedKey partitionKey,
-                                       ClusteringIndexSliceFilter clusteringIndexFilter)
-    {
-        this(false, 0, false, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
-    }
-
-    /**
-     * Creates a new single partition slice command for the provided single slice.
-     *
-     * @param metadata the table to query.
-     * @param nowInSec the time in seconds to use are "now" for this query.
-     * @param key the partition key for the partition to query.
-     * @param slice the slice of rows to query.
-     *
-     * @return a newly created read command that queries {@code slice} in {@code key}. The returned query will
-     * query every columns for the table (without limit or row filtering) and be in forward order.
-     */
-    public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Slice slice)
-    {
-        return create(metadata, nowInSec, key, Slices.with(metadata.comparator, slice));
-    }
-
-    /**
-     * Creates a new single partition slice command for the provided slices.
-     *
-     * @param metadata the table to query.
-     * @param nowInSec the time in seconds to use are "now" for this query.
-     * @param key the partition key for the partition to query.
-     * @param slices the slices of rows to query.
-     *
-     * @return a newly created read command that queries the {@code slices} in {@code key}. The returned query will
-     * query every columns for the table (without limit or row filtering) and be in forward order.
-     */
-    public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Slices slices)
-    {
-        ClusteringIndexSliceFilter filter = new ClusteringIndexSliceFilter(slices, false);
-        return new SinglePartitionSliceCommand(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter);
-    }
-
-    /**
-     * Creates a new single partition slice command for the provided slices.
-     *
-     * @param metadata the table to query.
-     * @param nowInSec the time in seconds to use are "now" for this query.
-     * @param key the partition key for the partition to query.
-     * @param slices the slices of rows to query.
-     *
-     * @return a newly created read command that queries the {@code slices} in {@code key}. The returned query will
-     * query every columns for the table (without limit or row filtering) and be in forward order.
-     */
-    public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, ByteBuffer key, Slices slices)
-    {
-        return create(metadata, nowInSec, metadata.decorateKey(key), slices);
-    }
-
-    public SinglePartitionSliceCommand copy()
-    {
-        return new SinglePartitionSliceCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter());
-    }
-
-    @Override
-    protected int oldestUnrepairedTombstone()
-    {
-        return oldestUnrepairedTombstone;
-    }
-
-    protected UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, boolean copyOnHeap)
-    {
-        Tracing.trace("Acquiring sstable references");
-        ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
-
-        List<UnfilteredRowIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
-        ClusteringIndexSliceFilter filter = clusteringIndexFilter();
-
-        try
-        {
-            for (Memtable memtable : view.memtables)
-            {
-                Partition partition = memtable.getPartition(partitionKey());
-                if (partition == null)
-                    continue;
-
-                @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
-                UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition);
-                @SuppressWarnings("resource") // same as above
-                UnfilteredRowIterator maybeCopied = copyOnHeap ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance) : iter;
-                oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, partition.stats().minLocalDeletionTime);
-                iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(maybeCopied, nowInSec()) : maybeCopied);
-            }
-            /*
-             * We can't eliminate full sstables based on the timestamp of what we've already read like
-             * in collectTimeOrderedData, but we still want to eliminate sstable whose maxTimestamp < mostRecentTombstone
-             * we've read. We still rely on the sstable ordering by maxTimestamp since if
-             *   maxTimestamp_s1 > maxTimestamp_s0,
-             * we're guaranteed that s1 cannot have a row tombstone such that
-             *   timestamp(tombstone) > maxTimestamp_s0
-             * since we necessarily have
-             *   timestamp(tombstone) <= maxTimestamp_s1
-             * In other words, iterating in maxTimestamp order allow to do our mostRecentPartitionTombstone elimination
-             * in one pass, and minimize the number of sstables for which we read a partition tombstone.
-             */
-            int sstablesIterated = 0;
-            Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
-            List<SSTableReader> skippedSSTables = null;
-            long mostRecentPartitionTombstone = Long.MIN_VALUE;
-            long minTimestamp = Long.MAX_VALUE;
-            int nonIntersectingSSTables = 0;
-
-            for (SSTableReader sstable : view.sstables)
-            {
-                minTimestamp = Math.min(minTimestamp, sstable.getMinTimestamp());
-                // if we've already seen a partition tombstone with a timestamp greater
-                // than the most recent update to this sstable, we can skip it
-                if (sstable.getMaxTimestamp() < mostRecentPartitionTombstone)
-                    break;
-
-                if (!filter.shouldInclude(sstable))
-                {
-                    nonIntersectingSSTables++;
-                    // sstable contains no tombstone if maxLocalDeletionTime == Integer.MAX_VALUE, so we can safely skip those entirely
-                    if (sstable.getSSTableMetadata().maxLocalDeletionTime != Integer.MAX_VALUE)
-                    {
-                        if (skippedSSTables == null)
-                            skippedSSTables = new ArrayList<>();
-                        skippedSSTables.add(sstable);
-                    }
-                    continue;
-                }
-
-                sstable.incrementReadCount();
-                @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
-                UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift()));
-                if (!sstable.isRepaired())
-                    oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
-
-                iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter);
-                mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone, iter.partitionLevelDeletion().markedForDeleteAt());
-                sstablesIterated++;
-            }
-
-            int includedDueToTombstones = 0;
-            // Check for partition tombstones in the skipped sstables
-            if (skippedSSTables != null)
-            {
-                for (SSTableReader sstable : skippedSSTables)
-                {
-                    if (sstable.getMaxTimestamp() <= minTimestamp)
-                        continue;
-
-                    sstable.incrementReadCount();
-                    @SuppressWarnings("resource") // 'iter' is either closed right away, or added to iterators which is close on exception, or through the closing of the final merged iterator
-                    UnfilteredRowIterator iter = filter.filter(sstable.iterator(partitionKey(), columnFilter(), filter.isReversed(), isForThrift()));
-                    if (iter.partitionLevelDeletion().markedForDeleteAt() > minTimestamp)
-                    {
-                        iterators.add(iter);
-                        if (!sstable.isRepaired())
-                            oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
-                        includedDueToTombstones++;
-                        sstablesIterated++;
-                    }
-                    else
-                    {
-                        iter.close();
-                    }
-                }
-            }
-            if (Tracing.isTracing())
-                Tracing.trace("Skipped {}/{} non-slice-intersecting sstables, included {} due to tombstones",
-                              nonIntersectingSSTables, view.sstables.size(), includedDueToTombstones);
-
-            cfs.metric.updateSSTableIterated(sstablesIterated);
-
-            if (iterators.isEmpty())
-                return EmptyIterators.unfilteredRow(cfs.metadata, partitionKey(), filter.isReversed());
-
-            Tracing.trace("Merging data from memtables and {} sstables", sstablesIterated);
-
-            @SuppressWarnings("resource") //  Closed through the closing of the result of that method.
-            UnfilteredRowIterator merged = UnfilteredRowIterators.merge(iterators, nowInSec());
-            if (!merged.isEmpty())
-            {
-                DecoratedKey key = merged.partitionKey();
-                cfs.metric.samplers.get(TableMetrics.Sampler.READS).addSample(key.getKey(), key.hashCode(), 1);
-            }
-
-            return merged;
-        }
-        catch (RuntimeException | Error e)
-        {
-            try
-            {
-                FBUtilities.closeAll(iterators);
-            }
-            catch (Exception suppressed)
-            {
-                e.addSuppressed(suppressed);
-            }
-            throw e;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4beb54da/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
index 0b43c19..cc90c40 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
@@ -33,7 +33,7 @@ public abstract class PartitionIterators
     private PartitionIterators() {}
 
     @SuppressWarnings("resource") // The created resources are returned right away
-    public static RowIterator getOnlyElement(final PartitionIterator iter, SinglePartitionReadCommand<?> command)
+    public static RowIterator getOnlyElement(final PartitionIterator iter, SinglePartitionReadCommand command)
     {
         // If the query has no results, we'll get an empty iterator, but we still
         // want a RowIterator out of this method, so we return an empty one.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4beb54da/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index afa731c..a3f7981 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -51,7 +51,7 @@ public abstract class UnfilteredPartitionIterators
     }
 
     @SuppressWarnings("resource") // The created resources are returned right away
-    public static UnfilteredRowIterator getOnlyElement(final UnfilteredPartitionIterator iter, SinglePartitionReadCommand<?> command)
+    public static UnfilteredRowIterator getOnlyElement(final UnfilteredPartitionIterator iter, SinglePartitionReadCommand command)
     {
         // If the query has no results, we'll get an empty iterator, but we still
         // want a RowIterator out of this method, so we return an empty one.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4beb54da/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
index 46701bc..b8ed800 100644
--- a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
@@ -134,13 +134,13 @@ public class CompositesSearcher extends CassandraIndexSearcher
 
                 // Query the gathered index hits. We still need to filter stale hits from the resulting query.
                 ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(clusterings.build(), false);
-                SinglePartitionReadCommand dataCmd = new SinglePartitionNamesCommand(index.baseCfs.metadata,
-                                                                                     command.nowInSec(),
-                                                                                     command.columnFilter(),
-                                                                                     command.rowFilter(),
-                                                                                     DataLimits.NONE,
-                                                                                     partitionKey,
-                                                                                     filter);
+                SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(index.baseCfs.metadata,
+                                                                                       command.nowInSec(),
+                                                                                       command.columnFilter(),
+                                                                                       command.rowFilter(),
+                                                                                       DataLimits.NONE,
+                                                                                       partitionKey,
+                                                                                       filter);
                 @SuppressWarnings("resource") // We close right away if empty, and if it's assign to next it will be called either
                                               // by the next caller of next, or through closing this iterator is this come before.
                 UnfilteredRowIterator dataIter = filterStaleEntries(dataCmd.queryMemtableAndDisk(index.baseCfs,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4beb54da/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
index 39066da..79f9e99 100644
--- a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
+++ b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
@@ -709,7 +709,7 @@ public final class LegacySchemaMigrator
         Slices slices = Slices.with(comparator, Slice.make(comparator, typeName));
         int nowInSec = FBUtilities.nowInSeconds();
         DecoratedKey key = store.metadata.decorateKey(AsciiType.instance.fromString(keyspaceName));
-        SinglePartitionReadCommand command = SinglePartitionSliceCommand.create(store.metadata, nowInSec, key, slices);
+        SinglePartitionReadCommand command = SinglePartitionReadCommand.create(store.metadata, nowInSec, key, slices);
 
         try (OpOrder.Group op = store.readOrdering.start();
              RowIterator partition = UnfilteredRowIterators.filter(command.queryMemtableAndDisk(store, op), nowInSec))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4beb54da/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index ca91b7b..4ff8a23 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -457,8 +457,8 @@ public final class SchemaKeyspace
         Slices slices = Slices.with(comparator, Slice.make(comparator, tableName));
         int nowInSec = FBUtilities.nowInSeconds();
         try (OpOrder.Group op = store.readOrdering.start();
-             RowIterator partition =  UnfilteredRowIterators.filter(SinglePartitionSliceCommand.create(store.metadata, nowInSec, getSchemaKSKey(keyspaceName), slices)
-                                                                                               .queryMemtableAndDisk(store, op), nowInSec))
+             RowIterator partition =  UnfilteredRowIterators.filter(SinglePartitionReadCommand.create(store.metadata, nowInSec, getSchemaKSKey(keyspaceName), slices)
+                                                                                              .queryMemtableAndDisk(store, op), nowInSec))
         {
             return fct.apply(partition);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4beb54da/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index 2de02f6..f3858d7 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -362,18 +362,18 @@ public class DataResolver extends ResponseResolver
                 DataLimits retryLimits = command.limits().forShortReadRetry(toQuery);
                 ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey);
                 ClusteringIndexFilter retryFilter = lastClustering == null ? filter : filter.forPaging(metadata.comparator, lastClustering, false);
-                SinglePartitionReadCommand<?> cmd = SinglePartitionReadCommand.create(command.metadata(),
-                                                                                      command.nowInSec(),
-                                                                                      command.columnFilter(),
-                                                                                      command.rowFilter(),
-                                                                                      retryLimits,
-                                                                                      partitionKey,
-                                                                                      retryFilter);
+                SinglePartitionReadCommand cmd = SinglePartitionReadCommand.create(command.metadata(),
+                                                                                   command.nowInSec(),
+                                                                                   command.columnFilter(),
+                                                                                   command.rowFilter(),
+                                                                                   retryLimits,
+                                                                                   partitionKey,
+                                                                                   retryFilter);
 
                 return doShortReadRetry(cmd);
             }
 
-            private UnfilteredRowIterator doShortReadRetry(SinglePartitionReadCommand<?> retryCommand)
+            private UnfilteredRowIterator doShortReadRetry(SinglePartitionReadCommand retryCommand)
             {
                 DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1);
                 ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4beb54da/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index d7d6c63..4986a26 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1541,7 +1541,7 @@ public class StorageProxy implements StorageProxyMBean
      * 4. If the digests (if any) match the data return the data
      * 5. else carry out read repair by getting data from all the nodes.
      */
-    private static PartitionIterator fetchRows(List<SinglePartitionReadCommand<?>> commands, ConsistencyLevel consistencyLevel)
+    private static PartitionIterator fetchRows(List<SinglePartitionReadCommand> commands, ConsistencyLevel consistencyLevel)
     throws UnavailableException, ReadFailureException, ReadTimeoutException
     {
         int cmdCount = commands.size();
@@ -1575,14 +1575,14 @@ public class StorageProxy implements StorageProxyMBean
 
     private static class SinglePartitionReadLifecycle
     {
-        private final SinglePartitionReadCommand<?> command;
+        private final SinglePartitionReadCommand command;
         private final AbstractReadExecutor executor;
         private final ConsistencyLevel consistency;
 
         private PartitionIterator result;
         private ReadCallback repairHandler;
 
-        SinglePartitionReadLifecycle(SinglePartitionReadCommand<?> command, ConsistencyLevel consistency)
+        SinglePartitionReadLifecycle(SinglePartitionReadCommand command, ConsistencyLevel consistency)
         {
             this.command = command;
             this.executor = AbstractReadExecutor.getReadExecutor(command, consistency);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4beb54da/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
index 70d4559..6f17284 100644
--- a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
@@ -35,11 +35,11 @@ public class SinglePartitionPager extends AbstractQueryPager
 {
     private static final Logger logger = LoggerFactory.getLogger(SinglePartitionPager.class);
 
-    private final SinglePartitionReadCommand<?> command;
+    private final SinglePartitionReadCommand command;
 
     private volatile PagingState.RowMark lastReturned;
 
-    public SinglePartitionPager(SinglePartitionReadCommand<?> command, PagingState state, int protocolVersion)
+    public SinglePartitionPager(SinglePartitionReadCommand command, PagingState state, int protocolVersion)
     {
         super(command, protocolVersion);
         this.command = command;