You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/08/11 17:46:32 UTC

[1/3] cassandra git commit: Improve SerializationHeader response serialization

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 dea15092e -> fe388d40c
  refs/heads/trunk 881776f35 -> 4ffb5cc60


Improve SerializationHeader response serialization

Replicas serving requests now retain the columns that were requested,
and use these to efficiently encode the subset of columns it responds
with. The expectation is that a majority of those requested will be
present in the response.

For fewer than 64 requested columns, or where all requested are returned,
a bitmap of missing columns is sent (i.e. a 0 when all are present),
encoded as a vint. Otherwise a count is sent, followed by a sequence
of either present or missing columns, whichever is more efficient.

patch by benedict; reviewed by ariel for CASSANDRA-9894


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fe388d40
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fe388d40
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fe388d40

Branch: refs/heads/cassandra-3.0
Commit: fe388d40c0b019c55c63c914ef5708f245af6cdd
Parents: dea1509
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Jul 29 18:51:51 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Tue Aug 11 17:45:49 2015 +0200

----------------------------------------------------------------------
 .../org/apache/cassandra/config/CFMetaData.java |   2 +-
 src/java/org/apache/cassandra/db/Columns.java   | 230 +++++++++++++++-
 .../org/apache/cassandra/db/ReadCommand.java    |   4 +-
 .../cassandra/db/ReadCommandVerbHandler.java    |   2 +-
 .../org/apache/cassandra/db/ReadResponse.java   |  87 ++++--
 .../cassandra/db/SerializationHeader.java       |  50 +++-
 .../partitions/ArrayBackedCachedPartition.java  |   6 +-
 .../db/partitions/PartitionUpdate.java          |   6 +-
 .../UnfilteredPartitionIterators.java           |   8 +-
 .../rows/UnfilteredRowIteratorSerializer.java   |  25 +-
 .../apache/cassandra/service/StorageProxy.java  |   2 +-
 .../org/apache/cassandra/db/ColumnsTest.java    | 275 +++++++++++++++----
 .../cassandra/service/DataResolverTest.java     |   2 +-
 13 files changed, 582 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 7719587..0db3814 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -106,7 +106,7 @@ public final class CFMetaData
     private final Map<ByteBuffer, ColumnDefinition> columnMetadata = new ConcurrentHashMap<>(); // not on any hot path
     private volatile List<ColumnDefinition> partitionKeyColumns;  // Always of size keyValidator.componentsCount, null padded if necessary
     private volatile List<ColumnDefinition> clusteringColumns;    // Of size comparator.componentsCount or comparator.componentsCount -1, null padded if necessary
-    private volatile PartitionColumns partitionColumns;
+    private volatile PartitionColumns partitionColumns;           // Always non-PK, non-clustering columns
 
     // For dense tables, this alias the single non-PK column the table contains (since it can only have one). We keep
     // that as convenience to access that column more easily (but we could replace calls by partitionColumns().iterator().next()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/db/Columns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java
index 231b529..c584b4c 100644
--- a/src/java/org/apache/cassandra/db/Columns.java
+++ b/src/java/org/apache/cassandra/db/Columns.java
@@ -26,6 +26,7 @@ import java.security.MessageDigest;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
 
+import net.nicoulaj.compilecommand.annotations.DontInline;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.ColumnIdentifier;
@@ -295,9 +296,9 @@ public class Columns implements Iterable<ColumnDefinition>
      *
      * @return an iterator over all the columns of this object.
      */
-    public Iterator<ColumnDefinition> iterator()
+    public BTreeSearchIterator<ColumnDefinition, ColumnDefinition> iterator()
     {
-        return BTree.iterator(columns);
+        return BTree.<ColumnDefinition, ColumnDefinition>slice(columns, Comparator.naturalOrder(), BTree.Dir.ASC);
     }
 
     /**
@@ -311,8 +312,13 @@ public class Columns implements Iterable<ColumnDefinition>
     {
         // In wildcard selection, we want to return all columns in alphabetical order,
         // irregarding of whether they are complex or not
-        return Iterators.<ColumnDefinition>mergeSorted(ImmutableList.of(simpleColumns(), complexColumns()),
-                                     (s, c) -> s.name.compareTo(c.name));
+        return Iterators.<ColumnDefinition>
+                         mergeSorted(ImmutableList.of(simpleColumns(), complexColumns()),
+                                     (s, c) ->
+                                     {
+                                         assert !s.kind.isPrimaryKeyKind();
+                                         return s.name.bytes.compareTo(c.name.bytes);
+                                     });
     }
 
     /**
@@ -420,5 +426,221 @@ public class Columns implements Iterable<ColumnDefinition>
             }
             return new Columns(builder.build());
         }
+
+        /**
+         * If both ends have a pre-shared superset of the columns we are serializing, we can send them much
+         * more efficiently. Both ends must provide the identically same set of columns.
+         */
+        public void serializeSubset(Columns columns, Columns superset, DataOutputPlus out) throws IOException
+        {
+            /**
+             * We weight this towards small sets, and sets where the majority of items are present, since
+             * we expect this to mostly be used for serializing result sets.
+             *
+             * For supersets with fewer than 64 columns, we encode a bitmap of *missing* columns,
+             * which equates to a zero (single byte) when all columns are present, and otherwise
+             * a positive integer that can typically be vint encoded efficiently.
+             *
+             * If we have 64 or more columns, we cannot neatly perform a bitmap encoding, so we just switch
+             * to a vint encoded set of deltas, either adding or subtracting (whichever is most efficient).
+             * We indicate this switch by sending our bitmap with every bit set, i.e. -1L
+             */
+            int columnCount = columns.columnCount();
+            int supersetCount = superset.columnCount();
+            if (columnCount == supersetCount)
+            {
+                out.writeUnsignedVInt(0);
+            }
+            else if (supersetCount < 64)
+            {
+                out.writeUnsignedVInt(encodeBitmap(columns, superset, supersetCount));
+            }
+            else
+            {
+                serializeLargeSubset(columns, columnCount, superset, supersetCount, out);
+            }
+        }
+
+        public long serializedSubsetSize(Columns columns, Columns superset)
+        {
+            int columnCount = columns.columnCount();
+            int supersetCount = superset.columnCount();
+            if (columnCount == supersetCount)
+            {
+                return TypeSizes.sizeofUnsignedVInt(0);
+            }
+            else if (supersetCount < 64)
+            {
+                return TypeSizes.sizeofUnsignedVInt(encodeBitmap(columns, superset, supersetCount));
+            }
+            else
+            {
+                return serializeLargeSubsetSize(columns, columnCount, superset, supersetCount);
+            }
+        }
+
+        public Columns deserializeSubset(Columns superset, DataInputPlus in) throws IOException
+        {
+            long encoded = in.readUnsignedVInt();
+            if (encoded == -1L)
+            {
+                return deserializeLargeSubset(in, superset);
+            }
+            else if (encoded == 0L)
+            {
+                return superset;
+            }
+            else
+            {
+                BTree.Builder<ColumnDefinition> builder = BTree.builder(Comparator.naturalOrder());
+                int firstComplexIdx = 0;
+                for (ColumnDefinition column : superset)
+                {
+                    if ((encoded & 1) == 0)
+                    {
+                        builder.add(column);
+                        if (column.isSimple())
+                            ++firstComplexIdx;
+                    }
+                    encoded >>>= 1;
+                }
+                return new Columns(builder.build(), firstComplexIdx);
+            }
+        }
+
+        // encodes a 1 bit for every *missing* column, on the assumption presence is more common,
+        // and because this is consistent with encoding 0 to represent all present
+        private static long encodeBitmap(Columns columns, Columns superset, int supersetCount)
+        {
+            long bitmap = 0L;
+            BTreeSearchIterator<ColumnDefinition, ColumnDefinition> iter = superset.iterator();
+            // the index we would encounter next if all columns are present
+            int expectIndex = 0;
+            for (ColumnDefinition column : columns)
+            {
+                if (iter.next(column) == null)
+                    throw new IllegalStateException();
+
+                int currentIndex = iter.indexOfCurrent();
+                int count = currentIndex - expectIndex;
+                // (1L << count) - 1 gives us count bits set at the bottom of the register
+                // so << expectIndex moves these bits to start at expectIndex, which is where our missing portion
+                // begins (assuming count > 0; if not, we're adding 0 bits, so it's a no-op)
+                bitmap |= ((1L << count) - 1) << expectIndex;
+                expectIndex = currentIndex + 1;
+            }
+            int count = supersetCount - expectIndex;
+            bitmap |= ((1L << count) - 1) << expectIndex;
+            return bitmap;
+        }
+
+        @DontInline
+        private void serializeLargeSubset(Columns columns, int columnCount, Columns superset, int supersetCount, DataOutputPlus out) throws IOException
+        {
+            // write flag indicating we're in lengthy mode
+            out.writeUnsignedVInt(-1L);
+            out.writeUnsignedVInt(supersetCount - columnCount);
+            BTreeSearchIterator<ColumnDefinition, ColumnDefinition> iter = superset.iterator();
+            if (columnCount < supersetCount / 2)
+            {
+                // write present columns
+                for (ColumnDefinition column : columns)
+                {
+                    if (iter.next(column) == null)
+                        throw new IllegalStateException();
+                    out.writeUnsignedVInt(iter.indexOfCurrent());
+                }
+            }
+            else
+            {
+                // write missing columns
+                int prev = -1;
+                for (ColumnDefinition column : columns)
+                {
+                    if (iter.next(column) == null)
+                        throw new IllegalStateException();
+                    int cur = iter.indexOfCurrent();
+                    while (++prev != cur)
+                        out.writeUnsignedVInt(prev);
+                }
+                while (++prev != supersetCount)
+                    out.writeUnsignedVInt(prev);
+            }
+        }
+
+        @DontInline
+        private Columns deserializeLargeSubset(DataInputPlus in, Columns superset) throws IOException
+        {
+            int supersetCount = superset.columnCount();
+            int delta = (int) in.readUnsignedVInt();
+            int columnCount = supersetCount - delta;
+
+            BTree.Builder<ColumnDefinition> builder = BTree.builder(Comparator.naturalOrder());
+            if (columnCount < supersetCount / 2)
+            {
+                for (int i = 0 ; i < columnCount ; i++)
+                {
+                    int idx = (int) in.readUnsignedVInt();
+                    builder.add(BTree.findByIndex(superset.columns, idx));
+                }
+            }
+            else
+            {
+                Iterator<ColumnDefinition> iter = superset.iterator();
+                int idx = 0;
+                int skipped = 0;
+                while (true)
+                {
+                    int nextMissingIndex = skipped < delta ? (int)in.readUnsignedVInt() : supersetCount;
+                    while (idx < nextMissingIndex)
+                    {
+                        ColumnDefinition def = iter.next();
+                        builder.add(def);
+                        idx++;
+                    }
+                    if (idx == supersetCount)
+                        break;
+                    iter.next();
+                    idx++;
+                    skipped++;
+                }
+            }
+            return new Columns(builder.build());
+        }
+
+        @DontInline
+        private int serializeLargeSubsetSize(Columns columns, int columnCount, Columns superset, int supersetCount)
+        {
+            // write flag indicating we're in lengthy mode
+            int size = TypeSizes.sizeofUnsignedVInt(-1L) + TypeSizes.sizeofUnsignedVInt(supersetCount - columnCount);
+            BTreeSearchIterator<ColumnDefinition, ColumnDefinition> iter = superset.iterator();
+            if (columnCount < supersetCount / 2)
+            {
+                // write present columns
+                for (ColumnDefinition column : columns)
+                {
+                    if (iter.next(column) == null)
+                        throw new IllegalStateException();
+                    size += TypeSizes.sizeofUnsignedVInt(iter.indexOfCurrent());
+                }
+            }
+            else
+            {
+                // write missing columns
+                int prev = -1;
+                for (ColumnDefinition column : columns)
+                {
+                    if (iter.next(column) == null)
+                        throw new IllegalStateException();
+                    int cur = iter.indexOfCurrent();
+                    while (++prev != cur)
+                        size += TypeSizes.sizeofUnsignedVInt(prev);
+                }
+                while (++prev != supersetCount)
+                    size += TypeSizes.sizeofUnsignedVInt(prev);
+            }
+            return size;
+        }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 5c40492..4830124 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -251,11 +251,11 @@ public abstract class ReadCommand implements ReadQuery
 
     protected abstract int oldestUnrepairedTombstone();
 
