You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:48:02 UTC

[38/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/MultiCBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MultiCBuilder.java b/src/java/org/apache/cassandra/db/MultiCBuilder.java
new file mode 100644
index 0000000..36a03ba
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/MultiCBuilder.java
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import static java.util.Collections.singletonList;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Builder that allow to build multiple Clustering/Slice.Bound at the same time.
+ */
+public abstract class MultiCBuilder
+{
+    /**
+     * Creates a new empty {@code MultiCBuilder}.
+     */
+    public static MultiCBuilder create(ClusteringComparator comparator)
+    {
+        return new ConcreteMultiCBuilder(comparator);
+    }
+
+    /**
+     * Wraps an existing {@code CBuilder} to provide him with a MultiCBuilder interface
+     * for the sake of passing it to {@link Restriction.appendTo}. The resulting
+     * {@code MultiCBuilder} will still only be able to create a single clustering/bound
+     * and an {@code IllegalArgumentException} will be thrown if elements that added that
+     * would correspond to building multiple clusterings.
+     */
+    public static MultiCBuilder wrap(final CBuilder builder)
+    {
+        return new MultiCBuilder()
+        {
+            private boolean containsNull;
+            private boolean containsUnset;
+            private boolean hasMissingElements;
+
+            public MultiCBuilder addElementToAll(ByteBuffer value)
+            {
+                builder.add(value);
+
+                if (value == null)
+                    containsNull = true;
+                if (value == ByteBufferUtil.UNSET_BYTE_BUFFER)
+                    containsUnset = true;
+
+                return this;
+            }
+
+            public MultiCBuilder addEachElementToAll(List<ByteBuffer> values)
+            {
+                if (values.isEmpty())
+                {
+                    hasMissingElements = true;
+                    return this;
+                }
+
+                if (values.size() > 1)
+                    throw new IllegalArgumentException();
+
+                return addElementToAll(values.get(0));
+            }
+
+            public MultiCBuilder addAllElementsToAll(List<List<ByteBuffer>> values)
+            {
+                if (values.isEmpty())
+                {
+                    hasMissingElements = true;
+                    return this;
+                }
+
+                if (values.size() > 1)
+                    throw new IllegalArgumentException();
+
+                return addEachElementToAll(values.get(0));
+            }
+
+            public int remainingCount()
+            {
+                return builder.remainingCount();
+            }
+
+            public boolean containsNull()
+            {
+                return containsNull;
+            }
+
+            public boolean containsUnset()
+            {
+                return containsUnset;
+            }
+
+            public boolean hasMissingElements()
+            {
+                return hasMissingElements;
+            }
+
+            public NavigableSet<Clustering> build()
+            {
+                return FBUtilities.singleton(builder.build(), builder.comparator());
+            }
+
+            public SortedSet<Slice.Bound> buildBound(boolean isStart, boolean isInclusive)
+            {
+                return FBUtilities.singleton(builder.buildBound(isStart, isInclusive), builder.comparator());
+            }
+        };
+    }
+
+    /**
+     * Adds the specified element to all the clusterings.
+     * <p>
+     * If this builder contains 2 clustering: A-B and A-C a call to this method to add D will result in the clusterings:
+     * A-B-D and A-C-D.
+     * </p>
+     *
+     * @param value the value of the next element
+     * @return this <code>MulitCBuilder</code>
+     */
+    public abstract MultiCBuilder addElementToAll(ByteBuffer value);
+
+    /**
+     * Adds individually each of the specified elements to the end of all of the existing clusterings.
+     * <p>
+     * If this builder contains 2 clusterings: A-B and A-C a call to this method to add D and E will result in the 4
+     * clusterings: A-B-D, A-B-E, A-C-D and A-C-E.
+     * </p>
+     *
+     * @param values the elements to add
+     * @return this <code>CompositeBuilder</code>
+     */
+    public abstract MultiCBuilder addEachElementToAll(List<ByteBuffer> values);
+
+    /**
+     * Adds individually each of the specified list of elements to the end of all of the existing composites.
+     * <p>
+     * If this builder contains 2 composites: A-B and A-C a call to this method to add [[D, E], [F, G]] will result in the 4
+     * composites: A-B-D-E, A-B-F-G, A-C-D-E and A-C-F-G.
+     * </p>
+     *
+     * @param values the elements to add
+     * @return this <code>CompositeBuilder</code>
+     */
+    public abstract MultiCBuilder addAllElementsToAll(List<List<ByteBuffer>> values);
+
+    /**
+     * Returns the number of elements that can be added to the clusterings.
+     *
+     * @return the number of elements that can be added to the clusterings.
+     */
+    public abstract int remainingCount();
+
+    /**
+     * Checks if the clusterings contains null elements.
+     *
+     * @return <code>true</code> if the clusterings contains <code>null</code> elements, <code>false</code> otherwise.
+     */
+    public abstract boolean containsNull();
+
+    /**
+     * Checks if the clusterings contains unset elements.
+     *
+     * @return <code>true</code> if the clusterings contains <code>unset</code> elements, <code>false</code> otherwise.
+     */
+    public abstract boolean containsUnset();
+
+    /**
+     * Checks if some empty list of values have been added
+     * @return <code>true</code> if the clusterings have some missing elements, <code>false</code> otherwise.
+     */
+    public abstract boolean hasMissingElements();
+
+    /**
+     * Builds the <code>clusterings</code>.
+     *
+     * @return the clusterings
+     */
+    public abstract NavigableSet<Clustering> build();
+
+    /**
+     * Builds the <code>clusterings</code> with the specified EOC.
+     *
+     * @return the clusterings
+     */
+    public abstract SortedSet<Slice.Bound> buildBound(boolean isStart, boolean isInclusive);
+
+    /**
+     * Checks if some elements can still be added to the clusterings.
+     *
+     * @return <code>true</code> if it is possible to add more elements to the clusterings, <code>false</code> otherwise.
+     */
+    public boolean hasRemaining()
+    {
+        return remainingCount() > 0;
+    }
+
+
+    private static class ConcreteMultiCBuilder extends MultiCBuilder
+    {
+        /**
+         * The table comparator.
+         */
+        private final ClusteringComparator comparator;
+
+        /**
+         * The elements of the clusterings
+         */
+        private final List<List<ByteBuffer>> elementsList = new ArrayList<>();
+
+        /**
+         * The number of elements that have been added.
+         */
+        private int size;
+
+        /**
+         * <code>true</code> if the clusterings have been build, <code>false</code> otherwise.
+         */
+        private boolean built;
+
+        /**
+         * <code>true</code> if the clusterings contains some <code>null</code> elements.
+         */
+        private boolean containsNull;
+
+        /**
+         * <code>true</code> if the composites contains some <code>unset</code> elements.
+         */
+        private boolean containsUnset;
+
+        /**
+         * <code>true</code> if some empty collection have been added.
+         */
+        private boolean hasMissingElements;
+
+        public ConcreteMultiCBuilder(ClusteringComparator comparator)
+        {
+            this.comparator = comparator;
+        }
+
+        public MultiCBuilder addElementToAll(ByteBuffer value)
+        {
+            checkUpdateable();
+
+            if (isEmpty())
+                elementsList.add(new ArrayList<ByteBuffer>());
+
+            for (int i = 0, m = elementsList.size(); i < m; i++)
+            {
+                if (value == null)
+                    containsNull = true;
+                if (value == ByteBufferUtil.UNSET_BYTE_BUFFER)
+                    containsUnset = true;
+
+                elementsList.get(i).add(value);
+            }
+            size++;
+            return this;
+        }
+
+        public MultiCBuilder addEachElementToAll(List<ByteBuffer> values)
+        {
+            checkUpdateable();
+
+            if (isEmpty())
+                elementsList.add(new ArrayList<ByteBuffer>());
+
+            if (values.isEmpty())
+            {
+                hasMissingElements = true;
+            }
+            else
+            {
+                for (int i = 0, m = elementsList.size(); i < m; i++)
+                {
+                    List<ByteBuffer> oldComposite = elementsList.remove(0);
+
+                    for (int j = 0, n = values.size(); j < n; j++)
+                    {
+                        List<ByteBuffer> newComposite = new ArrayList<>(oldComposite);
+                        elementsList.add(newComposite);
+
+                        ByteBuffer value = values.get(j);
+
+                        if (value == null)
+                            containsNull = true;
+                        if (value == ByteBufferUtil.UNSET_BYTE_BUFFER)
+                            containsUnset = true;
+
+                        newComposite.add(values.get(j));
+                    }
+                }
+            }
+            size++;
+            return this;
+        }
+
+        public MultiCBuilder addAllElementsToAll(List<List<ByteBuffer>> values)
+        {
+            checkUpdateable();
+
+            if (isEmpty())
+                elementsList.add(new ArrayList<ByteBuffer>());
+
+            if (values.isEmpty())
+            {
+                hasMissingElements = true;
+            }
+            else
+            {
+                for (int i = 0, m = elementsList.size(); i < m; i++)
+                {
+                    List<ByteBuffer> oldComposite = elementsList.remove(0);
+
+                    for (int j = 0, n = values.size(); j < n; j++)
+                    {
+                        List<ByteBuffer> newComposite = new ArrayList<>(oldComposite);
+                        elementsList.add(newComposite);
+
+                        List<ByteBuffer> value = values.get(j);
+
+                        if (value.isEmpty())
+                            hasMissingElements = true;
+
+                        if (value.contains(null))
+                            containsNull = true;
+                        if (value.contains(ByteBufferUtil.UNSET_BYTE_BUFFER))
+                            containsUnset = true;
+
+                        newComposite.addAll(value);
+                    }
+                }
+                size += values.get(0).size();
+            }
+            return this;
+        }
+
+        public int remainingCount()
+        {
+            return comparator.size() - size;
+        }
+
+        /**
+         * Checks if this builder is empty.
+         *
+         * @return <code>true</code> if this builder is empty, <code>false</code> otherwise.
+         */
+        public boolean isEmpty()
+        {
+            return elementsList.isEmpty();
+        }
+
+        public boolean containsNull()
+        {
+            return containsNull;
+        }
+
+        public boolean containsUnset()
+        {
+            return containsUnset;
+        }
+
+        public boolean hasMissingElements()
+        {
+            return hasMissingElements;
+        }
+
+        public NavigableSet<Clustering> build()
+        {
+            built = true;
+
+            if (hasMissingElements)
+                return FBUtilities.emptySortedSet(comparator);
+
+            CBuilder builder = CBuilder.create(comparator);
+
+            if (elementsList.isEmpty())
+                return FBUtilities.singleton(builder.build(), builder.comparator());
+
+            // Use a TreeSet to sort and eliminate duplicates
+            NavigableSet<Clustering> set = new TreeSet<>(builder.comparator());
+
+            for (int i = 0, m = elementsList.size(); i < m; i++)
+            {
+                List<ByteBuffer> elements = elementsList.get(i);
+                set.add(builder.buildWith(elements));
+            }
+            return set;
+        }
+
+        public SortedSet<Slice.Bound> buildBound(boolean isStart, boolean isInclusive)
+        {
+            built = true;
+
+            if (hasMissingElements)
+                return FBUtilities.emptySortedSet(comparator);
+
+            CBuilder builder = CBuilder.create(comparator);
+
+            if (elementsList.isEmpty())
+                return FBUtilities.singleton(builder.buildBound(isStart, isInclusive), comparator);
+
+            // Use a TreeSet to sort and eliminate duplicates
+            SortedSet<Slice.Bound> set = new TreeSet<>(comparator);
+
+            for (int i = 0, m = elementsList.size(); i < m; i++)
+            {
+                List<ByteBuffer> elements = elementsList.get(i);
+                set.add(builder.buildBoundWith(elements, isStart, isInclusive));
+            }
+            return set;
+        }
+
+        private void checkUpdateable()
+        {
+            if (!hasRemaining() || built)
+                throw new IllegalStateException("this builder cannot be updated anymore");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index 9dd1686..355d259 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db;
 
 import java.io.DataInput;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.apache.commons.lang3.StringUtils;
@@ -27,13 +26,15 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,37 +52,27 @@ public class Mutation implements IMutation
     // when we remove it, also restore SerializationsTest.testMutationRead to not regenerate new Mutations each test
     private final String keyspaceName;
 
-    private final ByteBuffer key;
+    private final DecoratedKey key;
     // map of column family id to mutations for that column family.
-    private final Map<UUID, ColumnFamily> modifications;
-
-    public Mutation(String keyspaceName, ByteBuffer key)
-    {
-        this(keyspaceName, key, new HashMap<UUID, ColumnFamily>());
-    }
+    private final Map<UUID, PartitionUpdate> modifications;
 
-    public Mutation(String keyspaceName, ByteBuffer key, ColumnFamily cf)
+    public Mutation(String keyspaceName, DecoratedKey key)
     {
-        this(keyspaceName, key, Collections.singletonMap(cf.id(), cf));
+        this(keyspaceName, key, new HashMap<UUID, PartitionUpdate>());
     }
 
-    public Mutation(String keyspaceName, Row row)
+    public Mutation(PartitionUpdate update)
     {
-        this(keyspaceName, row.key.getKey(), row.cf);
+        this(update.metadata().ksName, update.partitionKey(), Collections.singletonMap(update.metadata().cfId, update));
     }
 
-    protected Mutation(String keyspaceName, ByteBuffer key, Map<UUID, ColumnFamily> modifications)
+    protected Mutation(String keyspaceName, DecoratedKey key, Map<UUID, PartitionUpdate> modifications)
     {
         this.keyspaceName = keyspaceName;
         this.key = key;
         this.modifications = modifications;
     }
 
-    public Mutation(ByteBuffer key, ColumnFamily cf)
-    {
-        this(cf.metadata().ksName, key, cf);
-    }
-
     public Mutation copy()
     {
         Mutation copy = new Mutation(keyspaceName, key, new HashMap<>(modifications));
@@ -98,53 +89,34 @@ public class Mutation implements IMutation
         return modifications.keySet();
     }
 
-    public ByteBuffer key()
+    public DecoratedKey key()
     {
         return key;
     }
 
-    public Collection<ColumnFamily> getColumnFamilies()
+    public Collection<PartitionUpdate> getPartitionUpdates()
     {
         return modifications.values();
     }
 
-    public ColumnFamily getColumnFamily(UUID cfId)
+    public PartitionUpdate getPartitionUpdate(UUID cfId)
     {
         return modifications.get(cfId);
     }
 
-    /*
-     * Specify a column family name and the corresponding column
-     * family object.
-     * param @ cf - column family name
-     * param @ columnFamily - the column family.
-     */
-    public void add(ColumnFamily columnFamily)
+    public Mutation add(PartitionUpdate update)
     {
-        assert columnFamily != null;
-        ColumnFamily prev = modifications.put(columnFamily.id(), columnFamily);
+        assert update != null;
+        PartitionUpdate prev = modifications.put(update.metadata().cfId, update);
         if (prev != null)
             // developer error
-            throw new IllegalArgumentException("Table " + columnFamily + " already has modifications in this mutation: " + prev);
-    }
-
-    /**
-     * @return the ColumnFamily in this Mutation corresponding to @param cfName, creating an empty one if necessary.
-     */
-    public ColumnFamily addOrGet(String cfName)
-    {
-        return addOrGet(Schema.instance.getCFMetaData(keyspaceName, cfName));
+            throw new IllegalArgumentException("Table " + update.metadata().cfName + " already has modifications in this mutation: " + prev);
+        return this;
     }
 
-    public ColumnFamily addOrGet(CFMetaData cfm)
+    public PartitionUpdate get(CFMetaData cfm)
     {
-        ColumnFamily cf = modifications.get(cfm.cfId);
-        if (cf == null)
-        {
-            cf = ArrayBackedSortedColumns.factory.create(cfm);
-            modifications.put(cfm.cfId, cf);
-        }
-        return cf;
+        return modifications.get(cfm.cfId);
     }
 
     public boolean isEmpty()
@@ -152,56 +124,56 @@ public class Mutation implements IMutation
         return modifications.isEmpty();
     }
 
-    public void add(String cfName, CellName name, ByteBuffer value, long timestamp, int timeToLive)
-    {
-        addOrGet(cfName).addColumn(name, value, timestamp, timeToLive);
-    }
-
-    public void addCounter(String cfName, CellName name, long value)
-    {
-        addOrGet(cfName).addCounter(name, value);
-    }
-
-    public void add(String cfName, CellName name, ByteBuffer value, long timestamp)
-    {
-        add(cfName, name, value, timestamp, 0);
-    }
-
-    public void delete(String cfName, long timestamp)
+    /**
+     * Creates a new mutation that merges all the provided mutations.
+     *
+     * @param mutations the mutations to merge together. All mutation must be
+     * on the same keyspace and partition key. There should also be at least one
+     * mutation.
+     * @return a mutation that contains all the modifications contained in {@code mutations}.
+     *
+     * @throws IllegalArgumentException if not all the mutations are on the same
+     * keyspace and key.
+     */
+    public static Mutation merge(List<Mutation> mutations)
     {
-        int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
-        addOrGet(cfName).delete(new DeletionInfo(timestamp, localDeleteTime));
-    }
+        assert !mutations.isEmpty();
 
-    public void delete(String cfName, CellName name, long timestamp)
-    {
-        int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
-        addOrGet(cfName).addTombstone(name, localDeleteTime, timestamp);
-    }
+        if (mutations.size() == 1)
+            return mutations.get(0);
 
-    public void deleteRange(String cfName, Composite start, Composite end, long timestamp)
-    {
-        int localDeleteTime = (int) (System.currentTimeMillis() / 1000);
-        addOrGet(cfName).addAtom(new RangeTombstone(start, end, timestamp, localDeleteTime));
-    }
+        Set<UUID> updatedTables = new HashSet<>();
+        String ks = null;
+        DecoratedKey key = null;
+        for (Mutation mutation : mutations)
+        {
+            updatedTables.addAll(mutation.modifications.keySet());
+            if (ks != null && !ks.equals(mutation.keyspaceName))
+                throw new IllegalArgumentException();
+            if (key != null && !key.equals(mutation.key))
+                throw new IllegalArgumentException();
+            ks = mutation.keyspaceName;
+            key = mutation.key;
+        }
 
-    public void addAll(IMutation m)
-    {
-        if (!(m instanceof Mutation))
-            throw new IllegalArgumentException();
+        List<PartitionUpdate> updates = new ArrayList<>(mutations.size());
+        Map<UUID, PartitionUpdate> modifications = new HashMap<>(updatedTables.size());
+        for (UUID table : updatedTables)
+        {
+            for (Mutation mutation : mutations)
+            {
+                PartitionUpdate upd = mutation.modifications.get(table);
+                if (upd != null)
+                    updates.add(upd);
+            }
 
-        Mutation mutation = (Mutation)m;
-        if (!keyspaceName.equals(mutation.keyspaceName) || !key.equals(mutation.key))
-            throw new IllegalArgumentException();
+            if (updates.isEmpty())
+                continue;
 
-        for (Map.Entry<UUID, ColumnFamily> entry : mutation.modifications.entrySet())
-        {
-            // It's slighty faster to assume the key wasn't present and fix if
-            // not in the case where it wasn't there indeed.
-            ColumnFamily cf = modifications.put(entry.getKey(), entry.getValue());
-            if (cf != null)
-                entry.getValue().addAll(cf);
+            modifications.put(table, updates.size() == 1 ? updates.get(0) : PartitionUpdate.merge(updates));
+            updates.clear();
         }
+        return new Mutation(ks, key, modifications);
     }
 
     /*
@@ -243,7 +215,7 @@ public class Mutation implements IMutation
     {
         StringBuilder buff = new StringBuilder("Mutation(");
         buff.append("keyspace='").append(keyspaceName).append('\'');
-        buff.append(", key='").append(ByteBufferUtil.bytesToHex(key)).append('\'');
+        buff.append(", key='").append(ByteBufferUtil.bytesToHex(key.getKey())).append('\'');
         buff.append(", modifications=[");
         if (shallow)
         {
@@ -256,14 +228,16 @@ public class Mutation implements IMutation
             buff.append(StringUtils.join(cfnames, ", "));
         }
         else
-            buff.append(StringUtils.join(modifications.values(), ", "));
+        {
+            buff.append("\n  ").append(StringUtils.join(modifications.values(), "\n  ")).append("\n");
+        }
         return buff.append("])").toString();
     }
 
     public Mutation without(UUID cfId)
     {
         Mutation mutation = new Mutation(keyspaceName, key);
-        for (Map.Entry<UUID, ColumnFamily> entry : modifications.entrySet())
+        for (Map.Entry<UUID, PartitionUpdate> entry : modifications.entrySet())
             if (!entry.getKey().equals(cfId))
                 mutation.add(entry.getValue());
         return mutation;
@@ -276,58 +250,52 @@ public class Mutation implements IMutation
             if (version < MessagingService.VERSION_20)
                 out.writeUTF(mutation.getKeyspaceName());
 
-            ByteBufferUtil.writeWithShortLength(mutation.key(), out);
+            if (version < MessagingService.VERSION_30)
+                ByteBufferUtil.writeWithShortLength(mutation.key().getKey(), out);
 
             /* serialize the modifications in the mutation */
             int size = mutation.modifications.size();
             out.writeInt(size);
             assert size > 0;
-            for (Map.Entry<UUID, ColumnFamily> entry : mutation.modifications.entrySet())
-                ColumnFamily.serializer.serialize(entry.getValue(), out, version);
+            for (Map.Entry<UUID, PartitionUpdate> entry : mutation.modifications.entrySet())
+                PartitionUpdate.serializer.serialize(entry.getValue(), out, version);
         }
 
-        public Mutation deserialize(DataInput in, int version, ColumnSerializer.Flag flag) throws IOException
+        public Mutation deserialize(DataInput in, int version, SerializationHelper.Flag flag) throws IOException
         {
             String keyspaceName = null; // will always be set from cf.metadata but javac isn't smart enough to see that
             if (version < MessagingService.VERSION_20)
                 keyspaceName = in.readUTF();
 
-            ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
+            DecoratedKey key = null;
+            if (version < MessagingService.VERSION_30)
+                key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in));
+
             int size = in.readInt();
             assert size > 0;
 
-            Map<UUID, ColumnFamily> modifications;
             if (size == 1)
+                return new Mutation(PartitionUpdate.serializer.deserialize(in, version, flag, key));
+
+            Map<UUID, PartitionUpdate> modifications = new HashMap<>(size);
+            PartitionUpdate update = null;
+            for (int i = 0; i < size; ++i)
             {
-                ColumnFamily cf = deserializeOneCf(in, version, flag);
-                modifications = Collections.singletonMap(cf.id(), cf);
-                keyspaceName = cf.metadata().ksName;
-            }
-            else
-            {
-                modifications = new HashMap<UUID, ColumnFamily>(size);
-                for (int i = 0; i < size; ++i)
-                {
-                    ColumnFamily cf = deserializeOneCf(in, version, flag);
-                    modifications.put(cf.id(), cf);
-                    keyspaceName = cf.metadata().ksName;
-                }
+                update = PartitionUpdate.serializer.deserialize(in, version, flag, key);
+                modifications.put(update.metadata().cfId, update);
             }
 
-            return new Mutation(keyspaceName, key, modifications);
-        }
+            if (keyspaceName == null)
+                keyspaceName = update.metadata().ksName;
+            if (key == null)
+                key = update.partitionKey();
 
-        private ColumnFamily deserializeOneCf(DataInput in, int version, ColumnSerializer.Flag flag) throws IOException
-        {
-            ColumnFamily cf = ColumnFamily.serializer.deserialize(in, ArrayBackedSortedColumns.factory, flag, version);
-            // We don't allow Mutation with null column family, so we should never get null back.
-            assert cf != null;
-            return cf;
+            return new Mutation(keyspaceName, key, modifications);
         }
 
         public Mutation deserialize(DataInput in, int version) throws IOException
         {
-            return deserialize(in, version, ColumnSerializer.Flag.FROM_REMOTE);
+            return deserialize(in, version, SerializationHelper.Flag.FROM_REMOTE);
         }
 
         public long serializedSize(Mutation mutation, int version)
@@ -338,12 +306,15 @@ public class Mutation implements IMutation
             if (version < MessagingService.VERSION_20)
                 size += sizes.sizeof(mutation.getKeyspaceName());
 
-            int keySize = mutation.key().remaining();
-            size += sizes.sizeof((short) keySize) + keySize;
+            if (version < MessagingService.VERSION_30)
+            {
+                int keySize = mutation.key().getKey().remaining();
+                size += sizes.sizeof((short) keySize) + keySize;
+            }
 
             size += sizes.sizeof(mutation.modifications.size());
-            for (Map.Entry<UUID,ColumnFamily> entry : mutation.modifications.entrySet())
-                size += ColumnFamily.serializer.serializedSize(entry.getValue(), TypeSizes.NATIVE, version);
+            for (Map.Entry<UUID, PartitionUpdate> entry : mutation.modifications.entrySet())
+                size += PartitionUpdate.serializer.serializedSize(entry.getValue(), version, sizes);
 
             return size;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/NativeCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/NativeCell.java b/src/java/org/apache/cassandra/db/NativeCell.java
deleted file mode 100644
index dac5674..0000000
--- a/src/java/org/apache/cassandra/db/NativeCell.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.security.MessageDigest;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.ObjectSizes;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.memory.MemtableAllocator;
-import org.apache.cassandra.utils.memory.NativeAllocator;
-
-public class NativeCell extends AbstractNativeCell
-{
-    private static final long SIZE = ObjectSizes.measure(new NativeCell());
-
-    NativeCell()
-    {}
-
-    public NativeCell(NativeAllocator allocator, OpOrder.Group writeOp, Cell copyOf)
-    {
-        super(allocator, writeOp, copyOf);
-    }
-
-    @Override
-    public CellName name()
-    {
-        return this;
-    }
-
-    @Override
-    public long timestamp()
-    {
-        return getLong(TIMESTAMP_OFFSET);
-    }
-
-    @Override
-    public Cell localCopy(CFMetaData metadata, AbstractAllocator allocator)
-    {
-        return new BufferCell(copy(metadata, allocator), allocator.clone(value()), timestamp());
-    }
-
-    @Override
-    public Cell localCopy(CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group opGroup)
-    {
-        return allocator.clone(this, metadata, opGroup);
-    }
-
-    @Override
-    public void updateDigest(MessageDigest digest)
-    {
-        updateWithName(digest);  // name
-        updateWithValue(digest); // value
-
-        FBUtilities.updateWithLong(digest, timestamp());
-        FBUtilities.updateWithByte(digest, serializationFlags());
-    }
-
-    @Override
-    public long unsharedHeapSizeExcludingData()
-    {
-        return SIZE;
-    }
-
-    @Override
-    public long unsharedHeapSize()
-    {
-        return SIZE;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/NativeCounterCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/NativeCounterCell.java b/src/java/org/apache/cassandra/db/NativeCounterCell.java
deleted file mode 100644
index c16cc44..0000000
--- a/src/java/org/apache/cassandra/db/NativeCounterCell.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.security.MessageDigest;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.ObjectSizes;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.memory.MemtableAllocator;
-import org.apache.cassandra.utils.memory.NativeAllocator;
-
-public class NativeCounterCell extends NativeCell implements CounterCell
-{
-    private static final long SIZE = ObjectSizes.measure(new NativeCounterCell());
-
-    private NativeCounterCell()
-    {}
-
-    public NativeCounterCell(NativeAllocator allocator, OpOrder.Group writeOp, CounterCell copyOf)
-    {
-        super(allocator, writeOp, copyOf);
-    }
-
-    @Override
-    protected void construct(Cell from)
-    {
-        super.construct(from);
-        setLong(internalSize() - 8, ((CounterCell) from).timestampOfLastDelete());
-    }
-
-    @Override
-    protected int postfixSize()
-    {
-        return 8;
-    }
-
-    @Override
-    protected int sizeOf(Cell cell)
-    {
-        return 8 + super.sizeOf(cell);
-    }
-
-    @Override
-    public long timestampOfLastDelete()
-    {
-        return getLong(internalSize() - 8);
-    }
-
-    @Override
-    public long total()
-    {
-        return contextManager.total(value());
-    }
-
-    @Override
-    public boolean hasLegacyShards()
-    {
-        return contextManager.hasLegacyShards(value());
-    }
-
-    @Override
-    public Cell markLocalToBeCleared()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Cell diff(Cell cell)
-    {
-        return diffCounter(cell);
-    }
-
-    @Override
-    public Cell reconcile(Cell cell)
-    {
-        return reconcileCounter(cell);
-    }
-
-    @Override
-    public int serializationFlags()
-    {
-        return ColumnSerializer.COUNTER_MASK;
-    }
-
-    @Override
-    public int cellDataSize()
-    {
-        // A counter column adds 8 bytes for timestampOfLastDelete to Cell.
-        return super.cellDataSize() + TypeSizes.NATIVE.sizeof(timestampOfLastDelete());
-    }
-
-    @Override
-    public int serializedSize(CellNameType type, TypeSizes typeSizes)
-    {
-        return super.serializedSize(type, typeSizes) + typeSizes.sizeof(timestampOfLastDelete());
-    }
-
-    @Override
-    public void validateFields(CFMetaData metadata) throws MarshalException
-    {
-        validateName(metadata);
-        // We cannot use the value validator as for other columns as the CounterColumnType validate a long,
-        // which is not the internal representation of counters
-        contextManager.validateContext(value());
-    }
-
-    /*
-     * We have to special case digest creation for counter column because
-     * we don't want to include the information about which shard of the
-     * context is a delta or not, since this information differs from node to
-     * node.
-     */
-    @Override
-    public void updateDigest(MessageDigest digest)
-    {
-        updateWithName(digest);
-
-        // We don't take the deltas into account in a digest
-        contextManager.updateDigest(digest, value());
-
-        FBUtilities.updateWithLong(digest, timestamp());
-        FBUtilities.updateWithByte(digest, serializationFlags());
-        FBUtilities.updateWithLong(digest, timestampOfLastDelete());
-    }
-
-    @Override
-    public String getString(CellNameType comparator)
-    {
-        return String.format("%s(%s:false:%s@%d!%d)",
-                             getClass().getSimpleName(),
-                             comparator.getString(name()),
-                             contextManager.toString(value()),
-                             timestamp(),
-                             timestampOfLastDelete());
-    }
-
-    @Override
-    public CounterCell localCopy(CFMetaData metadata, AbstractAllocator allocator)
-    {
-        return new BufferCounterCell(copy(metadata, allocator), allocator.clone(value()), timestamp(), timestampOfLastDelete());
-    }
-
-    @Override
-    public CounterCell localCopy(CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group opGroup)
-    {
-        return allocator.clone(this, metadata, opGroup);
-    }
-
-    @Override
-    public long unsharedHeapSizeExcludingData()
-    {
-        return SIZE;
-    }
-
-    @Override
-    public long unsharedHeapSize()
-    {
-        return SIZE;
-    }
-
-    @Override
-    public boolean equals(Cell cell)
-    {
-        return super.equals(cell) && timestampOfLastDelete() == ((CounterCell) cell).timestampOfLastDelete();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/NativeDeletedCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/NativeDeletedCell.java b/src/java/org/apache/cassandra/db/NativeDeletedCell.java
deleted file mode 100644
index 6bdef43..0000000
--- a/src/java/org/apache/cassandra/db/NativeDeletedCell.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.security.MessageDigest;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.ObjectSizes;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.memory.MemoryUtil;
-import org.apache.cassandra.utils.memory.MemtableAllocator;
-import org.apache.cassandra.utils.memory.NativeAllocator;
-
-public class NativeDeletedCell extends NativeCell implements DeletedCell
-{
-    private static final long SIZE = ObjectSizes.measure(new NativeDeletedCell());
-
-    private NativeDeletedCell()
-    {}
-
-    public NativeDeletedCell(NativeAllocator allocator, OpOrder.Group writeOp, DeletedCell copyOf)
-    {
-        super(allocator, writeOp, copyOf);
-    }
-
-    @Override
-    public Cell reconcile(Cell cell)
-    {
-        if (cell instanceof DeletedCell)
-            return super.reconcile(cell);
-        return cell.reconcile(this);
-    }
-
-    @Override
-    public boolean isLive()
-    {
-        return false;
-    }
-
-    @Override
-    public boolean isLive(long now)
-    {
-        return false;
-    }
-
-    @Override
-    public int getLocalDeletionTime()
-    {
-        int v = getInt(valueStartOffset());
-        return MemoryUtil.INVERTED_ORDER ? Integer.reverseBytes(v) : v;
-    }
-
-    @Override
-    public int serializationFlags()
-    {
-        return ColumnSerializer.DELETION_MASK;
-    }
-
-    @Override
-    public void validateFields(CFMetaData metadata) throws MarshalException
-    {
-        validateName(metadata);
-
-        if ((int) (internalSize() - valueStartOffset()) != 4)
-            throw new MarshalException("A tombstone value should be 4 bytes long");
-        if (getLocalDeletionTime() < 0)
-            throw new MarshalException("The local deletion time should not be negative");
-    }
-
-    @Override
-    public void updateDigest(MessageDigest digest)
-    {
-        updateWithName(digest);
-        FBUtilities.updateWithLong(digest, timestamp());
-        FBUtilities.updateWithByte(digest, serializationFlags());
-    }
-
-    @Override
-    public DeletedCell localCopy(CFMetaData metadata, AbstractAllocator allocator)
-    {
-        return new BufferDeletedCell(copy(metadata, allocator), allocator.clone(value()), timestamp());
-    }
-
-    @Override
-    public DeletedCell localCopy(CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group opGroup)
-    {
-        return allocator.clone(this, metadata, opGroup);
-    }
-
-    @Override
-    public long unsharedHeapSizeExcludingData()
-    {
-        return SIZE;
-    }
-
-    @Override
-    public long unsharedHeapSize()
-    {
-        return SIZE;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/NativeExpiringCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/NativeExpiringCell.java b/src/java/org/apache/cassandra/db/NativeExpiringCell.java
deleted file mode 100644
index 6369536..0000000
--- a/src/java/org/apache/cassandra/db/NativeExpiringCell.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.security.MessageDigest;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.ObjectSizes;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.memory.MemtableAllocator;
-import org.apache.cassandra.utils.memory.NativeAllocator;
-
-public class NativeExpiringCell extends NativeCell implements ExpiringCell
-{
-    private static final long SIZE = ObjectSizes.measure(new NativeExpiringCell());
-
-    private NativeExpiringCell()
-    {}
-
-    public NativeExpiringCell(NativeAllocator allocator, OpOrder.Group writeOp, ExpiringCell copyOf)
-    {
-        super(allocator, writeOp, copyOf);
-    }
-
-    @Override
-    protected int sizeOf(Cell cell)
-    {
-        return super.sizeOf(cell) + 8;
-    }
-
-    @Override
-    protected void construct(Cell from)
-    {
-        ExpiringCell expiring = (ExpiringCell) from;
-
-        setInt(internalSize() - 4, expiring.getTimeToLive());
-        setInt(internalSize() - 8, expiring.getLocalDeletionTime());
-        super.construct(from);
-    }
-
-    @Override
-    protected int postfixSize()
-    {
-        return 8;
-    }
-
-    @Override
-    public int getTimeToLive()
-    {
-        return getInt(internalSize() - 4);
-    }
-
-    @Override
-    public int getLocalDeletionTime()
-    {
-        return getInt(internalSize() - 8);
-    }
-
-    @Override
-    public boolean isLive()
-    {
-        return isLive(System.currentTimeMillis());
-    }
-
-    @Override
-    public boolean isLive(long now)
-    {
-        return (int) (now / 1000) < getLocalDeletionTime();
-    }
-
-    @Override
-    public int serializationFlags()
-    {
-        return ColumnSerializer.EXPIRATION_MASK;
-    }
-
-    @Override
-    public int cellDataSize()
-    {
-        return super.cellDataSize() + TypeSizes.NATIVE.sizeof(getLocalDeletionTime()) + TypeSizes.NATIVE.sizeof(getTimeToLive());
-    }
-
-    @Override
-    public int serializedSize(CellNameType type, TypeSizes typeSizes)
-    {
-        /*
-         * An expired column adds to a Cell :
-         *    4 bytes for the localExpirationTime
-         *  + 4 bytes for the timeToLive
-        */
-        return super.serializedSize(type, typeSizes) + typeSizes.sizeof(getLocalDeletionTime()) + typeSizes.sizeof(getTimeToLive());
-    }
-
-    @Override
-    public void validateFields(CFMetaData metadata) throws MarshalException
-    {
-        super.validateFields(metadata);
-
-        if (getTimeToLive() <= 0)
-            throw new MarshalException("A column TTL should be > 0");
-        if (getLocalDeletionTime() < 0)
-            throw new MarshalException("The local expiration time should not be negative");
-    }
-
-    @Override
-    public void updateDigest(MessageDigest digest)
-    {
-        super.updateDigest(digest);
-        FBUtilities.updateWithInt(digest, getTimeToLive());
-    }
-
-    @Override
-    public Cell reconcile(Cell cell)
-    {
-        long ts1 = timestamp(), ts2 = cell.timestamp();
-        if (ts1 != ts2)
-            return ts1 < ts2 ? cell : this;
-        // we should prefer tombstones
-        if (cell instanceof DeletedCell)
-            return cell;
-        int c = value().compareTo(cell.value());
-        if (c != 0)
-            return c < 0 ? cell : this;
-        // If we have same timestamp and value, prefer the longest ttl
-        if (cell instanceof ExpiringCell)
-        {
-            int let1 = getLocalDeletionTime(), let2 = cell.getLocalDeletionTime();
-            if (let1 < let2)
-                return cell;
-        }
-        return this;
-    }
-
-    public boolean equals(Cell cell)
-    {
-        if (!super.equals(cell))
-            return false;
-        ExpiringCell that = (ExpiringCell) cell;
-        return getLocalDeletionTime() == that.getLocalDeletionTime() && getTimeToLive() == that.getTimeToLive();
-    }
-
-    @Override
-    public String getString(CellNameType comparator)
-    {
-        return String.format("%s(%s!%d)", getClass().getSimpleName(), super.getString(comparator), getTimeToLive());
-    }
-
-    @Override
-    public ExpiringCell localCopy(CFMetaData metadata, AbstractAllocator allocator)
-    {
-        return new BufferExpiringCell(name().copy(metadata, allocator), allocator.clone(value()), timestamp(), getTimeToLive(), getLocalDeletionTime());
-    }
-
-    @Override
-    public ExpiringCell localCopy(CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group opGroup)
-    {
-        return allocator.clone(this, metadata, opGroup);
-    }
-
-    @Override
-    public long unsharedHeapSizeExcludingData()
-    {
-        return SIZE;
-    }
-
-    @Override
-    public long unsharedHeapSize()
-    {
-        return SIZE;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/OnDiskAtom.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/OnDiskAtom.java b/src/java/org/apache/cassandra/db/OnDiskAtom.java
deleted file mode 100644
index f5eddb9..0000000
--- a/src/java/org/apache/cassandra/db/OnDiskAtom.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.*;
-import java.security.MessageDigest;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.io.ISSTableSerializer;
-import org.apache.cassandra.io.sstable.format.Version;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.serializers.MarshalException;
-
-public interface OnDiskAtom
-{
-    public Composite name();
-
-    /**
-     * For a standard column, this is the same as timestamp().
-     * For a super column, this is the min/max column timestamp of the sub columns.
-     */
-    public long timestamp();
-    public int getLocalDeletionTime(); // for tombstone GC, so int is sufficient granularity
-
-    public void validateFields(CFMetaData metadata) throws MarshalException;
-    public void updateDigest(MessageDigest digest);
-
-    public static class Serializer implements ISSTableSerializer<OnDiskAtom>
-    {
-        private final CellNameType type;
-
-        public Serializer(CellNameType type)
-        {
-            this.type = type;
-        }
-
-        public void serializeForSSTable(OnDiskAtom atom, DataOutputPlus out) throws IOException
-        {
-            if (atom instanceof Cell)
-            {
-                type.columnSerializer().serialize((Cell)atom, out);
-            }
-            else
-            {
-                assert atom instanceof RangeTombstone;
-                type.rangeTombstoneSerializer().serializeForSSTable((RangeTombstone)atom, out);
-            }
-        }
-
-        public OnDiskAtom deserializeFromSSTable(DataInput in, Version version) throws IOException
-        {
-            return deserializeFromSSTable(in, ColumnSerializer.Flag.LOCAL, Integer.MIN_VALUE, version);
-        }
-
-        public OnDiskAtom deserializeFromSSTable(DataInput in, ColumnSerializer.Flag flag, int expireBefore, Version version) throws IOException
-        {
-            Composite name = type.serializer().deserialize(in);
-            if (name.isEmpty())
-            {
-                // SSTableWriter.END_OF_ROW
-                return null;
-            }
-
-            int b = in.readUnsignedByte();
-            if ((b & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0)
-                return type.rangeTombstoneSerializer().deserializeBody(in, name, version);
-            else
-                return type.columnSerializer().deserializeColumnBody(in, (CellName)name, b, flag, expireBefore);
-        }
-
-        public long serializedSizeForSSTable(OnDiskAtom atom)
-        {
-            if (atom instanceof Cell)
-            {
-                return type.columnSerializer().serializedSize((Cell)atom, TypeSizes.NATIVE);
-            }
-            else
-            {
-                assert atom instanceof RangeTombstone;
-                return type.rangeTombstoneSerializer().serializedSizeForSSTable((RangeTombstone)atom);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/PagedRangeCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PagedRangeCommand.java b/src/java/org/apache/cassandra/db/PagedRangeCommand.java
deleted file mode 100644
index 40ef88e..0000000
--- a/src/java/org/apache/cassandra/db/PagedRangeCommand.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.net.MessagingService;
-
-public class PagedRangeCommand extends AbstractRangeCommand
-{
-    public static final IVersionedSerializer<PagedRangeCommand> serializer = new Serializer();
-
-    public final Composite start;
-    public final Composite stop;
-    public final int limit;
-    private final boolean countCQL3Rows;
-
-    public PagedRangeCommand(String keyspace,
-                             String columnFamily,
-                             long timestamp,
-                             AbstractBounds<RowPosition> keyRange,
-                             SliceQueryFilter predicate,
-                             Composite start,
-                             Composite stop,
-                             List<IndexExpression> rowFilter,
-                             int limit,
-                             boolean countCQL3Rows)
-    {
-        super(keyspace, columnFamily, timestamp, keyRange, predicate, rowFilter);
-        this.start = start;
-        this.stop = stop;
-        this.limit = limit;
-        this.countCQL3Rows = countCQL3Rows;
-    }
-
-    public MessageOut<PagedRangeCommand> createMessage()
-    {
-        return new MessageOut<>(MessagingService.Verb.PAGED_RANGE, this, serializer);
-    }
-
-    public AbstractRangeCommand forSubRange(AbstractBounds<RowPosition> subRange)
-    {
-        Composite newStart = subRange.left.equals(keyRange.left) ? start : ((SliceQueryFilter)predicate).start();
-        Composite newStop = subRange.right.equals(keyRange.right) ? stop : ((SliceQueryFilter)predicate).finish();
-        return new PagedRangeCommand(keyspace,
-                                     columnFamily,
-                                     timestamp,
-                                     subRange,
-                                     ((SliceQueryFilter) predicate).cloneShallow(),
-                                     newStart,
-                                     newStop,
-                                     rowFilter,
-                                     limit,
-                                     countCQL3Rows);
-    }
-
-    public AbstractRangeCommand withUpdatedLimit(int newLimit)
-    {
-        return new PagedRangeCommand(keyspace,
-                                     columnFamily,
-                                     timestamp,
-                                     keyRange,
-                                     ((SliceQueryFilter) predicate).cloneShallow(),
-                                     start,
-                                     stop,
-                                     rowFilter,
-                                     newLimit,
-                                     countCQL3Rows);
-    }
-
-    public int limit()
-    {
-        return limit;
-    }
-
-    public boolean countCQL3Rows()
-    {
-        return countCQL3Rows;
-    }
-
-    public List<Row> executeLocally()
-    {
-        ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily);
-
-        ExtendedFilter exFilter = cfs.makeExtendedFilter(keyRange, (SliceQueryFilter)predicate, start, stop, rowFilter, limit, countCQL3Rows(), timestamp);
-        if (cfs.indexManager.hasIndexFor(rowFilter))
-            return cfs.search(exFilter);
-        else
-            return cfs.getRangeSlice(exFilter);
-    }
-
-    @Override
-    public String toString()
-    {
-        return String.format("PagedRange(%s, %s, %d, %s, %s, %s, %s, %s, %d)", keyspace, columnFamily, timestamp, keyRange, predicate, start, stop, rowFilter, limit);
-    }
-
-    private static class Serializer implements IVersionedSerializer<PagedRangeCommand>
-    {
-        public void serialize(PagedRangeCommand cmd, DataOutputPlus out, int version) throws IOException
-        {
-            out.writeUTF(cmd.keyspace);
-            out.writeUTF(cmd.columnFamily);
-            out.writeLong(cmd.timestamp);
-
-            MessagingService.validatePartitioner(cmd.keyRange);
-            AbstractBounds.rowPositionSerializer.serialize(cmd.keyRange, out, version);
-
-            CFMetaData metadata = Schema.instance.getCFMetaData(cmd.keyspace, cmd.columnFamily);
-
-            // SliceQueryFilter (the count is not used)
-            SliceQueryFilter filter = (SliceQueryFilter)cmd.predicate;
-            metadata.comparator.sliceQueryFilterSerializer().serialize(filter, out, version);
-
-            // The start and stop of the page
-            metadata.comparator.serializer().serialize(cmd.start, out);
-            metadata.comparator.serializer().serialize(cmd.stop, out);
-
-            out.writeInt(cmd.rowFilter.size());
-            for (IndexExpression expr : cmd.rowFilter)
-            {
-                expr.writeTo(out);;
-            }
-
-            out.writeInt(cmd.limit);
-            if (version >= MessagingService.VERSION_21)
-                out.writeBoolean(cmd.countCQL3Rows);
-        }
-
-        public PagedRangeCommand deserialize(DataInput in, int version) throws IOException
-        {
-            String keyspace = in.readUTF();
-            String columnFamily = in.readUTF();
-            long timestamp = in.readLong();
-
-            AbstractBounds<RowPosition> keyRange =
-                    AbstractBounds.rowPositionSerializer.deserialize(in, MessagingService.globalPartitioner(), version);
-
-            CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
-            if (metadata == null)
-            {
-                String message = String.format("Got paged range command for nonexistent table %s.%s.  If the table was just " +
-                        "created, this is likely due to the schema not being fully propagated.  Please wait for schema " +
-                        "agreement on table creation." , keyspace, columnFamily);
-                throw new UnknownColumnFamilyException(message, null);
-            }
-
-            SliceQueryFilter predicate = metadata.comparator.sliceQueryFilterSerializer().deserialize(in, version);
-
-            Composite start = metadata.comparator.serializer().deserialize(in);
-            Composite stop =  metadata.comparator.serializer().deserialize(in);
-
-            int filterCount = in.readInt();
-            List<IndexExpression> rowFilter = new ArrayList<IndexExpression>(filterCount);
-            for (int i = 0; i < filterCount; i++)
-            {
-                rowFilter.add(IndexExpression.readFrom(in));
-            }
-
-            int limit = in.readInt();
-            boolean countCQL3Rows = version >= MessagingService.VERSION_21
-                                  ? in.readBoolean()
-                                  : predicate.compositesToGroup >= 0 || predicate.count != 1; // See #6857
-            return new PagedRangeCommand(keyspace, columnFamily, timestamp, keyRange, predicate, start, stop, rowFilter, limit, countCQL3Rows);
-        }
-
-        public long serializedSize(PagedRangeCommand cmd, int version)
-        {
-            long size = 0;
-
-            size += TypeSizes.NATIVE.sizeof(cmd.keyspace);
-            size += TypeSizes.NATIVE.sizeof(cmd.columnFamily);
-            size += TypeSizes.NATIVE.sizeof(cmd.timestamp);
-
-            size += AbstractBounds.rowPositionSerializer.serializedSize(cmd.keyRange, version);
-
-            CFMetaData metadata = Schema.instance.getCFMetaData(cmd.keyspace, cmd.columnFamily);
-
-            size += metadata.comparator.sliceQueryFilterSerializer().serializedSize((SliceQueryFilter)cmd.predicate, version);
-
-            size += metadata.comparator.serializer().serializedSize(cmd.start, TypeSizes.NATIVE);
-            size += metadata.comparator.serializer().serializedSize(cmd.stop, TypeSizes.NATIVE);
-
-            size += TypeSizes.NATIVE.sizeof(cmd.rowFilter.size());
-            for (IndexExpression expr : cmd.rowFilter)
-            {
-                size += TypeSizes.NATIVE.sizeofWithShortLength(expr.column);
-                size += TypeSizes.NATIVE.sizeof(expr.operator.ordinal());
-                size += TypeSizes.NATIVE.sizeofWithShortLength(expr.value);
-            }
-
-            size += TypeSizes.NATIVE.sizeof(cmd.limit);
-            if (version >= MessagingService.VERSION_21)
-                size += TypeSizes.NATIVE.sizeof(cmd.countCQL3Rows);
-            return size;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/PartitionColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionColumns.java b/src/java/org/apache/cassandra/db/PartitionColumns.java
new file mode 100644
index 0000000..a1b1d00
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/PartitionColumns.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.util.*;
+import java.security.MessageDigest;
+
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.config.ColumnDefinition;
+
+/**
+ * Columns (or a subset of the columns) that a partition contains.
+ * This mainly groups both static and regular columns for convenience.
+ */
+public class PartitionColumns implements Iterable<ColumnDefinition>
+{
+    public static PartitionColumns NONE = new PartitionColumns(Columns.NONE, Columns.NONE);
+
+    public final Columns statics;
+    public final Columns regulars;
+
+    public PartitionColumns(Columns statics, Columns regulars)
+    {
+        this.statics = statics;
+        this.regulars = regulars;
+    }
+
+    public static PartitionColumns of(ColumnDefinition column)
+    {
+        return new PartitionColumns(column.isStatic() ? Columns.of(column) : Columns.NONE,
+                                    column.isStatic() ? Columns.NONE : Columns.of(column));
+    }
+
+    public PartitionColumns without(ColumnDefinition column)
+    {
+        return new PartitionColumns(column.isStatic() ? statics.without(column) : statics,
+                                    column.isStatic() ? regulars : regulars.without(column));
+    }
+
+    public PartitionColumns withoutStatics()
+    {
+        return statics.isEmpty() ? this : new PartitionColumns(Columns.NONE, regulars);
+    }
+
+    public boolean isEmpty()
+    {
+        return statics.isEmpty() && regulars.isEmpty();
+    }
+
+    public boolean contains(ColumnDefinition column)
+    {
+        return column.isStatic() ? statics.contains(column) : regulars.contains(column);
+    }
+
+    public boolean includes(PartitionColumns columns)
+    {
+        return statics.contains(columns.statics) && regulars.contains(columns.regulars);
+    }
+
+    public Iterator<ColumnDefinition> iterator()
+    {
+        return Iterators.concat(statics.iterator(), regulars.iterator());
+    }
+
+    public Iterator<ColumnDefinition> selectOrderIterator()
+    {
+        return Iterators.concat(statics.selectOrderIterator(), regulars.selectOrderIterator());
+    }
+
+    @Override
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder();
+        sb.append("[").append(statics).append(" | ").append(regulars).append("]");
+        return sb.toString();
+    }
+
+    @Override
+    public boolean equals(Object other)
+    {
+        if (!(other instanceof PartitionColumns))
+            return false;
+
+        PartitionColumns that = (PartitionColumns)other;
+        return this.statics.equals(that.statics)
+            && this.regulars.equals(that.regulars);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(statics, regulars);
+    }
+
+    public void digest(MessageDigest digest)
+    {
+        regulars.digest(digest);
+        statics.digest(digest);
+    }
+
+    public static Builder builder()
+    {
+        return new Builder();
+    }
+
+    public static class Builder
+    {
+        // Note that we do want to use sorted sets because we want the column definitions to be compared
+        // through compareTo, not equals. The former basically check it's the same column name, while the latter
+        // check it's the same object, including the same type.
+        private SortedSet<ColumnDefinition> regularColumns;
+        private SortedSet<ColumnDefinition> staticColumns;
+
+        public Builder add(ColumnDefinition c)
+        {
+            if (c.isStatic())
+            {
+                if (staticColumns == null)
+                    staticColumns = new TreeSet<>();
+                staticColumns.add(c);
+            }
+            else
+            {
+                assert c.isRegular();
+                if (regularColumns == null)
+                    regularColumns = new TreeSet<>();
+                regularColumns.add(c);
+            }
+            return this;
+        }
+
+        public int added()
+        {
+            return (regularColumns == null ? 0 : regularColumns.size())
+                 + (staticColumns == null ? 0 : staticColumns.size());
+        }
+
+        public Builder addAll(Iterable<ColumnDefinition> columns)
+        {
+            for (ColumnDefinition c : columns)
+                add(c);
+            return this;
+        }
+
+        public Builder addAll(PartitionColumns columns)
+        {
+            if (regularColumns == null && !columns.regulars.isEmpty())
+                regularColumns = new TreeSet<>();
+
+            for (ColumnDefinition c : columns.regulars)
+                regularColumns.add(c);
+
+            if (staticColumns == null && !columns.statics.isEmpty())
+                staticColumns = new TreeSet<>();
+
+            for (ColumnDefinition c : columns.statics)
+                staticColumns.add(c);
+
+            return this;
+        }
+
+        public PartitionColumns build()
+        {
+            return new PartitionColumns(staticColumns == null ? Columns.NONE : Columns.from(staticColumns),
+                                        regularColumns == null ? Columns.NONE : Columns.from(regularColumns));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/PartitionPosition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionPosition.java b/src/java/org/apache/cassandra/db/PartitionPosition.java
new file mode 100644
index 0000000..1dc940e
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/PartitionPosition.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public interface PartitionPosition extends RingPosition<PartitionPosition>
+{
+    public static enum Kind
+    {
+        // Only add new values to the end of the enum, the ordinal is used
+        // during serialization
+        ROW_KEY, MIN_BOUND, MAX_BOUND;
+
+        private static final Kind[] allKinds = Kind.values();
+
+        static Kind fromOrdinal(int ordinal)
+        {
+            return allKinds[ordinal];
+        }
+    }
+
+    public static final class ForKey
+    {
+        public static PartitionPosition get(ByteBuffer key, IPartitioner p)
+        {
+            return key == null || key.remaining() == 0 ? p.getMinimumToken().minKeyBound() : p.decorateKey(key);
+        }
+    }
+
+    public static final RowPositionSerializer serializer = new RowPositionSerializer();
+
+    public Kind kind();
+    public boolean isMinimum();
+
+    public static class RowPositionSerializer implements IPartitionerDependentSerializer<PartitionPosition>
+    {
+        /*
+         * We need to be able to serialize both Token.KeyBound and
+         * DecoratedKey. To make this compact, we first write a byte whose
+         * meaning is:
+         *   - 0: DecoratedKey
+         *   - 1: a 'minimum' Token.KeyBound
+         *   - 2: a 'maximum' Token.KeyBound
+         * In the case of the DecoratedKey, we then serialize the key (the
+         * token is recreated on the other side). In the other cases, we then
+         * serialize the token.
+         */
+        public void serialize(PartitionPosition pos, DataOutputPlus out, int version) throws IOException
+        {
+            Kind kind = pos.kind();
+            out.writeByte(kind.ordinal());
+            if (kind == Kind.ROW_KEY)
+                ByteBufferUtil.writeWithShortLength(((DecoratedKey)pos).getKey(), out);
+            else
+                Token.serializer.serialize(pos.getToken(), out, version);
+        }
+
+        public PartitionPosition deserialize(DataInput in, IPartitioner p, int version) throws IOException
+        {
+            Kind kind = Kind.fromOrdinal(in.readByte());
+            if (kind == Kind.ROW_KEY)
+            {
+                ByteBuffer k = ByteBufferUtil.readWithShortLength(in);
+                return StorageService.getPartitioner().decorateKey(k);
+            }
+            else
+            {
+                Token t = Token.serializer.deserialize(in, p, version);
+                return kind == Kind.MIN_BOUND ? t.minKeyBound() : t.maxKeyBound();
+            }
+        }
+
+        public long serializedSize(PartitionPosition pos, int version)
+        {
+            Kind kind = pos.kind();
+            int size = 1; // 1 byte for enum
+            if (kind == Kind.ROW_KEY)
+            {
+                int keySize = ((DecoratedKey)pos).getKey().remaining();
+                size += TypeSizes.NATIVE.sizeof((short) keySize) + keySize;
+            }
+            else
+            {
+                size += Token.serializer.serializedSize(pos.getToken(), version);
+            }
+            return size;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
new file mode 100644
index 0000000..c11a9be
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.index.SecondaryIndexSearcher;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.metrics.ColumnFamilyMetrics;
+import org.apache.cassandra.service.*;
+import org.apache.cassandra.service.pager.*;
+import org.apache.cassandra.thrift.ThriftResultsMerger;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * A read command that selects a (part of a) range of partitions.
+ */
+public class PartitionRangeReadCommand extends ReadCommand
+{
+    protected static final SelectionDeserializer selectionDeserializer = new Deserializer();
+
+    private final DataRange dataRange;
+
+    public PartitionRangeReadCommand(boolean isDigest,
+                                     boolean isForThrift,
+                                     CFMetaData metadata,
+                                     int nowInSec,
+                                     ColumnFilter columnFilter,
+                                     RowFilter rowFilter,
+                                     DataLimits limits,
+                                     DataRange dataRange)
+    {
+        super(Kind.PARTITION_RANGE, isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
+        this.dataRange = dataRange;
+    }
+
+    public PartitionRangeReadCommand(CFMetaData metadata,
+                                     int nowInSec,
+                                     ColumnFilter columnFilter,
+                                     RowFilter rowFilter,
+                                     DataLimits limits,
+                                     DataRange dataRange)
+    {
+        this(false, false, metadata, nowInSec, columnFilter, rowFilter, limits, dataRange);
+    }
+
+    /**
+     * Creates a new read command that query all the data in the table.
+     *
+     * @param metadata the table to query.
+     * @param nowInSec the time in seconds to use are "now" for this query.
+     *
+     * @return a newly created read command that queries everything in the table.
+     */
+    public static PartitionRangeReadCommand allDataRead(CFMetaData metadata, int nowInSec)
+    {
+        return new PartitionRangeReadCommand(metadata,
+                                             nowInSec,
+                                             ColumnFilter.all(metadata),
+                                             RowFilter.NONE,
+                                             DataLimits.NONE,
+                                             DataRange.allData(StorageService.getPartitioner()));
+    }
+
+    public DataRange dataRange()
+    {
+        return dataRange;
+    }
+
+    public ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key)
+    {
+        return dataRange.clusteringIndexFilter(key);
+    }
+
+    public boolean isNamesQuery()
+    {
+        return dataRange.isNamesQuery();
+    }
+
+    public PartitionRangeReadCommand forSubRange(AbstractBounds<PartitionPosition> range)
+    {
+        return new PartitionRangeReadCommand(isDigestQuery(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange().forSubRange(range));
+    }
+
+    public PartitionRangeReadCommand copy()
+    {
+        return new PartitionRangeReadCommand(isDigestQuery(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange());
+    }
+
+    public PartitionRangeReadCommand withUpdatedLimit(DataLimits newLimits)
+    {
+        return new PartitionRangeReadCommand(metadata(), nowInSec(), columnFilter(), rowFilter(), newLimits, dataRange());
+    }
+
+    public long getTimeout()
+    {
+        return DatabaseDescriptor.getRangeRpcTimeout();
+    }
+
+    public boolean selects(DecoratedKey partitionKey, Clustering clustering)
+    {
+        if (!dataRange().contains(partitionKey))
+            return false;
+
+        if (clustering == Clustering.STATIC_CLUSTERING)
+            return !columnFilter().fetchedColumns().statics.isEmpty();
+
+        return dataRange().clusteringIndexFilter(partitionKey).selects(clustering);
+    }
+
+    public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException
+    {
+        return StorageProxy.getRangeSlice(this, consistency);
+    }
+
+    public QueryPager getPager(PagingState pagingState)
+    {
+        if (isNamesQuery())
+            return new RangeNamesQueryPager(this, pagingState);
+        else
+            return new RangeSliceQueryPager(this, pagingState);
+    }
+
+    protected void recordLatency(ColumnFamilyMetrics metric, long latencyNanos)
+    {
+        metric.rangeLatency.addNano(latencyNanos);
+    }
+
+    protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadOrderGroup orderGroup)
+    {
+        ColumnFamilyStore.ViewFragment view = cfs.select(cfs.viewFilter(dataRange().keyRange()));
+        Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), dataRange().keyRange().getString(metadata().getKeyValidator()));
+
+        // fetch data from current memtable, historical memtables, and SSTables in the correct order.
+        final List<UnfilteredPartitionIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
+
+        try
+        {
+            for (Memtable memtable : view.memtables)
+            {
+                @SuppressWarnings("resource") // We close on exception and on closing the result returned by this method
+                UnfilteredPartitionIterator iter = memtable.makePartitionIterator(columnFilter(), dataRange(), isForThrift());
+                iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, metadata(), nowInSec()) : iter);
+            }
+
+            for (SSTableReader sstable : view.sstables)
+            {
+                @SuppressWarnings("resource") // We close on exception and on closing the result returned by this method
+                UnfilteredPartitionIterator iter = sstable.getScanner(columnFilter(), dataRange(), isForThrift());
+                iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, metadata(), nowInSec()) : iter);
+            }
+
+            return checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(iterators, nowInSec()), cfs);
+        }
+        catch (RuntimeException | Error e)
+        {
+            try
+            {
+                FBUtilities.closeAll(iterators);
+            }
+            catch (Exception suppressed)
+            {
+                e.addSuppressed(suppressed);
+            }
+
+            throw e;
+        }
+    }
+
+    private UnfilteredPartitionIterator checkCacheFilter(UnfilteredPartitionIterator iter, final ColumnFamilyStore cfs)
+    {
+        return new WrappingUnfilteredPartitionIterator(iter)
+        {
+            @Override
+            public UnfilteredRowIterator computeNext(UnfilteredRowIterator iter)
+            {
+                // Note that we rely on the fact that until we actually advance 'iter', no really costly operation is actually done
+                // (except for reading the partition key from the index file) due to the call to mergeLazily in queryStorage.
+                DecoratedKey dk = iter.partitionKey();
+
+                // Check if this partition is in the rowCache and if it is, if  it covers our filter
+                CachedPartition cached = cfs.getRawCachedPartition(dk);
+                ClusteringIndexFilter filter = dataRange().clusteringIndexFilter(dk);
+
+                if (cached != null && cfs.isFilterFullyCoveredBy(filter, limits(), cached, nowInSec()))
+                {
+                    // We won't use 'iter' so close it now.
+                    iter.close();
+
+                    return filter.getUnfilteredRowIterator(columnFilter(), cached);
+                }
+
+                return iter;
+            }
+        };
+    }
+
+    protected void appendCQLWhereClause(StringBuilder sb)
+    {
+        if (dataRange.isUnrestricted() && rowFilter().isEmpty())
+            return;
+
+        sb.append(" WHERE ");
+        // We put the row filter first because the data range can end by "ORDER BY"
+        if (!rowFilter().isEmpty())
+        {
+            sb.append(rowFilter());
+            if (!dataRange.isUnrestricted())
+                sb.append(" AND ");
+        }
+        if (!dataRange.isUnrestricted())
+            sb.append(dataRange.toCQLString(metadata()));
+    }
+
+    /**
+     * Allow to post-process the result of the query after it has been reconciled on the coordinator
+     * but before it is passed to the CQL layer to return the ResultSet.
+     *
+     * See CASSANDRA-8717 for why this exists.
+     */
+    public PartitionIterator postReconciliationProcessing(PartitionIterator result)
+    {
+        ColumnFamilyStore cfs = Keyspace.open(metadata().ksName).getColumnFamilyStore(metadata().cfName);
+        SecondaryIndexSearcher searcher = getIndexSearcher(cfs);
+        return searcher == null ? result : searcher.postReconciliationProcessing(rowFilter(), result);
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("Read(%s.%s columns=%s rowfilter=%s limits=%s %s)",
+                             metadata().ksName,
+                             metadata().cfName,
+                             columnFilter(),
+                             rowFilter(),
+                             limits(),
+                             dataRange().toString(metadata()));
+    }
+
+    protected void serializeSelection(DataOutputPlus out, int version) throws IOException
+    {
+        DataRange.serializer.serialize(dataRange(), out, version, metadata());
+    }
+
+    protected long selectionSerializedSize(int version)
+    {
+        return DataRange.serializer.serializedSize(dataRange(), version, metadata());
+    }
+
+    private static class Deserializer extends SelectionDeserializer
+    {
+        public ReadCommand deserialize(DataInput in, int version, boolean isDigest, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits)
+        throws IOException
+        {
+            DataRange range = DataRange.serializer.deserialize(in, version, metadata);
+            return new PartitionRangeReadCommand(isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, range);
+        }
+    };
+}