You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2015/08/08 00:44:40 UTC

[1/4] cassandra git commit: On-wire backward compatibility for 3.0

Repository: cassandra
Updated Branches:
  refs/heads/trunk ed9343edf -> 288f2cf4f


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
index 4baf6a3..bb2fbf1 100644
--- a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java
@@ -103,38 +103,6 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer>
         return l.toArray(new ByteBuffer[l.size()]);
     }
 
-    public static class CompositeComponent
-    {
-        public AbstractType<?> comparator;
-        public ByteBuffer   value;
-
-        public CompositeComponent( AbstractType<?> comparator, ByteBuffer value )
-        {
-            this.comparator = comparator;
-            this.value      = value;
-        }
-    }
-
-    public List<CompositeComponent> deconstruct( ByteBuffer bytes )
-    {
-        List<CompositeComponent> list = new ArrayList<CompositeComponent>();
-
-        ByteBuffer bb = bytes.duplicate();
-        readIsStatic(bb);
-        int i = 0;
-
-        while (bb.remaining() > 0)
-        {
-            AbstractType comparator = getComparator(i, bb);
-            ByteBuffer value = ByteBufferUtil.readBytesWithShortLength(bb);
-
-            list.add( new CompositeComponent(comparator,value) );
-
-            byte b = bb.get(); // Ignore; not relevant here
-            ++i;
-        }
-        return list;
-    }
 
     /*
      * Escapes all occurences of the ':' character from the input, replacing them by "\:".

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index 01eb58f..633a994 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -218,6 +218,32 @@ public class CompositeType extends AbstractCompositeType
         return null;
     }
 
+    public static class CompositeComponent
+    {
+        public ByteBuffer value;
+        public byte eoc;
+
+        public CompositeComponent(ByteBuffer value, byte eoc)
+        {
+            this.value = value;
+            this.eoc = eoc;
+        }
+    }
+
+    public static List<CompositeComponent> deconstruct(ByteBuffer bytes)
+    {
+        List<CompositeComponent> list = new ArrayList<>();
+        ByteBuffer bb = bytes.duplicate();
+        readStatic(bb);
+        while (bb.remaining() > 0)
+        {
+            ByteBuffer value = ByteBufferUtil.readBytesWithShortLength(bb);
+            byte eoc = bb.get();
+            list.add(new CompositeComponent(value, eoc));
+        }
+        return list;
+    }
+
     // Extract CQL3 column name from the full column name.
     public ByteBuffer extractLastComponent(ByteBuffer bb)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java b/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java
index acdd0e2..0b218f5 100644
--- a/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java
@@ -189,7 +189,7 @@ public abstract class AbstractThreadUnsafePartition implements Partition, Iterab
         return sliceableUnfilteredIterator(ColumnFilter.all(metadata()), false);
     }
 
-    protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator(ColumnFilter selection, boolean reversed)
+    public SliceableUnfilteredRowIterator sliceableUnfilteredIterator(ColumnFilter selection, boolean reversed)
     {
         return new SliceableIterator(this, selection, reversed);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index f2e0617..bb73929 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -23,12 +23,15 @@ import java.util.*;
 
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.util.DataInputBuffer;
@@ -36,9 +39,10 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MergeIterator;
+import org.apache.cassandra.utils.Pair;
 
 /**
  * Stores updates made on a partition.
@@ -494,7 +498,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
     }
 
     @Override
-    protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator(ColumnFilter columns, boolean reversed)
+    public SliceableUnfilteredRowIterator sliceableUnfilteredIterator(ColumnFilter columns, boolean reversed)
     {
         maybeBuild();
         return super.sliceableUnfilteredIterator(columns, reversed);
@@ -503,7 +507,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
     /**
      * Validates the data contained in this update.
      *
-     * @throws MarshalException if some of the data contained in this update is corrupted.
+     * @throws org.apache.cassandra.serializers.MarshalException if some of the data contained in this update is corrupted.
      */
     public void validate()
     {
@@ -701,37 +705,19 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
     {
         public void serialize(PartitionUpdate update, DataOutputPlus out, int version) throws IOException
         {
-            if (version < MessagingService.VERSION_30)
-            {
-                // TODO
-                throw new UnsupportedOperationException();
-
-                // if (cf == null)
-                // {
-                //     out.writeBoolean(false);
-                //     return;
-                // }
-
-                // out.writeBoolean(true);
-                // serializeCfId(cf.id(), out, version);
-                // cf.getComparator().deletionInfoSerializer().serialize(cf.deletionInfo(), out, version);
-                // ColumnSerializer columnSerializer = cf.getComparator().columnSerializer();
-                // int count = cf.getColumnCount();
-                // out.writeInt(count);
-                // int written = 0;
-                // for (Cell cell : cf)
-                // {
-                //     columnSerializer.serialize(cell, out);
-                //     written++;
-                // }
-                // assert count == written: "Table had " + count + " columns, but " + written + " written";
-            }
-
-            CFMetaData.serializer.serialize(update.metadata(), out, version);
             try (UnfilteredRowIterator iter = update.sliceableUnfilteredIterator())
             {
                 assert !iter.isReverseOrder();
-                UnfilteredRowIteratorSerializer.serializer.serialize(iter, out, version, update.rows.size());
+
+                if (version < MessagingService.VERSION_30)
+                {
+                    LegacyLayout.serializeAsLegacyPartition(iter, out, version);
+                }
+                else
+                {
+                    CFMetaData.serializer.serialize(update.metadata(), out, version);
+                    UnfilteredRowIteratorSerializer.serializer.serialize(iter, out, version, update.rows.size());
+                }
             }
         }
 
@@ -745,9 +731,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
             else
             {
                 assert key != null;
-                CFMetaData metadata = deserializeMetadata(in, version);
-                DecoratedKey dk = metadata.decorateKey(key);
-                return deserializePre30(in, version, flag, metadata, dk);
+                return deserializePre30(in, version, flag, key);
             }
         }
 
@@ -761,8 +745,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
             else
             {
                 assert key != null;
-                CFMetaData metadata = deserializeMetadata(in, version);
-                return deserializePre30(in, version, flag, metadata, key);
+                return deserializePre30(in, version, flag, key.getKey());
             }
         }
 
@@ -802,48 +785,22 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition
                                        false);
         }
 
-        private static CFMetaData deserializeMetadata(DataInputPlus in, int version) throws IOException
+        private static PartitionUpdate deserializePre30(DataInputPlus in, int version, SerializationHelper.Flag flag, ByteBuffer key) throws IOException
         {
-            // This is only used in mutation, and mutation have never allowed "null" column families
-            boolean present = in.readBoolean();
-            assert present;
-
-            CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
-            return metadata;
-        }
-
-        private static PartitionUpdate deserializePre30(DataInputPlus in, int version, SerializationHelper.Flag flag, CFMetaData metadata, DecoratedKey dk) throws IOException
-        {
-            LegacyLayout.LegacyDeletionInfo info = LegacyLayout.LegacyDeletionInfo.serializer.deserialize(metadata, in, version);
-            int size = in.readInt();
-            Iterator<LegacyLayout.LegacyCell> cells = LegacyLayout.deserializeCells(metadata, in, flag, size);
-            SerializationHelper helper = new SerializationHelper(metadata, version, flag);
-            try (UnfilteredRowIterator iterator = LegacyLayout.onWireCellstoUnfilteredRowIterator(metadata, dk, info, cells, false, helper))
+            try (UnfilteredRowIterator iterator = LegacyLayout.deserializeLegacyPartition(in, version, flag, key))
             {
+                assert iterator != null; // This is only used in mutation, and mutation have never allowed "null" column families
                 return PartitionUpdate.fromIterator(iterator);
             }
         }
 
         public long serializedSize(PartitionUpdate update, int version)
         {
-            if (version < MessagingService.VERSION_30)
-            {
-                // TODO
-                throw new UnsupportedOperationException("Version is " + version);
-                //if (cf == null)
-                //{
-                //    return TypeSizes.sizeof(false);
-                //}
-                //else
-                //{
-                //    return TypeSizes.sizeof(true)  /* nullness bool */
-                //        + cfIdSerializedSize(cf.id(), typeSizes, version)  /* id */
-                //        + contentSerializedSize(cf, typeSizes, version);
-                //}
-            }
-
             try (UnfilteredRowIterator iter = update.sliceableUnfilteredIterator())
             {
+                if (version < MessagingService.VERSION_30)
+                    return LegacyLayout.serializedSizeAsLegacyPartition(iter, version);
+
                 return CFMetaData.serializer.serializedSize(update.metadata(), version)
                      + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, version, update.rows.size());
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index dd625c4..0418e7f 100644
--- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -22,13 +22,19 @@ import java.io.IOException;
 import java.security.MessageDigest;
 import java.util.*;
 
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Lists;
+
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.MergeIterator;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.*;
 
 /**
  * Static methods to work with partition iterators.
@@ -357,8 +363,7 @@ public abstract class UnfilteredPartitionIterators
     {
         public void serialize(UnfilteredPartitionIterator iter, DataOutputPlus out, int version) throws IOException
         {
-            if (version < MessagingService.VERSION_30)
-                throw new UnsupportedOperationException();
+            assert version >= MessagingService.VERSION_30; // We handle backward compatibility directy in ReadResponse.LegacyRangeSliceReplySerializer
 
             out.writeBoolean(iter.isForThrift());
             while (iter.hasNext())
@@ -374,9 +379,7 @@ public abstract class UnfilteredPartitionIterators
 
         public UnfilteredPartitionIterator deserialize(final DataInputPlus in, final int version, final CFMetaData metadata, final SerializationHelper.Flag flag) throws IOException
         {
-            if (version < MessagingService.VERSION_30)
-                throw new UnsupportedOperationException();
-
+            assert version >= MessagingService.VERSION_30; // We handle backward compatibility directy in ReadResponse.LegacyRangeSliceReplySerializer
             final boolean isForThrift = in.readBoolean();
 
             return new AbstractUnfilteredPartitionIterator()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java b/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java
index 7e9ceb8..8f9e921 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java
@@ -23,11 +23,13 @@ import java.util.function.Predicate;
 
 import com.google.common.base.Function;
 import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterators;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.ObjectSizes;
@@ -363,6 +365,11 @@ public class BTreeBackedRow extends AbstractRow
             ((ComplexColumnData) current).setValue(path, value);
     }
 
+    public Iterable<Cell> cellsInLegacyOrder(CFMetaData metadata)
+    {
+        return () -> new CellInLegacyOrderIterator(metadata);
+    }
+
     private class CellIterator extends AbstractIterator<Cell>
     {
         private Iterator<ColumnData> columnData = iterator();
@@ -392,6 +399,61 @@ public class BTreeBackedRow extends AbstractRow
         }
     }
 
+    private class CellInLegacyOrderIterator extends AbstractIterator<Cell>
+    {
+        private final AbstractType<?> comparator;
+        private final int firstComplexIdx;
+        private int simpleIdx;
+        private int complexIdx;
+        private Iterator<Cell> complexCells;
+        private final Object[] data;
+
+        private CellInLegacyOrderIterator(CFMetaData metadata)
+        {
+            this.comparator = metadata.getColumnDefinitionNameComparator(isStatic() ? ColumnDefinition.Kind.STATIC : ColumnDefinition.Kind.REGULAR);
+
+            // copy btree into array for simple separate iteration of simple and complex columns
+            this.data = new Object[BTree.size(btree)];
+            BTree.toArray(btree, data, 0);
+
+            int idx = Iterators.indexOf(Iterators.forArray(data), cd -> cd instanceof ComplexColumnData);
+            this.firstComplexIdx = idx < 0 ? data.length : idx;
+            this.complexIdx = firstComplexIdx;
+        }
+
+        protected Cell computeNext()
+        {
+            while (true)
+            {
+                if (complexCells != null)
+                {
+                    if (complexCells.hasNext())
+                        return complexCells.next();
+
+                    complexCells = null;
+                }
+
+                if (simpleIdx >= firstComplexIdx)
+                {
+                    if (complexIdx >= data.length)
+                        return endOfData();
+
+                    complexCells = ((ComplexColumnData)data[complexIdx++]).iterator();
+                }
+                else
+                {
+                    if (complexIdx >= data.length)
+                        return (Cell)data[simpleIdx++];
+
+                    if (comparator.compare(((ColumnData) data[simpleIdx]).column().name.bytes, ((ColumnData) data[complexIdx]).column().name.bytes) < 0)
+                        return (Cell)data[simpleIdx++];
+                    else
+                        complexCells = ((ComplexColumnData)data[complexIdx++]).iterator();
+                }
+            }
+        }
+    }
+
     public static class Builder implements Row.Builder
     {
         // a simple marker class that will sort to the beginning of a run of complex cells to store the deletion time

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/rows/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java
index 0c3dc2d..33ad447 100644
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -146,6 +146,18 @@ public interface Row extends Unfiltered, Iterable<ColumnData>
     public Iterable<Cell> cells();
 
     /**
+     * An iterable over the cells of this row that return cells in "legacy order".
+     * <p>
+     * In 3.0+, columns are sorted so that all simple columns are before all complex columns. Previously
+     * however, the cells where just sorted by the column name. This iterator return cells in that
+     * legacy order. It's only ever meaningful for backward/thrift compatibility code.
+     *
+     * @param metadata the table this is a row of.
+     * @return an iterable over the cells of this row in "legacy order".
+     */
+    public Iterable<Cell> cellsInLegacyOrder(CFMetaData metadata);
+
+    /**
      * Whether the row stores any (non-live) complex deletion for any complex column.
      */
     public boolean hasComplexDeletion();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 2c7932b..25eb0d0 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -210,8 +210,8 @@ public final class MessagingService implements MessagingServiceMBean
         put(Verb.BATCHLOG_MUTATION, Mutation.serializer);
         put(Verb.READ_REPAIR, Mutation.serializer);
         put(Verb.READ, ReadCommand.serializer);
-        //put(Verb.RANGE_SLICE, ReadCommand.legacyRangeSliceCommandSerializer);
-        //put(Verb.PAGED_RANGE, ReadCommand.legacyPagedRangeCommandSerializer);
+        put(Verb.RANGE_SLICE, ReadCommand.legacyRangeSliceCommandSerializer);
+        put(Verb.PAGED_RANGE, ReadCommand.legacyPagedRangeCommandSerializer);
         put(Verb.BOOTSTRAP_TOKEN, BootStrapper.StringSerializer.instance);
         put(Verb.REPAIR_MESSAGE, RepairMessage.serializer);
         put(Verb.GOSSIP_DIGEST_ACK, GossipDigestAck.serializer);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 487a14c..16a3e6e 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -99,7 +99,7 @@ public abstract class AbstractReadExecutor
                 traceState.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint);
             logger.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint);
             if (message == null)
-                message = readCommand.createMessage();
+                message = readCommand.createMessage(MessagingService.instance().getVersion(endpoint));
             MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
         }
 
@@ -277,7 +277,8 @@ public abstract class AbstractReadExecutor
                 if (traceState != null)
                     traceState.trace("speculating read retry on {}", extraReplica);
                 logger.trace("speculating read retry on {}", extraReplica);
-                MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), extraReplica, handler);
+                int version = MessagingService.instance().getVersion(extraReplica);
+                MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(version), extraReplica, handler);
                 speculated = true;
 
                 cfs.metric.speculativeRetries.inc();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index a1b5c96..6bfe94a 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -49,7 +49,7 @@ public class DataResolver extends ResponseResolver
     public PartitionIterator getData()
     {
         ReadResponse response = responses.iterator().next().payload;
-        return UnfilteredPartitionIterators.filter(response.makeIterator(command.metadata()), command.nowInSec());
+        return UnfilteredPartitionIterators.filter(response.makeIterator(command.metadata(), command), command.nowInSec());
     }
 
     public PartitionIterator resolve()
@@ -62,7 +62,7 @@ public class DataResolver extends ResponseResolver
         for (int i = 0; i < count; i++)
         {
             MessageIn<ReadResponse> msg = responses.get(i);
-            iters.add(msg.payload.makeIterator(command.metadata()));
+            iters.add(msg.payload.makeIterator(command.metadata(), command));
             sources[i] = msg.from;
         }
 
@@ -406,12 +406,12 @@ public class DataResolver extends ResponseResolver
                 if (StorageProxy.canDoLocalRequest(source))
                     StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
                 else
-                    MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), source, handler);
+                    MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version), source, handler);
 
                 // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results.
                 handler.awaitResults();
                 assert resolver.responses.size() == 1;