-    public ReadResponse createResponse(UnfilteredPartitionIterator iterator)
+    public ReadResponse createResponse(UnfilteredPartitionIterator iterator, ColumnFilter selection)
     {
         return isDigestQuery()
              ? ReadResponse.createDigestResponse(iterator)
-             : ReadResponse.createDataResponse(iterator);
+             : ReadResponse.createDataResponse(iterator, selection);
     }
 
     protected SecondaryIndexSearcher getIndexSearcher(ColumnFamilyStore cfs)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
index 9cde8dc..72a6fa8 100644
--- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
@@ -44,7 +44,7 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
         ReadResponse response;
         try (ReadOrderGroup opGroup = command.startOrderGroup(); UnfilteredPartitionIterator iterator = command.executeLocally(opGroup))
         {
-            response = command.createResponse(iterator);
+            response = command.createResponse(iterator, command.columnFilter());
         }
 
         MessageOut<ReadResponse> reply = new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, response, serializer());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index b3cc725..6f418f9 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -24,8 +24,11 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -34,7 +37,6 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -52,9 +54,15 @@ public abstract class ReadResponse
         this.metadata = metadata;
     }
 
-    public static ReadResponse createDataResponse(UnfilteredPartitionIterator data)
+    public static ReadResponse createDataResponse(UnfilteredPartitionIterator data, ColumnFilter selection)
+    {
+        return new LocalDataResponse(data, selection);
+    }
+
+    @VisibleForTesting
+    public static ReadResponse createRemoteDataResponse(UnfilteredPartitionIterator data, ColumnFilter selection)
     {
-        return new DataResponse(data);
+        return new RemoteDataResponse(LocalDataResponse.build(data, selection));
     }
 
     public static ReadResponse createDigestResponse(UnfilteredPartitionIterator data)
@@ -63,7 +71,6 @@ public abstract class ReadResponse
     }
 
     public abstract UnfilteredPartitionIterator makeIterator(CFMetaData metadata, ReadCommand command);
-
     public abstract ByteBuffer digest(CFMetaData metadata, ReadCommand command);
 
     public abstract boolean isDigestQuery();
@@ -102,27 +109,22 @@ public abstract class ReadResponse
         }
     }
 
-    private static class DataResponse extends ReadResponse
+    // built on the owning node responding to a query
+    private static class LocalDataResponse extends DataResponse
     {
-        // The response, serialized in the current messaging version
-        private final ByteBuffer data;
-        private final SerializationHelper.Flag flag;
-
-        private DataResponse(ByteBuffer data)
+        private final ColumnFilter received;
+        private LocalDataResponse(UnfilteredPartitionIterator iter, ColumnFilter received)
         {
-            super(null); // This is never call on the serialization side, where we actually care of the metadata.
-            this.data = data;
-            this.flag = SerializationHelper.Flag.FROM_REMOTE;
+            super(iter.metadata(), build(iter, received), SerializationHelper.Flag.LOCAL);
+            this.received = received;
         }
 
-        private DataResponse(UnfilteredPartitionIterator iter)
+        private static ByteBuffer build(UnfilteredPartitionIterator iter, ColumnFilter selection)
         {
-            super(iter.metadata());
             try (DataOutputBuffer buffer = new DataOutputBuffer())
             {
-                UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter, buffer, MessagingService.current_version);
-                this.data = buffer.buffer();
-                this.flag = SerializationHelper.Flag.LOCAL;
+                UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter, selection, buffer, MessagingService.current_version);
+                return buffer.buffer();
             }
             catch (IOException e)
             {
@@ -131,12 +133,57 @@ public abstract class ReadResponse
             }
         }
 
+        protected ColumnFilter selection(ColumnFilter sent)
+        {
+            // we didn't send anything, so we don't provide it in the serializer methods, but use the
+            // object's reference to the original column filter we received
+            assert sent == null | sent == received;
+            return received;
+        }
+    }
+
+    // built on the coordinator node receiving a response
+    private static class RemoteDataResponse extends DataResponse
+    {
+        protected RemoteDataResponse(ByteBuffer data)
+        {
+            super(null, data, SerializationHelper.Flag.FROM_REMOTE);
+        }
+
+        protected ColumnFilter selection(ColumnFilter sent)
+        {
+            // we should always know what we sent, and should provide it in digest() and makeIterator()
+            assert sent != null;
+            return sent;
+        }
+    }
+
+    static abstract class DataResponse extends ReadResponse
+    {
+        // TODO: can the digest be calculated over the raw bytes now?
+        // The response, serialized in the current messaging version
+        private final ByteBuffer data;
+        private final SerializationHelper.Flag flag;
+
+        protected DataResponse(CFMetaData metadata, ByteBuffer data, SerializationHelper.Flag flag)
+        {
+            super(metadata);
+            this.data = data;
+            this.flag = flag;
+        }
+
+        protected abstract ColumnFilter selection(ColumnFilter filter);
+
         public UnfilteredPartitionIterator makeIterator(CFMetaData metadata, ReadCommand command)
         {
             try
             {
                 DataInputPlus in = new DataInputBuffer(data, true);
-                return UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in, MessagingService.current_version, metadata, flag);
+                return UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in,
+                                                                                         MessagingService.current_version,
+                                                                                         metadata,
+                                                                                         selection(command.columnFilter()),
+                                                                                         flag);
             }
             catch (IOException e)
             {
@@ -307,7 +354,7 @@ public abstract class ReadResponse
 
             assert version == MessagingService.VERSION_30;
             ByteBuffer data = ByteBufferUtil.readWithVIntLength(in);
-            return new DataResponse(data);
+            return new RemoteDataResponse(data);
         }
 
         public long serializedSize(ReadResponse response, int version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/db/SerializationHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java
index 2326f1e..88f6832 100644
--- a/src/java/org/apache/cassandra/db/SerializationHeader.java
+++ b/src/java/org/apache/cassandra/db/SerializationHeader.java
@@ -27,6 +27,7 @@ import com.google.common.base.Function;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.BytesType;
@@ -372,35 +373,62 @@ public class SerializationHeader
 
     public static class Serializer implements IMetadataComponentSerializer<Component>
     {
-        public void serializeForMessaging(SerializationHeader header, DataOutputPlus out, boolean hasStatic) throws IOException
+        public void serializeForMessaging(SerializationHeader header, ColumnFilter selection, DataOutputPlus out, boolean hasStatic) throws IOException
         {
             EncodingStats.serializer.serialize(header.stats, out);
 
-            if (hasStatic)
-                Columns.serializer.serialize(header.columns.statics, out);
-            Columns.serializer.serialize(header.columns.regulars, out);
+            if (selection == null)
+            {
+                if (hasStatic)
+                    Columns.serializer.serialize(header.columns.statics, out);
+                Columns.serializer.serialize(header.columns.regulars, out);
+            }
+            else
+            {
+                if (hasStatic)
+                    Columns.serializer.serializeSubset(header.columns.statics, selection.fetchedColumns().statics, out);
+                Columns.serializer.serializeSubset(header.columns.regulars, selection.fetchedColumns().regulars, out);
+            }
         }
 
-        public SerializationHeader deserializeForMessaging(DataInputPlus in, CFMetaData metadata, boolean hasStatic) throws IOException
+        public SerializationHeader deserializeForMessaging(DataInputPlus in, CFMetaData metadata, ColumnFilter selection, boolean hasStatic) throws IOException
         {
             EncodingStats stats = EncodingStats.serializer.deserialize(in);
 
             AbstractType<?> keyType = metadata.getKeyValidator();
             List<AbstractType<?>> clusteringTypes = typesOf(metadata.clusteringColumns());
 
-            Columns statics = hasStatic ? Columns.serializer.deserialize(in, metadata) : Columns.NONE;
-            Columns regulars = Columns.serializer.deserialize(in, metadata);
+            Columns statics, regulars;
+            if (selection == null)
+            {
+                statics = hasStatic ? Columns.serializer.deserialize(in, metadata) : Columns.NONE;
+                regulars = Columns.serializer.deserialize(in, metadata);
+            }
+            else
+            {
+                statics = hasStatic ? Columns.serializer.deserializeSubset(selection.fetchedColumns().statics, in) : Columns.NONE;
+                regulars = Columns.serializer.deserializeSubset(selection.fetchedColumns().regulars, in);
+            }
 
             return new SerializationHeader(keyType, clusteringTypes, new PartitionColumns(statics, regulars), stats, null);
         }
 
-        public long serializedSizeForMessaging(SerializationHeader header, boolean hasStatic)
+        public long serializedSizeForMessaging(SerializationHeader header, ColumnFilter selection, boolean hasStatic)
         {
             long size = EncodingStats.serializer.serializedSize(header.stats);
 
-            if (hasStatic)
-                size += Columns.serializer.serializedSize(header.columns.statics);
-            size += Columns.serializer.serializedSize(header.columns.regulars);
+            if (selection == null)
+            {
+                if (hasStatic)
+                    size += Columns.serializer.serializedSize(header.columns.statics);
+                size += Columns.serializer.serializedSize(header.columns.regulars);
+            }
+            else
+            {
+                if (hasStatic)
+                    size += Columns.serializer.serializedSubsetSize(header.columns.statics, selection.fetchedColumns().statics);
+                size += Columns.serializer.serializedSubsetSize(header.columns.regulars, selection.fetchedColumns().regulars);
+            }
             return size;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java b/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
index a3c8768..fab8591 100644
--- a/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
@@ -216,7 +216,7 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements
             CFMetaData.serializer.serialize(partition.metadata(), out, version);
             try (UnfilteredRowIterator iter = p.sliceableUnfilteredIterator())
             {
-                UnfilteredRowIteratorSerializer.serializer.serialize(iter, out, version, p.rowCount());
+                UnfilteredRowIteratorSerializer.serializer.serialize(iter, null, out, version, p.rowCount());
             }
         }
 
@@ -238,7 +238,7 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements
             int nonExpiringLiveCells = in.readInt();
 
             CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
-            UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, version, metadata, SerializationHelper.Flag.LOCAL);
+            UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(metadata, null, in, version, SerializationHelper.Flag.LOCAL);
             assert !header.isReversed && header.rowEstimate >= 0;
 
             MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(header.partitionDeletion, metadata.comparator, false);
@@ -286,7 +286,7 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements
                      + TypeSizes.sizeof(p.nonTombstoneCellCount)
                      + TypeSizes.sizeof(p.nonExpiringLiveCells)
                      + CFMetaData.serializer.serializedSize(partition.metadata(), version)
-                     + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, MessagingService.current_version, p.rowCount());
+                     + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, null, MessagingService.current_version, p.rowCount());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index bb73929..e6d51e5 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -716,7 +716,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
                 else
                 {
                     CFMetaData.serializer.serialize(update.metadata(), out, version);
-                    UnfilteredRowIteratorSerializer.serializer.serialize(iter, out, version, update.rows.size());
+                    UnfilteredRowIteratorSerializer.serializer.serialize(iter, null, out, version, update.rows.size());
                 }
             }
         }
@@ -752,7 +752,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
         private static PartitionUpdate deserialize30(DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException
         {
             CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
-            UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, version, metadata, flag);
+            UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(metadata, null, in, version, flag);
             if (header.isEmpty)
                 return emptyUpdate(metadata, header.key);
 
@@ -802,7 +802,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
                     return LegacyLayout.serializedSizeAsLegacyPartition(iter, version);
 
                 return CFMetaData.serializer.serializedSize(update.metadata(), version)
-                     + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, version, update.rows.size());
+                     + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, null, version, update.rows.size());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index 0418e7f..f7ee5ee 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -361,7 +361,7 @@ public abstract class UnfilteredPartitionIterators
      */
     public static class Serializer
     {
-        public void serialize(UnfilteredPartitionIterator iter, DataOutputPlus out, int version) throws IOException
+        public void serialize(UnfilteredPartitionIterator iter, ColumnFilter selection, DataOutputPlus out, int version) throws IOException
         {
             assert version >= MessagingService.VERSION_30; // We handle backward compatibility directy in ReadResponse.LegacyRangeSliceReplySerializer
 
@@ -371,13 +371,13 @@ public abstract class UnfilteredPartitionIterators
                 out.writeBoolean(true);
                 try (UnfilteredRowIterator partition = iter.next())
                 {
-                    UnfilteredRowIteratorSerializer.serializer.serialize(partition, out, version);
+                    UnfilteredRowIteratorSerializer.serializer.serialize(partition, selection, out, version);
                 }
             }
             out.writeBoolean(false);
         }
 
