You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/01/27 15:50:20 UTC

[3/6] cassandra git commit: Sends the proper amount of cells to old nodes on DISTINCT

Sends the proper amount of cells to old nodes on DISTINCT

patch by slebresne; reviewed by blerer for CASSANDRA-10762

On a DISTINCT query, 3.0 nodes were sending the 1 row back, but pre-3.0
nodes actually expect only the 1st cell and limits get thrown off if
they get more. This could actually be a problem for thrift queries
(on CQL tables only) when the limit ended up in the middle of a row.
The patch fixes this by enforcing the cell limit while serializing the
response to old nodes.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3e37b4a9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3e37b4a9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3e37b4a9

Branch: refs/heads/trunk
Commit: 3e37b4a90d4e5a036f24ac3d9a3aa804df6e6969
Parents: eb12770
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jan 20 17:31:10 2016 +0100
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Wed Jan 27 15:45:27 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/LegacyLayout.java   |  53 ++++++++-
 .../org/apache/cassandra/db/ReadCommand.java    |  54 +++++----
 .../cassandra/db/ReadCommandVerbHandler.java    |   2 +-
 .../org/apache/cassandra/db/ReadResponse.java   | 110 ++++++++-----------
 .../apache/cassandra/db/filter/DataLimits.java  |  16 ++-
 .../db/partitions/PartitionUpdate.java          |   4 +-
 .../UnfilteredPartitionIterators.java           |   6 +-
 .../db/rows/UnfilteredRowIterators.java         |   6 +-
 .../org/apache/cassandra/repair/Validator.java  |   2 +-
 .../apache/cassandra/service/DataResolver.java  |   6 +-
 .../cassandra/service/DigestResolver.java       |   6 +-
 .../apache/cassandra/service/StorageProxy.java  |   2 +-
 .../cassandra/cache/CacheProviderTest.java      |   4 +-
 .../org/apache/cassandra/db/PartitionTest.java  |  18 +--
 .../apache/cassandra/db/ReadResponseTest.java   |  10 +-
 .../db/SinglePartitionSliceCommandTest.java     |  14 +--
 .../rows/DigestBackwardCompatibilityTest.java   |   5 +-
 .../cassandra/service/DataResolverTest.java     |   2 +-
 19 files changed, 191 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c99438f..8daeb2d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.3
+ * Fix DISTINCT queries in mixed version clusters (CASSANDRA-10762)
  * Migrate build status for indexes along with legacy schema (CASSANDRA-11046)
  * Ensure SSTables for legacy KEYS indexes can be read (CASSANDRA-11045)
  * Added support for IBM zSystems architecture (CASSANDRA-11054)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/LegacyLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java
index 6121227..b90151e 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -32,6 +32,7 @@ import com.google.common.collect.PeekingIterator;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.context.CounterContext;
@@ -318,8 +319,46 @@ public abstract class LegacyLayout
         return CompositeType.build(values);
     }
 
+    /**
+     * The maximum number of cells to include per partition when converting to the old format.
+     * <p>
+     * We already apply the limit during the actual query, but for queries that counts cells and not rows (thrift queries
+     * and distinct queries as far as old nodes are concerned), we may still include a little bit more than requested
+     * because {@link DataLimits} always include full rows. So if the limit ends in the middle of a queried row, the
+     * full row will be part of our result. This would confuse old nodes however so we make sure to truncate it to
+     * what's expected before writting it on the wire.
+     *
+     * @param command the read commmand for which to determine the maximum cells per partition. This can be {@code null}
+     * in which case {@code Integer.MAX_VALUE} is returned.
+     * @return the maximum number of cells per partition that should be enforced according to the read command if
+     * post-query limitation are in order (see above). This will be {@code Integer.MAX_VALUE} if no such limits are
+     * necessary.
+     */
+    private static int maxCellsPerPartition(ReadCommand command)
+    {
+        if (command == null)
+            return Integer.MAX_VALUE;
+
+        DataLimits limits = command.limits();
+
+        // There is 2 types of DISTINCT queries: those that includes only the partition key, and those that include static columns.
+        // On old nodes, the latter expects the first row in term of CQL count, which is what we already have and there is no additional
+        // limit to apply. The former however expect only one cell per partition and rely on it (See CASSANDRA-10762).
+        if (limits.isDistinct())
+            return command.columnFilter().fetchedColumns().statics.isEmpty() ? 1 : Integer.MAX_VALUE;
+
+        switch (limits.kind())
+        {
+            case THRIFT_LIMIT:
+            case SUPER_COLUMN_COUNTING_LIMIT:
+                return limits.perPartitionCount();
+            default:
+                return Integer.MAX_VALUE;
+        }
+    }
+
     // For serializing to old wire format
-    public static LegacyUnfilteredPartition fromUnfilteredRowIterator(UnfilteredRowIterator iterator)
+    public static LegacyUnfilteredPartition fromUnfilteredRowIterator(ReadCommand command, UnfilteredRowIterator iterator)
     {
         // we need to extract the range tombstone so materialize the partition. Since this is
         // used for the on-wire format, this is not worst than it used to be.
@@ -333,6 +372,10 @@ public abstract class LegacyLayout
         // before we use the LegacyRangeTombstoneList at all
         List<LegacyLayout.LegacyCell> cells = Lists.newArrayList(pair.right);
 
+        int maxCellsPerPartition = maxCellsPerPartition(command);
+        if (cells.size() > maxCellsPerPartition)
+            cells = cells.subList(0, maxCellsPerPartition);
+
         // The LegacyRangeTombstoneList already has range tombstones for the single-row deletions and complex
         // deletions.  Go through our normal range tombstones and add then to the LegacyRTL so that the range
         // tombstones all get merged and sorted properly.
@@ -352,13 +395,13 @@ public abstract class LegacyLayout
         return new LegacyUnfilteredPartition(info.getPartitionDeletion(), rtl, cells);
     }
 