-                return UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command.metadata()), retryCommand);
+                return UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command.metadata(), command), retryCommand);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/service/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DigestResolver.java b/src/java/org/apache/cassandra/service/DigestResolver.java
index 42aee04..db8adf3 100644
--- a/src/java/org/apache/cassandra/service/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/DigestResolver.java
@@ -48,7 +48,7 @@ public class DigestResolver extends ResponseResolver
     public PartitionIterator getData()
     {
         assert isDataPresent();
-        return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command.metadata()), command.nowInSec());
+        return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command.metadata(), command), command.nowInSec());
     }
 
     /*
@@ -77,7 +77,7 @@ public class DigestResolver extends ResponseResolver
         {
             ReadResponse response = message.payload;
 
-            ByteBuffer newDigest = response.digest(command.metadata());
+            ByteBuffer newDigest = response.digest(command.metadata(), command);
             if (digest == null)
                 digest = newDigest;
             else if (!digest.equals(newDigest))
@@ -88,7 +88,7 @@ public class DigestResolver extends ResponseResolver
         if (logger.isDebugEnabled())
             logger.debug("resolve: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
 
-        return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command.metadata()), command.nowInSec());
+        return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command.metadata(), command), command.nowInSec());
     }
 
     public boolean isDataPresent()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index d548019..8b1ef32 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -238,9 +238,11 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
                 final DataResolver repairResolver = new DataResolver(keyspace, command, consistencyLevel, endpoints.size());
                 AsyncRepairCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size());
 
-                MessageOut<ReadCommand> message = command.createMessage();
                 for (InetAddress endpoint : endpoints)
+                {
+                    MessageOut<ReadCommand> message = command.createMessage(MessagingService.instance().getVersion(endpoint));
                     MessagingService.instance().sendRR(message, endpoint, repairHandler);
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 2c3c018..1e1f847 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1621,9 +1621,9 @@ public class StorageProxy implements StorageProxyMBean
                                                  keyspace,
                                                  executor.handler.endpoints);
 
-                MessageOut<ReadCommand> message = command.createMessage();
                 for (InetAddress endpoint : executor.getContactedReplicas())
                 {
+                    MessageOut<ReadCommand> message = command.createMessage(MessagingService.instance().getVersion(endpoint));
                     Tracing.trace("Enqueuing full data read to {}", endpoint);
                     MessagingService.instance().sendRRWithFailure(message, endpoint, repairHandler);
                 }
@@ -1974,9 +1974,9 @@ public class StorageProxy implements StorageProxyMBean
             }
             else
             {
-                MessageOut<ReadCommand> message = rangeCommand.createMessage();
                 for (InetAddress endpoint : toQuery.filteredEndpoints)
                 {
+                    MessageOut<ReadCommand> message = rangeCommand.createMessage(MessagingService.instance().getVersion(endpoint));
                     Tracing.trace("Enqueuing request to {}", endpoint);
                     MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
                 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index a8ac8bf..959f7e3 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -296,8 +296,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCHLOG_MUTATION, new MutationVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ_REPAIR, new ReadRepairVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ, new ReadCommandVerbHandler());
-        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.RANGE_SLICE, new ReadCommandVerbHandler());
-        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAGED_RANGE, new ReadCommandVerbHandler());
+        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.RANGE_SLICE, new RangeSliceVerbHandler());
+        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAGED_RANGE, new RangeSliceVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.COUNTER_MUTATION, new CounterMutationVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.TRUNCATE, new TruncateVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAXOS_PREPARE, new PrepareVerbHandler());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
index 169944b..2e57a8b 100644
--- a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java
@@ -47,7 +47,9 @@ public class RangeSliceQueryPager extends AbstractQueryPager
         if (state != null)
         {
             lastReturnedKey = command.metadata().decorateKey(state.partitionKey);
-            lastReturnedClustering = LegacyLayout.decodeClustering(command.metadata(), state.cellName);
+            lastReturnedClustering = state.cellName.hasRemaining()
+                                   ? LegacyLayout.decodeClustering(command.metadata(), state.cellName)
+                                   : null;
             restoreState(lastReturnedKey, state.remaining, state.remainingInPartition);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
index bb223b8..28c5206 100644
--- a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java
@@ -24,11 +24,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.exceptions.RequestValidationException;
-import org.apache.cassandra.exceptions.RequestExecutionException;
-import org.apache.cassandra.service.ClientState;
 
 /**
  * Common interface to single partition queries (by slice and by name).
@@ -50,7 +46,9 @@ public class SinglePartitionPager extends AbstractQueryPager
 
         if (state != null)
         {
-            lastReturned = LegacyLayout.decodeClustering(command.metadata(), state.cellName);
+            lastReturned = state.cellName.hasRemaining()
+                         ? LegacyLayout.decodeClustering(command.metadata(), state.cellName)
+                         : null;
             restoreState(command.partitionKey(), state.remaining, state.remainingInPartition);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 9353d16..733067e 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -237,7 +237,7 @@ public class CassandraServer implements Cassandra.Iface
         if (partition.isEmpty())
             return EMPTY_COLUMNS;
 
-        Iterator<LegacyLayout.LegacyCell> cells = LegacyLayout.fromRowIterator(partition);
+        Iterator<LegacyLayout.LegacyCell> cells = LegacyLayout.fromRowIterator(partition).right;
         List<ColumnOrSuperColumn> result;
         if (partition.metadata().isSuper())
         {
@@ -932,7 +932,7 @@ public class CassandraServer implements Cassandra.Iface
             {
                 return result == null
                      ? new CASResult(true)
-                     : new CASResult(false).setCurrent_values(thriftifyColumnsAsColumns(metadata, LegacyLayout.fromRowIterator(result)));
+                     : new CASResult(false).setCurrent_values(thriftifyColumnsAsColumns(metadata, LegacyLayout.fromRowIterator(result).right));
             }
         }
         catch (UnknownColumnException e)


[4/4] cassandra git commit: Merge branch 'cassandra-3.0' into trunk

Posted by ty...@apache.org.
Merge branch 'cassandra-3.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/288f2cf4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/288f2cf4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/288f2cf4

Branch: refs/heads/trunk
Commit: 288f2cf4fb8ed90c45511dc0e35b1bdbfbd5a41d
Parents: ed9343e 8c64cef
Author: Tyler Hobbs <ty...@gmail.com>
Authored: Fri Aug 7 17:44:28 2015 -0500
Committer: Tyler Hobbs <ty...@gmail.com>
Committed: Fri Aug 7 17:44:28 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cql3/statements/ModificationStatement.java  |   2 +-
 .../org/apache/cassandra/db/Clustering.java     |   8 +-
 src/java/org/apache/cassandra/db/DataRange.java |  28 +-
 .../org/apache/cassandra/db/LegacyLayout.java   | 922 +++++++++++++++--
 .../apache/cassandra/db/PartitionColumns.java   |   6 +
 .../cassandra/db/PartitionRangeReadCommand.java |  11 +
 .../cassandra/db/RangeSliceVerbHandler.java     |  29 +
 .../org/apache/cassandra/db/ReadCommand.java    | 995 ++++++++++++++++++-
 .../cassandra/db/ReadCommandVerbHandler.java    |   9 +-
 .../org/apache/cassandra/db/ReadResponse.java   | 250 ++++-
 .../db/SinglePartitionReadCommand.java          |  11 +-
 src/java/org/apache/cassandra/db/Slice.java     |  48 +-
 .../filter/AbstractClusteringIndexFilter.java   |  20 -
 .../db/filter/ClusteringIndexFilter.java        |  20 +
 .../db/filter/ClusteringIndexNamesFilter.java   |   4 +-
 .../db/filter/ClusteringIndexSliceFilter.java   |   4 +-
 .../cassandra/db/filter/ColumnFilter.java       |   3 +
 .../apache/cassandra/db/filter/DataLimits.java  |  12 +-
 .../db/marshal/AbstractCompositeType.java       |  32 -
 .../cassandra/db/marshal/CompositeType.java     |  26 +
 .../AbstractThreadUnsafePartition.java          |   2 +-
 .../db/partitions/PartitionUpdate.java          |  93 +-
 .../UnfilteredPartitionIterators.java           |  13 +-
 .../cassandra/db/rows/BTreeBackedRow.java       |  62 ++
 src/java/org/apache/cassandra/db/rows/Row.java  |  12 +
 .../apache/cassandra/net/MessagingService.java  |   4 +-
 .../cassandra/service/AbstractReadExecutor.java |   5 +-
 .../apache/cassandra/service/DataResolver.java  |   8 +-
 .../cassandra/service/DigestResolver.java       |   6 +-
 .../apache/cassandra/service/ReadCallback.java  |   4 +-
 .../apache/cassandra/service/StorageProxy.java  |   4 +-
 .../cassandra/service/StorageService.java       |   4 +-
 .../service/pager/RangeSliceQueryPager.java     |   4 +-
 .../service/pager/SinglePartitionPager.java     |   8 +-
 .../cassandra/thrift/CassandraServer.java       |   4 +-
 36 files changed, 2353 insertions(+), 321 deletions(-)
----------------------------------------------------------------------



[3/4] cassandra git commit: On-wire backward compatibility for 3.0

Posted by ty...@apache.org.
On-wire backward compatibility for 3.0

This adds support for mixed-version clusters with Cassandra 2.1
and 2.2.

Patch by Tyler Hobbs and Sylvain Lebresne for CASSANDRA-9704


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8c64cefd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8c64cefd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8c64cefd

Branch: refs/heads/trunk
Commit: 8c64cefd19d706003d4b33b333274dbf17c9cb34
Parents: 69f0b89
Author: Tyler Hobbs <ty...@gmail.com>
Authored: Fri Aug 7 17:42:18 2015 -0500
Committer: Tyler Hobbs <ty...@gmail.com>
Committed: Fri Aug 7 17:42:18 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cql3/statements/ModificationStatement.java  |   2 +-
 .../org/apache/cassandra/db/Clustering.java     |   8 +-
 src/java/org/apache/cassandra/db/DataRange.java |  28 +-
 .../org/apache/cassandra/db/LegacyLayout.java   | 922 +++++++++++++++--
 .../apache/cassandra/db/PartitionColumns.java   |   6 +
 .../cassandra/db/PartitionRangeReadCommand.java |  11 +
 .../cassandra/db/RangeSliceVerbHandler.java     |  29 +
 .../org/apache/cassandra/db/ReadCommand.java    | 995 ++++++++++++++++++-
 .../cassandra/db/ReadCommandVerbHandler.java    |   9 +-
 .../org/apache/cassandra/db/ReadResponse.java   | 250 ++++-
 .../db/SinglePartitionReadCommand.java          |  11 +-
 src/java/org/apache/cassandra/db/Slice.java     |  48 +-
 .../filter/AbstractClusteringIndexFilter.java   |  20 -
 .../db/filter/ClusteringIndexFilter.java        |  20 +
 .../db/filter/ClusteringIndexNamesFilter.java   |   4 +-
 .../db/filter/ClusteringIndexSliceFilter.java   |   4 +-
 .../cassandra/db/filter/ColumnFilter.java       |   3 +
 .../apache/cassandra/db/filter/DataLimits.java  |  12 +-
 .../db/marshal/AbstractCompositeType.java       |  32 -
 .../cassandra/db/marshal/CompositeType.java     |  26 +
 .../AbstractThreadUnsafePartition.java          |   2 +-
 .../db/partitions/PartitionUpdate.java          |  93 +-
 .../UnfilteredPartitionIterators.java           |  13 +-
 .../cassandra/db/rows/BTreeBackedRow.java       |  62 ++
 src/java/org/apache/cassandra/db/rows/Row.java  |  12 +
 .../apache/cassandra/net/MessagingService.java  |   4 +-
 .../cassandra/service/AbstractReadExecutor.java |   5 +-
 .../apache/cassandra/service/DataResolver.java  |   8 +-
 .../cassandra/service/DigestResolver.java       |   6 +-
 .../apache/cassandra/service/ReadCallback.java  |   4 +-
 .../apache/cassandra/service/StorageProxy.java  |   4 +-
 .../cassandra/service/StorageService.java       |   4 +-
 .../service/pager/RangeSliceQueryPager.java     |   4 +-
 .../service/pager/SinglePartitionPager.java     |   8 +-
 .../cassandra/thrift/CassandraServer.java       |   4 +-
 36 files changed, 2353 insertions(+), 321 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 216d3f7..0ba7b4e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.0-beta1
+ * Support mixed-version clusters with Cassandra 2.1 and 2.2 (CASSANDRA-9704)
  * Fix multiple slices on RowSearchers (CASSANDRA-10002)
  * Fix bug in merging of collections (CASSANDRA-10001)
  * Optimize batchlog replay to avoid full scans (CASSANDRA-7237)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 9f2c952..5fa1842 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -544,7 +544,7 @@ public abstract class ModificationStatement implements CQLStatement
                                                          key,
                                                          new ClusteringIndexNamesFilter(clusterings, false)));
 
-        Map<DecoratedKey, Partition> map = new HashMap();
+        Map<DecoratedKey, Partition> map = new HashMap<>();
 
         SinglePartitionReadCommand.Group group = new SinglePartitionReadCommand.Group(commands, DataLimits.NONE);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/Clustering.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Clustering.java b/src/java/org/apache/cassandra/db/Clustering.java
index 7754182..a29ce65 100644
--- a/src/java/org/apache/cassandra/db/Clustering.java
+++ b/src/java/org/apache/cassandra/db/Clustering.java
@@ -57,10 +57,16 @@ public class Clustering extends AbstractClusteringPrefix
         }
 
         @Override
-        public String toString(CFMetaData metadata)
+        public String toString()
         {
             return "STATIC";
         }
+
+        @Override
+        public String toString(CFMetaData metadata)
+        {
+            return toString();
+        }
     };
 
     /** Empty clustering for tables having no clustering columns. */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/DataRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java
index 79b2448..ffe041e 100644
--- a/src/java/org/apache/cassandra/db/DataRange.java
+++ b/src/java/org/apache/cassandra/db/DataRange.java
@@ -149,6 +149,16 @@ public class DataRange
     }
 
     /**
+     * Whether the data range is for a paged request or not.
+     *
+     * @return true if for paging, false otherwise
+     */
+    public boolean isPaging()
+    {
+        return false;
+    }
+
+    /**
      * Whether the range queried by this {@code DataRange} actually wraps around.
      *
      * @return whether the range queried by this {@code DataRange} actually wraps around.
@@ -307,7 +317,7 @@ public class DataRange
      * first queried partition (the one for that last result) so it only fetch results that follow that
      * last result. In other words, this makes sure this resume paging where we left off.
      */
-    private static class Paging extends DataRange
+    public static class Paging extends DataRange
     {
         private final ClusteringComparator comparator;
         private final Clustering lastReturned;
@@ -349,6 +359,20 @@ public class DataRange
                  : new DataRange(range, clusteringIndexFilter);
         }
 