-        public UnfilteredPartitionIterator deserialize(final DataInputPlus in, final int version, final CFMetaData metadata, final SerializationHelper.Flag flag) throws IOException
+        public UnfilteredPartitionIterator deserialize(final DataInputPlus in, final int version, final CFMetaData metadata, final ColumnFilter selection, final SerializationHelper.Flag flag) throws IOException
         {
             assert version >= MessagingService.VERSION_30; // We handle backward compatibility directy in ReadResponse.LegacyRangeSliceReplySerializer
             final boolean isForThrift = in.readBoolean();
@@ -428,7 +428,7 @@ public abstract class UnfilteredPartitionIterators
                     try
                     {
                         nextReturned = true;
-                        next = UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, metadata, flag);
+                        next = UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, metadata, selection, flag);
                         return next;
                     }
                     catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
index 531bd26..f17ccca 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -71,22 +72,22 @@ public class UnfilteredRowIteratorSerializer
     public static final UnfilteredRowIteratorSerializer serializer = new UnfilteredRowIteratorSerializer();
 
     // Should only be used for the on-wire format.
-    public void serialize(UnfilteredRowIterator iterator, DataOutputPlus out, int version) throws IOException
+    public void serialize(UnfilteredRowIterator iterator, ColumnFilter selection, DataOutputPlus out, int version) throws IOException
     {
-        serialize(iterator, out, version, -1);
+        serialize(iterator, selection, out, version, -1);
     }
 
     // Should only be used for the on-wire format.
-    public void serialize(UnfilteredRowIterator iterator, DataOutputPlus out, int version, int rowEstimate) throws IOException
+    public void serialize(UnfilteredRowIterator iterator, ColumnFilter selection, DataOutputPlus out, int version, int rowEstimate) throws IOException
     {
         SerializationHeader header = new SerializationHeader(iterator.metadata(),
                                                              iterator.columns(),
                                                              iterator.stats());
-        serialize(iterator, out, header, version, rowEstimate);
+        serialize(iterator, header, selection, out, version, rowEstimate);
     }
 
     // Should only be used for the on-wire format.
-    public void serialize(UnfilteredRowIterator iterator, DataOutputPlus out, SerializationHeader header, int version, int rowEstimate) throws IOException
+    public void serialize(UnfilteredRowIterator iterator, SerializationHeader header, ColumnFilter selection, DataOutputPlus out, int version, int rowEstimate) throws IOException
     {
         ByteBufferUtil.writeWithVIntLength(iterator.partitionKey().getKey(), out);
 
@@ -113,7 +114,7 @@ public class UnfilteredRowIteratorSerializer
 
         out.writeByte((byte)flags);
 
-        SerializationHeader.serializer.serializeForMessaging(header, out, hasStatic);
+        SerializationHeader.serializer.serializeForMessaging(header, selection, out, hasStatic);
 
         if (!partitionDeletion.isLive())
             header.writeDeletionTime(partitionDeletion, out);
@@ -131,7 +132,7 @@ public class UnfilteredRowIteratorSerializer
 
     // Please note that this consume the iterator, and as such should not be called unless we have a simple way to
     // recreate an iterator for both serialize and serializedSize, which is mostly only PartitionUpdate/ArrayBackedCachedPartition.
-    public long serializedSize(UnfilteredRowIterator iterator, int version, int rowEstimate)
+    public long serializedSize(UnfilteredRowIterator iterator, ColumnFilter selection, int version, int rowEstimate)
     {
         SerializationHeader header = new SerializationHeader(iterator.metadata(),
                                                              iterator.columns(),
@@ -149,7 +150,7 @@ public class UnfilteredRowIteratorSerializer
         Row staticRow = iterator.staticRow();
         boolean hasStatic = staticRow != Rows.EMPTY_STATIC_ROW;
 
-        size += SerializationHeader.serializer.serializedSizeForMessaging(header, hasStatic);
+        size += SerializationHeader.serializer.serializedSizeForMessaging(header, selection, hasStatic);
 
         if (!partitionDeletion.isLive())
             size += header.deletionTimeSerializedSize(partitionDeletion);
@@ -167,7 +168,7 @@ public class UnfilteredRowIteratorSerializer
         return size;
     }
 
-    public Header deserializeHeader(DataInputPlus in, int version, CFMetaData metadata, SerializationHelper.Flag flag) throws IOException
+    public Header deserializeHeader(CFMetaData metadata, ColumnFilter selection, DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException
     {
         DecoratedKey key = metadata.decorateKey(ByteBufferUtil.readWithVIntLength(in));
         int flags = in.readUnsignedByte();
@@ -182,7 +183,7 @@ public class UnfilteredRowIteratorSerializer
         boolean hasStatic = (flags & HAS_STATIC_ROW) != 0;
         boolean hasRowEstimate = (flags & HAS_ROW_ESTIMATE) != 0;
 
-        SerializationHeader header = SerializationHeader.serializer.deserializeForMessaging(in, metadata, hasStatic);
+        SerializationHeader header = SerializationHeader.serializer.deserializeForMessaging(in, metadata, selection, hasStatic);
 
         DeletionTime partitionDeletion = hasPartitionDeletion ? header.readDeletionTime(in) : DeletionTime.LIVE;
 
@@ -220,9 +221,9 @@ public class UnfilteredRowIteratorSerializer
         };
     }
 
-    public UnfilteredRowIterator deserialize(DataInputPlus in, int version, CFMetaData metadata, SerializationHelper.Flag flag) throws IOException
+    public UnfilteredRowIterator deserialize(DataInputPlus in, int version, CFMetaData metadata, ColumnFilter selection, SerializationHelper.Flag flag) throws IOException
     {
-        return deserialize(in, version, metadata, flag, deserializeHeader(in, version, metadata, flag));
+        return deserialize(in, version, metadata, flag, deserializeHeader(metadata, selection, in, version, flag));
     }
 
     public static class Header

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 0846ef1..fc917f0 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1689,7 +1689,7 @@ public class StorageProxy implements StorageProxyMBean
             {
                 try (ReadOrderGroup orderGroup = command.startOrderGroup(); UnfilteredPartitionIterator iterator = command.executeLocally(orderGroup))
                 {
-                    handler.response(command.createResponse(iterator));
+                    handler.response(command.createResponse(iterator, command.columnFilter()));
                 }
                 MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/test/unit/org/apache/cassandra/db/ColumnsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnsTest.java b/test/unit/org/apache/cassandra/db/ColumnsTest.java
index 5447fcc..a0ade96 100644
--- a/test/unit/org/apache/cassandra/db/ColumnsTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnsTest.java
@@ -18,13 +18,12 @@
 */
 package org.apache.cassandra.db;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
+import java.io.IOException;
+import java.util.*;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Predicate;
 
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 
 import org.junit.AfterClass;
@@ -36,6 +35,8 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.marshal.SetType;
 import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.btree.BTreeSet;
 
@@ -44,42 +45,39 @@ public class ColumnsTest
 
     private static CFMetaData cfMetaData = MockSchema.newCFS().metadata;
 
+    // this tests most of our functionality, since each subset we perform
+    // reasonably comprehensive tests of basic functionality against
     @Test
     public void testContainsWithoutAndMergeTo()
     {
-        for (RandomColumns randomColumns : random())
-            testContainsWithoutAndMergeTo(randomColumns.columns, randomColumns.definitions);
+        for (ColumnsCheck randomColumns : randomSmall(true))
+            testContainsWithoutAndMergeTo(randomColumns);
     }
 
-    private void testContainsWithoutAndMergeTo(Columns columns, List<ColumnDefinition> definitions)
+    private void testContainsWithoutAndMergeTo(ColumnsCheck input)
     {
         // pick some arbitrary groupings of columns to remove at-once (to avoid factorial complexity)
         // whatever is left after each removal, we perform this logic on again, recursively
-        List<List<ColumnDefinition>> removeGroups = shuffleAndGroup(Lists.newArrayList(definitions));
+        List<List<ColumnDefinition>> removeGroups = shuffleAndGroup(Lists.newArrayList(input.definitions));
         for (List<ColumnDefinition> defs : removeGroups)
         {
-            Columns subset = columns;
-            for (ColumnDefinition def : defs)
-                subset = subset.without(def);
-            Assert.assertEquals(columns.columnCount() - defs.size(), subset.columnCount());
-            List<ColumnDefinition> remainingDefs = Lists.newArrayList(columns);
-            remainingDefs.removeAll(defs);
+            ColumnsCheck subset = input.remove(defs);
 
             // test contents after .without
-            assertContents(subset, remainingDefs);
+            subset.assertContents();
 
             // test .contains
-            assertSubset(columns, subset);
+            assertSubset(input.columns, subset.columns);
 
             // test .mergeTo
-            Columns otherSubset = columns;
-            for (ColumnDefinition def : remainingDefs)
+            Columns otherSubset = input.columns;
+            for (ColumnDefinition def : subset.definitions)
             {
                 otherSubset = otherSubset.without(def);
-                assertContents(otherSubset.mergeTo(subset), definitions);
+                assertContents(otherSubset.mergeTo(subset.columns), input.definitions);
             }
 
-            testContainsWithoutAndMergeTo(subset, remainingDefs);
+            testContainsWithoutAndMergeTo(subset);
         }
     }
 
@@ -90,6 +88,67 @@ public class ColumnsTest
         Assert.assertFalse(subset.contains(superset));
     }
 
+    @Test
+    public void testSerialize() throws IOException
+    {
+        testSerialize(Columns.NONE, Collections.emptyList());
+        for (ColumnsCheck randomColumns : randomSmall(false))
+            testSerialize(randomColumns.columns, randomColumns.definitions);
+    }
+
+    private void testSerialize(Columns columns, List<ColumnDefinition> definitions) throws IOException
+    {
+        try (DataOutputBuffer out = new DataOutputBuffer())
+        {
+            Columns.serializer.serialize(columns, out);
+            Assert.assertEquals(Columns.serializer.serializedSize(columns), out.buffer().remaining());
+            Columns deserialized = Columns.serializer.deserialize(new DataInputBuffer(out.buffer(), false), mock(columns));
+            Assert.assertEquals(columns, deserialized);
+            Assert.assertEquals(columns.hashCode(), deserialized.hashCode());
+            assertContents(deserialized, definitions);
+        }
+    }
+
+    @Test
+    public void testSerializeSmallSubset() throws IOException
+    {
+        for (ColumnsCheck randomColumns : randomSmall(true))
+            testSerializeSubset(randomColumns);
+    }
+
+    @Test
+    public void testSerializeHugeSubset() throws IOException
+    {
+        for (ColumnsCheck randomColumns : randomHuge())
+            testSerializeSubset(randomColumns);
+    }
+
+    private void testSerializeSubset(ColumnsCheck input) throws IOException
+    {
+        testSerializeSubset(input.columns, input.columns, input.definitions);
+        testSerializeSubset(input.columns, Columns.NONE, Collections.emptyList());
+        List<List<ColumnDefinition>> removeGroups = shuffleAndGroup(Lists.newArrayList(input.definitions));
+        for (List<ColumnDefinition> defs : removeGroups)
+        {
+            Collections.sort(defs);
+            ColumnsCheck subset = input.remove(defs);
+            testSerializeSubset(input.columns, subset.columns, subset.definitions);
+        }
+    }
+
+    private void testSerializeSubset(Columns superset, Columns subset, List<ColumnDefinition> subsetDefinitions) throws IOException
+    {
+        try (DataOutputBuffer out = new DataOutputBuffer())
+        {
+            Columns.serializer.serializeSubset(subset, superset, out);
+            Assert.assertEquals(Columns.serializer.serializedSubsetSize(subset, superset), out.buffer().remaining());
+            Columns deserialized = Columns.serializer.deserializeSubset(superset, new DataInputBuffer(out.buffer(), false));
+            Assert.assertEquals(subset, deserialized);
+            Assert.assertEquals(subset.hashCode(), deserialized.hashCode());
+            assertContents(deserialized, subsetDefinitions);
+        }
+    }
+
     private static void assertContents(Columns columns, List<ColumnDefinition> defs)
     {
         Assert.assertEquals(defs, Lists.newArrayList(columns));
@@ -109,6 +168,7 @@ public class ColumnsTest
             {
                 hasSimple = true;
                 Assert.assertEquals(i, columns.simpleIdx(def));
+                Assert.assertEquals(def, columns.getSimple(i));
                 Assert.assertEquals(def, simple.next());
                 ++firstComplexIdx;
             }
@@ -117,6 +177,7 @@ public class ColumnsTest
                 Assert.assertFalse(simple.hasNext());
                 hasComplex = true;
                 Assert.assertEquals(i - firstComplexIdx, columns.complexIdx(def));
+                Assert.assertEquals(def, columns.getComplex(i - firstComplexIdx));
                 Assert.assertEquals(def, complex.next());
             }
             i++;
@@ -127,6 +188,16 @@ public class ColumnsTest
         Assert.assertFalse(all.hasNext());
         Assert.assertEquals(hasSimple, columns.hasSimple());
         Assert.assertEquals(hasComplex, columns.hasComplex());
+
+        // check select order
+        if (!columns.hasSimple() || !columns.getSimple(0).kind.isPrimaryKeyKind())
+        {
+            List<ColumnDefinition> selectOrderDefs = new ArrayList<>(defs);
+            Collections.sort(selectOrderDefs, (a, b) -> a.name.bytes.compareTo(b.name.bytes));
+            List<ColumnDefinition> selectOrderColumns = new ArrayList<>();
+            Iterators.addAll(selectOrderColumns, columns.selectOrderIterator());
+            Assert.assertEquals(selectOrderDefs, selectOrderColumns);
+        }
     }
 
     private static <V> List<List<V>> shuffleAndGroup(List<V> list)
@@ -141,7 +212,7 @@ public class ColumnsTest
             list.set(j, v);
         }
 
