You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2015/06/30 12:47:48 UTC

[24/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/CollectionType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CollectionType.java b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
index 1660b2e..0b00b47 100644
--- a/src/java/org/apache/cassandra/db/marshal/CollectionType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CollectionType.java
@@ -18,22 +18,28 @@
 package org.apache.cassandra.db.marshal;
 
 import java.nio.ByteBuffer;
+import java.io.DataInput;
+import java.io.IOException;
 import java.util.List;
+import java.util.Iterator;
 
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.transport.Server;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.cassandra.cql3.ColumnSpecification;
 import org.apache.cassandra.cql3.Lists;
 import org.apache.cassandra.cql3.Maps;
 import org.apache.cassandra.cql3.Sets;
-
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.CellPath;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.serializers.CollectionSerializer;
 import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.transport.Server;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
@@ -47,6 +53,8 @@ public abstract class CollectionType<T> extends AbstractType<T>
 
     public static final int MAX_ELEMENTS = 65535;
 
+    public static CellPath.Serializer cellPathSerializer = new CollectionPathSerializer();
+
     public enum Kind
     {
         MAP
@@ -84,6 +92,8 @@ public abstract class CollectionType<T> extends AbstractType<T>
     public abstract AbstractType<?> nameComparator();
     public abstract AbstractType<?> valueComparator();
 
+    protected abstract List<ByteBuffer> serializedValues(Iterator<Cell> cells);
+
     @Override
     public abstract CollectionSerializer<T> getSerializer();
 
@@ -132,28 +142,33 @@ public abstract class CollectionType<T> extends AbstractType<T>
         return kind == Kind.MAP;
     }
 
-    public List<Cell> enforceLimit(ColumnDefinition def, List<Cell> cells, int version)
+    // Overrided by maps
+    protected int collectionSize(List<ByteBuffer> values)
+    {
+        return values.size();
+    }
+
+    protected int enforceLimit(ColumnDefinition def, List<ByteBuffer> values, int version)
     {
         assert isMultiCell();
 
-        if (version >= Server.VERSION_3 || cells.size() <= MAX_ELEMENTS)
-            return cells;
+        int size = collectionSize(values);
+        if (version >= Server.VERSION_3 || size <= MAX_ELEMENTS)
+            return size;
 
         logger.error("Detected collection for table {}.{} with {} elements, more than the {} limit. Only the first {}" +
                      " elements will be returned to the client. Please see " +
                      "http://cassandra.apache.org/doc/cql3/CQL.html#collections for more details.",
-                     def.ksName, def.cfName, cells.size(), MAX_ELEMENTS, MAX_ELEMENTS);
-        return cells.subList(0, MAX_ELEMENTS);
+                     def.ksName, def.cfName, values.size(), MAX_ELEMENTS, MAX_ELEMENTS);
+        return MAX_ELEMENTS;
     }
 
-    public abstract List<ByteBuffer> serializedValues(List<Cell> cells);
-
-    public ByteBuffer serializeForNativeProtocol(ColumnDefinition def, List<Cell> cells, int version)
+    public ByteBuffer serializeForNativeProtocol(ColumnDefinition def, Iterator<Cell> cells, int version)
     {
         assert isMultiCell();
-        cells = enforceLimit(def, cells, version);
         List<ByteBuffer> values = serializedValues(cells);
-        return CollectionSerializer.pack(values, cells.size(), version);
+        int size = enforceLimit(def, values, version);
+        return CollectionSerializer.pack(values, size, version);
     }
 
     @Override
@@ -217,4 +232,28 @@ public abstract class CollectionType<T> extends AbstractType<T>
     {
         return this.toString(false);
     }