+        /**
+         * @return the last Clustering that was returned (in the previous page)
+         */
+        public Clustering getLastReturned()
+        {
+            return lastReturned;
+        }
+
+        @Override
+        public boolean isPaging()
+        {
+            return true;
+        }
+
         @Override
         public boolean isUnrestricted()
         {
@@ -358,7 +382,7 @@ public class DataRange
         @Override
         public String toString(CFMetaData metadata)
         {
-            return String.format("range=%s pfilter=%s lastReturned=%s (%s)",
+            return String.format("range=%s (paging) pfilter=%s lastReturned=%s (%s)",
                                  keyRange.getString(metadata.getKeyValidator()),
                                  clusteringIndexFilter.toString(metadata),
                                  lastReturned.toString(metadata),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/LegacyLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java
index 696c1c9..50e5d04 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -25,10 +25,9 @@ import java.util.*;
 
 import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
 import com.google.common.collect.PeekingIterator;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.filter.ColumnFilter;
@@ -38,6 +37,7 @@ import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.*;
 
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
@@ -47,14 +47,14 @@ import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
  */
 public abstract class LegacyLayout
 {
-    private static final Logger logger = LoggerFactory.getLogger(LegacyLayout.class);
-
     public final static int MAX_CELL_NAME_LENGTH = FBUtilities.MAX_UNSIGNED_SHORT;
 
-    private final static int DELETION_MASK        = 0x01;
-    private final static int EXPIRATION_MASK      = 0x02;
-    private final static int COUNTER_MASK         = 0x04;
-    private final static int COUNTER_UPDATE_MASK  = 0x08;
+    public final static int STATIC_PREFIX = 0xFFFF;
+
+    public final static int DELETION_MASK        = 0x01;
+    public final static int EXPIRATION_MASK      = 0x02;
+    public final static int COUNTER_MASK         = 0x04;
+    public final static int COUNTER_UPDATE_MASK  = 0x08;
     private final static int RANGE_TOMBSTONE_MASK = 0x10;
 
     private LegacyLayout() {}
@@ -177,25 +177,69 @@ public abstract class LegacyLayout
         if (!bound.hasRemaining())
             return isStart ? LegacyBound.BOTTOM : LegacyBound.TOP;
 
-        List<ByteBuffer> components = metadata.isCompound()
-                                    ? CompositeType.splitName(bound)
-                                    : Collections.singletonList(bound);
+        List<CompositeType.CompositeComponent> components = metadata.isCompound()
+                                                          ? CompositeType.deconstruct(bound)
+                                                          : Collections.singletonList(new CompositeType.CompositeComponent(bound, (byte) 0));
 
         // Either it's a prefix of the clustering, or it's the bound of a collection range tombstone (and thus has
         // the collection column name)
         assert components.size() <= metadata.comparator.size() || (!metadata.isCompactTable() && components.size() == metadata.comparator.size() + 1);
 
-        List<ByteBuffer> prefix = components.size() <= metadata.comparator.size() ? components : components.subList(0, metadata.comparator.size());
-        Slice.Bound sb = Slice.Bound.create(isStart ? Slice.Bound.Kind.INCL_START_BOUND : Slice.Bound.Kind.INCL_END_BOUND,
-                                            prefix.toArray(new ByteBuffer[prefix.size()]));
+        List<CompositeType.CompositeComponent> prefix = components.size() <= metadata.comparator.size()
+                                                      ? components
+                                                      : components.subList(0, metadata.comparator.size());
+        Slice.Bound.Kind boundKind;
+        if (isStart)
+        {
+            if (components.get(components.size() - 1).eoc > 0)
+                boundKind = Slice.Bound.Kind.EXCL_START_BOUND;
+            else
+                boundKind = Slice.Bound.Kind.INCL_START_BOUND;
+        }
+        else
+        {
+            if (components.get(components.size() - 1).eoc < 0)
+                boundKind = Slice.Bound.Kind.EXCL_END_BOUND;
+            else
+                boundKind = Slice.Bound.Kind.INCL_END_BOUND;
+        }
+
+        ByteBuffer[] prefixValues = new ByteBuffer[prefix.size()];
+        for (int i = 0; i < prefix.size(); i++)
+            prefixValues[i] = prefix.get(i).value;
+        Slice.Bound sb = Slice.Bound.create(boundKind, prefixValues);
 
         ColumnDefinition collectionName = components.size() == metadata.comparator.size() + 1
-                                        ? metadata.getColumnDefinition(components.get(metadata.comparator.size()))
+                                        ? metadata.getColumnDefinition(components.get(metadata.comparator.size()).value)
                                         : null;
         return new LegacyBound(sb, metadata.isCompound() && CompositeType.isStaticName(bound), collectionName);
     }
 
-    public static ByteBuffer encodeCellName(CFMetaData metadata, Clustering clustering, ByteBuffer columnName, ByteBuffer collectionElement)
+    public static ByteBuffer encodeBound(CFMetaData metadata, Slice.Bound bound, boolean isStart)
+    {
+        if (bound == Slice.Bound.BOTTOM || bound == Slice.Bound.TOP || metadata.comparator.size() == 0)
+            return ByteBufferUtil.EMPTY_BYTE_BUFFER;
+
+        ClusteringPrefix clustering = bound.clustering();
+
+        if (!metadata.isCompound())
+        {
+            assert clustering.size() == 1;
+            return clustering.get(0);
+        }
+
+        CompositeType ctype = CompositeType.getInstance(metadata.comparator.subtypes());
+        CompositeType.Builder builder = ctype.builder();
+        for (int i = 0; i < clustering.size(); i++)
+            builder.add(clustering.get(i));
+
+        if (isStart)
+            return bound.isInclusive() ? builder.build() : builder.buildAsEndOfRange();
+        else
+            return bound.isInclusive() ? builder.buildAsEndOfRange() : builder.build();
+    }
+
+    public static ByteBuffer encodeCellName(CFMetaData metadata, ClusteringPrefix clustering, ByteBuffer columnName, ByteBuffer collectionElement)
     {
         boolean isStatic = clustering == Clustering.STATIC_CLUSTERING;
 
@@ -204,7 +248,7 @@ public abstract class LegacyLayout
             if (isStatic)
                 return columnName;
 
-            assert clustering.size() == 1;
+            assert clustering.size() == 1 : "Expected clustering size to be 1, but was " + clustering.size();
             return clustering.get(0);
         }
 
@@ -253,8 +297,11 @@ public abstract class LegacyLayout
         return new Clustering(components.subList(0, Math.min(csize, components.size())).toArray(new ByteBuffer[csize]));
     }
 
-    public static ByteBuffer encodeClustering(CFMetaData metadata, Clustering clustering)
+    public static ByteBuffer encodeClustering(CFMetaData metadata, ClusteringPrefix clustering)
     {
+        if (clustering.size() == 0)
+            return ByteBufferUtil.EMPTY_BYTE_BUFFER;
+
         if (!metadata.isCompound())
         {
             assert clustering.size() == 1;
@@ -268,14 +315,151 @@ public abstract class LegacyLayout
     }
 
     // For serializing to old wire format
-    public static Pair<DeletionInfo, Iterator<LegacyCell>> fromUnfilteredRowIterator(UnfilteredRowIterator iterator)
+    public static LegacyUnfilteredPartition fromUnfilteredRowIterator(UnfilteredRowIterator iterator)
     {
         // we need to extract the range tombstone so materialize the partition. Since this is
         // used for the on-wire format, this is not worst than it used to be.
         final ArrayBackedPartition partition = ArrayBackedPartition.create(iterator);
         DeletionInfo info = partition.deletionInfo();
-        Iterator<LegacyCell> cells = fromRowIterator(partition.metadata(), partition.iterator(), partition.staticRow());
-        return Pair.create(info, cells);
+        Pair<LegacyRangeTombstoneList, Iterator<LegacyCell>> pair = fromRowIterator(partition.metadata(), partition.iterator(), partition.staticRow());
+
+        LegacyLayout.LegacyRangeTombstoneList rtl = pair.left;
+
+        // Processing the cell iterator results in the LegacyRangeTombstoneList being populated, so we do this
+        // before we use the LegacyRangeTombstoneList at all
+        List<LegacyLayout.LegacyCell> cells = Lists.newArrayList(pair.right);
+
+        // The LegacyRangeTombstoneList already has range tombstones for the single-row deletions and complex
+        // deletions.  Go through our normal range tombstones and add then to the LegacyRTL so that the range
+        // tombstones all get merged and sorted properly.
+        if (info.hasRanges())
+        {
+            Iterator<RangeTombstone> rangeTombstoneIterator = info.rangeIterator(false);
+            while (rangeTombstoneIterator.hasNext())
+            {
+                RangeTombstone rt = rangeTombstoneIterator.next();
+                Slice slice = rt.deletedSlice();
+                LegacyLayout.LegacyBound start = new LegacyLayout.LegacyBound(slice.start(), false, null);
+                LegacyLayout.LegacyBound end = new LegacyLayout.LegacyBound(slice.end(), false, null);
+                rtl.add(start, end, rt.deletionTime().markedForDeleteAt(), rt.deletionTime().localDeletionTime());
+            }
+        }
+
+        return new LegacyUnfilteredPartition(info.getPartitionDeletion(), rtl, cells);
+    }
+
+    public static void serializeAsLegacyPartition(UnfilteredRowIterator partition, DataOutputPlus out, int version) throws IOException
+    {
+        assert version < MessagingService.VERSION_30;
+
+        out.writeBoolean(true);
+
+        LegacyLayout.LegacyUnfilteredPartition legacyPartition = LegacyLayout.fromUnfilteredRowIterator(partition);
+
+        UUIDSerializer.serializer.serialize(partition.metadata().cfId, out, version);
+        DeletionTime.serializer.serialize(legacyPartition.partitionDeletion, out);
+
+        legacyPartition.rangeTombstones.serialize(out, partition.metadata());
+
+        // begin cell serialization
+        out.writeInt(legacyPartition.cells.size());
+        for (LegacyLayout.LegacyCell cell : legacyPartition.cells)
+        {
+            ByteBufferUtil.writeWithShortLength(cell.name.encode(partition.metadata()), out);
+            if (cell.kind == LegacyLayout.LegacyCell.Kind.EXPIRING)
+            {
+                out.writeByte(LegacyLayout.EXPIRATION_MASK);  // serialization flags
+                out.writeInt(cell.ttl);
+                out.writeInt(cell.localDeletionTime);
+            }
+            else if (cell.kind == LegacyLayout.LegacyCell.Kind.DELETED)
+            {
+                out.writeByte(LegacyLayout.DELETION_MASK);  // serialization flags
+                out.writeLong(cell.timestamp);
+                out.writeInt(TypeSizes.sizeof(cell.localDeletionTime));
+                out.writeInt(cell.localDeletionTime);
+                continue;
+            }
+            else if (cell.kind == LegacyLayout.LegacyCell.Kind.COUNTER)
+            {
+                out.writeByte(LegacyLayout.COUNTER_MASK);  // serialization flags
+                out.writeLong(Long.MIN_VALUE);  // timestampOfLastDelete (not used, and MIN_VALUE is the default)
+            }
+            else
+            {
+                // normal cell
+                out.writeByte(0);  // serialization flags
+            }
+
+            out.writeLong(cell.timestamp);
+            ByteBufferUtil.writeWithLength(cell.value, out);
+        }
+    }
+
+    // For the old wire format
+    // Note: this can return null if an empty partition is serialized!
+    public static UnfilteredRowIterator deserializeLegacyPartition(DataInputPlus in, int version, SerializationHelper.Flag flag, ByteBuffer key) throws IOException
+    {
+        assert version < MessagingService.VERSION_30;
+
+        // This is only used in mutation, and mutation have never allowed "null" column families
+        boolean present = in.readBoolean();
+        if (!present)
+            return null;
+
+        CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
+        LegacyDeletionInfo info = LegacyDeletionInfo.deserialize(metadata, in);
+        int size = in.readInt();
+        Iterator<LegacyCell> cells = deserializeCells(metadata, in, flag, size);
+        SerializationHelper helper = new SerializationHelper(metadata, version, flag);
+        return onWireCellstoUnfilteredRowIterator(metadata, metadata.partitioner.decorateKey(key), info, cells, false, helper);
+    }
+
+    // For the old wire format
+    public static long serializedSizeAsLegacyPartition(UnfilteredRowIterator partition, int version)
+    {
+        assert version < MessagingService.VERSION_30;
+
+        if (partition.isEmpty())
+            return TypeSizes.sizeof(false);
+
+        long size = TypeSizes.sizeof(true);
+
+        LegacyLayout.LegacyUnfilteredPartition legacyPartition = LegacyLayout.fromUnfilteredRowIterator(partition);
+
+        size += UUIDSerializer.serializer.serializedSize(partition.metadata().cfId, version);
+        size += DeletionTime.serializer.serializedSize(legacyPartition.partitionDeletion);
+        size += legacyPartition.rangeTombstones.serializedSize(partition.metadata());
+
+        // begin cell serialization
+        size += TypeSizes.sizeof(legacyPartition.cells.size());
+        for (LegacyLayout.LegacyCell cell : legacyPartition.cells)
+        {
+            size += ByteBufferUtil.serializedSizeWithShortLength(cell.name.encode(partition.metadata()));
+            size += 1;  // serialization flags
+            if (cell.kind == LegacyLayout.LegacyCell.Kind.EXPIRING)
+            {
+                size += TypeSizes.sizeof(cell.ttl);
+                size += TypeSizes.sizeof(cell.localDeletionTime);
+            }
+            else if (cell.kind == LegacyLayout.LegacyCell.Kind.DELETED)
+            {
+                size += TypeSizes.sizeof(cell.timestamp);
+                // localDeletionTime replaces cell.value as the body
+                size += TypeSizes.sizeof(TypeSizes.sizeof(cell.localDeletionTime));
+                size += TypeSizes.sizeof(cell.localDeletionTime);
+                continue;
+            }
+            else if (cell.kind == LegacyLayout.LegacyCell.Kind.COUNTER)
+            {
+                size += TypeSizes.sizeof(Long.MIN_VALUE);  // timestampOfLastDelete
+            }
+
+            size += TypeSizes.sizeof(cell.timestamp);
+            size += ByteBufferUtil.serializedSizeWithLength(cell.value);
+        }
+
+        return size;
     }
 
     // For thrift sake
@@ -296,6 +480,7 @@ public abstract class LegacyLayout
                                                                            boolean reversed,
                                                                            SerializationHelper helper)
     {
+
         // If the table is a static compact, the "column_metadata" are now internally encoded as
         // static. This has already been recognized by decodeCellName, but it means the cells
         // provided are not in the expected order (the "static" cells are not necessarily at the front).
@@ -441,18 +626,27 @@ public abstract class LegacyLayout
         };
     }
 
-    public static Iterator<LegacyCell> fromRowIterator(final RowIterator iterator)
+    public static Pair<LegacyRangeTombstoneList, Iterator<LegacyCell>> fromRowIterator(final RowIterator iterator)
     {
         return fromRowIterator(iterator.metadata(), iterator, iterator.staticRow());
     }
 
-    public static Iterator<LegacyCell> fromRowIterator(final CFMetaData metadata, final Iterator<Row> iterator, final Row staticRow)
+    private static Pair<LegacyRangeTombstoneList, Iterator<LegacyCell>> fromRowIterator(final CFMetaData metadata, final Iterator<Row> iterator, final Row staticRow)
     {
-        return new AbstractIterator<LegacyCell>()
+        LegacyRangeTombstoneList deletions = new LegacyRangeTombstoneList(new LegacyBoundComparator(metadata.comparator), 10);
+        Iterator<LegacyCell> cells = new AbstractIterator<LegacyCell>()
         {
-            private Iterator<LegacyCell> currentRow = staticRow.isEmpty()
-                                                    ? Collections.<LegacyLayout.LegacyCell>emptyIterator()
-                                                    : fromRow(metadata, staticRow);
+            private Iterator<LegacyCell> currentRow = initializeRow();
+
+            private Iterator<LegacyCell> initializeRow()
+            {
+                if (staticRow == null || staticRow.isEmpty())
+                    return Collections.<LegacyLayout.LegacyCell>emptyIterator();
+
+                Pair<LegacyRangeTombstoneList, Iterator<LegacyCell>> row = fromRow(metadata, staticRow);
+                deletions.addAll(row.left);
+                return row.right;
+            }
 
             protected LegacyCell computeNext()
             {
@@ -462,17 +656,58 @@ public abstract class LegacyLayout
                 if (!iterator.hasNext())
                     return endOfData();
 
-                currentRow = fromRow(metadata, iterator.next());
+                Pair<LegacyRangeTombstoneList, Iterator<LegacyCell>> row = fromRow(metadata, iterator.next());
+                deletions.addAll(row.left);
+                currentRow = row.right;
                 return computeNext();
             }
         };
+
+        return Pair.create(deletions, cells);
     }
 
-    private static Iterator<LegacyCell> fromRow(final CFMetaData metadata, final Row row)
+    private static Pair<LegacyRangeTombstoneList, Iterator<LegacyCell>> fromRow(final CFMetaData metadata, final Row row)
     {
-        return new AbstractIterator<LegacyCell>()
+        // convert any complex deletions or row deletion into normal range tombstones so that we can build and send a proper RangeTombstoneList
+        // to legacy nodes
+        LegacyRangeTombstoneList deletions = new LegacyRangeTombstoneList(new LegacyBoundComparator(metadata.comparator), 10);
+
+        if (!row.deletion().isLive())
         {
-            private final Iterator<Cell> cells = row.cells().iterator();
+            Clustering clustering = row.clustering();
+            Slice.Bound startBound = Slice.Bound.inclusiveStartOf(clustering);
+            Slice.Bound endBound = Slice.Bound.inclusiveEndOf(clustering);
+
+            LegacyBound start = new LegacyLayout.LegacyBound(startBound, false, null);
+            LegacyBound end = new LegacyLayout.LegacyBound(endBound, false, null);
+
+            deletions.add(start, end, row.deletion().markedForDeleteAt(), row.deletion().localDeletionTime());
+        }
+
+        for (ColumnData cd : row)
+        {
+            ColumnDefinition col = cd.column();
+            if (col.isSimple())
+                continue;
+
+            DeletionTime delTime = ((ComplexColumnData)cd).complexDeletion();
+            if (!delTime.isLive())
+            {
+                Clustering clustering = row.clustering();
+
+                Slice.Bound startBound = Slice.Bound.inclusiveStartOf(clustering);
+                Slice.Bound endBound = Slice.Bound.inclusiveEndOf(clustering);
+
+                LegacyLayout.LegacyBound start = new LegacyLayout.LegacyBound(startBound, col.isStatic(), col);
+                LegacyLayout.LegacyBound end = new LegacyLayout.LegacyBound(endBound, col.isStatic(), col);
+
+                deletions.add(start, end, delTime.markedForDeleteAt(), delTime.localDeletionTime());
+            }
+        }
+
+        Iterator<LegacyCell> cells = new AbstractIterator<LegacyCell>()
+        {
+            private final Iterator<Cell> cells = row.cellsInLegacyOrder(metadata).iterator();
             // we don't have (and shouldn't have) row markers for compact tables.
             private boolean hasReturnedRowMarker = metadata.isCompactTable();
 
@@ -481,18 +716,24 @@ public abstract class LegacyLayout
                 if (!hasReturnedRowMarker)
                 {
                     hasReturnedRowMarker = true;
-                    LegacyCellName cellName = new LegacyCellName(row.clustering(), null, null);
-                    LivenessInfo info = row.primaryKeyLivenessInfo();
-                    return new LegacyCell(LegacyCell.Kind.REGULAR, cellName, ByteBufferUtil.EMPTY_BYTE_BUFFER, info.timestamp(), info.localExpirationTime(), info.ttl());
+
+                    // don't include a row marker if there's no timestamp on the primary key; this is the 3.0+ equivalent
+                    // of a row marker
+                    if (!row.primaryKeyLivenessInfo().isEmpty())
+                    {
+                        LegacyCellName cellName = new LegacyCellName(row.clustering(), null, null);
+                        LivenessInfo info = row.primaryKeyLivenessInfo();
+                        return new LegacyCell(info.isExpiring() ? LegacyCell.Kind.EXPIRING : LegacyCell.Kind.REGULAR, cellName, ByteBufferUtil.EMPTY_BYTE_BUFFER, info.timestamp(), info.localExpirationTime(), info.ttl());
+                    }
                 }
 
                 if (!cells.hasNext())
                     return endOfData();
 
-                Cell cell = cells.next();
-                return makeLegacyCell(row.clustering(), cell);
+                return makeLegacyCell(row.clustering(), cells.next());
             }
         };
+        return Pair.create(deletions, cells);
     }
 
     private static LegacyCell makeLegacyCell(Clustering clustering, Cell cell)
@@ -554,6 +795,9 @@ public abstract class LegacyLayout
         };
     }
 
+    // Note that this doesn't exactly compare cells as they were pre-3.0 because within a row they sort columns like
+    // in 3.0, that is, with simple columns before complex columns. In other words, this comparator makes sure cells
+    // are in the proper order to convert them to actual 3.0 rows.
     public static Comparator<LegacyCellName> legacyCellNameComparator(final CFMetaData metadata, final boolean reversed)
     {
         return new Comparator<LegacyCellName>()
@@ -591,13 +835,9 @@ public abstract class LegacyLayout
 
                     assert c1.column.isRegular() || c1.column.isStatic();
                     assert c2.column.isRegular() || c2.column.isStatic();
-                    if (c1.column.kind != c2.column.kind)
-                        return c1.column.isStatic() ? -1 : 1;
-
-                    AbstractType<?> cmp = metadata.getColumnDefinitionNameComparator(c1.column.kind);
-                    int c = cmp.compare(c1.column.name.bytes, c2.column.name.bytes);
-                    if (c != 0)
-                        return c;
+                    int cmp = c1.column.compareTo(c2.column);
+                    if (cmp != 0)
+                        return cmp;
                 }
 
                 assert (c1.collectionElement == null) == (c2.collectionElement == null);
@@ -748,13 +988,6 @@ public abstract class LegacyLayout
         }
     }
 
-    public static LegacyRangeTombstone readLegacyRangeTombstone(CFMetaData metadata, DataInputPlus in) throws IOException
-    {
-        ByteBuffer boundname = ByteBufferUtil.readWithShortLength(in);
-        in.readUnsignedByte();
-        return readLegacyRangeTombstoneBody(metadata, in, boundname);
-    }
-
     public static LegacyRangeTombstone readLegacyRangeTombstoneBody(CFMetaData metadata, DataInputPlus in, ByteBuffer boundname) throws IOException
     {
         LegacyBound min = decodeBound(metadata, boundname, true);
@@ -806,7 +1039,7 @@ public abstract class LegacyLayout
         public final CFMetaData metadata;
         private final boolean isStatic;
         private final SerializationHelper helper;
-        private Row.Builder builder;
+        private final Row.Builder builder;
         private Clustering clustering;
 
         private LegacyRangeTombstone rowDeletion;
@@ -822,7 +1055,11 @@ public abstract class LegacyLayout
             this.metadata = metadata;
             this.isStatic = isStatic;
             this.helper = helper;
-            this.builder = BTreeBackedRow.sortedBuilder(isStatic ? metadata.partitionColumns().statics : metadata.partitionColumns().regulars);
+            // We cannot use a sorted builder because we don't have exactly the same ordering in 3.0 and pre-3.0. More precisely, within a row, we
+            // store all simple columns before the complex ones in 3.0, which we use to sort everything sorted by the column name before. Note however
+            // that the unsorted builder won't have to reconcile cells, so the exact value we pass for nowInSec doesn't matter.
+            this.builder = BTreeBackedRow.unsortedBuilder(isStatic ? metadata.partitionColumns().statics : metadata.partitionColumns().regulars, FBUtilities.nowInSeconds());
+
         }
 
         public static CellGrouper staticGrouper(CFMetaData metadata, SerializationHelper helper)
@@ -939,6 +1176,21 @@ public abstract class LegacyLayout
         }
     }
 