-        // then group
+        // then group (logarithmically, to ensure our recursive functions don't explode the state space)
         List<List<V>> result = new ArrayList<>();
         for (int i = 0 ; i < list.size() ;)
         {
@@ -162,83 +233,179 @@ public class ColumnsTest
         MockSchema.cleanup();
     }
 
-    private static class RandomColumns
+    private static class ColumnsCheck
     {
         final Columns columns;
         final List<ColumnDefinition> definitions;
 
-        private RandomColumns(List<ColumnDefinition> definitions)
+        private ColumnsCheck(Columns columns, List<ColumnDefinition> definitions)
+        {
+            this.columns = columns;
+            this.definitions = definitions;
+        }
+
+        private ColumnsCheck(List<ColumnDefinition> definitions)
         {
             this.columns = Columns.from(BTreeSet.of(definitions));
             this.definitions = definitions;
         }
+
+        ColumnsCheck remove(List<ColumnDefinition> remove)
+        {
+            Columns subset = columns;
+            for (ColumnDefinition def : remove)
+                subset = subset.without(def);
+            Assert.assertEquals(columns.columnCount() - remove.size(), subset.columnCount());
+            List<ColumnDefinition> remainingDefs = Lists.newArrayList(columns);
+            remainingDefs.removeAll(remove);
+            return new ColumnsCheck(subset, remainingDefs);
+        }
+
+        void assertContents()
+        {
+            ColumnsTest.assertContents(columns, definitions);
+        }
+    }
+
+    private static List<ColumnsCheck> randomHuge()
+    {
+        List<ColumnsCheck> result = new ArrayList<>();
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        result.add(randomHuge(random.nextInt(64, 128), 0, 0, 0));
+        result.add(randomHuge(0, random.nextInt(64, 128), 0, 0));
+        result.add(randomHuge(0, 0, random.nextInt(64, 128), 0));
+        result.add(randomHuge(0, 0, 0, random.nextInt(64, 128)));
+        result.add(randomHuge(random.nextInt(64, 128), random.nextInt(64, 128), 0, 0));
+        result.add(randomHuge(0, random.nextInt(64, 128), random.nextInt(64, 128), 0));
+        result.add(randomHuge(0, 0, random.nextInt(64, 128), random.nextInt(64, 128)));
+        result.add(randomHuge(random.nextInt(64, 128), random.nextInt(64, 128), random.nextInt(64, 128), 0));
+        result.add(randomHuge(0, random.nextInt(64, 128), random.nextInt(64, 128), random.nextInt(64, 128)));
+        result.add(randomHuge(random.nextInt(64, 128), random.nextInt(64, 128), random.nextInt(64, 128), random.nextInt(64, 128)));
+        return result;
     }
 
-    private static List<RandomColumns> random()
+    private static List<ColumnsCheck> randomSmall(boolean permitMultiplePartitionKeys)
     {
-        List<RandomColumns> random = new ArrayList<>();
+        List<ColumnsCheck> random = new ArrayList<>();
         for (int i = 1 ; i <= 3 ; i++)
         {
-            random.add(random(i, i - 1, i - 1, i - 1));
-            random.add(random(i - 1, i, i - 1, i - 1));
-            random.add(random(i - 1, i - 1, i, i - 1));
-            random.add(random(i - 1, i - 1, i - 1, i));
+            int pkCount = permitMultiplePartitionKeys ? i - 1 : 1;
+            if (permitMultiplePartitionKeys)
+                random.add(randomSmall(i, i - 1, i - 1, i - 1));
+            random.add(randomSmall(0, 0, i, i)); // both kinds of regular, no PK
+            random.add(randomSmall(pkCount, i, i - 1, i - 1)); // PK + clustering, few or none regular
+            random.add(randomSmall(pkCount, i - 1, i, i - 1)); // PK + few or none clustering, some regular, few or none complex
+            random.add(randomSmall(pkCount, i - 1, i - 1, i)); // PK + few or none clustering or regular, some complex
         }
         return random;
     }
 
-    private static RandomColumns random(int pkCount, int clCount, int regularCount, int complexCount)
+    private static ColumnsCheck randomSmall(int pkCount, int clCount, int regularCount, int complexCount)
     {
-        List<Character> chars = new ArrayList<>();
+        List<String> names = new ArrayList<>();
         for (char c = 'a' ; c <= 'z' ; c++)
-            chars.add(c);
+            names .add(Character.toString(c));
 
         List<ColumnDefinition> result = new ArrayList<>();
-        addPartition(select(chars, pkCount), result);
-        addClustering(select(chars, clCount), result);
-        addRegular(select(chars, regularCount), result);
-        addComplex(select(chars, complexCount), result);
+        addPartition(select(names, pkCount), result);
+        addClustering(select(names, clCount), result);
+        addRegular(select(names, regularCount), result);
+        addComplex(select(names, complexCount), result);
         Collections.sort(result);
-        return new RandomColumns(result);
+        return new ColumnsCheck(result);
     }
 
-    private static List<Character> select(List<Character> chars, int count)
+    private static List<String> select(List<String> names, int count)
     {
-        List<Character> result = new ArrayList<>();
+        List<String> result = new ArrayList<>();
         ThreadLocalRandom random = ThreadLocalRandom.current();
         for (int i = 0 ; i < count ; i++)
         {
-            int v = random.nextInt(chars.size());
-            result.add(chars.get(v));
-            chars.remove(v);
+            int v = random.nextInt(names.size());
+            result.add(names.get(v));
+            names.remove(v);
         }
         return result;
     }
 
-    private static void addPartition(List<Character> chars, List<ColumnDefinition> results)
+    private static ColumnsCheck randomHuge(int pkCount, int clCount, int regularCount, int complexCount)
     {
-        addSimple(ColumnDefinition.Kind.PARTITION_KEY, chars, results);
+        List<ColumnDefinition> result = new ArrayList<>();
+        Set<String> usedNames = new HashSet<>();
+        addPartition(names(pkCount, usedNames), result);
+        addClustering(names(clCount, usedNames), result);
+        addRegular(names(regularCount, usedNames), result);
+        addComplex(names(complexCount, usedNames), result);
+        Collections.sort(result);
+        return new ColumnsCheck(result);
     }
 
-    private static void addClustering(List<Character> chars, List<ColumnDefinition> results)
+    private static List<String> names(int count, Set<String> usedNames)
     {
-        addSimple(ColumnDefinition.Kind.CLUSTERING, chars, results);
+        List<String> names = new ArrayList<>();
+        StringBuilder builder = new StringBuilder();
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        for (int i = 0 ; i < count ; i++)
+        {
+            builder.setLength(0);
+            for (int j = 0 ; j < 3 || usedNames.contains(builder.toString()) ; j++)
+                builder.append((char) random.nextInt('a', 'z' + 1));
+            String name = builder.toString();
+            names.add(name);
+            usedNames.add(name);
+        }
+        return names;
     }
 
-    private static void addRegular(List<Character> chars, List<ColumnDefinition> results)
+    private static void addPartition(List<String> names, List<ColumnDefinition> results)
     {
-        addSimple(ColumnDefinition.Kind.REGULAR, chars, results);
+        for (String name : names)
+            results.add(new ColumnDefinition(cfMetaData, ByteBufferUtil.bytes(name), UTF8Type.instance, null, ColumnDefinition.Kind.PARTITION_KEY));
     }
 
-    private static void addSimple(ColumnDefinition.Kind kind, List<Character> chars, List<ColumnDefinition> results)
+    private static void addClustering(List<String> names, List<ColumnDefinition> results)
     {
-        for (Character c : chars)
-            results.add(new ColumnDefinition(cfMetaData, ByteBufferUtil.bytes(c.toString()), UTF8Type.instance, null, kind));
+        int i = 0;
+        for (String name : names)
+            results.add(new ColumnDefinition(cfMetaData, ByteBufferUtil.bytes(name), UTF8Type.instance, i++, ColumnDefinition.Kind.CLUSTERING));
     }
 