-    public static void serializeAsLegacyPartition(UnfilteredRowIterator partition, DataOutputPlus out, int version) throws IOException
+    public static void serializeAsLegacyPartition(ReadCommand command, UnfilteredRowIterator partition, DataOutputPlus out, int version) throws IOException
     {
         assert version < MessagingService.VERSION_30;
 
         out.writeBoolean(true);
 
-        LegacyLayout.LegacyUnfilteredPartition legacyPartition = LegacyLayout.fromUnfilteredRowIterator(partition);
+        LegacyLayout.LegacyUnfilteredPartition legacyPartition = LegacyLayout.fromUnfilteredRowIterator(command, partition);
 
         UUIDSerializer.serializer.serialize(partition.metadata().cfId, out, version);
         DeletionTime.serializer.serialize(legacyPartition.partitionDeletion, out);
@@ -420,7 +463,7 @@ public abstract class LegacyLayout
     }
 
     // For the old wire format
-    public static long serializedSizeAsLegacyPartition(UnfilteredRowIterator partition, int version)
+    public static long serializedSizeAsLegacyPartition(ReadCommand command, UnfilteredRowIterator partition, int version)
     {
         assert version < MessagingService.VERSION_30;
 
@@ -429,7 +472,7 @@ public abstract class LegacyLayout
 
         long size = TypeSizes.sizeof(true);
 
-        LegacyLayout.LegacyUnfilteredPartition legacyPartition = LegacyLayout.fromUnfilteredRowIterator(partition);
+        LegacyLayout.LegacyUnfilteredPartition legacyPartition = LegacyLayout.fromUnfilteredRowIterator(command, partition);
 
         size += UUIDSerializer.serializer.serializedSize(partition.metadata().cfId, version);
         size += DeletionTime.serializer.serializedSize(legacyPartition.partitionDeletion);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 668a189..f21d100 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -282,11 +282,11 @@ public abstract class ReadCommand implements ReadQuery
 
     protected abstract int oldestUnrepairedTombstone();
 
-    public ReadResponse createResponse(UnfilteredPartitionIterator iterator, ColumnFilter selection)
+    public ReadResponse createResponse(UnfilteredPartitionIterator iterator)
     {
         return isDigestQuery()
-             ? ReadResponse.createDigestResponse(iterator, digestVersion)
-             : ReadResponse.createDataResponse(iterator, selection);
+             ? ReadResponse.createDigestResponse(iterator, this)
+             : ReadResponse.createDataResponse(iterator, this);
     }
 
     public long indexSerializedSize(int version)
@@ -723,18 +723,17 @@ public abstract class ReadCommand implements ReadQuery
                 out.writeBoolean(filter.isReversed());
 
                 // limit
-                DataLimits.Kind kind = rangeCommand.limits().kind();
-                boolean isDistinct = (kind == DataLimits.Kind.CQL_LIMIT || kind == DataLimits.Kind.CQL_PAGING_LIMIT) && rangeCommand.limits().perPartitionCount() == 1;
-                if (isDistinct)
+                DataLimits limits = rangeCommand.limits();
+                if (limits.isDistinct())
                     out.writeInt(1);
                 else
                     out.writeInt(LegacyReadCommandSerializer.updateLimitForQuery(rangeCommand.limits().count(), filter.requestedSlices()));
 
                 int compositesToGroup;
                 boolean selectsStatics = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() || filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
-                if (kind == DataLimits.Kind.THRIFT_LIMIT)
+                if (limits.kind() == DataLimits.Kind.THRIFT_LIMIT)
                     compositesToGroup = -1;
-                else if (isDistinct && !selectsStatics)
+                else if (limits.isDistinct() && !selectsStatics)
                     compositesToGroup = -2;  // for DISTINCT queries (CASSANDRA-8490)
                 else
                     compositesToGroup = metadata.isDense() ? -1 : metadata.clusteringColumns().size();
@@ -799,11 +798,15 @@ public abstract class ReadCommand implements ReadQuery
             AbstractBounds<PartitionPosition> keyRange = AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, version);
             int maxResults = in.readInt();
 
-            in.readBoolean();  // countCQL3Rows (not needed)
+            boolean countCQL3Rows = in.readBoolean();  // countCQL3Rows (not needed)
             in.readBoolean();  // isPaging (not needed)
 
             boolean selectsStatics = (!selection.fetchedColumns().statics.isEmpty() || filter.selects(Clustering.STATIC_CLUSTERING));
-            boolean isDistinct = compositesToGroup == -2 || (perPartitionLimit == 1 && selectsStatics);
+            // We have 2 types of DISTINCT queries: ones on only the partition key, and ones on the partition key and static columns. For the former,
+            // we can easily detect the case because compositeToGroup is -2 and that's the only case it can be that. The latter one is slightly less
+            // direct, but we know that on 2.1/2.2 queries, DISTINCT queries are the only CQL queries that have countCQL3Rows to false so we use
+            // that fact.
+            boolean isDistinct = compositesToGroup == -2 || (compositesToGroup != -1 && !countCQL3Rows);
             DataLimits limits;
             if (isDistinct)
                 limits = DataLimits.distinctLimits(maxResults);
@@ -1054,9 +1057,13 @@ public abstract class ReadCommand implements ReadQuery
 
             RowFilter rowFilter = LegacyRangeSliceCommandSerializer.deserializeRowFilter(in, metadata);
             int maxResults = in.readInt();
-            in.readBoolean(); // countCQL3Rows
+            boolean countCQL3Rows = in.readBoolean();
 
-            boolean isDistinct = compositesToGroup == -2 || (perPartitionLimit == 1 && selectsStatics);
+            // We have 2 types of DISTINCT queries: ones on only the partition key, and ones on the partition key and static columns. For the former,
+            // we can easily detect the case because compositeToGroup is -2 and that's the only case it can be that. The latter one is slightly less
+            // direct, but we know that on 2.1/2.2 queries, DISTINCT queries are the only CQL queries that have countCQL3Rows to false so we use
+            // that fact.
+            boolean isDistinct = compositesToGroup == -2 || (compositesToGroup != -1 && !countCQL3Rows);
             DataLimits limits;
             if (isDistinct)
                 limits = DataLimits.distinctLimits(maxResults);