+    public static class LegacyUnfilteredPartition
+    {
+        public final DeletionTime partitionDeletion;
+        public final LegacyRangeTombstoneList rangeTombstones;
+        public final List<LegacyCell> cells;
+
+        private LegacyUnfilteredPartition(DeletionTime partitionDeletion, LegacyRangeTombstoneList rangeTombstones, List<LegacyCell> cells)
+        {
+            this.partitionDeletion = partitionDeletion;
+            this.rangeTombstones = rangeTombstones;
+            this.cells = cells;
+        }
+    }
+
+
     public static class LegacyCellName
     {
         public final Clustering clustering;
@@ -987,7 +1239,7 @@ public abstract class LegacyLayout
         public final boolean isStatic;
         public final ColumnDefinition collectionName;
 
-        private LegacyBound(Slice.Bound bound, boolean isStatic, ColumnDefinition collectionName)
+        public LegacyBound(Slice.Bound bound, boolean isStatic, ColumnDefinition collectionName)
         {
             this.bound = bound;
             this.isStatic = isStatic;
@@ -1131,10 +1383,7 @@ public abstract class LegacyLayout
             if (isTombstone())
                 return false;
 
-            if (isExpiring())
-                return nowInSec < localDeletionTime;
-
-            return true;
+            return !isExpiring() || nowInSec < localDeletionTime;
         }
 
         @Override
@@ -1240,10 +1489,8 @@ public abstract class LegacyLayout
 
     public static class LegacyDeletionInfo
     {
-        public static final Serializer serializer = new Serializer();
-
         public final DeletionInfo deletionInfo;
-        private final List<LegacyRangeTombstone> inRowTombstones;
+        public final List<LegacyRangeTombstone> inRowTombstones;
 
         private LegacyDeletionInfo(DeletionInfo deletionInfo, List<LegacyRangeTombstone> inRowTombstones)
         {
@@ -1253,7 +1500,17 @@ public abstract class LegacyLayout
 
         public static LegacyDeletionInfo from(DeletionInfo info)
         {
-            return new LegacyDeletionInfo(info, Collections.<LegacyRangeTombstone>emptyList());
+            List<LegacyRangeTombstone> rangeTombstones = new ArrayList<>(info.rangeCount());
+            Iterator<RangeTombstone> iterator = info.rangeIterator(false);
+            while (iterator.hasNext())
+            {
+                RangeTombstone rt = iterator.next();
+                Slice slice = rt.deletedSlice();
+                rangeTombstones.add(new LegacyRangeTombstone(new LegacyBound(slice.start(), false, null),
+                                                             new LegacyBound(slice.end(), false, null),
+                                                             rt.deletionTime()));
+            }
+            return new LegacyDeletionInfo(info, rangeTombstones);
         }
 
         public static LegacyDeletionInfo live()
@@ -1266,47 +1523,536 @@ public abstract class LegacyLayout
             return inRowTombstones.iterator();
         }
 
-        public static class Serializer
+        public static LegacyDeletionInfo deserialize(CFMetaData metadata, DataInputPlus in) throws IOException
         {
-            public void serialize(CFMetaData metadata, LegacyDeletionInfo info, DataOutputPlus out, int version) throws IOException
+            DeletionTime topLevel = DeletionTime.serializer.deserialize(in);
+
+            int rangeCount = in.readInt();
+            if (rangeCount == 0)
+                return from(new MutableDeletionInfo(topLevel));
+
+            RangeTombstoneList ranges = new RangeTombstoneList(metadata.comparator, rangeCount);
+            List<LegacyRangeTombstone> inRowTombsones = new ArrayList<>();
+            for (int i = 0; i < rangeCount; i++)
             {
-                throw new UnsupportedOperationException();
-                //DeletionTime.serializer.serialize(info.topLevel, out);
-                //rtlSerializer.serialize(info.ranges, out, version);
+                LegacyBound start = decodeBound(metadata, ByteBufferUtil.readWithShortLength(in), true);
+                LegacyBound end = decodeBound(metadata, ByteBufferUtil.readWithShortLength(in), false);
+                int delTime =  in.readInt();
+                long markedAt = in.readLong();
+
+                LegacyRangeTombstone tombstone = new LegacyRangeTombstone(start, end, new DeletionTime(markedAt, delTime));
+                if (tombstone.isCollectionTombstone() || tombstone.isRowDeletion(metadata))
+                    inRowTombsones.add(tombstone);
+                else
+                    ranges.add(start.bound, end.bound, markedAt, delTime);
+            }
+            return new LegacyDeletionInfo(new MutableDeletionInfo(topLevel, ranges), inRowTombsones);
+        }
+    }
+
+    /**
+     * A helper class for LegacyRangeTombstoneList.  This replaces the Comparator<Composite> that RTL used before 3.0.
+     */
+    private static class LegacyBoundComparator implements Comparator<LegacyBound>
+    {
+        ClusteringComparator clusteringComparator;
+
+        public LegacyBoundComparator(ClusteringComparator clusteringComparator)
+        {
+            this.clusteringComparator = clusteringComparator;
+        }
+
+        public int compare(LegacyBound a, LegacyBound b)
+        {
+            int result = this.clusteringComparator.compare(a.bound, b.bound);
+            if (result != 0)
+                return result;
+
+            return UTF8Type.instance.compare(a.collectionName.name.bytes, b.collectionName.name.bytes);
+        }
+    }
+
+    /**
+     * Almost an entire copy of RangeTombstoneList from C* 2.1.  The main difference is that LegacyBoundComparator
+     * is used in place of Comparator<Composite> (because Composite doesn't exist any more).
+     *
+     * This class is needed to allow us to convert single-row deletions and complex deletions into range tombstones
+     * and properly merge them into the normal set of range tombstones.
+     */
+    public static class LegacyRangeTombstoneList
+    {
+        private final LegacyBoundComparator comparator;
+
+        // Note: we don't want to use a List for the markedAts and delTimes to avoid boxing. We could
+        // use a List for starts and ends, but having arrays everywhere is almost simpler.
+        private LegacyBound[] starts;
+        private LegacyBound[] ends;
+        private long[] markedAts;
+        private int[] delTimes;
+
+        private int size;
+
+        private LegacyRangeTombstoneList(LegacyBoundComparator comparator, LegacyBound[] starts, LegacyBound[] ends, long[] markedAts, int[] delTimes, int size)
+        {
+            assert starts.length == ends.length && starts.length == markedAts.length && starts.length == delTimes.length;
+            this.comparator = comparator;
+            this.starts = starts;
+            this.ends = ends;
+            this.markedAts = markedAts;
+            this.delTimes = delTimes;
+            this.size = size;
+        }
+
+        public LegacyRangeTombstoneList(LegacyBoundComparator comparator, int capacity)
+        {
+            this(comparator, new LegacyBound[capacity], new LegacyBound[capacity], new long[capacity], new int[capacity], 0);
+        }
+
+        public boolean isEmpty()
+        {
+            return size == 0;
+        }
+
+        public int size()
+        {
+            return size;
+        }
+
+        /**
+         * Adds a new range tombstone.
+         *
+         * This method will be faster if the new tombstone sort after all the currently existing ones (this is a common use case),
+         * but it doesn't assume it.
+         */
+        public void add(LegacyBound start, LegacyBound end, long markedAt, int delTime)
+        {
+            if (isEmpty())
+            {
+                addInternal(0, start, end, markedAt, delTime);
+                return;
             }
 
-            public LegacyDeletionInfo deserialize(CFMetaData metadata, DataInputPlus in, int version) throws IOException
+            int c = comparator.compare(ends[size-1], start);
+
+            // Fast path if we add in sorted order
+            if (c <= 0)
+            {
+                addInternal(size, start, end, markedAt, delTime);
+            }
+            else
             {
-                DeletionTime topLevel = DeletionTime.serializer.deserialize(in);
+                // Note: insertFrom expect i to be the insertion point in term of interval ends
+                int pos = Arrays.binarySearch(ends, 0, size, start, comparator);
+                insertFrom((pos >= 0 ? pos : -pos-1), start, end, markedAt, delTime);
+            }
+        }
 
-                int rangeCount = in.readInt();
-                if (rangeCount == 0)
-                    return from(new MutableDeletionInfo(topLevel));
+        /*
+         * Inserts a new element starting at index i. This method assumes that:
+         *    ends[i-1] <= start <= ends[i]
+         *
+         * A RangeTombstoneList is a list of range [s_0, e_0]...[s_n, e_n] such that:
+         *   - s_i <= e_i
+         *   - e_i <= s_i+1
+         *   - if s_i == e_i and e_i == s_i+1 then s_i+1 < e_i+1
+         * Basically, range are non overlapping except for their bound and in order. And while
+         * we allow ranges with the same value for the start and end, we don't allow repeating
+         * such range (so we can't have [0, 0][0, 0] even though it would respect the first 2
+         * conditions).
+         *
+         */
+
+        /**
+         * Adds all the range tombstones of {@code tombstones} to this RangeTombstoneList.
+         */
+        public void addAll(LegacyRangeTombstoneList tombstones)
+        {
+            if (tombstones.isEmpty())
+                return;
+
+            if (isEmpty())
+            {
+                copyArrays(tombstones, this);
+                return;
+            }
 
-                RangeTombstoneList ranges = new RangeTombstoneList(metadata.comparator, rangeCount);
-                List<LegacyRangeTombstone> inRowTombsones = new ArrayList<>();
-                for (int i = 0; i < rangeCount; i++)
+            /*
+             * We basically have 2 techniques we can use here: either we repeatedly call add() on tombstones values,
+             * or we do a merge of both (sorted) lists. If this lists is bigger enough than the one we add, then
+             * calling add() will be faster, otherwise it's merging that will be faster.
+             *
+             * Let's note that during memtables updates, it might not be uncommon that a new update has only a few range
+             * tombstones, while the CF we're adding it to (the one in the memtable) has many. In that case, using add() is
+             * likely going to be faster.
+             *
+             * In other cases however, like when diffing responses from multiple nodes, the tombstone lists we "merge" will
+             * be likely sized, so using add() might be a bit inefficient.
+             *
+             * Roughly speaking (this ignore the fact that updating an element is not exactly constant but that's not a big
+             * deal), if n is the size of this list and m is tombstones size, merging is O(n+m) while using add() is O(m*log(n)).
+             *
+             * But let's not crank up a logarithm computation for that. Long story short, merging will be a bad choice only
+             * if this list size is lot bigger that the other one, so let's keep it simple.
+             */
+            if (size > 10 * tombstones.size)
+            {
+                for (int i = 0; i < tombstones.size; i++)
+                    add(tombstones.starts[i], tombstones.ends[i], tombstones.markedAts[i], tombstones.delTimes[i]);
+            }
+            else
+            {
+                int i = 0;
+                int j = 0;
+                while (i < size && j < tombstones.size)
                 {
-                    LegacyBound start = decodeBound(metadata, ByteBufferUtil.readWithShortLength(in), true);
-                    LegacyBound end = decodeBound(metadata, ByteBufferUtil.readWithShortLength(in), false);
-                    int delTime =  in.readInt();
-                    long markedAt = in.readLong();
-
-                    LegacyRangeTombstone tombstone = new LegacyRangeTombstone(start, end, new DeletionTime(markedAt, delTime));
-                    if (tombstone.isCollectionTombstone() || tombstone.isRowDeletion(metadata))
-                        inRowTombsones.add(tombstone);
+                    if (comparator.compare(tombstones.starts[j], ends[i]) <= 0)
+                    {
+                        insertFrom(i, tombstones.starts[j], tombstones.ends[j], tombstones.markedAts[j], tombstones.delTimes[j]);
+                        j++;
+                    }
                     else
-                        ranges.add(start.bound, end.bound, markedAt, delTime);
+                    {
+                        i++;
+                    }
                 }
-                return new LegacyDeletionInfo(new MutableDeletionInfo(topLevel, ranges), inRowTombsones);
+                // Addds the remaining ones from tombstones if any (note that addInternal will increment size if relevant).
+                for (; j < tombstones.size; j++)
+                    addInternal(size, tombstones.starts[j], tombstones.ends[j], tombstones.markedAts[j], tombstones.delTimes[j]);
             }
+        }
 
-            public long serializedSize(CFMetaData metadata, LegacyDeletionInfo info, TypeSizes typeSizes, int version)
+        private static void copyArrays(LegacyRangeTombstoneList src, LegacyRangeTombstoneList dst)
+        {
+            dst.grow(src.size);
+            System.arraycopy(src.starts, 0, dst.starts, 0, src.size);
+            System.arraycopy(src.ends, 0, dst.ends, 0, src.size);
+            System.arraycopy(src.markedAts, 0, dst.markedAts, 0, src.size);
+            System.arraycopy(src.delTimes, 0, dst.delTimes, 0, src.size);
+            dst.size = src.size;
+        }
+
+        private void insertFrom(int i, LegacyBound start, LegacyBound end, long markedAt, int delTime)
+        {
+            while (i < size)
             {
-                throw new UnsupportedOperationException();
-                //long size = DeletionTime.serializer.serializedSize(info.topLevel, typeSizes);
-                //return size + rtlSerializer.serializedSize(info.ranges, typeSizes, version);
+                assert i == 0 || comparator.compare(ends[i-1], start) <= 0;
+
+                int c = comparator.compare(start, ends[i]);
+                assert c <= 0;
+                if (c == 0)
+                {
+                    // If start == ends[i], then we can insert from the next one (basically the new element
+                    // really start at the next element), except for the case where starts[i] == ends[i].
+                    // In this latter case, if we were to move to next element, we could end up with ...[x, x][x, x]...
+                    if (comparator.compare(starts[i], ends[i]) == 0)
+                    {
+                        // The current element cover a single value which is equal to the start of the inserted
+                        // element. If the inserted element overwrites the current one, just remove the current
+                        // (it's included in what we insert) and proceed with the insert.
+                        if (markedAt > markedAts[i])
+                        {
+                            removeInternal(i);
+                            continue;
+                        }
+
+                        // Otherwise (the current singleton interval override the new one), we want to leave the
+                        // current element and move to the next, unless start == end since that means the new element
+                        // is in fact fully covered by the current one (so we're done)
+                        if (comparator.compare(start, end) == 0)
+                            return;
+                    }
+                    i++;
+                    continue;
+                }
+
+                // Do we overwrite the current element?
+                if (markedAt > markedAts[i])
+                {
+                    // We do overwrite.
+
+                    // First deal with what might come before the newly added one.
+                    if (comparator.compare(starts[i], start) < 0)
+                    {
+                        addInternal(i, starts[i], start, markedAts[i], delTimes[i]);
+                        i++;
+                        // We don't need to do the following line, but in spirit that's what we want to do
+                        // setInternal(i, start, ends[i], markedAts, delTime])
+                    }
+
+                    // now, start <= starts[i]
+
+                    // Does the new element stops before/at the current one,
+                    int endCmp = comparator.compare(end, starts[i]);
+                    if (endCmp <= 0)
+                    {
+                        // Here start <= starts[i] and end <= starts[i]
+                        // This means the current element is before the current one. However, one special
+                        // case is if end == starts[i] and starts[i] == ends[i]. In that case,
+                        // the new element entirely overwrite the current one and we can just overwrite
+                        if (endCmp == 0 && comparator.compare(starts[i], ends[i]) == 0)
+                            setInternal(i, start, end, markedAt, delTime);
+                        else
+                            addInternal(i, start, end, markedAt, delTime);
+                        return;
+                    }
+
+                    // Do we overwrite the current element fully?
+                    int cmp = comparator.compare(ends[i], end);
+                    if (cmp <= 0)
+                    {
+                        // We do overwrite fully:
+                        // update the current element until it's end and continue
+                        // on with the next element (with the new inserted start == current end).
+
+                        // If we're on the last element, we can optimize
+                        if (i == size-1)
+                        {
+                            setInternal(i, start, end, markedAt, delTime);
+                            return;
+                        }
+
+                        setInternal(i, start, ends[i], markedAt, delTime);
+                        if (cmp == 0)
+                            return;
+
+                        start = ends[i];
+                        i++;
+                    }
+                    else
+                    {
+                        // We don't ovewrite fully. Insert the new interval, and then update the now next
+                        // one to reflect the not overwritten parts. We're then done.
+                        addInternal(i, start, end, markedAt, delTime);
+                        i++;
+                        setInternal(i, end, ends[i], markedAts[i], delTimes[i]);
+                        return;
+                    }
+                }
+                else
+                {
+                    // we don't overwrite the current element
+
+                    // If the new interval starts before the current one, insert that new interval
+                    if (comparator.compare(start, starts[i]) < 0)
+                    {
+                        // If we stop before the start of the current element, just insert the new
+                        // interval and we're done; otherwise insert until the beginning of the
+                        // current element
+                        if (comparator.compare(end, starts[i]) <= 0)
+                        {
+                            addInternal(i, start, end, markedAt, delTime);
+                            return;
+                        }
+                        addInternal(i, start, starts[i], markedAt, delTime);
+                        i++;
+                    }
+
+                    // After that, we're overwritten on the current element but might have
+                    // some residual parts after ...
+
+                    // ... unless we don't extend beyond it.
+                    if (comparator.compare(end, ends[i]) <= 0)
+                        return;
+
+                    start = ends[i];
+                    i++;
+                }
+            }
+
+            // If we got there, then just insert the remainder at the end
+            addInternal(i, start, end, markedAt, delTime);
+        }
+        private int capacity()
+        {
+            return starts.length;
+        }
+
+        private void addInternal(int i, LegacyBound start, LegacyBound end, long markedAt, int delTime)
+        {
+            assert i >= 0;
+
+            if (size == capacity())
+                growToFree(i);
+            else if (i < size)
+                moveElements(i);
+
+            setInternal(i, start, end, markedAt, delTime);
+            size++;
+        }
+
+        private void removeInternal(int i)
+        {
+            assert i >= 0;
+
+            System.arraycopy(starts, i+1, starts, i, size - i - 1);
+            System.arraycopy(ends, i+1, ends, i, size - i - 1);
+            System.arraycopy(markedAts, i+1, markedAts, i, size - i - 1);
+            System.arraycopy(delTimes, i+1, delTimes, i, size - i - 1);
+
+            --size;
+            starts[size] = null;
+            ends[size] = null;
+        }
+
+        /*
+         * Grow the arrays, leaving index i "free" in the process.
+         */
+        private void growToFree(int i)
+        {
+            int newLength = (capacity() * 3) / 2 + 1;
+            grow(i, newLength);
+        }
+
+        /*
+         * Grow the arrays to match newLength capacity.
+         */
+        private void grow(int newLength)
+        {
+            if (capacity() < newLength)
+                grow(-1, newLength);
+        }
+
+        private void grow(int i, int newLength)
+        {
+            starts = grow(starts, size, newLength, i);
+            ends = grow(ends, size, newLength, i);
+            markedAts = grow(markedAts, size, newLength, i);
+            delTimes = grow(delTimes, size, newLength, i);
+        }
+
+        private static LegacyBound[] grow(LegacyBound[] a, int size, int newLength, int i)
+        {
+            if (i < 0 || i >= size)
+                return Arrays.copyOf(a, newLength);
+
+            LegacyBound[] newA = new LegacyBound[newLength];
+            System.arraycopy(a, 0, newA, 0, i);
+            System.arraycopy(a, i, newA, i+1, size - i);
+            return newA;
+        }
+
+        private static long[] grow(long[] a, int size, int newLength, int i)
+        {
+            if (i < 0 || i >= size)
+                return Arrays.copyOf(a, newLength);
+
+            long[] newA = new long[newLength];
+            System.arraycopy(a, 0, newA, 0, i);
+            System.arraycopy(a, i, newA, i+1, size - i);
+            return newA;
+        }
+
+        private static int[] grow(int[] a, int size, int newLength, int i)
+        {
+            if (i < 0 || i >= size)
+                return Arrays.copyOf(a, newLength);
+
+            int[] newA = new int[newLength];
+            System.arraycopy(a, 0, newA, 0, i);
+            System.arraycopy(a, i, newA, i+1, size - i);
+            return newA;
+        }
+
+        /*
+         * Move elements so that index i is "free", assuming the arrays have at least one free slot at the end.
+         */
+        private void moveElements(int i)
+        {
+            if (i >= size)
+                return;
+
+            System.arraycopy(starts, i, starts, i+1, size - i);
+            System.arraycopy(ends, i, ends, i+1, size - i);
+            System.arraycopy(markedAts, i, markedAts, i+1, size - i);
+            System.arraycopy(delTimes, i, delTimes, i+1, size - i);
+            // we set starts[i] to null to indicate the position is now empty, so that we update boundaryHeapSize
+            // when we set it
+            starts[i] = null;
+        }
+
+        private void setInternal(int i, LegacyBound start, LegacyBound end, long markedAt, int delTime)
+        {
+            starts[i] = start;
+            ends[i] = end;
+            markedAts[i] = markedAt;
+            delTimes[i] = delTime;
+        }
+
+        public void serialize(DataOutputPlus out, CFMetaData metadata) throws IOException
+        {
+            out.writeInt(size);
+            if (size == 0)
+                return;
+
+            List<AbstractType<?>> types = new ArrayList<>(comparator.clusteringComparator.subtypes());
+            if (!metadata.isDense())
+                types.add(UTF8Type.instance);
+            CompositeType type = CompositeType.getInstance(types);
+
+            for (int i = 0; i < size; i++)
+            {
+                LegacyBound start = starts[i];
+                LegacyBound end = ends[i];
+
+                CompositeType.Builder startBuilder = type.builder();
+                CompositeType.Builder endBuilder = type.builder();
+                for (int j = 0; j < start.bound.clustering().size(); j++)
+                {
+                    startBuilder.add(start.bound.get(j));
+                    endBuilder.add(end.bound.get(j));
+                }
+
+                if (start.collectionName != null)
+                    startBuilder.add(start.collectionName.name.bytes);
+                if (end.collectionName != null)
+                    endBuilder.add(end.collectionName.name.bytes);
+
+                ByteBufferUtil.writeWithShortLength(startBuilder.build(), out);
+                ByteBufferUtil.writeWithShortLength(endBuilder.buildAsEndOfRange(), out);
+
+                out.writeInt(delTimes[i]);
+                out.writeLong(markedAts[i]);
+            }
+        }
+
+        public long serializedSize(CFMetaData metadata)
+        {
+            long size = 0;
+            size += TypeSizes.sizeof(this.size);
+
+            if (this.size == 0)
+                return size;
+
+            List<AbstractType<?>> types = new ArrayList<>(comparator.clusteringComparator.subtypes());
+            if (!metadata.isDense())
+                types.add(UTF8Type.instance);
+            CompositeType type = CompositeType.getInstance(types);
+
+            for (int i = 0; i < this.size; i++)
+            {
+                LegacyBound start = starts[i];
+                LegacyBound end = ends[i];
+
+                CompositeType.Builder startBuilder = type.builder();
+                CompositeType.Builder endBuilder = type.builder();
+                for (int j = 0; j < start.bound.clustering().size(); j++)
+                {
+                    startBuilder.add(start.bound.get(j));
+                    endBuilder.add(end.bound.get(j));
+                }
+
+                if (start.collectionName != null)
+                    startBuilder.add(start.collectionName.name.bytes);
+                if (end.collectionName != null)
+                    endBuilder.add(end.collectionName.name.bytes);
+
+                size += ByteBufferUtil.serializedSizeWithShortLength(startBuilder.build());
+                size += ByteBufferUtil.serializedSizeWithShortLength(endBuilder.buildAsEndOfRange());
+
+                size += TypeSizes.sizeof(delTimes[i]);
+                size += TypeSizes.sizeof(markedAts[i]);
             }
+            return size;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/PartitionColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionColumns.java b/src/java/org/apache/cassandra/db/PartitionColumns.java
index 5f1da8a..aa60198 100644
--- a/src/java/org/apache/cassandra/db/PartitionColumns.java
+++ b/src/java/org/apache/cassandra/db/PartitionColumns.java
@@ -91,6 +91,12 @@ public class PartitionColumns implements Iterable<ColumnDefinition>
         return Iterators.concat(statics.selectOrderIterator(), regulars.selectOrderIterator());
     }
 
+    /** * Returns the total number of static and regular columns. */
+    public int size()
+    {
+        return regulars.columnCount() + statics.columnCount();
+    }
+
     @Override
     public String toString()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 18b6950..2219a84 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -37,6 +37,8 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.*;
 import org.apache.cassandra.service.pager.*;
 import org.apache.cassandra.thrift.ThriftResultsMerger;
@@ -226,6 +228,15 @@ public class PartitionRangeReadCommand extends ReadCommand
         };
     }
 