-    private static void addComplex(List<Character> chars, List<ColumnDefinition> results)
+    private static void addRegular(List<String> names, List<ColumnDefinition> results)
     {
-        for (Character c : chars)
-            results.add(new ColumnDefinition(cfMetaData, ByteBufferUtil.bytes(c.toString()), SetType.getInstance(UTF8Type.instance, true), null, ColumnDefinition.Kind.REGULAR));
+        for (String name : names)
+            results.add(new ColumnDefinition(cfMetaData, ByteBufferUtil.bytes(name), UTF8Type.instance, null, ColumnDefinition.Kind.REGULAR));
+    }
+
+    private static <V> void addComplex(List<String> names, List<ColumnDefinition> results)
+    {
+        for (String name : names)
+            results.add(new ColumnDefinition(cfMetaData, ByteBufferUtil.bytes(name), SetType.getInstance(UTF8Type.instance, true), null, ColumnDefinition.Kind.REGULAR));
+    }
+
+    private static CFMetaData mock(Columns columns)
+    {
+        if (columns.isEmpty())
+            return cfMetaData;
+        CFMetaData.Builder builder = CFMetaData.Builder.create(cfMetaData.ksName, cfMetaData.cfName);
+        boolean hasPartitionKey = false;
+        for (ColumnDefinition def : columns)
+        {
+            switch (def.kind)
+            {
+                case PARTITION_KEY:
+                    builder.addPartitionKey(def.name, def.type);
+                    hasPartitionKey = true;
+                    break;
+                case CLUSTERING:
+                    builder.addClusteringColumn(def.name, def.type);
+                    break;
+                case REGULAR:
+                    builder.addRegularColumn(def.name, def.type);
+                    break;
+            }
+        }
+        if (!hasPartitionKey)
+            builder.addPartitionKey("219894021498309239rufejsfjdksfjheiwfhjes", UTF8Type.instance);
+        return builder.build();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java
index efd3504..0804bfb 100644
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@ -491,7 +491,7 @@ public class DataResolverTest
     public MessageIn<ReadResponse> readResponseMessage(InetAddress from, UnfilteredPartitionIterator partitionIterator)
     {
         return MessageIn.create(from,
-                                ReadResponse.createDataResponse(partitionIterator),
+                                ReadResponse.createRemoteDataResponse(partitionIterator, command.columnFilter()),
                                 Collections.EMPTY_MAP,
                                 MessagingService.Verb.REQUEST_RESPONSE,
                                 MessagingService.current_version);


[2/3] cassandra git commit: Improve SerializationHeader response serialization

Posted by be...@apache.org.
Improve SerializationHeader response serialization

Replicas serving requests now retain the columns that were requested,
and use these to efficiently encode the subset of columns it responds
with. The expectation is that a majority of those requested will be
present in the response.

For fewer than 64 requested columns, or where all requested are returned,
a bitmap of missing columns is sent (i.e. a 0 when all are present),
encoded as a vint. Otherwise a count is sent, followed by a sequence
of either present or missing columns, whichever is more efficient.

patch by benedict; reviewed by ariel for CASSANDRA-9894


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fe388d40
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fe388d40
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fe388d40

Branch: refs/heads/trunk
Commit: fe388d40c0b019c55c63c914ef5708f245af6cdd
Parents: dea1509
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Jul 29 18:51:51 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Tue Aug 11 17:45:49 2015 +0200

----------------------------------------------------------------------
 .../org/apache/cassandra/config/CFMetaData.java |   2 +-
 src/java/org/apache/cassandra/db/Columns.java   | 230 +++++++++++++++-
 .../org/apache/cassandra/db/ReadCommand.java    |   4 +-
 .../cassandra/db/ReadCommandVerbHandler.java    |   2 +-
 .../org/apache/cassandra/db/ReadResponse.java   |  87 ++++--
 .../cassandra/db/SerializationHeader.java       |  50 +++-
 .../partitions/ArrayBackedCachedPartition.java  |   6 +-
 .../db/partitions/PartitionUpdate.java          |   6 +-
 .../UnfilteredPartitionIterators.java           |   8 +-
 .../rows/UnfilteredRowIteratorSerializer.java   |  25 +-
 .../apache/cassandra/service/StorageProxy.java  |   2 +-
 .../org/apache/cassandra/db/ColumnsTest.java    | 275 +++++++++++++++----
 .../cassandra/service/DataResolverTest.java     |   2 +-
 13 files changed, 582 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 7719587..0db3814 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -106,7 +106,7 @@ public final class CFMetaData
     private final Map<ByteBuffer, ColumnDefinition> columnMetadata = new ConcurrentHashMap<>(); // not on any hot path
     private volatile List<ColumnDefinition> partitionKeyColumns;  // Always of size keyValidator.componentsCount, null padded if necessary
     private volatile List<ColumnDefinition> clusteringColumns;    // Of size comparator.componentsCount or comparator.componentsCount -1, null padded if necessary
-    private volatile PartitionColumns partitionColumns;
+    private volatile PartitionColumns partitionColumns;           // Always non-PK, non-clustering columns
 
     // For dense tables, this alias the single non-PK column the table contains (since it can only have one). We keep
     // that as convenience to access that column more easily (but we could replace calls by partitionColumns().iterator().next()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/db/Columns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java
index 231b529..c584b4c 100644
--- a/src/java/org/apache/cassandra/db/Columns.java
+++ b/src/java/org/apache/cassandra/db/Columns.java
@@ -26,6 +26,7 @@ import java.security.MessageDigest;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
 
+import net.nicoulaj.compilecommand.annotations.DontInline;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.ColumnIdentifier;
@@ -295,9 +296,9 @@ public class Columns implements Iterable<ColumnDefinition>
      *
      * @return an iterator over all the columns of this object.
      */
-    public Iterator<ColumnDefinition> iterator()
+    public BTreeSearchIterator<ColumnDefinition, ColumnDefinition> iterator()
     {
-        return BTree.iterator(columns);
+        return BTree.<ColumnDefinition, ColumnDefinition>slice(columns, Comparator.naturalOrder(), BTree.Dir.ASC);
     }
 
     /**
@@ -311,8 +312,13 @@ public class Columns implements Iterable<ColumnDefinition>
     {
         // In wildcard selection, we want to return all columns in alphabetical order,
         // irregarding of whether they are complex or not
-        return Iterators.<ColumnDefinition>mergeSorted(ImmutableList.of(simpleColumns(), complexColumns()),
-                                     (s, c) -> s.name.compareTo(c.name));
+        return Iterators.<ColumnDefinition>
+                         mergeSorted(ImmutableList.of(simpleColumns(), complexColumns()),
+                                     (s, c) ->
+                                     {
+                                         assert !s.kind.isPrimaryKeyKind();
+                                         return s.name.bytes.compareTo(c.name.bytes);
+                                     });
     }
 
     /**
@@ -420,5 +426,221 @@ public class Columns implements Iterable<ColumnDefinition>
             }
             return new Columns(builder.build());
         }
+
+        /**
+         * If both ends have a pre-shared superset of the columns we are serializing, we can send them much
+         * more efficiently. Both ends must provide the identically same set of columns.
+         */
+        public void serializeSubset(Columns columns, Columns superset, DataOutputPlus out) throws IOException
+        {
+            /**
+             * We weight this towards small sets, and sets where the majority of items are present, since
+             * we expect this to mostly be used for serializing result sets.
+             *
+             * For supersets with fewer than 64 columns, we encode a bitmap of *missing* columns,
+             * which equates to a zero (single byte) when all columns are present, and otherwise
+             * a positive integer that can typically be vint encoded efficiently.
+             *
+             * If we have 64 or more columns, we cannot neatly perform a bitmap encoding, so we just switch
+             * to a vint encoded set of deltas, either adding or subtracting (whichever is most efficient).
+             * We indicate this switch by sending our bitmap with every bit set, i.e. -1L
+             */
+            int columnCount = columns.columnCount();
+            int supersetCount = superset.columnCount();
+            if (columnCount == supersetCount)
+            {
+                out.writeUnsignedVInt(0);
+            }
+            else if (supersetCount < 64)
+            {
+                out.writeUnsignedVInt(encodeBitmap(columns, superset, supersetCount));
+            }
+            else
+            {
+                serializeLargeSubset(columns, columnCount, superset, supersetCount, out);
+            }
+        }
+
+        public long serializedSubsetSize(Columns columns, Columns superset)
+        {
+            int columnCount = columns.columnCount();
+            int supersetCount = superset.columnCount();
+            if (columnCount == supersetCount)
+            {
+                return TypeSizes.sizeofUnsignedVInt(0);
+            }
+            else if (supersetCount < 64)
+            {
+                return TypeSizes.sizeofUnsignedVInt(encodeBitmap(columns, superset, supersetCount));
+            }
+            else
+            {
+                return serializeLargeSubsetSize(columns, columnCount, superset, supersetCount);
+            }
+        }
+
+        public Columns deserializeSubset(Columns superset, DataInputPlus in) throws IOException
+        {
+            long encoded = in.readUnsignedVInt();
+            if (encoded == -1L)
+            {
+                return deserializeLargeSubset(in, superset);
+            }
+            else if (encoded == 0L)
+            {
+                return superset;
+            }
+            else
+            {
+                BTree.Builder<ColumnDefinition> builder = BTree.builder(Comparator.naturalOrder());
+                int firstComplexIdx = 0;
+                for (ColumnDefinition column : superset)
+                {
+                    if ((encoded & 1) == 0)
+                    {
+                        builder.add(column);
+                        if (column.isSimple())
+                            ++firstComplexIdx;
+                    }
+                    encoded >>>= 1;
+                }
+                return new Columns(builder.build(), firstComplexIdx);
+            }
+        }
+
+        // encodes a 1 bit for every *missing* column, on the assumption presence is more common,
+        // and because this is consistent with encoding 0 to represent all present
+        private static long encodeBitmap(Columns columns, Columns superset, int supersetCount)
+        {
+            long bitmap = 0L;
+            BTreeSearchIterator<ColumnDefinition, ColumnDefinition> iter = superset.iterator();
+            // the index we would encounter next if all columns are present
+            int expectIndex = 0;
+            for (ColumnDefinition column : columns)
+            {
+                if (iter.next(column) == null)
+                    throw new IllegalStateException();
+
+                int currentIndex = iter.indexOfCurrent();
+                int count = currentIndex - expectIndex;
+                // (1L << count) - 1 gives us count bits set at the bottom of the register
+                // so << expectIndex moves these bits to start at expectIndex, which is where our missing portion
+                // begins (assuming count > 0; if not, we're adding 0 bits, so it's a no-op)
+                bitmap |= ((1L << count) - 1) << expectIndex;
+                expectIndex = currentIndex + 1;
+            }
+            int count = supersetCount - expectIndex;
+            bitmap |= ((1L << count) - 1) << expectIndex;
+            return bitmap;
+        }
+
+        @DontInline
+        private void serializeLargeSubset(Columns columns, int columnCount, Columns superset, int supersetCount, DataOutputPlus out) throws IOException
+        {
+            // write flag indicating we're in lengthy mode
+            out.writeUnsignedVInt(-1L);
+            out.writeUnsignedVInt(supersetCount - columnCount);
+            BTreeSearchIterator<ColumnDefinition, ColumnDefinition> iter = superset.iterator();
+            if (columnCount < supersetCount / 2)
+            {
+                // write present columns
+                for (ColumnDefinition column : columns)
+                {
+                    if (iter.next(column) == null)
+                        throw new IllegalStateException();
+                    out.writeUnsignedVInt(iter.indexOfCurrent());
+                }
+            }
+            else
+            {
+                // write missing columns
+                int prev = -1;
+                for (ColumnDefinition column : columns)
+                {
+                    if (iter.next(column) == null)
+                        throw new IllegalStateException();
+                    int cur = iter.indexOfCurrent();
+                    while (++prev != cur)
+                        out.writeUnsignedVInt(prev);
+                }
+                while (++prev != supersetCount)
+                    out.writeUnsignedVInt(prev);
+            }
+        }
+
+        @DontInline
+        private Columns deserializeLargeSubset(DataInputPlus in, Columns superset) throws IOException
+        {
+            int supersetCount = superset.columnCount();
+            int delta = (int) in.readUnsignedVInt();
+            int columnCount = supersetCount - delta;
+
+            BTree.Builder<ColumnDefinition> builder = BTree.builder(Comparator.naturalOrder());
+            if (columnCount < supersetCount / 2)
+            {
+                for (int i = 0 ; i < columnCount ; i++)
+                {
+                    int idx = (int) in.readUnsignedVInt();
+                    builder.add(BTree.findByIndex(superset.columns, idx));
+                }
+            }
+            else
+            {
+                Iterator<ColumnDefinition> iter = superset.iterator();
+                int idx = 0;
+                int skipped = 0;
+                while (true)
+                {
+                    int nextMissingIndex = skipped < delta ? (int)in.readUnsignedVInt() : supersetCount;
+                    while (idx < nextMissingIndex)
+                    {
+                        ColumnDefinition def = iter.next();
+                        builder.add(def);
+                        idx++;
+                    }
+                    if (idx == supersetCount)
+                        break;
+                    iter.next();
+                    idx++;
+                    skipped++;
+                }
+            }
+            return new Columns(builder.build());
+        }
+
+        @DontInline
+        private int serializeLargeSubsetSize(Columns columns, int columnCount, Columns superset, int supersetCount)
+        {
+            // write flag indicating we're in lengthy mode
+            int size = TypeSizes.sizeofUnsignedVInt(-1L) + TypeSizes.sizeofUnsignedVInt(supersetCount - columnCount);
+            BTreeSearchIterator<ColumnDefinition, ColumnDefinition> iter = superset.iterator();
+            if (columnCount < supersetCount / 2)
+            {
+                // write present columns
+                for (ColumnDefinition column : columns)
+                {
+                    if (iter.next(column) == null)
+                        throw new IllegalStateException();
+                    size += TypeSizes.sizeofUnsignedVInt(iter.indexOfCurrent());
+                }
+            }
+            else
+            {
+                // write missing columns
+                int prev = -1;
+                for (ColumnDefinition column : columns)
+                {
+                    if (iter.next(column) == null)
+                        throw new IllegalStateException();
+                    int cur = iter.indexOfCurrent();
+                    while (++prev != cur)
+                        size += TypeSizes.sizeofUnsignedVInt(prev);
+                }
+                while (++prev != supersetCount)
+                    size += TypeSizes.sizeofUnsignedVInt(prev);
+            }
+            return size;
+        }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 5c40492..4830124 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -251,11 +251,11 @@ public abstract class ReadCommand implements ReadQuery
 
     protected abstract int oldestUnrepairedTombstone();
 
-    public ReadResponse createResponse(UnfilteredPartitionIterator iterator)
+    public ReadResponse createResponse(UnfilteredPartitionIterator iterator, ColumnFilter selection)
     {
         return isDigestQuery()
              ? ReadResponse.createDigestResponse(iterator)
-             : ReadResponse.createDataResponse(iterator);
+             : ReadResponse.createDataResponse(iterator, selection);
     }
 
     protected SecondaryIndexSearcher getIndexSearcher(ColumnFamilyStore cfs)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
index 9cde8dc..72a6fa8 100644
--- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
@@ -44,7 +44,7 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
         ReadResponse response;
         try (ReadOrderGroup opGroup = command.startOrderGroup(); UnfilteredPartitionIterator iterator = command.executeLocally(opGroup))
         {
-            response = command.createResponse(iterator);
+            response = command.createResponse(iterator, command.columnFilter());
         }
 
         MessageOut<ReadResponse> reply = new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, response, serializer());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index b3cc725..6f418f9 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -24,8 +24,11 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -34,7 +37,6 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -52,9 +54,15 @@ public abstract class ReadResponse
         this.metadata = metadata;
     }
 