@@ -1340,17 +1347,16 @@ public abstract class ReadCommand implements ReadQuery
             out.writeBoolean(filter.isReversed());
 
             boolean selectsStatics = !command.columnFilter().fetchedColumns().statics.isEmpty() || slices.selects(Clustering.STATIC_CLUSTERING);
-            DataLimits.Kind kind = command.limits().kind();
-            boolean isDistinct = (kind == DataLimits.Kind.CQL_LIMIT || kind == DataLimits.Kind.CQL_PAGING_LIMIT) && command.limits().perPartitionCount() == 1;
-            if (isDistinct)
+            DataLimits limits = command.limits();
+            if (limits.isDistinct())
                 out.writeInt(1);  // the limit is always 1 for DISTINCT queries
             else
                 out.writeInt(updateLimitForQuery(command.limits().count(), filter.requestedSlices()));
 
             int compositesToGroup;
-            if (kind == DataLimits.Kind.THRIFT_LIMIT || metadata.isDense())
+            if (limits.kind() == DataLimits.Kind.THRIFT_LIMIT || metadata.isDense())
                 compositesToGroup = -1;
-            else if (isDistinct && !selectsStatics)
+            else if (limits.isDistinct() && !selectsStatics)
                 compositesToGroup = -2;  // for DISTINCT queries (CASSANDRA-8490)
             else
                 compositesToGroup = metadata.clusteringColumns().size();
@@ -1369,9 +1375,19 @@ public abstract class ReadCommand implements ReadQuery
             // if a slice query from a pre-3.0 node doesn't cover statics, we shouldn't select them at all
             ColumnFilter columnFilter = LegacyRangeSliceCommandSerializer.getColumnSelectionForSlice(selectsStatics, compositesToGroup, metadata);
 
-            boolean isDistinct = compositesToGroup == -2 || (count == 1 && selectsStatics);
+            // We have 2 types of DISTINCT queries: ones on only the partition key, and ones on the partition key and static columns. For the former,
+            // we can easily detect the case because compositeToGroup is -2 and that's the only case it can be that. The latter is probablematic
+            // however as we have no way to distinguish it from a normal select with a limit of 1 (and this, contrarily to the range query case
+            // were the countCQL3Rows boolean allows us to decide).
+            // So we consider this case not distinct here. This is ok because even if it is a distinct (with static), the count will be 1 and
+            // we'll still just query one row (a distinct DataLimits currently behave exactly like a CQL limit with a count of 1). The only
+            // drawback is that we'll send back the first row entirely while a 2.1/2.2 node would return only the first cell in that same
+            // situation. This isn't a problem for 2.1/2.2 code however (it would be for a range query, as it would throw off the count for
+            // reasons similar to CASSANDRA-10762, but it's ok for single partition queries).
+            // We do _not_ want to do the reverse however and consider a 'SELECT * FROM foo LIMIT 1' as a DISTINCT query as that would make
+            // us only return the 1st cell rather then 1st row.
             DataLimits limits;
-            if (compositesToGroup == -2 || isDistinct)
+            if (compositesToGroup == -2)
                 limits = DataLimits.distinctLimits(count);  // See CASSANDRA-8490 for the explanation of this value
             else if (compositesToGroup == -1)
                 limits = DataLimits.thriftLimits(1, count);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
index 72a6fa8..9cde8dc 100644
--- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
@@ -44,7 +44,7 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
         ReadResponse response;
         try (ReadOrderGroup opGroup = command.startOrderGroup(); UnfilteredPartitionIterator iterator = command.executeLocally(opGroup))
         {
-            response = command.createResponse(iterator, command.columnFilter());
+            response = command.createResponse(iterator);
         }
 
         MessageOut<ReadResponse> reply = new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, response, serializer());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index 41f0d5d..a618aa5 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -53,38 +53,38 @@ public abstract class ReadResponse
 
     // This is used only when serializing data responses and we can't it easily in other cases. So this can be null, which is slighly
     // hacky, but as this hack doesn't escape this class, and it's easy enough to validate that it's not null when we need, it's "good enough".
-    private final CFMetaData metadata;
+    private final ReadCommand command;
 
-    protected ReadResponse(CFMetaData metadata)
+    protected ReadResponse(ReadCommand command)
     {
-        this.metadata = metadata;
+        this.command = command;
     }
 
-    public static ReadResponse createDataResponse(UnfilteredPartitionIterator data, ColumnFilter selection)
+    public static ReadResponse createDataResponse(UnfilteredPartitionIterator data, ReadCommand command)
     {
-        return new LocalDataResponse(data, selection);
+        return new LocalDataResponse(data, command);
     }
 
     @VisibleForTesting
-    public static ReadResponse createRemoteDataResponse(UnfilteredPartitionIterator data, ColumnFilter selection)
+    public static ReadResponse createRemoteDataResponse(UnfilteredPartitionIterator data, ReadCommand command)
     {
-        return new RemoteDataResponse(LocalDataResponse.build(data, selection));
+        return new RemoteDataResponse(LocalDataResponse.build(data, command.columnFilter()));
     }
 
-    public static ReadResponse createDigestResponse(UnfilteredPartitionIterator data, int version)
+    public static ReadResponse createDigestResponse(UnfilteredPartitionIterator data, ReadCommand command)
     {
-        return new DigestResponse(makeDigest(data, version));
+        return new DigestResponse(makeDigest(data, command));
     }
 
-    public abstract UnfilteredPartitionIterator makeIterator(CFMetaData metadata, ReadCommand command);
-    public abstract ByteBuffer digest(CFMetaData metadata, ReadCommand command);
+    public abstract UnfilteredPartitionIterator makeIterator(ReadCommand command);
+    public abstract ByteBuffer digest(ReadCommand command);
 
     public abstract boolean isDigestResponse();
 