+    @SuppressWarnings("deprecation")
+    protected MessageOut<ReadCommand> createLegacyMessage()
+    {
+        if (this.dataRange.isPaging())
+            return new MessageOut<>(MessagingService.Verb.PAGED_RANGE, this, legacyPagedRangeCommandSerializer);
+        else
+            return new MessageOut<>(MessagingService.Verb.RANGE_SLICE, this, legacyRangeSliceCommandSerializer);
+    }
+
     protected void appendCQLWhereClause(StringBuilder sb)
     {
         if (dataRange.isUnrestricted() && rowFilter().isEmpty())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java
new file mode 100644
index 0000000..3f1d660
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+
+public class RangeSliceVerbHandler extends ReadCommandVerbHandler
+{
+    @Override
+    protected IVersionedSerializer<ReadResponse> serializer()
+    {
+        return ReadResponse.legacyRangeSliceReplySerializer;
+    }
+}


[2/4] cassandra git commit: On-wire backward compatibility for 3.0

Posted by ty...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index c3f036a..913a1de 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -18,18 +18,24 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
-import java.util.Iterator;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.Lists;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.Operator;
 import org.apache.cassandra.db.index.SecondaryIndexSearcher;
 import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -37,7 +43,10 @@ import org.apache.cassandra.metrics.TableMetrics;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
 
 /**
  * General interface for storage-engine read commands (common to both range and
@@ -51,6 +60,10 @@ public abstract class ReadCommand implements ReadQuery
 
     public static final IVersionedSerializer<ReadCommand> serializer = new Serializer();
 
+    public static final IVersionedSerializer<ReadCommand> legacyRangeSliceCommandSerializer = new LegacyRangeSliceCommandSerializer();
+    public static final IVersionedSerializer<ReadCommand> legacyPagedRangeCommandSerializer = new LegacyPagedRangeCommandSerializer();
+    public static final IVersionedSerializer<ReadCommand> legacyReadCommandSerializer = new LegacyReadCommandSerializer();
+
     private final Kind kind;
     private final CFMetaData metadata;
     private final int nowInSec;
@@ -72,9 +85,9 @@ public abstract class ReadCommand implements ReadQuery
         SINGLE_PARTITION (SinglePartitionReadCommand.selectionDeserializer),
         PARTITION_RANGE  (PartitionRangeReadCommand.selectionDeserializer);
 
-        private SelectionDeserializer selectionDeserializer;
+        private final SelectionDeserializer selectionDeserializer;
 
-        private Kind(SelectionDeserializer selectionDeserializer)
+        Kind(SelectionDeserializer selectionDeserializer)
         {
             this.selectionDeserializer = selectionDeserializer;
         }
@@ -251,8 +264,6 @@ public abstract class ReadCommand implements ReadQuery
     /**
      * Executes this command on the local host.
      *
-     * @param cfs the store for the table queried by this command.
-     *
      * @return an iterator over the result of executing this command locally.
      */
     @SuppressWarnings("resource") // The result iterator is closed upon exceptions (we know it's fine to potentially not close the intermediary
@@ -281,7 +292,7 @@ public abstract class ReadCommand implements ReadQuery
             // we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it
             // would be more efficient (the sooner we discard stuff we know we don't care, the less useless
             // processing we do on it).
-            return limits().filter(rowFilter().filter(resultIterator, nowInSec()), nowInSec());
+            return limits().filter(updatedFilter.filter(resultIterator, nowInSec()), nowInSec());
         }
         catch (RuntimeException | Error e)
         {
@@ -389,7 +400,7 @@ public abstract class ReadCommand implements ReadQuery
                         logger.warn(msg);
                     }
 
-                    Tracing.trace("Read {} live and {} tombstone cells{}", new Object[]{ liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : "") });
+                    Tracing.trace("Read {} live and {} tombstone cells{}", liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : ""));
                 }
             }
         };
@@ -398,12 +409,16 @@ public abstract class ReadCommand implements ReadQuery
     /**
      * Creates a message for this command.
      */
-    public MessageOut<ReadCommand> createMessage()
+    public MessageOut<ReadCommand> createMessage(int version)
     {
-        // TODO: we should use different verbs for old message (RANGE_SLICE, PAGED_RANGE)
-        return new MessageOut<>(MessagingService.Verb.READ, this, serializer);
+        if (version >= MessagingService.VERSION_30)
+            return new MessageOut<>(MessagingService.Verb.READ, this, serializer);
+
+        return createLegacyMessage();
     }
 
+    protected abstract MessageOut<ReadCommand> createLegacyMessage();
+
     protected abstract void appendCQLWhereClause(StringBuilder sb);
 
     // Skip purgeable tombstones. We do this because it's safe to do (post-merge of the memtable and sstable at least), it
@@ -433,11 +448,11 @@ public abstract class ReadCommand implements ReadQuery
     {
         StringBuilder sb = new StringBuilder();
         sb.append("SELECT ").append(columnFilter());
-        sb.append(" FROM ").append(metadata().ksName).append(".").append(metadata.cfName);
+        sb.append(" FROM ").append(metadata().ksName).append('.').append(metadata.cfName);
         appendCQLWhereClause(sb);
 
         if (limits() != DataLimits.NONE)
-            sb.append(" ").append(limits());
+            sb.append(' ').append(limits());
         return sb.toString();
     }
 
@@ -465,8 +480,8 @@ public abstract class ReadCommand implements ReadQuery
 
         public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
         {
-            if (version < MessagingService.VERSION_30)
-                throw new UnsupportedOperationException();
+            // for serialization, createLegacyMessage() should cause legacyReadCommandSerializer to be used directly
+            assert version >= MessagingService.VERSION_30;
 
             out.writeByte(command.kind.ordinal());
             out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()));
@@ -482,7 +497,7 @@ public abstract class ReadCommand implements ReadQuery
         public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
         {
             if (version < MessagingService.VERSION_30)
-                throw new UnsupportedOperationException();
+                return legacyReadCommandSerializer.deserialize(in, version);
 
             Kind kind = Kind.values()[in.readByte()];
             int flags = in.readByte();
@@ -499,8 +514,8 @@ public abstract class ReadCommand implements ReadQuery
 
         public long serializedSize(ReadCommand command, int version)
         {
-            if (version < MessagingService.VERSION_30)
-                throw new UnsupportedOperationException();
+            // for serialization, createLegacyMessage() should cause legacyReadCommandSerializer to be used directly
+            assert version >= MessagingService.VERSION_30;
 
             return 2 // kind + flags
                  + CFMetaData.serializer.serializedSize(command.metadata(), version)
@@ -511,4 +526,950 @@ public abstract class ReadCommand implements ReadQuery
                  + command.selectionSerializedSize(version);
         }
     }