-    public static ReadResponse createDataResponse(UnfilteredPartitionIterator data)
+    public static ReadResponse createDataResponse(UnfilteredPartitionIterator data, ColumnFilter selection)
+    {
+        return new LocalDataResponse(data, selection);
+    }
+
+    @VisibleForTesting
+    public static ReadResponse createRemoteDataResponse(UnfilteredPartitionIterator data, ColumnFilter selection)
     {
-        return new DataResponse(data);
+        return new RemoteDataResponse(LocalDataResponse.build(data, selection));
     }
 
     public static ReadResponse createDigestResponse(UnfilteredPartitionIterator data)
@@ -63,7 +71,6 @@ public abstract class ReadResponse
     }
 
     public abstract UnfilteredPartitionIterator makeIterator(CFMetaData metadata, ReadCommand command);
-
     public abstract ByteBuffer digest(CFMetaData metadata, ReadCommand command);
 
     public abstract boolean isDigestQuery();
@@ -102,27 +109,22 @@ public abstract class ReadResponse
         }
     }
 
-    private static class DataResponse extends ReadResponse
+    // built on the owning node responding to a query
+    private static class LocalDataResponse extends DataResponse
     {
-        // The response, serialized in the current messaging version
-        private final ByteBuffer data;
-        private final SerializationHelper.Flag flag;
-
-        private DataResponse(ByteBuffer data)
+        private final ColumnFilter received;
+        private LocalDataResponse(UnfilteredPartitionIterator iter, ColumnFilter received)
         {
-            super(null); // This is never call on the serialization side, where we actually care of the metadata.
-            this.data = data;
-            this.flag = SerializationHelper.Flag.FROM_REMOTE;
+            super(iter.metadata(), build(iter, received), SerializationHelper.Flag.LOCAL);
+            this.received = received;
         }
 
-        private DataResponse(UnfilteredPartitionIterator iter)
+        private static ByteBuffer build(UnfilteredPartitionIterator iter, ColumnFilter selection)
         {
-            super(iter.metadata());
             try (DataOutputBuffer buffer = new DataOutputBuffer())
             {
-                UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter, buffer, MessagingService.current_version);
-                this.data = buffer.buffer();
-                this.flag = SerializationHelper.Flag.LOCAL;
+                UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter, selection, buffer, MessagingService.current_version);
+                return buffer.buffer();
             }
             catch (IOException e)
             {
@@ -131,12 +133,57 @@ public abstract class ReadResponse
             }
         }
 
+        protected ColumnFilter selection(ColumnFilter sent)
+        {
+            // we didn't send anything, so we don't provide it in the serializer methods, but use the
+            // object's reference to the original column filter we received
+            assert sent == null | sent == received;
+            return received;
+        }
+    }
+
+    // built on the coordinator node receiving a response
+    private static class RemoteDataResponse extends DataResponse
+    {
+        protected RemoteDataResponse(ByteBuffer data)
+        {
+            super(null, data, SerializationHelper.Flag.FROM_REMOTE);
+        }
+
+        protected ColumnFilter selection(ColumnFilter sent)
+        {
+            // we should always know what we sent, and should provide it in digest() and makeIterator()
+            assert sent != null;
+            return sent;
+        }
+    }
+
+    static abstract class DataResponse extends ReadResponse
+    {
+        // TODO: can the digest be calculated over the raw bytes now?
+        // The response, serialized in the current messaging version
+        private final ByteBuffer data;
+        private final SerializationHelper.Flag flag;
+
+        protected DataResponse(CFMetaData metadata, ByteBuffer data, SerializationHelper.Flag flag)
+        {
+            super(metadata);
+            this.data = data;
+            this.flag = flag;
+        }
+
+        protected abstract ColumnFilter selection(ColumnFilter filter);
+
         public UnfilteredPartitionIterator makeIterator(CFMetaData metadata, ReadCommand command)
         {
             try
             {
                 DataInputPlus in = new DataInputBuffer(data, true);
-                return UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in, MessagingService.current_version, metadata, flag);
+                return UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in,
+                                                                                         MessagingService.current_version,
+                                                                                         metadata,
+                                                                                         selection(command.columnFilter()),
+                                                                                         flag);
             }
             catch (IOException e)
             {
@@ -307,7 +354,7 @@ public abstract class ReadResponse
 
             assert version == MessagingService.VERSION_30;
             ByteBuffer data = ByteBufferUtil.readWithVIntLength(in);
-            return new DataResponse(data);
+            return new RemoteDataResponse(data);
         }
 
         public long serializedSize(ReadResponse response, int version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/db/SerializationHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java
index 2326f1e..88f6832 100644
--- a/src/java/org/apache/cassandra/db/SerializationHeader.java
+++ b/src/java/org/apache/cassandra/db/SerializationHeader.java
@@ -27,6 +27,7 @@ import com.google.common.base.Function;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.BytesType;
@@ -372,35 +373,62 @@ public class SerializationHeader
 
     public static class Serializer implements IMetadataComponentSerializer<Component>
     {
-        public void serializeForMessaging(SerializationHeader header, DataOutputPlus out, boolean hasStatic) throws IOException
+        public void serializeForMessaging(SerializationHeader header, ColumnFilter selection, DataOutputPlus out, boolean hasStatic) throws IOException
         {
             EncodingStats.serializer.serialize(header.stats, out);
 
-            if (hasStatic)
-                Columns.serializer.serialize(header.columns.statics, out);
-            Columns.serializer.serialize(header.columns.regulars, out);
+            if (selection == null)
+            {
+                if (hasStatic)
+                    Columns.serializer.serialize(header.columns.statics, out);
+                Columns.serializer.serialize(header.columns.regulars, out);
+            }
+            else
+            {
+                if (hasStatic)
+                    Columns.serializer.serializeSubset(header.columns.statics, selection.fetchedColumns().statics, out);
+                Columns.serializer.serializeSubset(header.columns.regulars, selection.fetchedColumns().regulars, out);
+            }
         }
 
-        public SerializationHeader deserializeForMessaging(DataInputPlus in, CFMetaData metadata, boolean hasStatic) throws IOException
+        public SerializationHeader deserializeForMessaging(DataInputPlus in, CFMetaData metadata, ColumnFilter selection, boolean hasStatic) throws IOException
         {
             EncodingStats stats = EncodingStats.serializer.deserialize(in);
 
             AbstractType<?> keyType = metadata.getKeyValidator();
             List<AbstractType<?>> clusteringTypes = typesOf(metadata.clusteringColumns());
 
-            Columns statics = hasStatic ? Columns.serializer.deserialize(in, metadata) : Columns.NONE;
-            Columns regulars = Columns.serializer.deserialize(in, metadata);
+            Columns statics, regulars;
+            if (selection == null)
+            {
+                statics = hasStatic ? Columns.serializer.deserialize(in, metadata) : Columns.NONE;
+                regulars = Columns.serializer.deserialize(in, metadata);
+            }
+            else
+            {
+                statics = hasStatic ? Columns.serializer.deserializeSubset(selection.fetchedColumns().statics, in) : Columns.NONE;
+                regulars = Columns.serializer.deserializeSubset(selection.fetchedColumns().regulars, in);
+            }
 
             return new SerializationHeader(keyType, clusteringTypes, new PartitionColumns(statics, regulars), stats, null);
         }
 
-        public long serializedSizeForMessaging(SerializationHeader header, boolean hasStatic)
+        public long serializedSizeForMessaging(SerializationHeader header, ColumnFilter selection, boolean hasStatic)
         {
             long size = EncodingStats.serializer.serializedSize(header.stats);
 
-            if (hasStatic)
-                size += Columns.serializer.serializedSize(header.columns.statics);
-            size += Columns.serializer.serializedSize(header.columns.regulars);
+            if (selection == null)
+            {
+                if (hasStatic)
+                    size += Columns.serializer.serializedSize(header.columns.statics);
+                size += Columns.serializer.serializedSize(header.columns.regulars);
+            }
+            else
+            {
+                if (hasStatic)
+                    size += Columns.serializer.serializedSubsetSize(header.columns.statics, selection.fetchedColumns().statics);
+                size += Columns.serializer.serializedSubsetSize(header.columns.regulars, selection.fetchedColumns().regulars);
+            }
             return size;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java b/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
index a3c8768..fab8591 100644
--- a/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
@@ -216,7 +216,7 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements
             CFMetaData.serializer.serialize(partition.metadata(), out, version);
             try (UnfilteredRowIterator iter = p.sliceableUnfilteredIterator())
             {
-                UnfilteredRowIteratorSerializer.serializer.serialize(iter, out, version, p.rowCount());
+                UnfilteredRowIteratorSerializer.serializer.serialize(iter, null, out, version, p.rowCount());
             }
         }
 
@@ -238,7 +238,7 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements
             int nonExpiringLiveCells = in.readInt();
 
             CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
-            UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, version, metadata, SerializationHelper.Flag.LOCAL);
+            UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(metadata, null, in, version, SerializationHelper.Flag.LOCAL);
             assert !header.isReversed && header.rowEstimate >= 0;
 
             MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(header.partitionDeletion, metadata.comparator, false);
@@ -286,7 +286,7 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements
                      + TypeSizes.sizeof(p.nonTombstoneCellCount)
                      + TypeSizes.sizeof(p.nonExpiringLiveCells)
                      + CFMetaData.serializer.serializedSize(partition.metadata(), version)
-                     + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, MessagingService.current_version, p.rowCount());
+                     + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, null, MessagingService.current_version, p.rowCount());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index bb73929..e6d51e5 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -716,7 +716,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
                 else
                 {
                     CFMetaData.serializer.serialize(update.metadata(), out, version);
-                    UnfilteredRowIteratorSerializer.serializer.serialize(iter, out, version, update.rows.size());
+                    UnfilteredRowIteratorSerializer.serializer.serialize(iter, null, out, version, update.rows.size());
                 }
             }
         }
@@ -752,7 +752,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
         private static PartitionUpdate deserialize30(DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException
         {
             CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
-            UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, version, metadata, flag);
+            UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(metadata, null, in, version, flag);
             if (header.isEmpty)
                 return emptyUpdate(metadata, header.key);
 
@@ -802,7 +802,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
                     return LegacyLayout.serializedSizeAsLegacyPartition(iter, version);
 
                 return CFMetaData.serializer.serializedSize(update.metadata(), version)
-                     + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, version, update.rows.size());
+                     + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, null, version, update.rows.size());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index 0418e7f..f7ee5ee 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -361,7 +361,7 @@ public abstract class UnfilteredPartitionIterators
      */
     public static class Serializer
     {
-        public void serialize(UnfilteredPartitionIterator iter, DataOutputPlus out, int version) throws IOException
+        public void serialize(UnfilteredPartitionIterator iter, ColumnFilter selection, DataOutputPlus out, int version) throws IOException
         {
             assert version >= MessagingService.VERSION_30; // We handle backward compatibility directy in ReadResponse.LegacyRangeSliceReplySerializer
 
@@ -371,13 +371,13 @@ public abstract class UnfilteredPartitionIterators
                 out.writeBoolean(true);
                 try (UnfilteredRowIterator partition = iter.next())
                 {
-                    UnfilteredRowIteratorSerializer.serializer.serialize(partition, out, version);
+                    UnfilteredRowIteratorSerializer.serializer.serialize(partition, selection, out, version);
                 }
             }
             out.writeBoolean(false);
         }
 
-        public UnfilteredPartitionIterator deserialize(final DataInputPlus in, final int version, final CFMetaData metadata, final SerializationHelper.Flag flag) throws IOException
+        public UnfilteredPartitionIterator deserialize(final DataInputPlus in, final int version, final CFMetaData metadata, final ColumnFilter selection, final SerializationHelper.Flag flag) throws IOException
         {
             assert version >= MessagingService.VERSION_30; // We handle backward compatibility directy in ReadResponse.LegacyRangeSliceReplySerializer
             final boolean isForThrift = in.readBoolean();
@@ -428,7 +428,7 @@ public abstract class UnfilteredPartitionIterators
                     try
                     {
                         nextReturned = true;
-                        next = UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, metadata, flag);
+                        next = UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, metadata, selection, flag);
                         return next;
                     }
                     catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