-    protected static ByteBuffer makeDigest(UnfilteredPartitionIterator iterator, int version)
+    protected static ByteBuffer makeDigest(UnfilteredPartitionIterator iterator, ReadCommand command)
     {
         MessageDigest digest = FBUtilities.threadLocalMD5Digest();
-        UnfilteredPartitionIterators.digest(iterator, digest, version);
+        UnfilteredPartitionIterators.digest(command, iterator, digest, command.digestVersion());
         return ByteBuffer.wrap(digest.digest());
     }
 
@@ -99,12 +99,12 @@ public abstract class ReadResponse
             this.digest = digest;
         }
 
-        public UnfilteredPartitionIterator makeIterator(CFMetaData metadata, ReadCommand command)
+        public UnfilteredPartitionIterator makeIterator(ReadCommand command)
         {
             throw new UnsupportedOperationException();
         }
 
-        public ByteBuffer digest(CFMetaData metadata, ReadCommand command)
+        public ByteBuffer digest(ReadCommand command)
         {
             // We assume that the digest is in the proper version, which bug excluded should be true since this is called with
             // ReadCommand.digestVersion() as argument and that's also what we use to produce the digest in the first place.
@@ -122,11 +122,9 @@ public abstract class ReadResponse
     // built on the owning node responding to a query
     private static class LocalDataResponse extends DataResponse
     {
-        private final ColumnFilter received;
-        private LocalDataResponse(UnfilteredPartitionIterator iter, ColumnFilter received)
+        private LocalDataResponse(UnfilteredPartitionIterator iter, ReadCommand command)
         {
-            super(iter.metadata(), build(iter, received), SerializationHelper.Flag.LOCAL);
-            this.received = received;
+            super(command, build(iter, command.columnFilter()), SerializationHelper.Flag.LOCAL);
         }
 
         private static ByteBuffer build(UnfilteredPartitionIterator iter, ColumnFilter selection)
@@ -142,14 +140,6 @@ public abstract class ReadResponse
                 throw new RuntimeException(e);
             }
         }
-
-        protected ColumnFilter selection(ReadCommand sent)
-        {
-            // we didn't send anything, so we don't provide it in the serializer methods, but use the
-            // object's reference to the original column filter we received
-            assert sent == null || sent.columnFilter() == received;
-            return received;
-        }
     }
 
     // built on the coordinator node receiving a response
@@ -159,13 +149,6 @@ public abstract class ReadResponse
         {
             super(null, data, SerializationHelper.Flag.FROM_REMOTE);
         }
-
-        protected ColumnFilter selection(ReadCommand sent)
-        {
-            // we should always know what we sent, and should provide it in digest() and makeIterator()
-            assert sent != null;
-            return sent.columnFilter();
-        }
     }
 
     static abstract class DataResponse extends ReadResponse
@@ -175,23 +158,24 @@ public abstract class ReadResponse
         private final ByteBuffer data;
         private final SerializationHelper.Flag flag;
 
-        protected DataResponse(CFMetaData metadata, ByteBuffer data, SerializationHelper.Flag flag)
+        protected DataResponse(ReadCommand command, ByteBuffer data, SerializationHelper.Flag flag)
         {
-            super(metadata);
+            super(command);
             this.data = data;
             this.flag = flag;
         }
 
-        protected abstract ColumnFilter selection(ReadCommand command);
-
-        public UnfilteredPartitionIterator makeIterator(CFMetaData metadata, ReadCommand command)
+        public UnfilteredPartitionIterator makeIterator(ReadCommand command)
         {
             try (DataInputBuffer in = new DataInputBuffer(data, true))
             {
+                // Note that the command parameter shadows the 'command' field and this is intended because
+                // the later can be null (for RemoteDataResponse as those are created in the serializers and
+                // those don't have easy access to the command). This is also why we need the command as parameter here.
                 return UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in,
                                                                                          MessagingService.current_version,
-                                                                                         metadata,
-                                                                                         selection(command),
+                                                                                         command.metadata(),
+                                                                                         command.columnFilter(),
                                                                                          flag);
             }
             catch (IOException e)
@@ -201,11 +185,11 @@ public abstract class ReadResponse
             }
         }
 
-        public ByteBuffer digest(CFMetaData metadata, ReadCommand command)
+        public ByteBuffer digest(ReadCommand command)
         {
-            try (UnfilteredPartitionIterator iterator = makeIterator(metadata, command))
+            try (UnfilteredPartitionIterator iterator = makeIterator(command))
             {
-                return makeDigest(iterator, command.digestVersion());
+                return makeDigest(iterator, command);
             }
         }
 
@@ -229,11 +213,11 @@ public abstract class ReadResponse
         @VisibleForTesting
         LegacyRemoteDataResponse(List<ImmutableBTreePartition> partitions)
         {
-            super(null); // we never serialize LegacyRemoteDataResponses, so we don't care about the metadata
+            super(null); // we never serialize LegacyRemoteDataResponses, so we don't care about the command
             this.partitions = partitions;
         }
 
-        public UnfilteredPartitionIterator makeIterator(CFMetaData metadata, final ReadCommand command)
+        public UnfilteredPartitionIterator makeIterator(final ReadCommand command)
         {
             // Due to a bug in the serialization of AbstractBounds, anything that isn't a Range is understood by pre-3.0 nodes
             // as a Bound, which means IncludingExcludingBounds and ExcludingBounds responses may include keys they shouldn't.
@@ -271,7 +255,7 @@ public abstract class ReadResponse
 
                 public CFMetaData metadata()
                 {
-                    return metadata;
+                    return command.metadata();
                 }
 
                 public boolean hasNext()
@@ -296,11 +280,11 @@ public abstract class ReadResponse
             };
         }
 