+
+    private enum LegacyType
+    {
+        GET_BY_NAMES((byte)1),
+        GET_SLICES((byte)2);
+
+        public final byte serializedValue;
+
+        LegacyType(byte b)
+        {
+            this.serializedValue = b;
+        }
+
+        public static LegacyType fromPartitionFilterKind(ClusteringIndexFilter.Kind kind)
+        {
+            return kind == ClusteringIndexFilter.Kind.SLICE
+                   ? GET_SLICES
+                   : GET_BY_NAMES;
+        }
+
+        public static LegacyType fromSerializedValue(byte b)
+        {
+            return b == 1 ? GET_BY_NAMES : GET_SLICES;
+        }
+    }
+
+    /**
+     * Serializer for pre-3.0 RangeSliceCommands.
+     */
+    private static class LegacyRangeSliceCommandSerializer implements IVersionedSerializer<ReadCommand>
+    {
+        public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
+        {
+            assert version < MessagingService.VERSION_30;
+
+            PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command;
+            assert !rangeCommand.dataRange().isPaging();
+
+            // convert pre-3.0 incompatible names filters to slice filters
+            rangeCommand = maybeConvertNamesToSlice(rangeCommand);
+
+            CFMetaData metadata = rangeCommand.metadata();
+
+            out.writeUTF(metadata.ksName);
+            out.writeUTF(metadata.cfName);
+            out.writeLong(rangeCommand.nowInSec() * 1000L);  // convert from seconds to millis
+
+            // begin DiskAtomFilterSerializer.serialize()
+            if (rangeCommand.isNamesQuery())
+            {
+                out.writeByte(1);  // 0 for slices, 1 for names
+                ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter;
+                LegacyReadCommandSerializer.serializeNamesFilter(rangeCommand, filter, out);
+            }
+            else
+            {
+                out.writeByte(0);  // 0 for slices, 1 for names
+
+                // slice filter serialization
+                ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter;
+
+                boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
+                LegacyReadCommandSerializer.serializeSlices(out, filter.requestedSlices(), filter.isReversed(), makeStaticSlice, metadata);
+
+                out.writeBoolean(filter.isReversed());
+
+                // limit
+                DataLimits.Kind kind = rangeCommand.limits().kind();
+                boolean isDistinct = (kind == DataLimits.Kind.CQL_LIMIT || kind == DataLimits.Kind.CQL_PAGING_LIMIT) && rangeCommand.limits().perPartitionCount() == 1;
+                if (isDistinct)
+                    out.writeInt(1);
+                else
+                    out.writeInt(LegacyReadCommandSerializer.updateLimitForQuery(rangeCommand.limits().count(), filter.requestedSlices()));
+
+                int compositesToGroup;
+                boolean selectsStatics = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() || filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
+                if (kind == DataLimits.Kind.THRIFT_LIMIT)
+                    compositesToGroup = -1;
+                else if (isDistinct && !selectsStatics)
+                    compositesToGroup = -2;  // for DISTINCT queries (CASSANDRA-8490)
+                else
+                    compositesToGroup = metadata.isDense() ? -1 : metadata.clusteringColumns().size();
+
+                out.writeInt(compositesToGroup);
+            }
+
+            serializeRowFilter(out, rangeCommand.rowFilter());
+            AbstractBounds.rowPositionSerializer.serialize(rangeCommand.dataRange().keyRange(), out, version);
+
+            // maxResults
+            out.writeInt(rangeCommand.limits().count());
+
+            // countCQL3Rows
+            if (rangeCommand.isForThrift() || rangeCommand.limits().perPartitionCount() == 1)  // if for Thrift or DISTINCT
+                out.writeBoolean(false);
+            else
+                out.writeBoolean(true);
+
+            // isPaging
+            out.writeBoolean(false);
+        }
+
+        public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
+        {
+            assert version < MessagingService.VERSION_30;
+
+            String keyspace = in.readUTF();
+            String columnFamily = in.readUTF();
+
+            CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
+            if (metadata == null)
+            {
+                String message = String.format("Got legacy range command for nonexistent table %s.%s.", keyspace, columnFamily);
+                throw new UnknownColumnFamilyException(message, null);
+            }
+
+            int nowInSec = (int) (in.readLong() / 1000);  // convert from millis to seconds
+
+            ClusteringIndexFilter filter;
+            ColumnFilter selection;
+            int compositesToGroup = 0;
+            int perPartitionLimit = -1;
+            byte readType = in.readByte();  // 0 for slices, 1 for names
+            if (readType == 1)
+            {
+                Pair<ColumnFilter, ClusteringIndexNamesFilter> selectionAndFilter = LegacyReadCommandSerializer.deserializeNamesSelectionAndFilter(in, metadata);
+                selection = selectionAndFilter.left;
+                filter = selectionAndFilter.right;
+            }
+            else
+            {
+                filter = LegacyReadCommandSerializer.deserializeSlicePartitionFilter(in, metadata);
+                perPartitionLimit = in.readInt();
+                compositesToGroup = in.readInt();
+                selection = getColumnSelectionForSlice((ClusteringIndexSliceFilter) filter, compositesToGroup, metadata);
+            }
+
+            RowFilter rowFilter = deserializeRowFilter(in, metadata);
+
+            AbstractBounds<PartitionPosition> keyRange = AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, version);
+            int maxResults = in.readInt();
+
+            in.readBoolean();  // countCQL3Rows (not needed)
+            in.readBoolean();  // isPaging (not needed)
+
+            boolean selectsStatics = (!selection.fetchedColumns().statics.isEmpty() || filter.selects(Clustering.STATIC_CLUSTERING));
+            boolean isDistinct = compositesToGroup == -2 || (perPartitionLimit == 1 && selectsStatics);
+            DataLimits limits;
+            if (isDistinct)
+                limits = DataLimits.distinctLimits(maxResults);
+            else if (compositesToGroup == -1)
+                limits = DataLimits.thriftLimits(maxResults, perPartitionLimit);
+            else
+                limits = DataLimits.cqlLimits(maxResults);
+
+            return new PartitionRangeReadCommand(false, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter));
+        }
+
+        static void serializeRowFilter(DataOutputPlus out, RowFilter rowFilter) throws IOException
+        {
+            ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rowFilter.iterator());
+            out.writeInt(indexExpressions.size());
+            for (RowFilter.Expression expression : indexExpressions)
+            {
+                ByteBufferUtil.writeWithShortLength(expression.column().name.bytes, out);
+                expression.operator().writeTo(out);
+                ByteBufferUtil.writeWithShortLength(expression.getIndexValue(), out);
+            }
+        }
+
+        static RowFilter deserializeRowFilter(DataInputPlus in, CFMetaData metadata) throws IOException
+        {
+            int numRowFilters = in.readInt();
+            if (numRowFilters == 0)
+                return RowFilter.NONE;
+
+            RowFilter rowFilter = RowFilter.create(numRowFilters);
+            for (int i = 0; i < numRowFilters; i++)
+            {
+                ByteBuffer columnName = ByteBufferUtil.readWithShortLength(in);
+                ColumnDefinition column = metadata.getColumnDefinition(columnName);
+                Operator op = Operator.readFrom(in);
+                ByteBuffer indexValue = ByteBufferUtil.readWithShortLength(in);
+                rowFilter.add(column, op, indexValue);
+            }
+            return rowFilter;
+        }
+
+        static long serializedRowFilterSize(RowFilter rowFilter)
+        {
+            long size = TypeSizes.sizeof(0);  // rowFilterCount
+            for (RowFilter.Expression expression : rowFilter)
+            {
+                size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes);
+                size += TypeSizes.sizeof(0);  // operator int value
+                size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue());
+            }
+            return size;
+        }
+
+        public long serializedSize(ReadCommand command, int version)
+        {
+            assert version < MessagingService.VERSION_30;
+            assert command.kind == Kind.PARTITION_RANGE;
+
+            PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command;
+            rangeCommand = maybeConvertNamesToSlice(rangeCommand);
+            CFMetaData metadata = rangeCommand.metadata();
+
+            long size = TypeSizes.sizeof(metadata.ksName);
+            size += TypeSizes.sizeof(metadata.cfName);
+            size += TypeSizes.sizeof((long) rangeCommand.nowInSec());
+
+            size += 1;  // single byte flag: 0 for slices, 1 for names
+            if (rangeCommand.isNamesQuery())
+            {
+                PartitionColumns columns = rangeCommand.columnFilter().fetchedColumns();
+                ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter;
+                size += LegacyReadCommandSerializer.serializedNamesFilterSize(filter, metadata, columns);
+            }
+            else
+            {
+                ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter;
+                boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
+                size += LegacyReadCommandSerializer.serializedSlicesSize(filter.requestedSlices(), makeStaticSlice, metadata);
+                size += TypeSizes.sizeof(filter.isReversed());
+                size += TypeSizes.sizeof(rangeCommand.limits().perPartitionCount());
+                size += TypeSizes.sizeof(0); // compositesToGroup
+            }
+
+            if (rangeCommand.rowFilter().equals(RowFilter.NONE))
+            {
+                size += TypeSizes.sizeof(0);
+            }
+            else
+            {
+                ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rangeCommand.rowFilter().iterator());
+                size += TypeSizes.sizeof(indexExpressions.size());
+                for (RowFilter.Expression expression : indexExpressions)
+                {
+                    size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes);
+                    size += TypeSizes.sizeof(expression.operator().ordinal());
+                    size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue());
+                }
+            }
+
+            size += AbstractBounds.rowPositionSerializer.serializedSize(rangeCommand.dataRange().keyRange(), version);
+            size += TypeSizes.sizeof(rangeCommand.limits().count());
+            size += TypeSizes.sizeof(!rangeCommand.isForThrift());
+            return size + TypeSizes.sizeof(rangeCommand.dataRange().isPaging());
+        }
+
+        static PartitionRangeReadCommand maybeConvertNamesToSlice(PartitionRangeReadCommand command)
+        {
+            if (!command.dataRange().isNamesQuery())
+                return command;
+
+            CFMetaData metadata = command.metadata();
+            if (!LegacyReadCommandSerializer.shouldConvertNamesToSlice(metadata, command.columnFilter().fetchedColumns()))
+                return command;
+
+            ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) command.dataRange().clusteringIndexFilter;
+            ClusteringIndexSliceFilter sliceFilter = LegacyReadCommandSerializer.convertNamesFilterToSliceFilter(filter, metadata);
+            DataRange newRange = new DataRange(command.dataRange().keyRange(), sliceFilter);
+            return new PartitionRangeReadCommand(
+                    command.isDigestQuery(), command.isForThrift(), metadata, command.nowInSec(),
+                    command.columnFilter(), command.rowFilter(), command.limits(), newRange);
+        }
+
+        static ColumnFilter getColumnSelectionForSlice(ClusteringIndexSliceFilter filter, int compositesToGroup, CFMetaData metadata)
+        {
+            // A value of -2 indicates this is a DISTINCT query that doesn't select static columns, only partition keys.
+            if (compositesToGroup == -2)
+                return ColumnFilter.selection(PartitionColumns.NONE);
+
+            // if a slice query from a pre-3.0 node doesn't cover statics, we shouldn't select them at all
+            PartitionColumns columns = filter.selects(Clustering.STATIC_CLUSTERING)
+                                     ? metadata.partitionColumns()
+                                     : metadata.partitionColumns().withoutStatics();
+            return new ColumnFilter.Builder(metadata).addAll(columns).build();
+        }
+    }
+
+    /**
+     * Serializer for pre-3.0 PagedRangeCommands.
+     */
+    private static class LegacyPagedRangeCommandSerializer implements IVersionedSerializer<ReadCommand>
+    {
+        public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
+        {
+            assert version < MessagingService.VERSION_30;
+
+            PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command;
+            assert rangeCommand.dataRange().isPaging();
+
+            CFMetaData metadata = rangeCommand.metadata();
+
+            out.writeUTF(metadata.ksName);
+            out.writeUTF(metadata.cfName);
+            out.writeLong(rangeCommand.nowInSec() * 1000L);  // convert from seconds to millis
+
+            AbstractBounds.rowPositionSerializer.serialize(rangeCommand.dataRange().keyRange(), out, version);
+
+            // pre-3.0 nodes don't accept names filters for paged range commands
+            ClusteringIndexSliceFilter filter;
+            if (rangeCommand.dataRange().clusteringIndexFilter.kind() == ClusteringIndexFilter.Kind.NAMES)
+                filter = LegacyReadCommandSerializer.convertNamesFilterToSliceFilter((ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter, metadata);
+            else
+                filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter;
+
+            // slice filter
+            boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
+            LegacyReadCommandSerializer.serializeSlices(out, filter.requestedSlices(), filter.isReversed(), makeStaticSlice, metadata);
+            out.writeBoolean(filter.isReversed());
+
+            // slice filter's count
+            DataLimits.Kind kind = rangeCommand.limits().kind();
+            boolean isDistinct = (kind == DataLimits.Kind.CQL_LIMIT || kind == DataLimits.Kind.CQL_PAGING_LIMIT) && rangeCommand.limits().perPartitionCount() == 1;
+            if (isDistinct)
+                out.writeInt(1);
+            else
+                out.writeInt(LegacyReadCommandSerializer.updateLimitForQuery(rangeCommand.limits().perPartitionCount(), filter.requestedSlices()));
+
+            // compositesToGroup
+            boolean selectsStatics = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() || filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
+            int compositesToGroup;
+            if (kind == DataLimits.Kind.THRIFT_LIMIT)
+                compositesToGroup = -1;
+            else if (isDistinct && !selectsStatics)
+                compositesToGroup = -2;  // for DISTINCT queries (CASSANDRA-8490)
+            else
+                compositesToGroup = metadata.isDense() ? -1 : metadata.clusteringColumns().size();
+
+            out.writeInt(compositesToGroup);
+
+            // command-level "start" and "stop" composites.  The start is the last-returned cell name if there is one,
+            // otherwise it's the same as the slice filter's start.  The stop appears to always be the same as the
+            // slice filter's stop.
+            DataRange.Paging pagingRange = (DataRange.Paging) rangeCommand.dataRange();
+            Clustering lastReturned = pagingRange.getLastReturned();
+            Slice.Bound newStart = Slice.Bound.exclusiveStartOf(lastReturned);
+            Slice lastSlice = filter.requestedSlices().get(filter.requestedSlices().size() - 1);
+            ByteBufferUtil.writeWithShortLength(LegacyLayout.encodeBound(metadata, newStart, true), out);
+            ByteBufferUtil.writeWithShortLength(LegacyLayout.encodeClustering(metadata, lastSlice.end().clustering()), out);
+
+            LegacyRangeSliceCommandSerializer.serializeRowFilter(out, rangeCommand.rowFilter());
+
+            // command-level limit
+            // Pre-3.0 we would always request one more row than we actually needed and the command-level "start" would
+            // be the last-returned cell name, so the response would always include it.  When dealing with compound comparators,
+            // we can pass an exclusive start and use the normal limit.  However, when dealing with non-compound comparators,
+            // pre-3.0 nodes cannot perform exclusive slices, so we need to request one extra row.
+            int maxResults = rangeCommand.limits().count() + (metadata.isCompound() ? 0 : 1);
+            out.writeInt(maxResults);
+
+            // countCQL3Rows
+            if (rangeCommand.isForThrift() || rangeCommand.limits().perPartitionCount() == 1)  // for Thrift or DISTINCT
+                out.writeBoolean(false);
+            else
+                out.writeBoolean(true);
+        }
+
+        public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
+        {
+            assert version < MessagingService.VERSION_30;
+
+            String keyspace = in.readUTF();
+            String columnFamily = in.readUTF();
+
+            CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
+            if (metadata == null)
+            {
+                String message = String.format("Got legacy paged range command for nonexistent table %s.%s.", keyspace, columnFamily);
+                throw new UnknownColumnFamilyException(message, null);
+            }
+
+            int nowInSec = (int) (in.readLong() / 1000);  // convert from millis to seconds
+            AbstractBounds<PartitionPosition> keyRange = AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, version);
+
+            ClusteringIndexSliceFilter filter = LegacyReadCommandSerializer.deserializeSlicePartitionFilter(in, metadata);
+            int perPartitionLimit = in.readInt();
+            int compositesToGroup = in.readInt();
+
+            // command-level Composite "start" and "stop"
+            LegacyLayout.LegacyBound startBound = LegacyLayout.decodeBound(metadata, ByteBufferUtil.readWithShortLength(in), true);
+            ByteBufferUtil.readWithShortLength(in);  // the composite "stop", which isn't actually needed
+
+            // pre-3.0 nodes will sometimes use a clustering prefix for the Command-level start and stop, but in all
+            // cases this should also be represented by the ClusteringIndexFilter, so we can ignore them
+            Clustering startClustering;
+            if (startBound == LegacyLayout.LegacyBound.BOTTOM || startBound.bound.size() < metadata.comparator.size())
+                startClustering = Clustering.EMPTY;
+            else
+                startClustering = startBound.getAsClustering(metadata);
+
+            ColumnFilter selection = LegacyRangeSliceCommandSerializer.getColumnSelectionForSlice(filter, compositesToGroup, metadata);
+
+            RowFilter rowFilter = LegacyRangeSliceCommandSerializer.deserializeRowFilter(in, metadata);
+            int maxResults = in.readInt();
+            in.readBoolean(); // countCQL3Rows
+
+
+            boolean selectsStatics = (!selection.fetchedColumns().statics.isEmpty() || filter.selects(Clustering.STATIC_CLUSTERING));
+            boolean isDistinct = compositesToGroup == -2 || (perPartitionLimit == 1 && selectsStatics);
+            DataLimits limits;
+            if (isDistinct)
+                limits = DataLimits.distinctLimits(maxResults);
+            else if (compositesToGroup == -1)
+                limits = DataLimits.thriftLimits(1, perPartitionLimit); // we only use paging w/ thrift for get_count(), so partition limit must be 1
+            else
+                limits = DataLimits.cqlLimits(maxResults);
+
+            limits = limits.forPaging(maxResults);
+
+            // pre-3.0 nodes normally expect pages to include the last cell from the previous page, but they handle it
+            // missing without any problems, so we can safely always set "inclusive" to false in the data range
+            DataRange dataRange = new DataRange(keyRange, filter).forPaging(keyRange, metadata.comparator, startClustering, false);
+            return new PartitionRangeReadCommand(false, true, metadata, nowInSec, selection, rowFilter, limits, dataRange);
+        }
+
+        public long serializedSize(ReadCommand command, int version)
+        {
+            assert version < MessagingService.VERSION_30;
+            assert command.kind == Kind.PARTITION_RANGE;
+
+            PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command;
+            CFMetaData metadata = rangeCommand.metadata();
+            assert rangeCommand.dataRange().isPaging();
+
+            long size = TypeSizes.sizeof(metadata.ksName);
+            size += TypeSizes.sizeof(metadata.cfName);
+            size += TypeSizes.sizeof((long) rangeCommand.nowInSec());
+
+            size += AbstractBounds.rowPositionSerializer.serializedSize(rangeCommand.dataRange().keyRange(), version);
+
+            // pre-3.0 nodes only accept slice filters for paged range commands
+            ClusteringIndexSliceFilter filter;
+            if (rangeCommand.dataRange().clusteringIndexFilter.kind() == ClusteringIndexFilter.Kind.NAMES)
+                filter = LegacyReadCommandSerializer.convertNamesFilterToSliceFilter((ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter, metadata);
+            else
+                filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter;
+
+            // slice filter
+            boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
+            size += LegacyReadCommandSerializer.serializedSlicesSize(filter.requestedSlices(), makeStaticSlice, metadata);
+            size += TypeSizes.sizeof(filter.isReversed());
+
+            // slice filter's count
+            size += TypeSizes.sizeof(rangeCommand.limits().perPartitionCount());
+
+            // compositesToGroup
+            size += TypeSizes.sizeof(0);
+
+            // command-level Composite "start" and "stop"
+            DataRange.Paging pagingRange = (DataRange.Paging) rangeCommand.dataRange();
+            Clustering lastReturned = pagingRange.getLastReturned();
+            Slice lastSlice = filter.requestedSlices().get(filter.requestedSlices().size() - 1);
+            size += ByteBufferUtil.serializedSizeWithShortLength(LegacyLayout.encodeClustering(metadata, lastReturned));
+            size += ByteBufferUtil.serializedSizeWithShortLength(LegacyLayout.encodeClustering(metadata, lastSlice.end().clustering()));
+
+            size += LegacyRangeSliceCommandSerializer.serializedRowFilterSize(rangeCommand.rowFilter());
+
+            // command-level limit
+            size += TypeSizes.sizeof(rangeCommand.limits().count());
+
+            // countCQL3Rows
+            return size + TypeSizes.sizeof(true);
+        }
+    }
+
+    /**
+     * Serializer for pre-3.0 ReadCommands.
+     */
+    static class LegacyReadCommandSerializer implements IVersionedSerializer<ReadCommand>
+    {
+        public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
+        {
+            assert version < MessagingService.VERSION_30;
+            assert command.kind == Kind.SINGLE_PARTITION;
+
+            SinglePartitionReadCommand singleReadCommand = (SinglePartitionReadCommand) command;
+            singleReadCommand = maybeConvertNamesToSlice(singleReadCommand);
+
+            CFMetaData metadata = singleReadCommand.metadata();
+
+            out.writeByte(LegacyType.fromPartitionFilterKind(singleReadCommand.clusteringIndexFilter().kind()).serializedValue);
+
+            out.writeBoolean(singleReadCommand.isDigestQuery());
+            out.writeUTF(metadata.ksName);
+            ByteBufferUtil.writeWithShortLength(singleReadCommand.partitionKey().getKey(), out);
+            out.writeUTF(metadata.cfName);
+            out.writeLong(singleReadCommand.nowInSec() * 1000L);  // convert from seconds to millis
+
+            if (singleReadCommand.clusteringIndexFilter().kind() == ClusteringIndexFilter.Kind.SLICE)
+                serializeSliceCommand((SinglePartitionSliceCommand) singleReadCommand, out);
+            else
+                serializeNamesCommand((SinglePartitionNamesCommand) singleReadCommand, out);
+        }
+
+        public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
+        {
+            assert version < MessagingService.VERSION_30;
+            LegacyType msgType = LegacyType.fromSerializedValue(in.readByte());
+
+            boolean isDigest = in.readBoolean();
+            String keyspaceName = in.readUTF();
+            ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
+            String cfName = in.readUTF();
+            long nowInMillis = in.readLong();
+            int nowInSeconds = (int) (nowInMillis / 1000);  // convert from millis to seconds
+            CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName);
+            DecoratedKey dk = metadata.partitioner.decorateKey(key);
+
+            switch (msgType)
+            {
+                case GET_BY_NAMES:
+                    return deserializeNamesCommand(in, isDigest, metadata, dk, nowInSeconds);
+                case GET_SLICES:
+                    return deserializeSliceCommand(in, isDigest, metadata, dk, nowInSeconds);
+                default:
+                    throw new AssertionError();
+            }
+        }
+
+        public long serializedSize(ReadCommand command, int version)
+        {
+            assert version < MessagingService.VERSION_30;
+            assert command.kind == Kind.SINGLE_PARTITION;
+            SinglePartitionReadCommand singleReadCommand = (SinglePartitionReadCommand) command;
+            singleReadCommand = maybeConvertNamesToSlice(singleReadCommand);
+
+            int keySize = singleReadCommand.partitionKey().getKey().remaining();
+
+            CFMetaData metadata = singleReadCommand.metadata();
+
+            long size = 1;  // message type (single byte)
+            size += TypeSizes.sizeof(command.isDigestQuery());
+            size += TypeSizes.sizeof(metadata.ksName);
+            size += TypeSizes.sizeof((short) keySize) + keySize;
+            size += TypeSizes.sizeof((long) command.nowInSec());
+
+            if (singleReadCommand.clusteringIndexFilter().kind() == ClusteringIndexFilter.Kind.SLICE)
+                return size + serializedSliceCommandSize((SinglePartitionSliceCommand) singleReadCommand);
+            else
+                return size + serializedNamesCommandSize((SinglePartitionNamesCommand) singleReadCommand);
+        }
+
+        private void serializeNamesCommand(SinglePartitionNamesCommand command, DataOutputPlus out) throws IOException
+        {
+            serializeNamesFilter(command, command.clusteringIndexFilter(), out);
+        }
+
+
+        private static void serializeNamesFilter(ReadCommand command, ClusteringIndexNamesFilter filter, DataOutputPlus out) throws IOException
+        {
+            PartitionColumns columns = command.columnFilter().fetchedColumns();
+            CFMetaData metadata = command.metadata();
+            SortedSet<Clustering> requestedRows = filter.requestedRows();
+
+            if (requestedRows.isEmpty())
+            {
+                // only static columns are requested
+                out.writeInt(columns.size());
+                for (ColumnDefinition column : columns)
+                    ByteBufferUtil.writeWithShortLength(column.name.bytes, out);
+            }
+            else
+            {
+                out.writeInt(requestedRows.size() * columns.size());
+                for (Clustering clustering : requestedRows)
+                {
+                    for (ColumnDefinition column : columns)
+                        ByteBufferUtil.writeWithShortLength(LegacyLayout.encodeCellName(metadata, clustering, column.name.bytes, null), out);
+                }
+            }
+
+            // countCql3Rows should be true if it's not for Thrift or a DISTINCT query
+            if (command.isForThrift() || (command.limits().kind() == DataLimits.Kind.CQL_LIMIT && command.limits().perPartitionCount() == 1))
+                out.writeBoolean(false);  // it's compact and not a DISTINCT query
+            else
+                out.writeBoolean(true);
+        }
+
+        static long serializedNamesFilterSize(ClusteringIndexNamesFilter filter, CFMetaData metadata, PartitionColumns fetchedColumns)
+        {
+            SortedSet<Clustering> requestedRows = filter.requestedRows();
+
+            long size = 0;
+            if (requestedRows.isEmpty())
+            {
+                // only static columns are requested
+                size += TypeSizes.sizeof(fetchedColumns.size());
+                for (ColumnDefinition column : fetchedColumns)
+                    size += ByteBufferUtil.serializedSizeWithShortLength(column.name.bytes);
+            }
+            else
+            {
+                size += TypeSizes.sizeof(requestedRows.size() * fetchedColumns.size());
+                for (Clustering clustering : requestedRows)
+                {
+                    for (ColumnDefinition column : fetchedColumns)
+                        size += ByteBufferUtil.serializedSizeWithShortLength(LegacyLayout.encodeCellName(metadata, clustering, column.name.bytes, null));
+                }
+            }
+
+            return size + TypeSizes.sizeof(true);  // countCql3Rows
+        }
+
+        private SinglePartitionNamesCommand deserializeNamesCommand(DataInputPlus in, boolean isDigest, CFMetaData metadata, DecoratedKey key, int nowInSeconds) throws IOException
+        {
+            Pair<ColumnFilter, ClusteringIndexNamesFilter> selectionAndFilter = deserializeNamesSelectionAndFilter(in, metadata);
+
+            // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift
+            return new SinglePartitionNamesCommand(
+                    isDigest, true, metadata, nowInSeconds, selectionAndFilter.left, RowFilter.NONE, DataLimits.NONE,
+                    key, selectionAndFilter.right);
+        }
+
+        static Pair<ColumnFilter, ClusteringIndexNamesFilter> deserializeNamesSelectionAndFilter(DataInputPlus in, CFMetaData metadata) throws IOException
+        {
+            int numCellNames = in.readInt();
+
+            // The names filter could include either a) static columns or b) normal columns with the clustering columns
+            // fully specified.  We need to handle those cases differently in 3.0.
+            NavigableSet<Clustering> clusterings = new TreeSet<>(metadata.comparator);
+
+            ColumnFilter.Builder selectionBuilder = new ColumnFilter.Builder(metadata);
+            for (int i = 0; i < numCellNames; i++)
+            {
+                ByteBuffer buffer = ByteBufferUtil.readWithShortLength(in);
+                LegacyLayout.LegacyCellName cellName;
+                try
+                {
+                    cellName = LegacyLayout.decodeCellName(metadata, buffer);
+                }
+                catch (UnknownColumnException exc)
+                {
+                    // TODO this probably needs a new exception class that shares a parent with UnknownColumnFamilyException
+                    throw new UnknownColumnFamilyException(
+                            "Received legacy range read command with names filter for unrecognized column name. " +
+                                    "Fill name in filter (hex): " + ByteBufferUtil.bytesToHex(buffer), metadata.cfId);
+                }
+
+                if (!cellName.clustering.equals(Clustering.STATIC_CLUSTERING))
+                    clusterings.add(cellName.clustering);
+
+                selectionBuilder.add(cellName.column);
+            }
+
+            in.readBoolean();  // countCql3Rows
+
+            // clusterings cannot include STATIC_CLUSTERING, so if the names filter is for static columns, clusterings
+            // will be empty.  However, by requesting the static columns in our ColumnFilter, this will still work.
+            ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(clusterings, false);
+            return Pair.create(selectionBuilder.build(), filter);
+        }
+
+        private long serializedNamesCommandSize(SinglePartitionNamesCommand command)
+        {
+            ClusteringIndexNamesFilter filter = command.clusteringIndexFilter();
+            PartitionColumns columns = command.columnFilter().fetchedColumns();
+            return serializedNamesFilterSize(filter, command.metadata(), columns);
+        }
+
+        private void serializeSliceCommand(SinglePartitionSliceCommand command, DataOutputPlus out) throws IOException
+        {
+            CFMetaData metadata = command.metadata();
+            ClusteringIndexSliceFilter filter = command.clusteringIndexFilter();
+
+            Slices slices = filter.requestedSlices();
+            boolean makeStaticSlice = !command.columnFilter().fetchedColumns().statics.isEmpty() && !slices.selects(Clustering.STATIC_CLUSTERING);
+            serializeSlices(out, slices, filter.isReversed(), makeStaticSlice, metadata);
+
+            out.writeBoolean(filter.isReversed());
+
+            boolean selectsStatics = !command.columnFilter().fetchedColumns().statics.isEmpty() || slices.selects(Clustering.STATIC_CLUSTERING);
+            DataLimits.Kind kind = command.limits().kind();
+            boolean isDistinct = (kind == DataLimits.Kind.CQL_LIMIT || kind == DataLimits.Kind.CQL_PAGING_LIMIT) && command.limits().perPartitionCount() == 1;
+            if (isDistinct)
+                out.writeInt(1);  // the limit is always 1 for DISTINCT queries
+            else
+                out.writeInt(updateLimitForQuery(command.limits().count(), filter.requestedSlices()));
+
+            int compositesToGroup;
+            if (kind == DataLimits.Kind.THRIFT_LIMIT || metadata.isDense())
+                compositesToGroup = -1;
+            else if (isDistinct && !selectsStatics)
+                compositesToGroup = -2;  // for DISTINCT queries (CASSANDRA-8490)
+            else
+                compositesToGroup = metadata.clusteringColumns().size();
+
+            out.writeInt(compositesToGroup);
+        }
+
+        private SinglePartitionSliceCommand deserializeSliceCommand(DataInputPlus in, boolean isDigest, CFMetaData metadata, DecoratedKey key, int nowInSeconds) throws IOException
+        {
+            ClusteringIndexSliceFilter filter = deserializeSlicePartitionFilter(in, metadata);
+            int count = in.readInt();
+            int compositesToGroup = in.readInt();
+
+            // if a slice query from a pre-3.0 node doesn't cover statics, we shouldn't select them at all
+            boolean selectsStatics = filter.selects(Clustering.STATIC_CLUSTERING);
+            PartitionColumns columns = selectsStatics
+                                     ? metadata.partitionColumns()
+                                     : metadata.partitionColumns().withoutStatics();
+            ColumnFilter columnFilter = new ColumnFilter.Builder(metadata).addAll(columns).build();
+
+            boolean isDistinct = compositesToGroup == -2 || (count == 1 && selectsStatics);
+            DataLimits limits;
+            if (compositesToGroup == -2 || isDistinct)
+                limits = DataLimits.distinctLimits(count);  // See CASSANDRA-8490 for the explanation of this value
+            else if (compositesToGroup == -1)
+                limits = DataLimits.thriftLimits(1, count);
+            else
+                limits = DataLimits.cqlLimits(count);
+
+            // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift
+            return new SinglePartitionSliceCommand(isDigest, true, metadata, nowInSeconds, columnFilter, RowFilter.NONE, limits, key, filter);
+        }
+
+        private long serializedSliceCommandSize(SinglePartitionSliceCommand command)
+        {
+            CFMetaData metadata = command.metadata();
+            ClusteringIndexSliceFilter filter = command.clusteringIndexFilter();
+
+            Slices slices = filter.requestedSlices();
+            boolean makeStaticSlice = !command.columnFilter().fetchedColumns().statics.isEmpty() && !slices.selects(Clustering.STATIC_CLUSTERING);
+
+            long size = serializedSlicesSize(slices, makeStaticSlice, metadata);
+            size += TypeSizes.sizeof(command.clusteringIndexFilter().isReversed());
+            size += TypeSizes.sizeof(command.limits().count());
+            return size + TypeSizes.sizeof(0);  // compositesToGroup
+        }
+
+        static void serializeSlices(DataOutputPlus out, Slices slices, boolean isReversed, boolean makeStaticSlice, CFMetaData metadata) throws IOException
+        {
+            out.writeInt(slices.size() + (makeStaticSlice ? 1 : 0));
+
+            // In 3.0 we always store the slices in normal comparator order.  Pre-3.0 nodes expect the slices to
+            // be in reversed order if the query is reversed, so we handle that here.
+            if (isReversed)
+            {
+                for (int i = slices.size() - 1; i >= 0; i--)
+                    serializeSlice(out, slices.get(i), true, metadata);
+                if (makeStaticSlice)
+                    serializeStaticSlice(out, true, metadata);
+            }
+            else
+            {
+                if (makeStaticSlice)
+                    serializeStaticSlice(out, false, metadata);
+                for (Slice slice : slices)
+                    serializeSlice(out, slice, false, metadata);
+            }
+        }
+
+        static long serializedSlicesSize(Slices slices, boolean makeStaticSlice, CFMetaData metadata)
+        {
+            long size = TypeSizes.sizeof(slices.size());
+
+            for (Slice slice : slices)
+            {
+                ByteBuffer sliceStart = LegacyLayout.encodeBound(metadata, slice.start(), true);
+                size += ByteBufferUtil.serializedSizeWithShortLength(sliceStart);
+                ByteBuffer sliceEnd = LegacyLayout.encodeBound(metadata, slice.end(), false);
+                size += ByteBufferUtil.serializedSizeWithShortLength(sliceEnd);
+            }
+
+            if (makeStaticSlice)
+                size += serializedStaticSliceSize(metadata);
+
+            return size;
+        }
+
+        static long serializedStaticSliceSize(CFMetaData metadata)
+        {
+            // unlike serializeStaticSlice(), but we don't care about reversal for size calculations
+            ByteBuffer sliceStart = LegacyLayout.encodeBound(metadata, Slice.Bound.BOTTOM, false);
+            long size = ByteBufferUtil.serializedSizeWithShortLength(sliceStart);
+
+            size += TypeSizes.sizeof((short) (metadata.comparator.size() * 3 + 2));
+            size += TypeSizes.sizeof((short) LegacyLayout.STATIC_PREFIX);
+            for (int i = 0; i < metadata.comparator.size(); i++)
+            {
+                size += ByteBufferUtil.serializedSizeWithShortLength(ByteBufferUtil.EMPTY_BYTE_BUFFER);
+                size += 1;  // EOC
+            }
+            return size;
+        }
+
+        private static void serializeSlice(DataOutputPlus out, Slice slice, boolean isReversed, CFMetaData metadata) throws IOException
+        {
+            ByteBuffer sliceStart = LegacyLayout.encodeBound(metadata, isReversed ? slice.end() : slice.start(), !isReversed);
+            ByteBufferUtil.writeWithShortLength(sliceStart, out);
+
+            ByteBuffer sliceEnd = LegacyLayout.encodeBound(metadata, isReversed ? slice.start() : slice.end(), isReversed);
+            ByteBufferUtil.writeWithShortLength(sliceEnd, out);
+        }
+
+        private static void serializeStaticSlice(DataOutputPlus out, boolean isReversed, CFMetaData metadata) throws IOException
+        {
+            // if reversed, write an empty bound for the slice start; if reversed, write out an empty bound for the
+            // slice finish after we've written the static slice start
+            if (!isReversed)
+            {
+                ByteBuffer sliceStart = LegacyLayout.encodeBound(metadata, Slice.Bound.BOTTOM, false);
+                ByteBufferUtil.writeWithShortLength(sliceStart, out);
+            }
+
+            // write out the length of the composite
+            out.writeShort(2 + metadata.comparator.size() * 3);  // two bytes + EOC for each component, plus static prefix
+            out.writeShort(LegacyLayout.STATIC_PREFIX);
+            for (int i = 0; i < metadata.comparator.size(); i++)
+            {
+                ByteBufferUtil.writeWithShortLength(ByteBufferUtil.EMPTY_BYTE_BUFFER, out);
+                // write the EOC, using an inclusive end if we're on the final component
+                out.writeByte(i == metadata.comparator.size() - 1 ? 1 : 0);
+            }
+
+            if (isReversed)
+            {
+                ByteBuffer sliceStart = LegacyLayout.encodeBound(metadata, Slice.Bound.BOTTOM, false);
+                ByteBufferUtil.writeWithShortLength(sliceStart, out);
+            }
+        }
+
+        static ClusteringIndexSliceFilter deserializeSlicePartitionFilter(DataInputPlus in, CFMetaData metadata) throws IOException
+        {
+            int numSlices = in.readInt();
+            ByteBuffer[] startBuffers = new ByteBuffer[numSlices];
+            ByteBuffer[] finishBuffers = new ByteBuffer[numSlices];
+            for (int i = 0; i < numSlices; i++)
+            {
+                startBuffers[i] = ByteBufferUtil.readWithShortLength(in);
+                finishBuffers[i] = ByteBufferUtil.readWithShortLength(in);
+            }
+
+            // we have to know if the query is reversed before we can correctly build the slices
+            boolean reversed = in.readBoolean();
+
+            Slices.Builder slicesBuilder = new Slices.Builder(metadata.comparator);
+            for (int i = 0; i < numSlices; i++)
+            {
+                Slice.Bound start, finish;
+                if (!reversed)
+                {
+                    start = LegacyLayout.decodeBound(metadata, startBuffers[i], true).bound;
+                    finish = LegacyLayout.decodeBound(metadata, finishBuffers[i], false).bound;
+                }
+                else
+                {
+                    // pre-3.0, reversed query slices put the greater element at the start of the slice
+                    finish = LegacyLayout.decodeBound(metadata, startBuffers[i], false).bound;
+                    start = LegacyLayout.decodeBound(metadata, finishBuffers[i], true).bound;
+                }
+                slicesBuilder.add(Slice.make(start, finish));
+            }
+
+            return new ClusteringIndexSliceFilter(slicesBuilder.build(), reversed);
+        }
+
+        private static SinglePartitionReadCommand maybeConvertNamesToSlice(SinglePartitionReadCommand command)
+        {
+            if (command.clusteringIndexFilter().kind() != ClusteringIndexFilter.Kind.NAMES)
+                return command;
+
+            CFMetaData metadata = command.metadata();
+
+            if (!shouldConvertNamesToSlice(metadata, command.columnFilter().fetchedColumns()))
+                return command;
+
+            ClusteringIndexNamesFilter filter = ((SinglePartitionNamesCommand) command).clusteringIndexFilter();
+            ClusteringIndexSliceFilter sliceFilter = convertNamesFilterToSliceFilter(filter, metadata);
+            return new SinglePartitionSliceCommand(
+                    command.isDigestQuery(), command.isForThrift(), metadata, command.nowInSec(),
+                    command.columnFilter(), command.rowFilter(), command.limits(), command.partitionKey(), sliceFilter);
+        }
+
+        /**
+         * Returns true if a names filter on the given table and column selection should be converted to a slice
+         * filter for compatibility with pre-3.0 nodes, false otherwise.
+         */
+        static boolean shouldConvertNamesToSlice(CFMetaData metadata, PartitionColumns columns)
+        {
+            // On pre-3.0 nodes, due to CASSANDRA-5762, we always do a slice for CQL3 tables (not dense, composite).
+            if (!metadata.isDense() && metadata.isCompound())
+                return true;
+
+            // pre-3.0 nodes don't support names filters for reading collections, so if we're requesting any of those,
+            // we need to convert this to a slice filter
+            for (ColumnDefinition column : columns)
+            {
+                if (column.type.isMultiCell())
+                    return true;
+            }
+            return false;
+        }
+
+        /**
+         * Converts a names filter that is incompatible with pre-3.0 nodes to a slice filter that is compatible.
+         */
+        private static ClusteringIndexSliceFilter convertNamesFilterToSliceFilter(ClusteringIndexNamesFilter filter, CFMetaData metadata)
+        {
+            SortedSet<Clustering> requestedRows = filter.requestedRows();
+            Slices slices;
+            if (requestedRows.isEmpty() || requestedRows.size() == 1 && requestedRows.first().size() == 0)
+            {
+                slices = Slices.ALL;
+            }
+            else
+            {
+                Slices.Builder slicesBuilder = new Slices.Builder(metadata.comparator);
+                for (Clustering clustering : requestedRows)
+                    slicesBuilder.add(Slice.Bound.inclusiveStartOf(clustering), Slice.Bound.inclusiveEndOf(clustering));
+                slices = slicesBuilder.build();
+            }
+
+            return new ClusteringIndexSliceFilter(slices, filter.isReversed());
+        }
+
+        /**
+         * Potentially increases the existing query limit to account for the lack of exclusive bounds in pre-3.0 nodes.
+         * @param limit the existing query limit
+         * @param slices the requested slices
+         * @return the updated limit
+         */
+        static int updateLimitForQuery(int limit, Slices slices)
+        {
+            // Pre-3.0 nodes don't support exclusive bounds for slices. Instead, we query one more element if necessary
+            // and filter it later (in LegacyRemoteDataResponse)
+            if (!slices.hasLowerBound() && ! slices.hasUpperBound())
+                return limit;
+
+            for (Slice slice : slices)
+            {
+                if (limit == Integer.MAX_VALUE)
+                    return limit;
+
+                if (!slice.start().isInclusive())
+                    limit++;
+                if (!slice.end().isInclusive())
+                    limit++;
+            }
+            return limit;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
index f85d406..9cde8dc 100644
--- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
@@ -17,8 +17,8 @@
  */
 package org.apache.cassandra.db;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
@@ -28,6 +28,11 @@ import org.apache.cassandra.tracing.Tracing;
 
 public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
 {
+    protected IVersionedSerializer<ReadResponse> serializer()
+    {
+        return ReadResponse.serializer;
+    }
+
     public void doVerb(MessageIn<ReadCommand> message, int id)
     {
         if (StorageService.instance.isBootstrapMode())
@@ -42,7 +47,7 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand>
             response = command.createResponse(iterator);
         }
 
-        MessageOut<ReadResponse> reply = new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, response, ReadResponse.serializer);
+        MessageOut<ReadResponse> reply = new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, response, serializer());
 
         Tracing.trace("Enqueuing response to {}", message.from);
         MessagingService.instance().sendReply(reply, id, message.from);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java
index 90bd21d..b3cc725 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -20,8 +20,12 @@ package org.apache.cassandra.db;
 import java.io.*;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -30,6 +34,7 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -57,8 +62,10 @@ public abstract class ReadResponse
         return new DigestResponse(makeDigest(data));
     }
 