index 531bd26..f17ccca 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -71,22 +72,22 @@ public class UnfilteredRowIteratorSerializer
     public static final UnfilteredRowIteratorSerializer serializer = new UnfilteredRowIteratorSerializer();
 
     // Should only be used for the on-wire format.
-    public void serialize(UnfilteredRowIterator iterator, DataOutputPlus out, int version) throws IOException
+    public void serialize(UnfilteredRowIterator iterator, ColumnFilter selection, DataOutputPlus out, int version) throws IOException
     {
-        serialize(iterator, out, version, -1);
+        serialize(iterator, selection, out, version, -1);
     }
 
     // Should only be used for the on-wire format.
-    public void serialize(UnfilteredRowIterator iterator, DataOutputPlus out, int version, int rowEstimate) throws IOException
+    public void serialize(UnfilteredRowIterator iterator, ColumnFilter selection, DataOutputPlus out, int version, int rowEstimate) throws IOException
     {
         SerializationHeader header = new SerializationHeader(iterator.metadata(),
                                                              iterator.columns(),
                                                              iterator.stats());
-        serialize(iterator, out, header, version, rowEstimate);
+        serialize(iterator, header, selection, out, version, rowEstimate);
     }
 
     // Should only be used for the on-wire format.
-    public void serialize(UnfilteredRowIterator iterator, DataOutputPlus out, SerializationHeader header, int version, int rowEstimate) throws IOException
+    public void serialize(UnfilteredRowIterator iterator, SerializationHeader header, ColumnFilter selection, DataOutputPlus out, int version, int rowEstimate) throws IOException
     {
         ByteBufferUtil.writeWithVIntLength(iterator.partitionKey().getKey(), out);
 
@@ -113,7 +114,7 @@ public class UnfilteredRowIteratorSerializer
 
         out.writeByte((byte)flags);
 
-        SerializationHeader.serializer.serializeForMessaging(header, out, hasStatic);
+        SerializationHeader.serializer.serializeForMessaging(header, selection, out, hasStatic);
 
         if (!partitionDeletion.isLive())
             header.writeDeletionTime(partitionDeletion, out);
@@ -131,7 +132,7 @@ public class UnfilteredRowIteratorSerializer
 
     // Please note that this consume the iterator, and as such should not be called unless we have a simple way to
     // recreate an iterator for both serialize and serializedSize, which is mostly only PartitionUpdate/ArrayBackedCachedPartition.
-    public long serializedSize(UnfilteredRowIterator iterator, int version, int rowEstimate)
+    public long serializedSize(UnfilteredRowIterator iterator, ColumnFilter selection, int version, int rowEstimate)
     {
         SerializationHeader header = new SerializationHeader(iterator.metadata(),
                                                              iterator.columns(),
@@ -149,7 +150,7 @@ public class UnfilteredRowIteratorSerializer
         Row staticRow = iterator.staticRow();
         boolean hasStatic = staticRow != Rows.EMPTY_STATIC_ROW;
 
-        size += SerializationHeader.serializer.serializedSizeForMessaging(header, hasStatic);
+        size += SerializationHeader.serializer.serializedSizeForMessaging(header, selection, hasStatic);
 
         if (!partitionDeletion.isLive())
             size += header.deletionTimeSerializedSize(partitionDeletion);
@@ -167,7 +168,7 @@ public class UnfilteredRowIteratorSerializer
         return size;
     }
 
-    public Header deserializeHeader(DataInputPlus in, int version, CFMetaData metadata, SerializationHelper.Flag flag) throws IOException
+    public Header deserializeHeader(CFMetaData metadata, ColumnFilter selection, DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException
     {
         DecoratedKey key = metadata.decorateKey(ByteBufferUtil.readWithVIntLength(in));
         int flags = in.readUnsignedByte();
@@ -182,7 +183,7 @@ public class UnfilteredRowIteratorSerializer
         boolean hasStatic = (flags & HAS_STATIC_ROW) != 0;
         boolean hasRowEstimate = (flags & HAS_ROW_ESTIMATE) != 0;
 
-        SerializationHeader header = SerializationHeader.serializer.deserializeForMessaging(in, metadata, hasStatic);
+        SerializationHeader header = SerializationHeader.serializer.deserializeForMessaging(in, metadata, selection, hasStatic);
 
         DeletionTime partitionDeletion = hasPartitionDeletion ? header.readDeletionTime(in) : DeletionTime.LIVE;
 
@@ -220,9 +221,9 @@ public class UnfilteredRowIteratorSerializer
         };
     }
 
-    public UnfilteredRowIterator deserialize(DataInputPlus in, int version, CFMetaData metadata, SerializationHelper.Flag flag) throws IOException
+    public UnfilteredRowIterator deserialize(DataInputPlus in, int version, CFMetaData metadata, ColumnFilter selection, SerializationHelper.Flag flag) throws IOException
     {
-        return deserialize(in, version, metadata, flag, deserializeHeader(in, version, metadata, flag));
+        return deserialize(in, version, metadata, flag, deserializeHeader(metadata, selection, in, version, flag));
     }
 
     public static class Header

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 0846ef1..fc917f0 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1689,7 +1689,7 @@ public class StorageProxy implements StorageProxyMBean
             {
                 try (ReadOrderGroup orderGroup = command.startOrderGroup(); UnfilteredPartitionIterator iterator = command.executeLocally(orderGroup))
                 {
-                    handler.response(command.createResponse(iterator));
+                    handler.response(command.createResponse(iterator, command.columnFilter()));
                 }
                 MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/test/unit/org/apache/cassandra/db/ColumnsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnsTest.java b/test/unit/org/apache/cassandra/db/ColumnsTest.java
index 5447fcc..a0ade96 100644
--- a/test/unit/org/apache/cassandra/db/ColumnsTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnsTest.java
@@ -18,13 +18,12 @@
 */
 package org.apache.cassandra.db;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
+import java.io.IOException;
+import java.util.*;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Predicate;
 
+import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 
 import org.junit.AfterClass;
@@ -36,6 +35,8 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.marshal.SetType;
 import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.btree.BTreeSet;
 
@@ -44,42 +45,39 @@ public class ColumnsTest
 
     private static CFMetaData cfMetaData = MockSchema.newCFS().metadata;
 
+    // this tests most of our functionality, since each subset we perform
+    // reasonably comprehensive tests of basic functionality against
     @Test
     public void testContainsWithoutAndMergeTo()
     {
-        for (RandomColumns randomColumns : random())
-            testContainsWithoutAndMergeTo(randomColumns.columns, randomColumns.definitions);
+        for (ColumnsCheck randomColumns : randomSmall(true))
+            testContainsWithoutAndMergeTo(randomColumns);
     }
 
-    private void testContainsWithoutAndMergeTo(Columns columns, List<ColumnDefinition> definitions)
+    private void testContainsWithoutAndMergeTo(ColumnsCheck input)
     {
         // pick some arbitrary groupings of columns to remove at-once (to avoid factorial complexity)
         // whatever is left after each removal, we perform this logic on again, recursively
-        List<List<ColumnDefinition>> removeGroups = shuffleAndGroup(Lists.newArrayList(definitions));
+        List<List<ColumnDefinition>> removeGroups = shuffleAndGroup(Lists.newArrayList(input.definitions));
         for (List<ColumnDefinition> defs : removeGroups)
         {
-            Columns subset = columns;
-            for (ColumnDefinition def : defs)
-                subset = subset.without(def);
-            Assert.assertEquals(columns.columnCount() - defs.size(), subset.columnCount());
-            List<ColumnDefinition> remainingDefs = Lists.newArrayList(columns);
-            remainingDefs.removeAll(defs);
+            ColumnsCheck subset = input.remove(defs);
 
             // test contents after .without
-            assertContents(subset, remainingDefs);
+            subset.assertContents();
 
             // test .contains
-            assertSubset(columns, subset);
+            assertSubset(input.columns, subset.columns);
 
             // test .mergeTo
-            Columns otherSubset = columns;
-            for (ColumnDefinition def : remainingDefs)
+            Columns otherSubset = input.columns;
+            for (ColumnDefinition def : subset.definitions)
             {
                 otherSubset = otherSubset.without(def);
-                assertContents(otherSubset.mergeTo(subset), definitions);
+                assertContents(otherSubset.mergeTo(subset.columns), input.definitions);
             }
 
-            testContainsWithoutAndMergeTo(subset, remainingDefs);
+            testContainsWithoutAndMergeTo(subset);
         }
     }
 
@@ -90,6 +88,67 @@ public class ColumnsTest
         Assert.assertFalse(subset.contains(superset));
     }
 
+    @Test
+    public void testSerialize() throws IOException
+    {
+        testSerialize(Columns.NONE, Collections.emptyList());
+        for (ColumnsCheck randomColumns : randomSmall(false))
+            testSerialize(randomColumns.columns, randomColumns.definitions);
+    }
+
+    private void testSerialize(Columns columns, List<ColumnDefinition> definitions) throws IOException
+    {
+        try (DataOutputBuffer out = new DataOutputBuffer())
+        {
+            Columns.serializer.serialize(columns, out);
+            Assert.assertEquals(Columns.serializer.serializedSize(columns), out.buffer().remaining());
+            Columns deserialized = Columns.serializer.deserialize(new DataInputBuffer(out.buffer(), false), mock(columns));
+            Assert.assertEquals(columns, deserialized);
+            Assert.assertEquals(columns.hashCode(), deserialized.hashCode());
+            assertContents(deserialized, definitions);
+        }
+    }
+
+    @Test
+    public void testSerializeSmallSubset() throws IOException
+    {
+        for (ColumnsCheck randomColumns : randomSmall(true))
+            testSerializeSubset(randomColumns);
+    }
+
+    @Test
+    public void testSerializeHugeSubset() throws IOException
+    {
+        for (ColumnsCheck randomColumns : randomHuge())
+            testSerializeSubset(randomColumns);
+    }
+
+    private void testSerializeSubset(ColumnsCheck input) throws IOException
+    {
+        testSerializeSubset(input.columns, input.columns, input.definitions);
+        testSerializeSubset(input.columns, Columns.NONE, Collections.emptyList());
+        List<List<ColumnDefinition>> removeGroups = shuffleAndGroup(Lists.newArrayList(input.definitions));
+        for (List<ColumnDefinition> defs : removeGroups)
+        {
+            Collections.sort(defs);
+            ColumnsCheck subset = input.remove(defs);
+            testSerializeSubset(input.columns, subset.columns, subset.definitions);
+        }
+    }
+
+    private void testSerializeSubset(Columns superset, Columns subset, List<ColumnDefinition> subsetDefinitions) throws IOException
+    {
+        try (DataOutputBuffer out = new DataOutputBuffer())
+        {
+            Columns.serializer.serializeSubset(subset, superset, out);
+            Assert.assertEquals(Columns.serializer.serializedSubsetSize(subset, superset), out.buffer().remaining());
+            Columns deserialized = Columns.serializer.deserializeSubset(superset, new DataInputBuffer(out.buffer(), false));
+            Assert.assertEquals(subset, deserialized);
+            Assert.assertEquals(subset.hashCode(), deserialized.hashCode());
+            assertContents(deserialized, subsetDefinitions);
+        }
+    }
+
     private static void assertContents(Columns columns, List<ColumnDefinition> defs)
     {
         Assert.assertEquals(defs, Lists.newArrayList(columns));
@@ -109,6 +168,7 @@ public class ColumnsTest
             {
                 hasSimple = true;
                 Assert.assertEquals(i, columns.simpleIdx(def));
+                Assert.assertEquals(def, columns.getSimple(i));
                 Assert.assertEquals(def, simple.next());
                 ++firstComplexIdx;
             }
@@ -117,6 +177,7 @@ public class ColumnsTest
                 Assert.assertFalse(simple.hasNext());
                 hasComplex = true;
                 Assert.assertEquals(i - firstComplexIdx, columns.complexIdx(def));
+                Assert.assertEquals(def, columns.getComplex(i - firstComplexIdx));
                 Assert.assertEquals(def, complex.next());
             }
             i++;
@@ -127,6 +188,16 @@ public class ColumnsTest
         Assert.assertFalse(all.hasNext());
         Assert.assertEquals(hasSimple, columns.hasSimple());
         Assert.assertEquals(hasComplex, columns.hasComplex());
+
+        // check select order
+        if (!columns.hasSimple() || !columns.getSimple(0).kind.isPrimaryKeyKind())
+        {
+            List<ColumnDefinition> selectOrderDefs = new ArrayList<>(defs);
+            Collections.sort(selectOrderDefs, (a, b) -> a.name.bytes.compareTo(b.name.bytes));
+            List<ColumnDefinition> selectOrderColumns = new ArrayList<>();
+            Iterators.addAll(selectOrderColumns, columns.selectOrderIterator());
+            Assert.assertEquals(selectOrderDefs, selectOrderColumns);
+        }
     }
 
     private static <V> List<List<V>> shuffleAndGroup(List<V> list)
@@ -141,7 +212,7 @@ public class ColumnsTest
             list.set(j, v);
         }
 