-        public ByteBuffer digest(CFMetaData metadata, ReadCommand command)
+        public ByteBuffer digest(ReadCommand command)
         {
-            try (UnfilteredPartitionIterator iterator = makeIterator(metadata, command))
+            try (UnfilteredPartitionIterator iterator = makeIterator(command))
             {
-                return makeDigest(iterator, command.digestVersion());
+                return makeDigest(iterator, command);
             }
         }
 
@@ -323,14 +307,14 @@ public abstract class ReadResponse
                 out.writeBoolean(isDigest);
                 if (!isDigest)
                 {
-                    assert !(response instanceof LegacyRemoteDataResponse); // we only use those on the receiving side
-                    try (UnfilteredPartitionIterator iter = response.makeIterator(response.metadata, null))
+                    assert response.command != null; // we only serialize LocalDataResponse, which always has the command set
+                    try (UnfilteredPartitionIterator iter = response.makeIterator(response.command))
                     {
                         assert iter.hasNext();
                         try (UnfilteredRowIterator partition = iter.next())
                         {
                             ByteBufferUtil.writeWithShortLength(partition.partitionKey().getKey(), out);
-                            LegacyLayout.serializeAsLegacyPartition(partition, out, version);
+                            LegacyLayout.serializeAsLegacyPartition(response.command, partition, out, version);
                         }
                         assert !iter.hasNext();
                     }
@@ -397,14 +381,14 @@ public abstract class ReadResponse
                         + TypeSizes.sizeof(isDigest);
                 if (!isDigest)
                 {
-                    assert !(response instanceof LegacyRemoteDataResponse); // we only use those on the receiving side
-                    try (UnfilteredPartitionIterator iter = response.makeIterator(response.metadata, null))
+                    assert response.command != null; // we only serialize LocalDataResponse, which always has the command set
+                    try (UnfilteredPartitionIterator iter = response.makeIterator(response.command))
                     {
                         assert iter.hasNext();
                         try (UnfilteredRowIterator partition = iter.next())
                         {
                             size += ByteBufferUtil.serializedSizeWithShortLength(partition.partitionKey().getKey());
-                            size += LegacyLayout.serializedSizeAsLegacyPartition(partition, version);
+                            size += LegacyLayout.serializedSizeAsLegacyPartition(response.command, partition, version);
                         }
                         assert !iter.hasNext();
                     }
@@ -458,8 +442,8 @@ public abstract class ReadResponse
 
             // determine the number of partitions upfront for serialization
             int numPartitions = 0;
-            assert !(response instanceof LegacyRemoteDataResponse); // we only use those on the receiving side
-            try (UnfilteredPartitionIterator iterator = response.makeIterator(response.metadata, null))
+            assert response.command != null; // we only serialize LocalDataResponse, which always has the command set
+            try (UnfilteredPartitionIterator iterator = response.makeIterator(response.command))
             {
                 while (iterator.hasNext())
                 {
@@ -476,14 +460,14 @@ public abstract class ReadResponse
 
             out.writeInt(numPartitions);
 
-            try (UnfilteredPartitionIterator iterator = response.makeIterator(response.metadata, null))
+            try (UnfilteredPartitionIterator iterator = response.makeIterator(response.command))
             {
                 while (iterator.hasNext())
                 {
                     try (UnfilteredRowIterator partition = iterator.next())
                     {
                         ByteBufferUtil.writeWithShortLength(partition.partitionKey().getKey(), out);
-                        LegacyLayout.serializeAsLegacyPartition(partition, out, version);
+                        LegacyLayout.serializeAsLegacyPartition(response.command, partition, out, version);
                     }
                 }
             }
@@ -509,15 +493,15 @@ public abstract class ReadResponse
             assert version < MessagingService.VERSION_30;
             long size = TypeSizes.sizeof(0);  // number of partitions
 
-            assert !(response instanceof LegacyRemoteDataResponse); // we only use those on the receiving side
-            try (UnfilteredPartitionIterator iterator = response.makeIterator(response.metadata, null))
+            assert response.command != null; // we only serialize LocalDataResponse, which always has the command set
+            try (UnfilteredPartitionIterator iterator = response.makeIterator(response.command))
             {
                 while (iterator.hasNext())
                 {
                     try (UnfilteredRowIterator partition = iterator.next())
                     {
                         size += ByteBufferUtil.serializedSizeWithShortLength(partition.partitionKey().getKey());
-                        size += LegacyLayout.serializedSizeAsLegacyPartition(partition, version);
+                        size += LegacyLayout.serializedSizeAsLegacyPartition(response.command, partition, version);
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/filter/DataLimits.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java b/src/java/org/apache/cassandra/db/filter/DataLimits.java
index 19f24ad..f6fdcdd 100644
--- a/src/java/org/apache/cassandra/db/filter/DataLimits.java
+++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java
@@ -98,6 +98,7 @@ public abstract class DataLimits
     public abstract Kind kind();
 
     public abstract boolean isUnlimited();
+    public abstract boolean isDistinct();
 
     public abstract DataLimits forPaging(int pageSize);
     public abstract DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining);
@@ -232,8 +233,7 @@ public abstract class DataLimits
         protected final int rowLimit;
         protected final int perPartitionLimit;
 
-        // Whether the query is a distinct query or not. This is currently not used by the code but prior experience
-        // shows that keeping the information around is wise and might be useful in the future.
+        // Whether the query is a distinct query or not.
         protected final boolean isDistinct;
 
         private CQLLimits(int rowLimit)
@@ -268,9 +268,14 @@ public abstract class DataLimits
             return rowLimit == NO_LIMIT && perPartitionLimit == NO_LIMIT;
         }
 
+        public boolean isDistinct()
+        {
+            return isDistinct;
+        }
+
         public DataLimits forPaging(int pageSize)
         {
-            return new CQLLimits(pageSize, perPartitionLimit);
+            return new CQLLimits(pageSize, perPartitionLimit, isDistinct);
         }
 
         public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining)
@@ -513,6 +518,11 @@ public abstract class DataLimits
             return partitionLimit == NO_LIMIT && cellPerPartitionLimit == NO_LIMIT;
         }
 
+        public boolean isDistinct()
+        {
+            return false;
+        }
+
         public DataLimits forPaging(int pageSize)
         {
             // We don't support paging on thrift in general but do use paging under the hood for get_count. For

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index f10b3b6..6331440 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -613,7 +613,7 @@ public class PartitionUpdate extends AbstractBTreePartition
 
                 if (version < MessagingService.VERSION_30)
                 {
-                    LegacyLayout.serializeAsLegacyPartition(iter, out, version);
+                    LegacyLayout.serializeAsLegacyPartition(null, iter, out, version);
                 }
                 else
                 {
@@ -699,7 +699,7 @@ public class PartitionUpdate extends AbstractBTreePartition
             try (UnfilteredRowIterator iter = update.sliceableUnfilteredIterator())
             {
                 if (version < MessagingService.VERSION_30)
-                    return LegacyLayout.serializedSizeAsLegacyPartition(iter, version);
+                    return LegacyLayout.serializedSizeAsLegacyPartition(null, iter, version);
 
                 return CFMetaData.serializer.serializedSize(update.metadata(), version)
                      + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, null, version, update.rowCount());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index a3f7981..41b1424 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -238,11 +238,13 @@ public abstract class UnfilteredPartitionIterators
     /**
      * Digests the the provided iterator.
      *
+     * @param command the command that has yield {@code iterator}. This can be null if {@code version >= MessagingService.VERSION_30}
+     * as this is only used when producing digest to be sent to legacy nodes.
      * @param iterator the iterator to digest.
      * @param digest the {@code MessageDigest} to use for the digest.
      * @param version the messaging protocol to use when producing the digest.
      */
-    public static void digest(UnfilteredPartitionIterator iterator, MessageDigest digest, int version)
+    public static void digest(ReadCommand command, UnfilteredPartitionIterator iterator, MessageDigest digest, int version)
     {
         try (UnfilteredPartitionIterator iter = iterator)
         {
@@ -250,7 +252,7 @@ public abstract class UnfilteredPartitionIterators
             {
                 try (UnfilteredRowIterator partition = iter.next())
                 {
-                    UnfilteredRowIterators.digest(partition, digest, version);
+                    UnfilteredRowIterators.digest(command, partition, digest, version);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index ea929d7..9416896 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -102,15 +102,17 @@ public abstract class UnfilteredRowIterators
     /**
      * Digests the partition represented by the provided iterator.
      *
+     * @param command the command that has yield {@code iterator}. This can be null if {@code version >= MessagingService.VERSION_30}
+     * as this is only used when producing digest to be sent to legacy nodes.
      * @param iterator the iterator to digest.
      * @param digest the {@code MessageDigest} to use for the digest.
      * @param version the messaging protocol to use when producing the digest.
      */
-    public static void digest(UnfilteredRowIterator iterator, MessageDigest digest, int version)
+    public static void digest(ReadCommand command, UnfilteredRowIterator iterator, MessageDigest digest, int version)
     {
         if (version < MessagingService.VERSION_30)
         {
-            LegacyLayout.fromUnfilteredRowIterator(iterator).digest(iterator.metadata(), digest);
+            LegacyLayout.fromUnfilteredRowIterator(command, iterator).digest(iterator.metadata(), digest);
             return;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java
index 3db9761..217c9de 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -211,7 +211,7 @@ public class Validator implements Runnable
         validated++;
         // MerkleTree uses XOR internally, so we want lots of output bits here
         CountingDigest digest = new CountingDigest(FBUtilities.newMessageDigest("SHA-256"));
-        UnfilteredRowIterators.digest(partition, digest, MessagingService.current_version);
+        UnfilteredRowIterators.digest(null, partition, digest, MessagingService.current_version);
         // only return new hash for merkle tree in case digest was updated - see CASSANDRA-8979
         return digest.count > 0
              ? new MerkleTree.RowHash(partition.partitionKey().getToken(), digest.digest(), digest.count)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index f3858d7..1fe931f 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -50,7 +50,7 @@ public class DataResolver extends ResponseResolver
     public PartitionIterator getData()
     {
         ReadResponse response = responses.iterator().next().payload;
-        return UnfilteredPartitionIterators.filter(response.makeIterator(command.metadata(), command), command.nowInSec());
+        return UnfilteredPartitionIterators.filter(response.makeIterator(command), command.nowInSec());
     }
 
     public PartitionIterator resolve()
@@ -63,7 +63,7 @@ public class DataResolver extends ResponseResolver
         for (int i = 0; i < count; i++)
         {
             MessageIn<ReadResponse> msg = responses.get(i);
-            iters.add(msg.payload.makeIterator(command.metadata(), command));
+            iters.add(msg.payload.makeIterator(command));
             sources[i] = msg.from;
         }
 
@@ -385,7 +385,7 @@ public class DataResolver extends ResponseResolver
                 // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results.
                 handler.awaitResults();
                 assert resolver.responses.size() == 1;
-                return UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command.metadata(), command), retryCommand);
+                return UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command), retryCommand);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/service/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DigestResolver.java b/src/java/org/apache/cassandra/service/DigestResolver.java
index 62b4538..4a918a3 100644
--- a/src/java/org/apache/cassandra/service/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/DigestResolver.java
@@ -48,7 +48,7 @@ public class DigestResolver extends ResponseResolver
     public PartitionIterator getData()
     {
         assert isDataPresent();
-        return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command.metadata(), command), command.nowInSec());
+        return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec());
     }
 
     /*
@@ -77,7 +77,7 @@ public class DigestResolver extends ResponseResolver
         {
             ReadResponse response = message.payload;
 
-            ByteBuffer newDigest = response.digest(command.metadata(), command);
+            ByteBuffer newDigest = response.digest(command);
             if (digest == null)
                 digest = newDigest;
             else if (!digest.equals(newDigest))
@@ -88,7 +88,7 @@ public class DigestResolver extends ResponseResolver
         if (logger.isTraceEnabled())
             logger.trace("resolve: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
 
-        return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command.metadata(), command), command.nowInSec());
+        return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec());
     }
 
     public boolean isDataPresent()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 89ac0bb..8fa2082 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1779,7 +1779,7 @@ public class StorageProxy implements StorageProxyMBean
             {
                 try (ReadOrderGroup orderGroup = command.startOrderGroup(); UnfilteredPartitionIterator iterator = command.executeLocally(orderGroup))
                 {
-                    handler.response(command.createResponse(iterator, command.columnFilter()));
+                    handler.response(command.createResponse(iterator));
                 }
                 MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
index cd52d35..a4173d6 100644
--- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
+++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
@@ -103,8 +103,8 @@ public class CacheProviderTest
         {
             MessageDigest d1 = MessageDigest.getInstance("MD5");
             MessageDigest d2 = MessageDigest.getInstance("MD5");
-            UnfilteredRowIterators.digest(((CachedBTreePartition) one).unfilteredIterator(), d1, MessagingService.current_version);
-            UnfilteredRowIterators.digest(((CachedBTreePartition) two).unfilteredIterator(), d2, MessagingService.current_version);
+            UnfilteredRowIterators.digest(null, ((CachedBTreePartition) one).unfilteredIterator(), d1, MessagingService.current_version);
+            UnfilteredRowIterators.digest(null, ((CachedBTreePartition) two).unfilteredIterator(), d2, MessagingService.current_version);
             assertTrue(MessageDigest.isEqual(d1.digest(), d2.digest()));
         }
         catch (NoSuchAlgorithmException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/test/unit/org/apache/cassandra/db/PartitionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/PartitionTest.java b/test/unit/org/apache/cassandra/db/PartitionTest.java
index 623ff0e..7216ab7 100644
--- a/test/unit/org/apache/cassandra/db/PartitionTest.java
+++ b/test/unit/org/apache/cassandra/db/PartitionTest.java
@@ -138,21 +138,23 @@ public class PartitionTest
 
             new RowUpdateBuilder(cfs.metadata, 5, "key2").clustering("c").add("val", "val2").build().applyUnsafe();
 
-            ImmutableBTreePartition p1 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key1").build());
-            ImmutableBTreePartition p2 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build());
+            ReadCommand cmd1 = Util.cmd(cfs, "key1").build();
+            ReadCommand cmd2 = Util.cmd(cfs, "key2").build();
+            ImmutableBTreePartition p1 = Util.getOnlyPartitionUnfiltered(cmd1);
+            ImmutableBTreePartition p2 = Util.getOnlyPartitionUnfiltered(cmd2);
 
             MessageDigest digest1 = MessageDigest.getInstance("MD5");
             MessageDigest digest2 = MessageDigest.getInstance("MD5");
-            UnfilteredRowIterators.digest(p1.unfilteredIterator(), digest1, version);
-            UnfilteredRowIterators.digest(p2.unfilteredIterator(), digest2, version);
+            UnfilteredRowIterators.digest(cmd1, p1.unfilteredIterator(), digest1, version);
+            UnfilteredRowIterators.digest(cmd2, p2.unfilteredIterator(), digest2, version);
             assertFalse(Arrays.equals(digest1.digest(), digest2.digest()));
 
             p1 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build());
             p2 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build());
             digest1 = MessageDigest.getInstance("MD5");
             digest2 = MessageDigest.getInstance("MD5");
-            UnfilteredRowIterators.digest(p1.unfilteredIterator(), digest1, version);
-            UnfilteredRowIterators.digest(p2.unfilteredIterator(), digest2, version);
+            UnfilteredRowIterators.digest(cmd1, p1.unfilteredIterator(), digest1, version);
+            UnfilteredRowIterators.digest(cmd2, p2.unfilteredIterator(), digest2, version);
             assertTrue(Arrays.equals(digest1.digest(), digest2.digest()));
 
             p1 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build());
@@ -160,8 +162,8 @@ public class PartitionTest
             p2 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build());
             digest1 = MessageDigest.getInstance("MD5");
             digest2 = MessageDigest.getInstance("MD5");
-            UnfilteredRowIterators.digest(p1.unfilteredIterator(), digest1, version);
-            UnfilteredRowIterators.digest(p2.unfilteredIterator(), digest2, version);
+            UnfilteredRowIterators.digest(cmd1, p1.unfilteredIterator(), digest1, version);
+            UnfilteredRowIterators.digest(cmd2, p2.unfilteredIterator(), digest2, version);
             assertFalse(Arrays.equals(digest1.digest(), digest2.digest()));
         }
         finally

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/test/unit/org/apache/cassandra/db/ReadResponseTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ReadResponseTest.java b/test/unit/org/apache/cassandra/db/ReadResponseTest.java
index af0ec60..52ab8bb 100644
--- a/test/unit/org/apache/cassandra/db/ReadResponseTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadResponseTest.java
@@ -69,12 +69,12 @@ public class ReadResponseTest extends CQLTester
                                                                 makePartition(cfs.metadata, "k3"));
         ReadResponse.LegacyRemoteDataResponse response = new ReadResponse.LegacyRemoteDataResponse(responses);
 
-        assertPartitions(response.makeIterator(cfs.metadata, Util.cmd(cfs).fromKeyExcl("k1").toKeyExcl("k3").build()), "k2");
-        assertPartitions(response.makeIterator(cfs.metadata, Util.cmd(cfs).fromKeyExcl("k0").toKeyExcl("k3").build()), "k1", "k2");
-        assertPartitions(response.makeIterator(cfs.metadata, Util.cmd(cfs).fromKeyExcl("k1").toKeyExcl("k4").build()), "k2", "k3");
+        assertPartitions(response.makeIterator(Util.cmd(cfs).fromKeyExcl("k1").toKeyExcl("k3").build()), "k2");
+        assertPartitions(response.makeIterator(Util.cmd(cfs).fromKeyExcl("k0").toKeyExcl("k3").build()), "k1", "k2");
+        assertPartitions(response.makeIterator(Util.cmd(cfs).fromKeyExcl("k1").toKeyExcl("k4").build()), "k2", "k3");
 
-        assertPartitions(response.makeIterator(cfs.metadata, Util.cmd(cfs).fromKeyIncl("k1").toKeyExcl("k3").build()), "k1", "k2");
-        assertPartitions(response.makeIterator(cfs.metadata, Util.cmd(cfs).fromKeyIncl("k1").toKeyExcl("k4").build()), "k1", "k2", "k3");
+        assertPartitions(response.makeIterator(Util.cmd(cfs).fromKeyIncl("k1").toKeyExcl("k3").build()), "k1", "k2");
+        assertPartitions(response.makeIterator(Util.cmd(cfs).fromKeyIncl("k1").toKeyExcl("k4").build()), "k1", "k2", "k3");
     }
 
     private void assertPartitions(UnfilteredPartitionIterator actual, String... expectedKeys)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
index 7cacb5e..9af6028 100644
--- a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
@@ -111,14 +111,14 @@ public class SinglePartitionSliceCommandTest
 
         logger.debug("ReadCommand: {}", cmd);
         UnfilteredPartitionIterator partitionIterator = cmd.executeLocally(ReadOrderGroup.emptyGroup());
-        ReadResponse response = ReadResponse.createDataResponse(partitionIterator, cmd.columnFilter());
+        ReadResponse response = ReadResponse.createDataResponse(partitionIterator, cmd);
 
         logger.debug("creating response: {}", response);
-        partitionIterator = response.makeIterator(cfm, null);  // <- cmd is null
+        partitionIterator = response.makeIterator(cmd);
         assert partitionIterator.hasNext();
         UnfilteredRowIterator partition = partitionIterator.next();
 
-        LegacyLayout.LegacyUnfilteredPartition rowIter = LegacyLayout.fromUnfilteredRowIterator(partition);
+        LegacyLayout.LegacyUnfilteredPartition rowIter = LegacyLayout.fromUnfilteredRowIterator(cmd, partition);
         Assert.assertEquals(Collections.emptyList(), rowIter.cells);
     }
 
@@ -168,14 +168,14 @@ public class SinglePartitionSliceCommandTest
         // check (de)serialized iterator for memtable static cell
         try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator pi = cmd.executeLocally(orderGroup))
         {
-            response = ReadResponse.createDataResponse(pi, cmd.columnFilter());
+            response = ReadResponse.createDataResponse(pi, cmd);
         }
 
         out = new DataOutputBuffer((int) ReadResponse.serializer.serializedSize(response, MessagingService.VERSION_30));
         ReadResponse.serializer.serialize(response, out, MessagingService.VERSION_30);
         in = new DataInputBuffer(out.buffer(), true);
         dst = ReadResponse.serializer.deserialize(in, MessagingService.VERSION_30);
-        try (UnfilteredPartitionIterator pi = dst.makeIterator(cfm, cmd))
+        try (UnfilteredPartitionIterator pi = dst.makeIterator(cmd))
         {
             checkForS(pi);
         }
@@ -184,13 +184,13 @@ public class SinglePartitionSliceCommandTest
         Schema.instance.getColumnFamilyStoreInstance(cfm.cfId).forceBlockingFlush();
         try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator pi = cmd.executeLocally(orderGroup))
         {
-            response = ReadResponse.createDataResponse(pi, cmd.columnFilter());
+            response = ReadResponse.createDataResponse(pi, cmd);
         }
         out = new DataOutputBuffer((int) ReadResponse.serializer.serializedSize(response, MessagingService.VERSION_30));
         ReadResponse.serializer.serialize(response, out, MessagingService.VERSION_30);
         in = new DataInputBuffer(out.buffer(), true);
         dst = ReadResponse.serializer.deserialize(in, MessagingService.VERSION_30);
-        try (UnfilteredPartitionIterator pi = dst.makeIterator(cfm, cmd))
+        try (UnfilteredPartitionIterator pi = dst.makeIterator(cmd))
         {
             checkForS(pi);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java b/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java
index 5503cfb..c8f5cb1 100644
--- a/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java
+++ b/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java
@@ -59,9 +59,10 @@ public class DigestBackwardCompatibilityTest extends CQLTester
          *   return ColumnFamily.digest(partition);
          */
 
-        ImmutableBTreePartition partition = Util.getOnlyPartitionUnfiltered(Util.cmd(getCurrentColumnFamilyStore(), partitionKey).build());
+        ReadCommand cmd = Util.cmd(getCurrentColumnFamilyStore(), partitionKey).build();
+        ImmutableBTreePartition partition = Util.getOnlyPartitionUnfiltered(cmd);
         MessageDigest digest = FBUtilities.threadLocalMD5Digest();
-        UnfilteredRowIterators.digest(partition.unfilteredIterator(), digest, MessagingService.VERSION_22);
+        UnfilteredRowIterators.digest(cmd, partition.unfilteredIterator(), digest, MessagingService.VERSION_22);
         return ByteBuffer.wrap(digest.digest());
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java
index ecffbbd..997f4e4 100644
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@ -712,7 +712,7 @@ public class DataResolverTest
     public MessageIn<ReadResponse> readResponseMessage(InetAddress from, UnfilteredPartitionIterator partitionIterator, ReadCommand cmd)
     {
         return MessageIn.create(from,
-                                ReadResponse.createRemoteDataResponse(partitionIterator, cmd.columnFilter()),
+                                ReadResponse.createRemoteDataResponse(partitionIterator, cmd),
                                 Collections.EMPTY_MAP,
                                 MessagingService.Verb.REQUEST_RESPONSE,
                                 MessagingService.current_version);