-    public abstract UnfilteredPartitionIterator makeIterator(CFMetaData metadata);
-    public abstract ByteBuffer digest(CFMetaData metadata);
+    public abstract UnfilteredPartitionIterator makeIterator(CFMetaData metadata, ReadCommand command);
+
+    public abstract ByteBuffer digest(CFMetaData metadata, ReadCommand command);
+
     public abstract boolean isDigestQuery();
 
     protected static ByteBuffer makeDigest(UnfilteredPartitionIterator iterator)
@@ -79,12 +86,12 @@ public abstract class ReadResponse
             this.digest = digest;
         }
 
-        public UnfilteredPartitionIterator makeIterator(CFMetaData metadata)
+        public UnfilteredPartitionIterator makeIterator(CFMetaData metadata, ReadCommand command)
         {
             throw new UnsupportedOperationException();
         }
 
-        public ByteBuffer digest(CFMetaData metadata)
+        public ByteBuffer digest(CFMetaData metadata, ReadCommand command)
         {
             return digest;
         }
@@ -124,7 +131,7 @@ public abstract class ReadResponse
             }
         }
 
-        public UnfilteredPartitionIterator makeIterator(CFMetaData metadata)
+        public UnfilteredPartitionIterator makeIterator(CFMetaData metadata, ReadCommand command)
         {
             try
             {
@@ -138,9 +145,76 @@ public abstract class ReadResponse
             }
         }
 