-        // then group
+        // then group (logarithmically, to ensure our recursive functions don't explode the state space)
         List<List<V>> result = new ArrayList<>();
         for (int i = 0 ; i < list.size() ;)
         {
@@ -162,83 +233,179 @@ public class ColumnsTest
         MockSchema.cleanup();
     }
 
-    private static class RandomColumns
+    private static class ColumnsCheck
     {
         final Columns columns;
         final List<ColumnDefinition> definitions;
 
-        private RandomColumns(List<ColumnDefinition> definitions)
+        private ColumnsCheck(Columns columns, List<ColumnDefinition> definitions)
+        {
+            this.columns = columns;
+            this.definitions = definitions;
+        }
+
+        private ColumnsCheck(List<ColumnDefinition> definitions)
         {
             this.columns = Columns.from(BTreeSet.of(definitions));
             this.definitions = definitions;
         }
+
+        ColumnsCheck remove(List<ColumnDefinition> remove)
+        {
+            Columns subset = columns;
+            for (ColumnDefinition def : remove)
+                subset = subset.without(def);
+            Assert.assertEquals(columns.columnCount() - remove.size(), subset.columnCount());
+            List<ColumnDefinition> remainingDefs = Lists.newArrayList(columns);
+            remainingDefs.removeAll(remove);
+            return new ColumnsCheck(subset, remainingDefs);
+        }
+
+        void assertContents()
+        {
+            ColumnsTest.assertContents(columns, definitions);
+        }
+    }
+
+    private static List<ColumnsCheck> randomHuge()
+    {
+        List<ColumnsCheck> result = new ArrayList<>();
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        result.add(randomHuge(random.nextInt(64, 128), 0, 0, 0));
+        result.add(randomHuge(0, random.nextInt(64, 128), 0, 0));
+        result.add(randomHuge(0, 0, random.nextInt(64, 128), 0));
+        result.add(randomHuge(0, 0, 0, random.nextInt(64, 128)));
+        result.add(randomHuge(random.nextInt(64, 128), random.nextInt(64, 128), 0, 0));
+        result.add(randomHuge(0, random.nextInt(64, 128), random.nextInt(64, 128), 0));
+        result.add(randomHuge(0, 0, random.nextInt(64, 128), random.nextInt(64, 128)));
+        result.add(randomHuge(random.nextInt(64, 128), random.nextInt(64, 128), random.nextInt(64, 128), 0));
+        result.add(randomHuge(0, random.nextInt(64, 128), random.nextInt(64, 128), random.nextInt(64, 128)));
+        result.add(randomHuge(random.nextInt(64, 128), random.nextInt(64, 128), random.nextInt(64, 128), random.nextInt(64, 128)));
+        return result;
     }
 
-    private static List<RandomColumns> random()
+    private static List<ColumnsCheck> randomSmall(boolean permitMultiplePartitionKeys)
     {
-        List<RandomColumns> random = new ArrayList<>();
+        List<ColumnsCheck> random = new ArrayList<>();
         for (int i = 1 ; i <= 3 ; i++)
         {
-            random.add(random(i, i - 1, i - 1, i - 1));
-            random.add(random(i - 1, i, i - 1, i - 1));
-            random.add(random(i - 1, i - 1, i, i - 1));
-            random.add(random(i - 1, i - 1, i - 1, i));
+            int pkCount = permitMultiplePartitionKeys ? i - 1 : 1;
+            if (permitMultiplePartitionKeys)
+                random.add(randomSmall(i, i - 1, i - 1, i - 1));
+            random.add(randomSmall(0, 0, i, i)); // both kinds of regular, no PK
+            random.add(randomSmall(pkCount, i, i - 1, i - 1)); // PK + clustering, few or none regular
+            random.add(randomSmall(pkCount, i - 1, i, i - 1)); // PK + few or none clustering, some regular, few or none complex
+            random.add(randomSmall(pkCount, i - 1, i - 1, i)); // PK + few or none clustering or regular, some complex
         }
         return random;
     }
 
-    private static RandomColumns random(int pkCount, int clCount, int regularCount, int complexCount)
+    private static ColumnsCheck randomSmall(int pkCount, int clCount, int regularCount, int complexCount)
     {
-        List<Character> chars = new ArrayList<>();
+        List<String> names = new ArrayList<>();
         for (char c = 'a' ; c <= 'z' ; c++)
-            chars.add(c);
+            names .add(Character.toString(c));
 
         List<ColumnDefinition> result = new ArrayList<>();
-        addPartition(select(chars, pkCount), result);
-        addClustering(select(chars, clCount), result);
-        addRegular(select(chars, regularCount), result);
-        addComplex(select(chars, complexCount), result);
+        addPartition(select(names, pkCount), result);
+        addClustering(select(names, clCount), result);
+        addRegular(select(names, regularCount), result);
+        addComplex(select(names, complexCount), result);
         Collections.sort(result);
-        return new RandomColumns(result);
+        return new ColumnsCheck(result);
     }
 
-    private static List<Character> select(List<Character> chars, int count)
+    private static List<String> select(List<String> names, int count)
     {
-        List<Character> result = new ArrayList<>();
+        List<String> result = new ArrayList<>();
         ThreadLocalRandom random = ThreadLocalRandom.current();
         for (int i = 0 ; i < count ; i++)
         {
-            int v = random.nextInt(chars.size());
-            result.add(chars.get(v));
-            chars.remove(v);
+            int v = random.nextInt(names.size());
+            result.add(names.get(v));
+            names.remove(v);
         }
         return result;
     }
 
-    private static void addPartition(List<Character> chars, List<ColumnDefinition> results)
+    private static ColumnsCheck randomHuge(int pkCount, int clCount, int regularCount, int complexCount)
     {
-        addSimple(ColumnDefinition.Kind.PARTITION_KEY, chars, results);
+        List<ColumnDefinition> result = new ArrayList<>();
+        Set<String> usedNames = new HashSet<>();
+        addPartition(names(pkCount, usedNames), result);
+        addClustering(names(clCount, usedNames), result);
+        addRegular(names(regularCount, usedNames), result);
+        addComplex(names(complexCount, usedNames), result);
+        Collections.sort(result);
+        return new ColumnsCheck(result);
     }
 
-    private static void addClustering(List<Character> chars, List<ColumnDefinition> results)
+    private static List<String> names(int count, Set<String> usedNames)
     {
-        addSimple(ColumnDefinition.Kind.CLUSTERING, chars, results);
+        List<String> names = new ArrayList<>();
+        StringBuilder builder = new StringBuilder();
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        for (int i = 0 ; i < count ; i++)
+        {
+            builder.setLength(0);
+            for (int j = 0 ; j < 3 || usedNames.contains(builder.toString()) ; j++)
+                builder.append((char) random.nextInt('a', 'z' + 1));
+            String name = builder.toString();
+            names.add(name);
+            usedNames.add(name);
+        }
+        return names;
     }
 
-    private static void addRegular(List<Character> chars, List<ColumnDefinition> results)
+    private static void addPartition(List<String> names, List<ColumnDefinition> results)
     {
-        addSimple(ColumnDefinition.Kind.REGULAR, chars, results);
+        for (String name : names)
+            results.add(new ColumnDefinition(cfMetaData, ByteBufferUtil.bytes(name), UTF8Type.instance, null, ColumnDefinition.Kind.PARTITION_KEY));
     }
 
-    private static void addSimple(ColumnDefinition.Kind kind, List<Character> chars, List<ColumnDefinition> results)
+    private static void addClustering(List<String> names, List<ColumnDefinition> results)
     {
-        for (Character c : chars)
-            results.add(new ColumnDefinition(cfMetaData, ByteBufferUtil.bytes(c.toString()), UTF8Type.instance, null, kind));
+        int i = 0;
+        for (String name : names)
+            results.add(new ColumnDefinition(cfMetaData, ByteBufferUtil.bytes(name), UTF8Type.instance, i++, ColumnDefinition.Kind.CLUSTERING));
     }
 
-    private static void addComplex(List<Character> chars, List<ColumnDefinition> results)
+    private static void addRegular(List<String> names, List<ColumnDefinition> results)
     {
-        for (Character c : chars)
-            results.add(new ColumnDefinition(cfMetaData, ByteBufferUtil.bytes(c.toString()), SetType.getInstance(UTF8Type.instance, true), null, ColumnDefinition.Kind.REGULAR));
+        for (String name : names)
+            results.add(new ColumnDefinition(cfMetaData, ByteBufferUtil.bytes(name), UTF8Type.instance, null, ColumnDefinition.Kind.REGULAR));
+    }
+
+    private static <V> void addComplex(List<String> names, List<ColumnDefinition> results)
+    {
+        for (String name : names)
+            results.add(new ColumnDefinition(cfMetaData, ByteBufferUtil.bytes(name), SetType.getInstance(UTF8Type.instance, true), null, ColumnDefinition.Kind.REGULAR));
+    }
+
+    private static CFMetaData mock(Columns columns)
+    {
+        if (columns.isEmpty())
+            return cfMetaData;
+        CFMetaData.Builder builder = CFMetaData.Builder.create(cfMetaData.ksName, cfMetaData.cfName);
+        boolean hasPartitionKey = false;
+        for (ColumnDefinition def : columns)
+        {
+            switch (def.kind)
+            {
+                case PARTITION_KEY:
+                    builder.addPartitionKey(def.name, def.type);
+                    hasPartitionKey = true;
+                    break;
+                case CLUSTERING:
+                    builder.addClusteringColumn(def.name, def.type);
+                    break;
+                case REGULAR:
+                    builder.addRegularColumn(def.name, def.type);
+                    break;
+            }
+        }
+        if (!hasPartitionKey)
+            builder.addPartitionKey("219894021498309239rufejsfjdksfjheiwfhjes", UTF8Type.instance);
+        return builder.build();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/fe388d40/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java
index efd3504..0804bfb 100644
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@ -491,7 +491,7 @@ public class DataResolverTest
     public MessageIn<ReadResponse> readResponseMessage(InetAddress from, UnfilteredPartitionIterator partitionIterator)
     {
         return MessageIn.create(from,
-                                ReadResponse.createDataResponse(partitionIterator),
+                                ReadResponse.createRemoteDataResponse(partitionIterator, command.columnFilter()),
                                 Collections.EMPTY_MAP,
                                 MessagingService.Verb.REQUEST_RESPONSE,
                                 MessagingService.current_version);


[3/3] cassandra git commit: Merge branch 'cassandra-3.0' into trunk

Posted by be...@apache.org.
Merge branch 'cassandra-3.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4ffb5cc6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4ffb5cc6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4ffb5cc6

Branch: refs/heads/trunk
Commit: 4ffb5cc60707c34171ca57e692297b6f47fd1ee7
Parents: 881776f fe388d4
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Tue Aug 11 17:46:12 2015 +0200
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Tue Aug 11 17:46:12 2015 +0200

----------------------------------------------------------------------
 .../org/apache/cassandra/config/CFMetaData.java |   2 +-
 src/java/org/apache/cassandra/db/Columns.java   | 230 +++++++++++++++-
 .../org/apache/cassandra/db/ReadCommand.java    |   4 +-
 .../cassandra/db/ReadCommandVerbHandler.java    |   2 +-
 .../org/apache/cassandra/db/ReadResponse.java   |  87 ++++--
 .../cassandra/db/SerializationHeader.java       |  50 +++-
 .../partitions/ArrayBackedCachedPartition.java  |   6 +-
 .../db/partitions/PartitionUpdate.java          |   6 +-
 .../UnfilteredPartitionIterators.java           |   8 +-
 .../rows/UnfilteredRowIteratorSerializer.java   |  25 +-
 .../apache/cassandra/service/StorageProxy.java  |   2 +-
 .../org/apache/cassandra/db/ColumnsTest.java    | 275 +++++++++++++++----
 .../cassandra/service/DataResolverTest.java     |   2 +-
 13 files changed, 582 insertions(+), 117 deletions(-)
----------------------------------------------------------------------