+
+    private static class CollectionPathSerializer implements CellPath.Serializer
+    {
+        public void serialize(CellPath path, DataOutputPlus out) throws IOException
+        {
+            ByteBufferUtil.writeWithLength(path.get(0), out);
+        }
+
+        public CellPath deserialize(DataInput in) throws IOException
+        {
+            return CellPath.create(ByteBufferUtil.readWithLength(in));
+        }
+
+        public long serializedSize(CellPath path, TypeSizes sizes)
+        {
+            return sizes.sizeofWithLength(path.get(0));
+        }
+
+        public void skip(DataInput in) throws IOException
+        {
+            int length = in.readInt();
+            FileUtils.skipBytesFully(in, length);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/ColumnToCollectionType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/ColumnToCollectionType.java b/src/java/org/apache/cassandra/db/marshal/ColumnToCollectionType.java
index 1d2c88c..a81d3f8 100644
--- a/src/java/org/apache/cassandra/db/marshal/ColumnToCollectionType.java
+++ b/src/java/org/apache/cassandra/db/marshal/ColumnToCollectionType.java
@@ -31,6 +31,9 @@ import org.apache.cassandra.serializers.BytesSerializer;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+/*
+ * This class is deprecated and only kept for backward compatibility.
+ */
 public class ColumnToCollectionType extends AbstractType<ByteBuffer>
 {
     // interning instances

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/CompositeType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
index f3c041e..01eb58f 100644
--- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java
@@ -177,6 +177,7 @@ public class CompositeType extends AbstractCompositeType
         // most names will be complete.
         ByteBuffer[] l = new ByteBuffer[types.size()];
         ByteBuffer bb = name.duplicate();
+        readStatic(bb);
         int i = 0;
         while (bb.remaining() > 0)
         {
@@ -186,6 +187,19 @@ public class CompositeType extends AbstractCompositeType
         return i == l.length ? l : Arrays.copyOfRange(l, 0, i);
     }
 
+    public static List<ByteBuffer> splitName(ByteBuffer name)
+    {
+        List<ByteBuffer> l = new ArrayList<>();
+        ByteBuffer bb = name.duplicate();
+        readStatic(bb);
+        while (bb.remaining() > 0)
+        {
+            l.add(ByteBufferUtil.readBytesWithShortLength(bb));
+            bb.get(); // skip end-of-component
+        }
+        return l;
+    }
+
     // Extract component idx from bb. Return null if there is not enough component.
     public static ByteBuffer extractComponent(ByteBuffer bb, int idx)
     {
@@ -318,11 +332,20 @@ public class CompositeType extends AbstractCompositeType
 
     public static ByteBuffer build(ByteBuffer... buffers)
     {
-        int totalLength = 0;
+        return build(false, buffers);
+    }
+
+    public static ByteBuffer build(boolean isStatic, ByteBuffer... buffers)
+    {
+        int totalLength = isStatic ? 2 : 0;
         for (ByteBuffer bb : buffers)
             totalLength += 2 + bb.remaining() + 1;
 
         ByteBuffer out = ByteBuffer.allocate(totalLength);
+
+        if (isStatic)
+            out.putShort((short)STATIC_MARKER);
+
         for (ByteBuffer bb : buffers)
         {
             ByteBufferUtil.writeShortLength(out, bb.remaining());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java b/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java
index 4b3ce82..687e525 100644
--- a/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java
+++ b/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java
@@ -24,6 +24,7 @@ import org.apache.cassandra.cql3.Term;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.serializers.TypeSerializer;
 import org.apache.cassandra.serializers.CounterSerializer;
+import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class CounterColumnType extends AbstractType<Long>
@@ -59,6 +60,12 @@ public class CounterColumnType extends AbstractType<Long>
         return ByteBufferUtil.bytes(value);
     }
 
+    @Override
+    public void validateCellValue(ByteBuffer cellValue) throws MarshalException
+    {
+        CounterContext.instance().validateContext(cellValue);
+    }
+
     public int compare(ByteBuffer o1, ByteBuffer o2)
     {
         return ByteBufferUtil.compareUnsigned(o1, o2);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/DateType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/DateType.java b/src/java/org/apache/cassandra/db/marshal/DateType.java
index 359ce52..66da443 100644
--- a/src/java/org/apache/cassandra/db/marshal/DateType.java
+++ b/src/java/org/apache/cassandra/db/marshal/DateType.java
@@ -124,4 +124,10 @@ public class DateType extends AbstractType<Date>
     {
         return TimestampSerializer.instance;
     }
+
+    @Override
+    protected int valueLengthIfFixed()
+    {
+        return 8;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/DoubleType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/DoubleType.java b/src/java/org/apache/cassandra/db/marshal/DoubleType.java
index 661b3c9..bc160d5 100644
--- a/src/java/org/apache/cassandra/db/marshal/DoubleType.java
+++ b/src/java/org/apache/cassandra/db/marshal/DoubleType.java
@@ -97,4 +97,10 @@ public class DoubleType extends AbstractType<Double>
     {
         return DoubleSerializer.instance;
     }
+
+    @Override
+    protected int valueLengthIfFixed()
+    {
+        return 8;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/EmptyType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/EmptyType.java b/src/java/org/apache/cassandra/db/marshal/EmptyType.java
index f82d767..448376f 100644
--- a/src/java/org/apache/cassandra/db/marshal/EmptyType.java
+++ b/src/java/org/apache/cassandra/db/marshal/EmptyType.java
@@ -69,4 +69,10 @@ public class EmptyType extends AbstractType<Void>
     {
         return EmptySerializer.instance;
     }
+
+    @Override
+    protected int valueLengthIfFixed()
+    {
+        return 0;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/FloatType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/FloatType.java b/src/java/org/apache/cassandra/db/marshal/FloatType.java
index af02cad..ceedce4 100644
--- a/src/java/org/apache/cassandra/db/marshal/FloatType.java
+++ b/src/java/org/apache/cassandra/db/marshal/FloatType.java
@@ -96,4 +96,10 @@ public class FloatType extends AbstractType<Float>
     {
         return FloatSerializer.instance;
     }
+
+    @Override
+    protected int valueLengthIfFixed()
+    {
+        return 4;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/Int32Type.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/Int32Type.java b/src/java/org/apache/cassandra/db/marshal/Int32Type.java
index 67d8142..cb0c584 100644
--- a/src/java/org/apache/cassandra/db/marshal/Int32Type.java
+++ b/src/java/org/apache/cassandra/db/marshal/Int32Type.java
@@ -109,4 +109,9 @@ public class Int32Type extends AbstractType<Integer>
         return Int32Serializer.instance;
     }
 
+    @Override
+    protected int valueLengthIfFixed()
+    {
+        return 4;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java b/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java
index 3e00d71..174ce3a 100644
--- a/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java
+++ b/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java
@@ -83,4 +83,10 @@ public class LexicalUUIDType extends AbstractType<UUID>
     {
         return UUIDSerializer.instance;
     }
+
+    @Override
+    protected int valueLengthIfFixed()
+    {
+        return 16;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/ListType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/ListType.java b/src/java/org/apache/cassandra/db/marshal/ListType.java
index 03f39d7..73af808 100644
--- a/src/java/org/apache/cassandra/db/marshal/ListType.java
+++ b/src/java/org/apache/cassandra/db/marshal/ListType.java
@@ -23,14 +23,13 @@ import java.util.*;
 import org.apache.cassandra.cql3.Json;
 import org.apache.cassandra.cql3.Lists;
 import org.apache.cassandra.cql3.Term;
-import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.serializers.CollectionSerializer;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.serializers.ListSerializer;
 
-import org.apache.cassandra.transport.Server;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -169,12 +168,12 @@ public class ListType<T> extends CollectionType<List<T>>
         return sb.toString();
     }
 
-    public List<ByteBuffer> serializedValues(List<Cell> cells)
+    public List<ByteBuffer> serializedValues(Iterator<Cell> cells)
     {
         assert isMultiCell;
-        List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(cells.size());
-        for (Cell c : cells)
-            bbs.add(c.value());
+        List<ByteBuffer> bbs = new ArrayList<ByteBuffer>();
+        while (cells.hasNext())
+            bbs.add(cells.next().value());
         return bbs;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java b/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java
index 427598d..e02ba3c 100644
--- a/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java
+++ b/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java
@@ -20,11 +20,12 @@ package org.apache.cassandra.db.marshal;
 import java.nio.ByteBuffer;
 
 import org.apache.cassandra.cql3.Term;
+import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.serializers.TypeSerializer;
 import org.apache.cassandra.serializers.MarshalException;
 
-import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 /** for sorting columns representing row keys in the row ordering as determined by a partitioner.
@@ -38,6 +39,11 @@ public class LocalByPartionerType extends AbstractType<ByteBuffer>
         this.partitioner = partitioner;
     }
 
+    public static LocalByPartionerType getInstance(TypeParser parser)
+    {
+        return new LocalByPartionerType(StorageService.getPartitioner());
+    }
+
     @Override
     public ByteBuffer compose(ByteBuffer bytes)
     {
@@ -74,8 +80,8 @@ public class LocalByPartionerType extends AbstractType<ByteBuffer>
 
     public int compare(ByteBuffer o1, ByteBuffer o2)
     {
-        // o1 and o2 can be empty so we need to use RowPosition, not DecoratedKey
-        return RowPosition.ForKey.get(o1, partitioner).compareTo(RowPosition.ForKey.get(o2, partitioner));
+        // o1 and o2 can be empty so we need to use PartitionPosition, not DecoratedKey
+        return PartitionPosition.ForKey.get(o1, partitioner).compareTo(PartitionPosition.ForKey.get(o2, partitioner));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/LongType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/LongType.java b/src/java/org/apache/cassandra/db/marshal/LongType.java
index d77d7d0..9d41f4f 100644
--- a/src/java/org/apache/cassandra/db/marshal/LongType.java
+++ b/src/java/org/apache/cassandra/db/marshal/LongType.java
@@ -118,4 +118,9 @@ public class LongType extends AbstractType<Long>
         return LongSerializer.instance;
     }
 
+    @Override
+    protected int valueLengthIfFixed()
+    {
+        return 8;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/MapType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/MapType.java b/src/java/org/apache/cassandra/db/marshal/MapType.java
index 983710b..28a4fd5 100644
--- a/src/java/org/apache/cassandra/db/marshal/MapType.java
+++ b/src/java/org/apache/cassandra/db/marshal/MapType.java
@@ -23,7 +23,7 @@ import java.util.*;
 import org.apache.cassandra.cql3.Json;
 import org.apache.cassandra.cql3.Maps;
 import org.apache.cassandra.cql3.Term;
-import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.serializers.CollectionSerializer;
@@ -173,6 +173,11 @@ public class MapType<K, V> extends CollectionType<Map<K, V>>
     }
 
     @Override
+    protected int collectionSize(List<ByteBuffer> values)
+    {
+        return values.size() / 2;
+    }
+
     public String toString(boolean ignoreFreezing)
     {
         boolean includeFrozenType = !ignoreFreezing && !isMultiCell();
@@ -186,13 +191,14 @@ public class MapType<K, V> extends CollectionType<Map<K, V>>
         return sb.toString();
     }
 
-    public List<ByteBuffer> serializedValues(List<Cell> cells)
+    public List<ByteBuffer> serializedValues(Iterator<Cell> cells)
     {
         assert isMultiCell;
-        List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(cells.size() * 2);
-        for (Cell c : cells)
+        List<ByteBuffer> bbs = new ArrayList<ByteBuffer>();
+        while (cells.hasNext())
         {
-            bbs.add(c.name().collectionElement());
+            Cell c = cells.next();
+            bbs.add(c.path().get(0));
             bbs.add(c.value());
         }
         return bbs;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/ReversedType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/ReversedType.java b/src/java/org/apache/cassandra/db/marshal/ReversedType.java
index 2181f74..cf357a8 100644
--- a/src/java/org/apache/cassandra/db/marshal/ReversedType.java
+++ b/src/java/org/apache/cassandra/db/marshal/ReversedType.java
@@ -80,6 +80,12 @@ public class ReversedType<T> extends AbstractType<T>
         return baseType.compare(o2, o1);
     }
 
+    @Override
+    public int compareForCQL(ByteBuffer v1, ByteBuffer v2)
+    {
+        return baseType.compare(v1, v2);
+    }
+
     public String getString(ByteBuffer bytes)
     {
         return baseType.getString(bytes);
@@ -129,6 +135,12 @@ public class ReversedType<T> extends AbstractType<T>
     }
 
     @Override
+    protected int valueLengthIfFixed()
+    {
+        return baseType.valueLengthIfFixed();
+    }
+
+    @Override
     public String toString()
     {
         return getClass().getName() + "(" + baseType + ")";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/SetType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/SetType.java b/src/java/org/apache/cassandra/db/marshal/SetType.java
index 78aac25..126c6aa 100644
--- a/src/java/org/apache/cassandra/db/marshal/SetType.java
+++ b/src/java/org/apache/cassandra/db/marshal/SetType.java
@@ -23,12 +23,11 @@ import java.util.*;
 import org.apache.cassandra.cql3.Json;
 import org.apache.cassandra.cql3.Sets;
 import org.apache.cassandra.cql3.Term;
-import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.serializers.SetSerializer;
-import org.apache.cassandra.transport.Server;
 
 public class SetType<T> extends CollectionType<Set<T>>
 {
@@ -144,11 +143,11 @@ public class SetType<T> extends CollectionType<Set<T>>
         return sb.toString();
     }
 
-    public List<ByteBuffer> serializedValues(List<Cell> cells)
+    public List<ByteBuffer> serializedValues(Iterator<Cell> cells)
     {
-        List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(cells.size());
-        for (Cell c : cells)
-            bbs.add(c.name().collectionElement());
+        List<ByteBuffer> bbs = new ArrayList<ByteBuffer>();
+        while (cells.hasNext())
+            bbs.add(cells.next().path().get(0));
         return bbs;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java b/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java
index a1d8d82..64fa750 100644
--- a/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java
+++ b/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java
@@ -127,4 +127,10 @@ public class TimeUUIDType extends AbstractType<UUID>
     {
         return TimeUUIDSerializer.instance;
     }
+
+    @Override
+    protected int valueLengthIfFixed()
+    {
+        return 16;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/TimestampType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/TimestampType.java b/src/java/org/apache/cassandra/db/marshal/TimestampType.java
index b01651d..288f8fd 100644
--- a/src/java/org/apache/cassandra/db/marshal/TimestampType.java
+++ b/src/java/org/apache/cassandra/db/marshal/TimestampType.java
@@ -125,4 +125,10 @@ public class TimestampType extends AbstractType<Date>
     {
         return TimestampSerializer.instance;
     }
+
+    @Override
+    protected int valueLengthIfFixed()
+    {
+        return 8;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/UUIDType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/UUIDType.java b/src/java/org/apache/cassandra/db/marshal/UUIDType.java
index 0250eb20..14c9f48 100644
--- a/src/java/org/apache/cassandra/db/marshal/UUIDType.java
+++ b/src/java/org/apache/cassandra/db/marshal/UUIDType.java
@@ -168,4 +168,10 @@ public class UUIDType extends AbstractType<UUID>
     {
         return (uuid.get(6) & 0xf0) >> 4;
     }
+
+    @Override
+    protected int valueLengthIfFixed()
+    {
+        return 16;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/AbstractPartitionData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractPartitionData.java b/src/java/org/apache/cassandra/db/partitions/AbstractPartitionData.java
new file mode 100644
index 0000000..2fcd7b3
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/AbstractPartitionData.java
@@ -0,0 +1,831 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.UnmodifiableIterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.utils.SearchIterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract common class for all non-thread safe Partition implementations.
+ */
+public abstract class AbstractPartitionData implements Partition, Iterable<Row>
+{
+    private static final Logger logger = LoggerFactory.getLogger(AbstractPartitionData.class);
+
+    protected final CFMetaData metadata;
+    protected final DecoratedKey key;
+
+    protected final DeletionInfo deletionInfo;
+    protected final PartitionColumns columns;
+
+    protected Row staticRow;
+
+    protected int rows;
+
+    // The values for the clustering columns of the rows contained in this partition object. If
+    // clusteringSize is the size of the clustering comparator for this table, clusterings has size
+    // clusteringSize * rows where rows is the number of rows stored, and row i has it's clustering
+    // column values at indexes [clusteringSize * i, clusteringSize * (i + 1)).
+    protected ByteBuffer[] clusterings;
+
+    // The partition key column liveness infos for the rows of this partition (row i has its liveness info at index i).
+    protected final LivenessInfoArray livenessInfos;
+    // The row deletion for the rows of this partition (row i has its row deletion at index i).
+    protected final DeletionTimeArray deletions;
+
+    // The row data (cells data + complex deletions for complex columns) for the rows contained in this partition.
+    protected final RowDataBlock data;
+
+    // Stats over the rows stored in this partition.
+    private final RowStats.Collector statsCollector = new RowStats.Collector();
+
+    // The maximum timestamp for any data contained in this partition.
+    protected long maxTimestamp = Long.MIN_VALUE;
+
+    private AbstractPartitionData(CFMetaData metadata,
+                                    DecoratedKey key,
+                                    DeletionInfo deletionInfo,
+                                    ByteBuffer[] clusterings,
+                                    LivenessInfoArray livenessInfos,
+                                    DeletionTimeArray deletions,
+                                    PartitionColumns columns,
+                                    RowDataBlock data)
+    {
+        this.metadata = metadata;
+        this.key = key;
+        this.deletionInfo = deletionInfo;
+        this.clusterings = clusterings;
+        this.livenessInfos = livenessInfos;
+        this.deletions = deletions;
+        this.columns = columns;
+        this.data = data;
+
+        collectStats(deletionInfo.getPartitionDeletion());
+        Iterator<RangeTombstone> iter = deletionInfo.rangeIterator(false);
+        while (iter.hasNext())
+            collectStats(iter.next().deletionTime());
+    }
+
+    protected AbstractPartitionData(CFMetaData metadata,
+                                    DecoratedKey key,
+                                    DeletionInfo deletionInfo,
+                                    PartitionColumns columns,
+                                    RowDataBlock data,
+                                    int initialRowCapacity)
+    {
+        this(metadata,
+             key,
+             deletionInfo,
+             new ByteBuffer[initialRowCapacity * metadata.clusteringColumns().size()],
+             new LivenessInfoArray(initialRowCapacity),
+             new DeletionTimeArray(initialRowCapacity),
+             columns,
+             data);
+    }
+
+    protected AbstractPartitionData(CFMetaData metadata,
+                                    DecoratedKey key,
+                                    DeletionTime partitionDeletion,
+                                    PartitionColumns columns,
+                                    int initialRowCapacity,
+                                    boolean sortable)
+    {
+        this(metadata,
+             key,
+             new DeletionInfo(partitionDeletion.takeAlias()),
+             columns,
+             new RowDataBlock(columns.regulars, initialRowCapacity, sortable, metadata.isCounter()),
+             initialRowCapacity);
+    }
+
+    private void collectStats(DeletionTime dt)
+    {
+        statsCollector.updateDeletionTime(dt);
+        maxTimestamp = Math.max(maxTimestamp, dt.markedForDeleteAt());
+    }
+
+    private void collectStats(LivenessInfo info)
+    {
+        statsCollector.updateTimestamp(info.timestamp());
+        statsCollector.updateTTL(info.ttl());
+        statsCollector.updateLocalDeletionTime(info.localDeletionTime());
+        maxTimestamp = Math.max(maxTimestamp, info.timestamp());
+    }
+
+    public CFMetaData metadata()
+    {
+        return metadata;
+    }
+
+    public DecoratedKey partitionKey()
+    {
+        return key;
+    }
+
+    public DeletionTime partitionLevelDeletion()
+    {
+        return deletionInfo.getPartitionDeletion();
+    }
+
+    public PartitionColumns columns()
+    {
+        return columns;
+    }
+
+    public Row staticRow()
+    {
+        return staticRow == null ? Rows.EMPTY_STATIC_ROW : staticRow;
+    }
+
+    public RowStats stats()
+    {
+        return statsCollector.get();
+    }
+
+    /**
+     * The deletion info for the partition update.
+     *
+     * <b>warning:</b> the returned object should be used in a read-only fashion. In particular,
+     * it should not be used to add new range tombstones to this deletion. For that,
+     * {@link addRangeTombstone} should be used instead. The reason being that adding directly to
+     * the returned object would bypass some stats collection that {@code addRangeTombstone} does.
+     *
+     * @return the deletion info for the partition update for use as read-only.
+     */
+    public DeletionInfo deletionInfo()
+    {
+        // TODO: it is a tad fragile that deletionInfo can be but shouldn't be modified. We
+        // could add the option of providing a read-only view of a DeletionInfo instead.
+        return deletionInfo;
+    }
+
+    public void addPartitionDeletion(DeletionTime deletionTime)
+    {
+        collectStats(deletionTime);
+        deletionInfo.add(deletionTime);
+    }
+
+    public void addRangeTombstone(Slice deletedSlice, DeletionTime deletion)
+    {
+        addRangeTombstone(new RangeTombstone(deletedSlice, deletion.takeAlias()));
+    }
+
+    public void addRangeTombstone(RangeTombstone range)
+    {
+        collectStats(range.deletionTime());
+        deletionInfo.add(range, metadata.comparator);
+    }
+
+    /**
+     * Swap row i and j.
+     *
+     * This is only used when we need to reorder rows because those were not added in clustering order,
+     * which happens in {@link PartitionUpdate#sort} and {@link ArrayBackedPartition#create}. This method
+     * is public only because {@code PartitionUpdate} needs to implement {@link Sorting.Sortable}, but
+     * it should really only be used by subclasses (and with care) in practice.
+     */
+    public void swap(int i, int j)
+    {
+        int cs = metadata.clusteringColumns().size();
+        for (int k = 0; k < cs; k++)
+        {
+            ByteBuffer tmp = clusterings[j * cs + k];
+            clusterings[j * cs + k] = clusterings[i * cs + k];
+            clusterings[i * cs + k] = tmp;
+        }
+
+        livenessInfos.swap(i, j);
+        deletions.swap(i, j);
+        data.swap(i, j);
+    }
+
+    public int rowCount()
+    {
+        return rows;
+    }
+
+    public boolean isEmpty()
+    {
+        return deletionInfo.isLive() && rows == 0 && staticRow().isEmpty();
+    }
+
+    protected void clear()
+    {
+        rows = 0;
+        Arrays.fill(clusterings, null);
+        livenessInfos.clear();
+        deletions.clear();
+        data.clear();
+    }
+
+    @Override
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder();
+        CFMetaData metadata = metadata();
+        sb.append(String.format("Partition[%s.%s] key=%s columns=%s deletion=%s",
+                    metadata.ksName,
+                    metadata.cfName,
+                    metadata.getKeyValidator().getString(partitionKey().getKey()),
+                    columns(),
+                    deletionInfo));
+
+        if (staticRow() != Rows.EMPTY_STATIC_ROW)
+            sb.append("\n    ").append(staticRow().toString(metadata, true));
+
+        // We use createRowIterator() directly instead of iterator() because that avoids
+        // sorting for PartitionUpdate (which inherit this method) and that is useful because
+        //  1) it can help with debugging and 2) we can't write after sorting but we want to
+        // be able to print an update while we build it (again for debugging)
+        Iterator<Row> iterator = createRowIterator(null, false);
+        while (iterator.hasNext())
+            sb.append("\n    ").append(iterator.next().toString(metadata, true));
+
+        return sb.toString();
+    }
+
+    protected void reverse()
+    {
+        for (int i = 0; i < rows / 2; i++)
+            swap(i, rows - 1 - i);
+    }
+
+    public Row getRow(Clustering clustering)
+    {
+        Row row = searchIterator(ColumnFilter.selection(columns()), false).next(clustering);
+        // Note that for statics, this will never return null, this will return an empty row. However,
+        // it's more consistent for this method to return null if we don't really have a static row.
+        return row == null || (clustering == Clustering.STATIC_CLUSTERING && row.isEmpty()) ? null : row;
+    }
+
+    /**
+     * Returns an iterator that iterators over the rows of this update in clustering order.
+     *
+     * @return an iterator over the rows of this update.
+     */
+    public Iterator<Row> iterator()
+    {
+        return createRowIterator(null, false);
+    }
+
+    public SearchIterator<Clustering, Row> searchIterator(final ColumnFilter columns, boolean reversed)
+    {
+        final RowIterator iter = createRowIterator(columns, reversed);
+        return new SearchIterator<Clustering, Row>()
+        {
+            public boolean hasNext()
+            {
+                return iter.hasNext();
+            }
+
+            public Row next(Clustering key)
+            {
+                if (key == Clustering.STATIC_CLUSTERING)
+                {
+                    if (columns.fetchedColumns().statics.isEmpty() || staticRow().isEmpty())
+                        return Rows.EMPTY_STATIC_ROW;
+
+                    return FilteringRow.columnsFilteringRow(columns).setTo(staticRow());
+                }
+
+                return iter.seekTo(key) ? iter.next() : null;
+            }
+        };
+    }
+
+    public UnfilteredRowIterator unfilteredIterator()
+    {
+        return unfilteredIterator(ColumnFilter.selection(columns()), Slices.ALL, false);
+    }
+
+    public UnfilteredRowIterator unfilteredIterator(ColumnFilter columns, Slices slices, boolean reversed)
+    {
+        return slices.makeSliceIterator(sliceableUnfilteredIterator(columns, reversed));
+    }
+
+    protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator()
+    {
+        return sliceableUnfilteredIterator(ColumnFilter.selection(columns()), false);
+    }
+
+    protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator(final ColumnFilter selection, final boolean reversed)
+    {
+        return new AbstractSliceableIterator(this, selection.fetchedColumns(), reversed)
+        {
+            private final RowIterator rowIterator = createRowIterator(selection, reversed);
+            private RowAndTombstoneMergeIterator mergeIterator = new RowAndTombstoneMergeIterator(metadata.comparator, reversed);
+
+            protected Unfiltered computeNext()
+            {
+                if (!mergeIterator.isSet())
+                    mergeIterator.setTo(rowIterator, deletionInfo.rangeIterator(reversed));
+
+                return mergeIterator.hasNext() ? mergeIterator.next() : endOfData();
+            }
+
+            public Iterator<Unfiltered> slice(Slice slice)
+            {
+                return mergeIterator.setTo(rowIterator.slice(slice), deletionInfo.rangeIterator(slice, reversed));
+            }
+        };
+    }
+
+    private RowIterator createRowIterator(ColumnFilter columns, boolean reversed)
+    {
+        return reversed ? new ReverseRowIterator(columns) : new ForwardRowIterator(columns);
+    }
+
+    /**
+     * An iterator over the rows of this partition that reuse the same row object.
+     */
+    private abstract class RowIterator extends UnmodifiableIterator<Row>
+    {
+        protected final InternalReusableClustering clustering = new InternalReusableClustering();
+        protected final InternalReusableRow reusableRow;
+        protected final FilteringRow filter;
+
+        protected int next;
+
+        protected RowIterator(final ColumnFilter columns)
+        {
+            this.reusableRow = new InternalReusableRow(clustering);
+            this.filter = columns == null ? null : FilteringRow.columnsFilteringRow(columns);
+        }
+
+        /*
+         * Move the iterator so that row {@code name} is returned next by {@code next} if that
+         * row exists. Otherwise the first row sorting after {@code name} will be returned.
+         * Returns whether {@code name} was found or not.
+         */
+        public abstract boolean seekTo(Clustering name);
+
+        public abstract Iterator<Row> slice(Slice slice);
+
+        protected Row setRowTo(int row)
+        {
+            reusableRow.setTo(row);
+            return filter == null ? reusableRow : filter.setTo(reusableRow);
+        }
+
+        /**
+         * Simple binary search.
+         */
+        protected int binarySearch(ClusteringPrefix name, int fromIndex, int toIndex)
+        {
+            int low = fromIndex;
+            int mid = toIndex;
+            int high = mid - 1;
+            int result = -1;
+            while (low <= high)
+            {
+                mid = (low + high) >> 1;
+                if ((result = metadata.comparator.compare(name, clustering.setTo(mid))) > 0)
+                    low = mid + 1;
+                else if (result == 0)
+                    return mid;
+                else
+                    high = mid - 1;
+            }
+            return -mid - (result < 0 ? 1 : 2);
+        }
+    }
+
+    private class ForwardRowIterator extends RowIterator
+    {
+        private ForwardRowIterator(ColumnFilter columns)
+        {
+            super(columns);
+            this.next = 0;
+        }
+
+        public boolean hasNext()
+        {
+            return next < rows;
+        }
+
+        public Row next()
+        {
+            return setRowTo(next++);
+        }
+
+        public boolean seekTo(Clustering name)
+        {
+            if (next >= rows)
+                return false;
+
+            int idx = binarySearch(name, next, rows);
+            next = idx >= 0 ? idx : -idx - 1;
+            return idx >= 0;
+        }
+
+        public Iterator<Row> slice(Slice slice)
+        {
+            int sidx = binarySearch(slice.start(), next, rows);
+            final int start = sidx >= 0 ? sidx : -sidx - 1;
+            if (start >= rows)
+                return Collections.emptyIterator();
+
+            int eidx = binarySearch(slice.end(), start, rows);
+            // The insertion point is the first element greater than slice.end(), so we want the previous index
+            final int end = eidx >= 0 ? eidx : -eidx - 2;
+
+            // Remember the end to speed up potential further slice search
+            next = end;
+
+            if (start > end)
+                return Collections.emptyIterator();
+
+            return new AbstractIterator<Row>()
+            {
+                private int i = start;
+
+                protected Row computeNext()
+                {
+                    if (i >= rows || i > end)
+                        return endOfData();
+
+                    return setRowTo(i++);
+                }
+            };
+        }
+    }
+
+    private class ReverseRowIterator extends RowIterator
+    {
+        private ReverseRowIterator(ColumnFilter columns)
+        {
+            super(columns);
+            this.next = rows - 1;
+        }
+
+        public boolean hasNext()
+        {
+            return next >= 0;
+        }
+
+        public Row next()
+        {
+            return setRowTo(next--);
+        }
+
+        public boolean seekTo(Clustering name)
+        {
+            // We only use that method with forward iterators.
+            throw new UnsupportedOperationException();
+        }
+
+        public Iterator<Row> slice(Slice slice)
+        {
+            int sidx = binarySearch(slice.end(), 0, next + 1);
+            // The insertion point is the first element greater than slice.end(), so we want the previous index
+            final int start = sidx >= 0 ? sidx : -sidx - 2;
+            if (start < 0)
+                return Collections.emptyIterator();
+
+            int eidx = binarySearch(slice.start(), 0, start + 1);
+            final int end = eidx >= 0 ? eidx : -eidx - 1;
+
+            // Remember the end to speed up potential further slice search
+            next = end;
+
+            if (start < end)
+                return Collections.emptyIterator();
+
+            return new AbstractIterator<Row>()
+            {
+                private int i = start;
+
+                protected Row computeNext()
+                {
+                    if (i < 0 || i < end)
+                        return endOfData();
+
+                    return setRowTo(i--);
+                }
+            };
+        }
+    }
+
+    /**
+     * A reusable view over the clustering of this partition.
+     */
+    protected class InternalReusableClustering extends Clustering
+    {
+        final int size = metadata.clusteringColumns().size();
+        private int base;
+
+        public int size()
+        {
+            return size;
+        }
+
+        public Clustering setTo(int row)
+        {
+            base = row * size;
+            return this;
+        }
+
+        public ByteBuffer get(int i)
+        {
+            return clusterings[base + i];
+        }
+
+        public ByteBuffer[] getRawValues()
+        {
+            ByteBuffer[] values = new ByteBuffer[size];
+            for (int i = 0; i < size; i++)
+                values[i] = get(i);
+            return values;
+        }
+    };
+
+    /**
+     * A reusable view over the rows of this partition.
+     */
+    protected class InternalReusableRow extends AbstractReusableRow
+    {
+        private final LivenessInfoArray.Cursor liveness = new LivenessInfoArray.Cursor();
+        private final DeletionTimeArray.Cursor deletion = new DeletionTimeArray.Cursor();
+        private final InternalReusableClustering clustering;
+
+        private int row;
+
+        public InternalReusableRow()
+        {
+            this(new InternalReusableClustering());
+        }
+
+        public InternalReusableRow(InternalReusableClustering clustering)
+        {
+            this.clustering = clustering;
+        }
+
+        protected RowDataBlock data()
+        {
+            return data;
+        }
+
+        public Row setTo(int row)
+        {
+            this.clustering.setTo(row);
+            this.liveness.setTo(livenessInfos, row);
+            this.deletion.setTo(deletions, row);
+            this.row = row;
+            return this;
+        }
+
+        protected int row()
+        {
+            return row;
+        }
+
+        public Clustering clustering()
+        {
+            return clustering;
+        }
+
+        public LivenessInfo primaryKeyLivenessInfo()
+        {
+            return liveness;
+        }
+
+        public DeletionTime deletion()
+        {
+            return deletion;
+        }
+    };
+
+    private static abstract class AbstractSliceableIterator extends AbstractUnfilteredRowIterator implements SliceableUnfilteredRowIterator
+    {
+        private AbstractSliceableIterator(AbstractPartitionData data, PartitionColumns columns, boolean isReverseOrder)
+        {
+            super(data.metadata, data.key, data.partitionLevelDeletion(), columns, data.staticRow(), isReverseOrder, data.stats());
+        }
+    }
+
+    /**
+     * A row writer to add rows to this partition.
+     */
+    protected class Writer extends RowDataBlock.Writer
+    {
+        private int clusteringBase;
+
+        private int simpleColumnsSetInRow;
+        private final Set<ColumnDefinition> complexColumnsSetInRow = new HashSet<>();
+
+        public Writer(boolean inOrderCells)
+        {
+            super(data, inOrderCells);
+        }
+
+        public void writeClusteringValue(ByteBuffer value)
+        {
+            ensureCapacity(row);
+            clusterings[clusteringBase++] = value;
+        }
+
+        public void writePartitionKeyLivenessInfo(LivenessInfo info)
+        {
+            ensureCapacity(row);
+            livenessInfos.set(row, info);
+            collectStats(info);
+        }
+
+        public void writeRowDeletion(DeletionTime deletion)
+        {
+            ensureCapacity(row);
+            if (!deletion.isLive())
+                deletions.set(row, deletion);
+
+            collectStats(deletion);
+        }
+
+        @Override
+        public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path)
+        {
+            ensureCapacity(row);
+            collectStats(info);
+
+            if (column.isComplex())
+                complexColumnsSetInRow.add(column);
+            else
+                ++simpleColumnsSetInRow;
+
+            super.writeCell(column, isCounter, value, info, path);
+        }
+
+        @Override
+        public void writeComplexDeletion(ColumnDefinition c, DeletionTime complexDeletion)
+        {
+            ensureCapacity(row);
+            collectStats(complexDeletion);
+
+            super.writeComplexDeletion(c, complexDeletion);
+        }
+
+        @Override
+        public void endOfRow()
+        {
+            super.endOfRow();
+            ++rows;
+
+            statsCollector.updateColumnSetPerRow(simpleColumnsSetInRow + complexColumnsSetInRow.size());
+
+            simpleColumnsSetInRow = 0;
+            complexColumnsSetInRow.clear();
+        }
+
+        public int currentRow()
+        {
+            return row;
+        }
+
+        private void ensureCapacity(int rowToSet)
+        {
+            int originalCapacity = livenessInfos.size();
+            if (rowToSet < originalCapacity)
+                return;
+
+            int newCapacity = RowDataBlock.computeNewCapacity(originalCapacity, rowToSet);
+
+            int clusteringSize = metadata.clusteringColumns().size();
+
+            clusterings = Arrays.copyOf(clusterings, newCapacity * clusteringSize);
+
+            livenessInfos.resize(newCapacity);
+            deletions.resize(newCapacity);
+        }
+
+        @Override
+        public Writer reset()
+        {
+            super.reset();
+            clusteringBase = 0;
+            simpleColumnsSetInRow = 0;
+            complexColumnsSetInRow.clear();
+            return this;
+        }
+    }
+
+    /**
+     * A range tombstone marker writer to add range tombstone markers to this partition.
+     */
+    protected class RangeTombstoneCollector implements RangeTombstoneMarker.Writer
+    {
+        private final boolean reversed;
+
+        private final ByteBuffer[] nextValues = new ByteBuffer[metadata().comparator.size()];
+        private int size;
+        private RangeTombstone.Bound.Kind nextKind;
+
+        private Slice.Bound openBound;
+        private DeletionTime openDeletion;
+
+        public RangeTombstoneCollector(boolean reversed)
+        {
+            this.reversed = reversed;
+        }
+
+        public void writeClusteringValue(ByteBuffer value)
+        {
+            nextValues[size++] = value;
+        }
+
+        public void writeBoundKind(RangeTombstone.Bound.Kind kind)
+        {
+            nextKind = kind;
+        }
+
+        private ByteBuffer[] getValues()
+        {
+            return Arrays.copyOfRange(nextValues, 0, size);
+        }
+
+        private void open(RangeTombstone.Bound.Kind kind, DeletionTime deletion)
+        {
+            openBound = Slice.Bound.create(kind, getValues());
+            openDeletion = deletion.takeAlias();
+        }
+
+        private void close(RangeTombstone.Bound.Kind kind, DeletionTime deletion)
+        {
+            assert deletion.equals(openDeletion) : "Expected " + openDeletion + " but was "  + deletion;
+            Slice.Bound closeBound = Slice.Bound.create(kind, getValues());
+            Slice slice = reversed
+                        ? Slice.make(closeBound, openBound)
+                        : Slice.make(openBound, closeBound);
+            addRangeTombstone(slice, openDeletion);
+        }
+
+        public void writeBoundDeletion(DeletionTime deletion)
+        {
+            assert !nextKind.isBoundary();
+            if (nextKind.isOpen(reversed))
+                open(nextKind, deletion);
+            else
+                close(nextKind, deletion);
+        }
+
+        public void writeBoundaryDeletion(DeletionTime endDeletion, DeletionTime startDeletion)
+        {
+            assert nextKind.isBoundary();
+            DeletionTime closeTime = reversed ? startDeletion : endDeletion;
+            DeletionTime openTime = reversed ? endDeletion : startDeletion;
+
+            close(nextKind.closeBoundOfBoundary(reversed), closeTime);
+            open(nextKind.openBoundOfBoundary(reversed), openTime);
+        }
+
+        public void endOfMarker()
+        {
+            clear();
+        }
+
+        private void addRangeTombstone(Slice deletionSlice, DeletionTime dt)
+        {
+            AbstractPartitionData.this.addRangeTombstone(deletionSlice, dt);
+        }
+
+        private void clear()
+        {
+            size = 0;
+            Arrays.fill(nextValues, null);
+            nextKind = null;
+        }
+
+        public void reset()
+        {
+            openBound = null;
+            openDeletion = null;
+            clear();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/AbstractUnfilteredPartitionIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractUnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/AbstractUnfilteredPartitionIterator.java
new file mode 100644
index 0000000..d615ea9
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/AbstractUnfilteredPartitionIterator.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+public abstract class AbstractUnfilteredPartitionIterator implements UnfilteredPartitionIterator
+{
+    public void remove()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public void close()
+    {
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java b/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
new file mode 100644
index 0000000..68b3970
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+
+public class ArrayBackedCachedPartition extends ArrayBackedPartition implements CachedPartition
+{
+    private final int createdAtInSec;
+
+    // Note that those fields are really immutable, but we can't easily pass their values to
+    // the ctor so they are not final.
+    private int cachedLiveRows;
+    private int rowsWithNonExpiringCells;
+
+    private int nonTombstoneCellCount;
+    private int nonExpiringLiveCells;
+
+    private ArrayBackedCachedPartition(CFMetaData metadata,
+                                       DecoratedKey partitionKey,
+                                       DeletionTime deletionTime,
+                                       PartitionColumns columns,
+                                       int initialRowCapacity,
+                                       boolean sortable,
+                                       int createdAtInSec)
+    {
+        super(metadata, partitionKey, deletionTime, columns, initialRowCapacity, sortable);
+        this.createdAtInSec = createdAtInSec;
+    }
+
+    /**
+     * Creates an {@code ArrayBackedCachedPartition} holding all the data of the provided iterator.
+     *
+     * Warning: Note that this method does not close the provided iterator and it is
+     * up to the caller to do so.
+     *
+     * @param iterator the iterator got gather in memory.
+     * @param nowInSec the time of the creation in seconds. This is the time at which {@link #cachedLiveRows} applies.
+     * @return the created partition.
+     */
+    public static ArrayBackedCachedPartition create(UnfilteredRowIterator iterator, int nowInSec)
+    {
+        return create(iterator, 4, nowInSec);
+    }
+
+    /**
+     * Creates an {@code ArrayBackedCachedPartition} holding all the data of the provided iterator.
+     *
+     * Warning: Note that this method does not close the provided iterator and it is
+     * up to the caller to do so.
+     *
+     * @param iterator the iterator got gather in memory.
+     * @param initialRowCapacity sizing hint (in rows) to use for the created partition. It should ideally
+     * correspond or be a good estimation of the number or rows in {@code iterator}.
+     * @param nowInSec the time of the creation in seconds. This is the time at which {@link #cachedLiveRows} applies.
+     * @return the created partition.
+     */
+    public static ArrayBackedCachedPartition create(UnfilteredRowIterator iterator, int initialRowCapacity, int nowInSec)
+    {
+        ArrayBackedCachedPartition partition = new ArrayBackedCachedPartition(iterator.metadata(),
+                                                                              iterator.partitionKey(),
+                                                                              iterator.partitionLevelDeletion(),
+                                                                              iterator.columns(),
+                                                                              initialRowCapacity,
+                                                                              iterator.isReverseOrder(),
+                                                                              nowInSec);
+
+        partition.staticRow = iterator.staticRow().takeAlias();
+
+        Writer writer = partition.new Writer(nowInSec);
+        RangeTombstoneCollector markerCollector = partition.new RangeTombstoneCollector(iterator.isReverseOrder());
+
+        copyAll(iterator, writer, markerCollector, partition);
+
+        return partition;
+    }
+
+    public Row lastRow()
+    {
+        if (rows == 0)
+            return null;
+
+        return new InternalReusableRow().setTo(rows - 1);
+    }
+
+    /**
+     * The number of rows that were live at the time the partition was cached.
+     *
+     * See {@link ColumnFamilyStore#isFilterFullyCoveredBy} to see why we need this.
+     *
+     * @return the number of rows in this partition that were live at the time the
+     * partition was cached (this can be different from the number of live rows now
+     * due to expiring cells).
+     */
+    public int cachedLiveRows()
+    {
+        return cachedLiveRows;
+    }
+
+    /**
+     * The number of rows in this cached partition that have at least one non-expiring
+     * non-deleted cell.
+     *
+     * Note that this is generally not a very meaningful number, but this is used by
+     * {@link DataLimits#hasEnoughLiveData} as an optimization.
+     *
+     * @return the number of row that have at least one non-expiring non-deleted cell.
+     */
+    public int rowsWithNonExpiringCells()
+    {
+        return rowsWithNonExpiringCells;
+    }
+
+    public int nonTombstoneCellCount()
+    {
+        return nonTombstoneCellCount;
+    }
+
+    public int nonExpiringLiveCells()
+    {
+        return nonExpiringLiveCells;
+    }
+
+    // Writers that collect the values for 'cachedLiveRows', 'rowsWithNonExpiringCells', 'nonTombstoneCellCount'
+    // and 'nonExpiringLiveCells'.
+    protected class Writer extends AbstractPartitionData.Writer
+    {
+        private final int nowInSec;
+
+        private boolean hasLiveData;
+        private boolean hasNonExpiringCell;
+
+        protected Writer(int nowInSec)
+        {
+            super(true);
+            this.nowInSec = nowInSec;
+        }
+
+        @Override
+        public void writePartitionKeyLivenessInfo(LivenessInfo info)
+        {
+            super.writePartitionKeyLivenessInfo(info);
+            if (info.isLive(nowInSec))
+                hasLiveData = true;
+        }
+
+        @Override
+        public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path)
+        {
+            super.writeCell(column, isCounter, value, info, path);
+
+            if (info.isLive(nowInSec))
+            {
+                hasLiveData = true;
+                if (!info.hasTTL())
+                {
+                    hasNonExpiringCell = true;
+                    ++ArrayBackedCachedPartition.this.nonExpiringLiveCells;
+                }
+            }
+
+            if (!info.hasLocalDeletionTime() || info.hasTTL())
+                ++ArrayBackedCachedPartition.this.nonTombstoneCellCount;
+        }
+
+        @Override
+        public void endOfRow()
+        {
+            super.endOfRow();
+            if (hasLiveData)
+                ++ArrayBackedCachedPartition.this.cachedLiveRows;
+            if (hasNonExpiringCell)
+                ++ArrayBackedCachedPartition.this.rowsWithNonExpiringCells;
+
+            hasLiveData = false;
+            hasNonExpiringCell = false;
+        }
+    }
+
+    static class Serializer implements ISerializer<CachedPartition>
+    {
+        public void serialize(CachedPartition partition, DataOutputPlus out) throws IOException
+        {
+            assert partition instanceof ArrayBackedCachedPartition;
+            ArrayBackedCachedPartition p = (ArrayBackedCachedPartition)partition;
+
+            out.writeInt(p.createdAtInSec);
+            try (UnfilteredRowIterator iter = p.sliceableUnfilteredIterator())
+            {
+                UnfilteredRowIteratorSerializer.serializer.serialize(iter, out, MessagingService.current_version, p.rows);
+            }
+        }
+
+        public CachedPartition deserialize(DataInput in) throws IOException
+        {
+            // Note that it would be slightly simpler to just do
+            //   ArrayBackedCachedPiartition.create(UnfilteredRowIteratorSerializer.serializer.deserialize(...));
+            // However deserializing the header separatly is not a lot harder and allows us to:
+            //   1) get the capacity of the partition so we can size it properly directly
+            //   2) saves the creation of a temporary iterator: rows are directly written to the partition, which
+            //      is slightly faster.
+
+            int createdAtInSec = in.readInt();
+
+            UnfilteredRowIteratorSerializer.Header h = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, MessagingService.current_version, SerializationHelper.Flag.LOCAL);
+            assert !h.isReversed && h.rowEstimate >= 0;
+
+            ArrayBackedCachedPartition partition = new ArrayBackedCachedPartition(h.metadata, h.key, h.partitionDeletion, h.sHeader.columns(), h.rowEstimate, false, createdAtInSec);
+            partition.staticRow = h.staticRow;
+
+            Writer writer = partition.new Writer(createdAtInSec);
+            RangeTombstoneMarker.Writer markerWriter = partition.new RangeTombstoneCollector(false);
+
+            UnfilteredRowIteratorSerializer.serializer.deserialize(in, new SerializationHelper(MessagingService.current_version, SerializationHelper.Flag.LOCAL), h.sHeader, writer, markerWriter);
+            return partition;
+        }
+
+        public long serializedSize(CachedPartition partition, TypeSizes sizes)
+        {
+            assert partition instanceof ArrayBackedCachedPartition;
+            ArrayBackedCachedPartition p = (ArrayBackedCachedPartition)partition;
+
+            try (UnfilteredRowIterator iter = p.sliceableUnfilteredIterator())
+            {
+                return sizes.sizeof(p.createdAtInSec)
+                     + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, MessagingService.current_version, p.rows, sizes);
+            }
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java b/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java
new file mode 100644
index 0000000..d7f3a88
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+
+public class ArrayBackedPartition extends AbstractPartitionData
+{
+    protected ArrayBackedPartition(CFMetaData metadata,
+                                   DecoratedKey partitionKey,
+                                   DeletionTime deletionTime,
+                                   PartitionColumns columns,
+                                   int initialRowCapacity,
+                                   boolean sortable)
+    {
+        super(metadata, partitionKey, deletionTime, columns, initialRowCapacity, sortable);
+    }
+
+    /**
+     * Creates an {@code ArrayBackedPartition} holding all the data of the provided iterator.
+     *
+     * Warning: Note that this method does not close the provided iterator and it is
+     * up to the caller to do so.
+     *
+     * @param iterator the iterator to gather in memory.
+     * @return the created partition.
+     */
+    public static ArrayBackedPartition create(UnfilteredRowIterator iterator)
+    {
+        return create(iterator, 4);
+    }
+
+    /**
+     * Creates an {@code ArrayBackedPartition} holding all the data of the provided iterator.
+     *
+     * Warning: Note that this method does not close the provided iterator and it is
+     * up to the caller to do so.
+     *
+     * @param iterator the iterator to gather in memory.
+     * @param initialRowCapacity sizing hint (in rows) to use for the created partition. It should ideally
+     * correspond or be a good estimation of the number or rows in {@code iterator}.
+     * @return the created partition.
+     */
+    public static ArrayBackedPartition create(UnfilteredRowIterator iterator, int initialRowCapacity)
+    {
+        ArrayBackedPartition partition = new ArrayBackedPartition(iterator.metadata(),
+                                                                  iterator.partitionKey(),
+                                                                  iterator.partitionLevelDeletion(),
+                                                                  iterator.columns(),
+                                                                  initialRowCapacity,
+                                                                  iterator.isReverseOrder());
+
+        partition.staticRow = iterator.staticRow().takeAlias();
+
+        Writer writer = partition.new Writer(true);
+        RangeTombstoneCollector markerCollector = partition.new RangeTombstoneCollector(iterator.isReverseOrder());
+
+        copyAll(iterator, writer, markerCollector, partition);
+
+        return partition;
+    }
+
+    protected static void copyAll(UnfilteredRowIterator iterator, Writer writer, RangeTombstoneCollector markerCollector, ArrayBackedPartition partition)
+    {
+        while (iterator.hasNext())
+        {
+            Unfiltered unfiltered = iterator.next();
+            if (unfiltered.kind() == Unfiltered.Kind.ROW)
+                ((Row) unfiltered).copyTo(writer);
+            else
+                ((RangeTombstoneMarker) unfiltered).copyTo(markerCollector);
+        }
+
+        // A Partition (or more precisely AbstractPartitionData) always assumes that its data is in clustering
+        // order. So if we've just added them in reverse clustering order, reverse them.
+        if (iterator.isReverseOrder())
+            partition.reverse();
+    }
+}