-        public ByteBuffer digest(CFMetaData metadata)
+        public ByteBuffer digest(CFMetaData metadata, ReadCommand command)
         {
-            try (UnfilteredPartitionIterator iterator = makeIterator(metadata))
+            try (UnfilteredPartitionIterator iterator = makeIterator(metadata, command))
+            {
+                return makeDigest(iterator);
+            }
+        }
+
+        public boolean isDigestQuery()
+        {
+            return false;
+        }
+    }
+
+    /**
+     * A remote response from a pre-3.0 node.  This needs a separate class in order to cleanly handle trimming and
+     * reversal of results when the read command calls for it.  Pre-3.0 nodes always return results in the normal
+     * sorted order, even if the query asks for reversed results.  Additionally,  pre-3.0 nodes do not have a notion of
+     * exclusive slices on non-composite tables, so extra rows may need to be trimmed.
+     */
+    private static class LegacyRemoteDataResponse extends ReadResponse
+    {
+        private final List<ArrayBackedPartition> partitions;
+
+        private LegacyRemoteDataResponse(List<ArrayBackedPartition> partitions)
+        {
+            super(null); // we never serialize LegacyRemoteDataResponses, so we don't care about the metadata
+            this.partitions = partitions;
+        }
+
+        public UnfilteredPartitionIterator makeIterator(CFMetaData metadata, final ReadCommand command)
+        {
+            return new AbstractUnfilteredPartitionIterator()
+            {
+                private int idx;
+
+                public boolean isForThrift()
+                {
+                    return true;
+                }
+
+                public CFMetaData metadata()
+                {
+                    return metadata;
+                }
+
+                public boolean hasNext()
+                {
+                    return idx < partitions.size();
+                }
+
+                public UnfilteredRowIterator next()
+                {
+                    ArrayBackedPartition partition = partitions.get(idx++);
+
+                    ClusteringIndexFilter filter = command.clusteringIndexFilter(partition.partitionKey());
+
+                    // Pre-3.0, we didn't have a way to express exclusivity for non-composite comparators, so all slices were
+                    // inclusive on both ends. If we have exclusive slice ends, we need to filter the results here.
+                    if (!command.metadata().isCompound())
+                        return filter.filter(partition.sliceableUnfilteredIterator(command.columnFilter(), filter.isReversed()));
+
+                    return partition.unfilteredIterator(command.columnFilter(), Slices.ALL, filter.isReversed());
+                }
+            };
+        }
+
+        public ByteBuffer digest(CFMetaData metadata, ReadCommand command)
+        {
+            try (UnfilteredPartitionIterator iterator = makeIterator(metadata, command))
             {
                 return makeDigest(iterator);
             }
@@ -156,14 +230,32 @@ public abstract class ReadResponse
     {
         public void serialize(ReadResponse response, DataOutputPlus out, int version) throws IOException
         {
+            boolean isDigest = response instanceof DigestResponse;
+            ByteBuffer digest = isDigest ? ((DigestResponse)response).digest : ByteBufferUtil.EMPTY_BYTE_BUFFER;
+
             if (version < MessagingService.VERSION_30)
             {
-                // TODO
-                throw new UnsupportedOperationException();
+                out.writeInt(digest.remaining());
+                out.write(digest);
+                out.writeBoolean(isDigest);
+                if (!isDigest)
+                {
+                    assert !(response instanceof LegacyRemoteDataResponse); // we only use those on the receiving side
+                    try (UnfilteredPartitionIterator iter = response.makeIterator(response.metadata, null))
+                    {
+                        assert iter.hasNext();
+                        try (UnfilteredRowIterator partition = iter.next())
+                        {
+                            ByteBufferUtil.writeWithShortLength(partition.partitionKey().getKey(), out);
+                            LegacyLayout.serializeAsLegacyPartition(partition, out, version);
+                        }
+                        assert !iter.hasNext();
+                    }
+                }
+                return;
             }
 
-            boolean isDigest = response.isDigestQuery();
-            ByteBufferUtil.writeWithVIntLength(isDigest ? response.digest(response.metadata) : ByteBufferUtil.EMPTY_BYTE_BUFFER, out);
+            ByteBufferUtil.writeWithVIntLength(digest, out);
             if (!isDigest)
             {
                 // Note that we can only get there if version == 3.0, which is the current_version. When we'll change the
@@ -178,8 +270,35 @@ public abstract class ReadResponse
         {
             if (version < MessagingService.VERSION_30)
             {
-                // TODO
-                throw new UnsupportedOperationException();
+                byte[] digest = null;
+                int digestSize = in.readInt();
+                if (digestSize > 0)
+                {
+                    digest = new byte[digestSize];
+                    in.readFully(digest, 0, digestSize);
+                }
+                boolean isDigest = in.readBoolean();
+                assert isDigest == digestSize > 0;
+                if (isDigest)
+                {
+                    assert digest != null;
+                    return new DigestResponse(ByteBuffer.wrap(digest));
+                }
+
+                // ReadResponses from older versions are always single-partition (ranges are handled by RangeSliceReply)
+                ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
+                UnfilteredRowIterator rowIterator = LegacyLayout.deserializeLegacyPartition(in, version, SerializationHelper.Flag.FROM_REMOTE, key);
+                if (rowIterator == null)
+                    return new LegacyRemoteDataResponse(Collections.emptyList());
+
+                try
+                {
+                    return new LegacyRemoteDataResponse(Collections.singletonList(ArrayBackedPartition.create(rowIterator)));
+                }
+                finally
+                {
+                    rowIterator.close();
+                }
             }
 
             ByteBuffer digest = ByteBufferUtil.readWithVIntLength(in);
@@ -193,14 +312,32 @@ public abstract class ReadResponse
 
         public long serializedSize(ReadResponse response, int version)
         {
+            boolean isDigest = response instanceof DigestResponse;
+            ByteBuffer digest = isDigest ? ((DigestResponse)response).digest : ByteBufferUtil.EMPTY_BYTE_BUFFER;
+
             if (version < MessagingService.VERSION_30)
             {
-                // TODO
-                throw new UnsupportedOperationException();
+                long size = TypeSizes.sizeof(digest.remaining())
+                        + digest.remaining()
+                        + TypeSizes.sizeof(isDigest);
+                if (!isDigest)
+                {
+                    assert !(response instanceof LegacyRemoteDataResponse); // we only use those on the receiving side
+                    try (UnfilteredPartitionIterator iter = response.makeIterator(response.metadata, null))
+                    {
+                        assert iter.hasNext();
+                        try (UnfilteredRowIterator partition = iter.next())
+                        {
+                            size += ByteBufferUtil.serializedSizeWithShortLength(partition.partitionKey().getKey());
+                            size += LegacyLayout.serializedSizeAsLegacyPartition(partition, version);
+                        }
+                        assert !iter.hasNext();
+                    }
+                }
+                return size;
             }
 
-            boolean isDigest = response.isDigestQuery();
-            long size = ByteBufferUtil.serializedSizeWithVIntLength(isDigest ? response.digest(response.metadata) : ByteBufferUtil.EMPTY_BYTE_BUFFER);
+            long size = ByteBufferUtil.serializedSizeWithVIntLength(digest);
             if (!isDigest)
             {
                 // Note that we can only get there if version == 3.0, which is the current_version. When we'll change the
@@ -217,32 +354,75 @@ public abstract class ReadResponse
     {
         public void serialize(ReadResponse response, DataOutputPlus out, int version) throws IOException
         {
-            // TODO
-            throw new UnsupportedOperationException();
-            //        out.writeInt(rsr.rows.size());
-            //        for (Row row : rsr.rows)
-            //            Row.serializer.serialize(row, out, version);
+            assert version < MessagingService.VERSION_30;
+
+            // determine the number of partitions upfront for serialization
+            int numPartitions = 0;
+            assert !(response instanceof LegacyRemoteDataResponse); // we only use those on the receiving side
+            try (UnfilteredPartitionIterator iterator = response.makeIterator(response.metadata, null))
+            {
+                while (iterator.hasNext())
+                {
+                    try (UnfilteredRowIterator atomIterator = iterator.next())
+                    {
+                        numPartitions++;
+
+                        // we have to fully exhaust the subiterator
+                        while (atomIterator.hasNext())
+                            atomIterator.next();
+                    }
+                }
+            }
+
+            out.writeInt(numPartitions);
+
+            try (UnfilteredPartitionIterator iterator = response.makeIterator(response.metadata, null))
+            {
+                while (iterator.hasNext())
+                {
+                    try (UnfilteredRowIterator partition = iterator.next())
+                    {
+                        ByteBufferUtil.writeWithShortLength(partition.partitionKey().getKey(), out);
+                        LegacyLayout.serializeAsLegacyPartition(partition, out, version);
+                    }
+                }
+            }
         }
 
         public ReadResponse deserialize(DataInputPlus in, int version) throws IOException
         {
-            // TODO
-            throw new UnsupportedOperationException();
-            //        int rowCount = in.readInt();
-            //        List<Row> rows = new ArrayList<Row>(rowCount);
-            //        for (int i = 0; i < rowCount; i++)
-            //            rows.add(Row.serializer.deserialize(in, version));
-            //        return new RangeSliceReply(rows);
+            // Contrarily to serialize, we have to read the number of serialized partitions here.
+            int partitionCount = in.readInt();
+            ArrayList<ArrayBackedPartition> partitions = new ArrayList<>(partitionCount);
+            for (int i = 0; i < partitionCount; i++)
+            {
+                ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
+                try (UnfilteredRowIterator partition = LegacyLayout.deserializeLegacyPartition(in, version, SerializationHelper.Flag.FROM_REMOTE, key))
+                {
+                    partitions.add(ArrayBackedPartition.create(partition));
+                }
+            }
+            return new LegacyRemoteDataResponse(partitions);
         }
 
         public long serializedSize(ReadResponse response, int version)
         {
-            // TODO
-            throw new UnsupportedOperationException();
-            //        int size = TypeSizes.sizeof(rsr.rows.size());
-            //        for (Row row : rsr.rows)
-            //            size += Row.serializer.serializedSize(row, version);
-            //        return size;
+            assert version < MessagingService.VERSION_30;
+            long size = TypeSizes.sizeof(0);  // number of partitions
+
+            assert !(response instanceof LegacyRemoteDataResponse); // we only use those on the receiving side
+            try (UnfilteredPartitionIterator iterator = response.makeIterator(response.metadata, null))
+            {
+                while (iterator.hasNext())
+                {
+                    try (UnfilteredRowIterator partition = iterator.next())
+                    {
+                        size += ByteBufferUtil.serializedSizeWithShortLength(partition.partitionKey().getKey());
+                        size += LegacyLayout.serializedSizeAsLegacyPartition(partition, version);
+                    }
+                }
+            }
+            return size;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index bb184e8..1b688c9 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -32,6 +32,8 @@ import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.*;
 import org.apache.cassandra.service.pager.*;
 import org.apache.cassandra.tracing.Tracing;
@@ -257,7 +259,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
     private UnfilteredRowIterator getThroughCache(ColumnFamilyStore cfs, OpOrder.Group readOp)
     {
         assert !cfs.isIndex(); // CASSANDRA-5732
-        assert cfs.isRowCacheEnabled() : String.format("Row cache is not enabled on table [" + cfs.name + "]");
+        assert cfs.isRowCacheEnabled() : String.format("Row cache is not enabled on table [%s]", cfs.name);
 
         UUID cfId = metadata().cfId;
         RowCacheKey key = new RowCacheKey(cfId, partitionKey());
@@ -393,6 +395,11 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
                              nowInSec());
     }
 
+    protected MessageOut<ReadCommand> createLegacyMessage()
+    {
+        return new MessageOut<>(MessagingService.Verb.READ, this, legacyReadCommandSerializer);
+    }
+
     protected void appendCQLWhereClause(StringBuilder sb)
     {
         sb.append(" WHERE ");
@@ -509,5 +516,5 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter
             else
                 return new SinglePartitionSliceCommand(isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, (ClusteringIndexSliceFilter)filter);
         }
-    };
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/Slice.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Slice.java b/src/java/org/apache/cassandra/db/Slice.java
index 2ffb91e..7fde45e 100644
--- a/src/java/org/apache/cassandra/db/Slice.java
+++ b/src/java/org/apache/cassandra/db/Slice.java
@@ -19,15 +19,12 @@ package org.apache.cassandra.db;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.security.MessageDigest;
 import java.util.*;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.ObjectSizes;
 
 /**
  * A slice represents the selection of a range of rows.
@@ -83,11 +80,10 @@ public class Slice
     public static Slice make(ClusteringComparator comparator, Object... values)
     {
         CBuilder builder = CBuilder.create(comparator);
-        for (int i = 0; i < values.length; i++)
+        for (Object val : values)
         {
-            Object val = values[i];
             if (val instanceof ByteBuffer)
-                builder.add((ByteBuffer)val);
+                builder.add((ByteBuffer) val);
             else
                 builder.add(val);
         }
@@ -208,6 +204,9 @@ public class Slice
      */
     public Slice forPaging(ClusteringComparator comparator, Clustering lastReturned, boolean inclusive, boolean reversed)
     {
+        if (lastReturned == null)
+            return this;
+
         if (reversed)
         {
             int cmp = comparator.compare(lastReturned, start);
@@ -286,14 +285,14 @@ public class Slice
         for (int i = 0; i < start.size(); i++)
         {
             if (i > 0)
-                sb.append(":");
+                sb.append(':');
             sb.append(comparator.subtype(i).getString(start.get(i)));
         }
         sb.append(", ");
         for (int i = 0; i < end.size(); i++)
         {
             if (i > 0)
-                sb.append(":");
+                sb.append(':');
             sb.append(comparator.subtype(i).getString(end.get(i)));
         }
         sb.append(end.isInclusive() ? "]" : ")");
@@ -394,14 +393,37 @@ public class Slice
             return create(Kind.EXCL_END_BOUND, values);
         }
 
+        public static Bound inclusiveStartOf(ClusteringPrefix prefix)
+        {
+            ByteBuffer[] values = new ByteBuffer[prefix.size()];
+            for (int i = 0; i < prefix.size(); i++)
+                values[i] = prefix.get(i);
+            return inclusiveStartOf(values);
+        }
+
+        public static Bound exclusiveStartOf(ClusteringPrefix prefix)
+        {
+            ByteBuffer[] values = new ByteBuffer[prefix.size()];
+            for (int i = 0; i < prefix.size(); i++)
+                values[i] = prefix.get(i);
+            return exclusiveStartOf(values);
+        }
+
+        public static Bound inclusiveEndOf(ClusteringPrefix prefix)
+        {
+            ByteBuffer[] values = new ByteBuffer[prefix.size()];
+            for (int i = 0; i < prefix.size(); i++)
+                values[i] = prefix.get(i);
+            return inclusiveEndOf(values);
+        }
+
         public static Bound create(ClusteringComparator comparator, boolean isStart, boolean isInclusive, Object... values)
         {
             CBuilder builder = CBuilder.create(comparator);
-            for (int i = 0; i < values.length; i++)
+            for (Object val : values)
             {
-                Object val = values[i];
                 if (val instanceof ByteBuffer)
-                    builder.add((ByteBuffer)val);
+                    builder.add((ByteBuffer) val);
                 else
                     builder.add(val);
             }
@@ -483,14 +505,14 @@ public class Slice
         public String toString(ClusteringComparator comparator)
         {
             StringBuilder sb = new StringBuilder();
-            sb.append(kind()).append("(");
+            sb.append(kind()).append('(');
             for (int i = 0; i < size(); i++)
             {
                 if (i > 0)
                     sb.append(", ");
                 sb.append(comparator.subtype(i).getString(get(i)));
             }
-            return sb.append(")").toString();
+            return sb.append(')').toString();
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java b/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java
index ed7584b..51e9d8e 100644
--- a/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java
@@ -28,23 +28,8 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 
 public abstract class AbstractClusteringIndexFilter implements ClusteringIndexFilter
 {
-    protected enum Kind
-    {
-        SLICE (ClusteringIndexSliceFilter.deserializer),
-        NAMES (ClusteringIndexNamesFilter.deserializer);
-
-        private final InternalDeserializer deserializer;
-
-        private Kind(InternalDeserializer deserializer)
-        {
-            this.deserializer = deserializer;
-        }
-    }
-
     static final Serializer serializer = new FilterSerializer();
 
-    abstract Kind kind();
-
     protected final boolean reversed;
 
     protected AbstractClusteringIndexFilter(boolean reversed)
@@ -101,9 +86,4 @@ public abstract class AbstractClusteringIndexFilter implements ClusteringIndexFi
                  + filter.serializedSizeInternal(version);
         }
     }
-
-    protected static abstract class InternalDeserializer
-    {
-        public abstract ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata, boolean reversed) throws IOException;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java
index 33a0917..e3f824f 100644
--- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java
@@ -39,6 +39,24 @@ public interface ClusteringIndexFilter
 {
     public static Serializer serializer = AbstractClusteringIndexFilter.serializer;
 
+    public enum Kind
+    {
+        SLICE (ClusteringIndexSliceFilter.deserializer),
+        NAMES (ClusteringIndexNamesFilter.deserializer);
+
+        protected final InternalDeserializer deserializer;
+
+        private Kind(InternalDeserializer deserializer)
+        {
+            this.deserializer = deserializer;
+        }
+    }
+
+    static interface InternalDeserializer
+    {
+        public ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata, boolean reversed) throws IOException;
+    }
+
     /**
      * Whether the filter query rows in reversed clustering order or not.
      *
@@ -140,6 +158,8 @@ public interface ClusteringIndexFilter
      */
     public boolean shouldInclude(SSTableReader sstable);
 
+    public Kind kind();
+
     public String toString(CFMetaData metadata);
     public String toCQLString(CFMetaData metadata);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
index f2c81a7..e0bc533 100644
--- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
@@ -232,7 +232,7 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter
         return sb.toString();
     }
 
-    Kind kind()
+    public Kind kind()
     {
         return Kind.NAMES;
     }
@@ -254,7 +254,7 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter
         return size;
     }
 
-    private static class NamesDeserializer extends InternalDeserializer
+    private static class NamesDeserializer implements InternalDeserializer
     {
         public ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata, boolean reversed) throws IOException
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
index 4f0e4e2..b2d529c 100644
--- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java
@@ -153,7 +153,7 @@ public class ClusteringIndexSliceFilter extends AbstractClusteringIndexFilter
         return sb.toString();
     }
 
-    Kind kind()
+    public Kind kind()
     {
         return Kind.SLICE;
     }
@@ -168,7 +168,7 @@ public class ClusteringIndexSliceFilter extends AbstractClusteringIndexFilter
         return Slices.serializer.serializedSize(slices, version);
     }
 
-    private static class SliceDeserializer extends InternalDeserializer
+    private static class SliceDeserializer implements InternalDeserializer
     {
         public ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata, boolean reversed) throws IOException
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index d2cb87d..2afc785 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@ -313,6 +313,9 @@ public class ColumnFilter
             return "";
 
         Iterator<ColumnDefinition> defs = selection.selectOrderIterator();
+        if (!defs.hasNext())
+            return "<none>";
+
         StringBuilder sb = new StringBuilder();
         appendColumnDef(sb, defs.next());
         while (defs.hasNext())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/filter/DataLimits.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java b/src/java/org/apache/cassandra/db/filter/DataLimits.java
index 458ee30..3e608b4 100644
--- a/src/java/org/apache/cassandra/db/filter/DataLimits.java
+++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java
@@ -62,7 +62,7 @@ public abstract class DataLimits
     // partition (see SelectStatement.makeFilter). So an "unbounded" distinct is still actually doing some filtering.
     public static final DataLimits DISTINCT_NONE = new CQLLimits(Integer.MAX_VALUE, 1, true);
 
-    private enum Kind { CQL_LIMIT, CQL_PAGING_LIMIT, THRIFT_LIMIT, SUPER_COLUMN_COUNTING_LIMIT }
+    public enum Kind { CQL_LIMIT, CQL_PAGING_LIMIT, THRIFT_LIMIT, SUPER_COLUMN_COUNTING_LIMIT }
 
     public static DataLimits cqlLimits(int cqlRowLimit)
     {
@@ -89,7 +89,7 @@ public abstract class DataLimits
         return new SuperColumnCountingLimits(partitionLimit, cellPerPartitionLimit);
     }
 
-    protected abstract Kind kind();
+    public abstract Kind kind();
 
     public abstract boolean isUnlimited();
 
@@ -199,7 +199,7 @@ public abstract class DataLimits
             return new CQLLimits(rowLimit, 1, true);
         }
 
-        protected Kind kind()
+        public Kind kind()
         {
             return Kind.CQL_LIMIT;
         }
@@ -368,7 +368,7 @@ public abstract class DataLimits
         }
 
         @Override
-        protected Kind kind()
+        public Kind kind()
         {
             return Kind.CQL_PAGING_LIMIT;
         }
@@ -432,7 +432,7 @@ public abstract class DataLimits
             this.cellPerPartitionLimit = cellPerPartitionLimit;
         }
 
-        protected Kind kind()
+        public Kind kind()
         {
             return Kind.THRIFT_LIMIT;
         }
@@ -588,7 +588,7 @@ public abstract class DataLimits
             super(partitionLimit, cellPerPartitionLimit);
         }
 
-        protected Kind kind()
+        public Kind kind()
         {
             return Kind.SUPER_COLUMN_COUNTING_LIMIT;
         }