You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2016/01/27 11:50:07 UTC

[2/2] cassandra git commit: 9472: Reintroduce Off-Heap Memtables

9472: Reintroduce Off-Heap Memtables

patch by benedict and stefania


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2f412431
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2f412431
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2f412431

Branch: refs/heads/trunk
Commit: 2f41243191c381193a3bf6ec3730ff6555325d06
Parents: a0901b8
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Nov 4 17:03:38 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Jan 27 10:49:26 2016 +0000

----------------------------------------------------------------------
 conf/cassandra.yaml                             |   1 +
 .../cassandra/config/DatabaseDescriptor.java    |   3 +-
 .../db/AbstractBufferClusteringPrefix.java      |  72 ++++++
 .../cassandra/db/AbstractClusteringPrefix.java  |  44 ----
 .../apache/cassandra/db/BufferClustering.java   |  44 ++++
 src/java/org/apache/cassandra/db/CBuilder.java  |   6 +-
 .../org/apache/cassandra/db/Clustering.java     | 150 ++++++------
 .../cassandra/db/ClusteringComparator.java      |   3 -
 .../apache/cassandra/db/ClusteringPrefix.java   |   4 +-
 .../org/apache/cassandra/db/LegacyLayout.java   |   8 +-
 src/java/org/apache/cassandra/db/Memtable.java  |   5 +-
 .../org/apache/cassandra/db/MultiCBuilder.java  |   2 +-
 .../apache/cassandra/db/NativeClustering.java   | 125 ++++++++++
 .../apache/cassandra/db/NativeDecoratedKey.java |   6 +-
 .../apache/cassandra/db/RowUpdateBuilder.java   |   2 -
 .../org/apache/cassandra/db/Serializers.java    |   6 +-
 .../db/SinglePartitionReadCommand.java          |  18 +-
 src/java/org/apache/cassandra/db/Slice.java     |   2 +-
 .../apache/cassandra/db/filter/RowFilter.java   |   4 +-
 .../db/partitions/AbstractBTreePartition.java   |  25 +-
 .../db/partitions/AtomicBTreePartition.java     |  66 +++++-
 .../db/partitions/FilteredPartition.java        |   2 +-
 .../apache/cassandra/db/rows/AbstractCell.java  |  79 +++++++
 .../org/apache/cassandra/db/rows/BTreeRow.java  |  33 ++-
 .../apache/cassandra/db/rows/BufferCell.java    | 232 -------------------
 src/java/org/apache/cassandra/db/rows/Cell.java | 161 ++++++++++++-
 .../apache/cassandra/db/rows/NativeCell.java    | 151 ++++++++++++
 .../apache/cassandra/db/rows/RowIterator.java   |   5 -
 .../db/rows/UnfilteredRowIterators.java         |  27 ---
 .../apache/cassandra/db/transform/BaseRows.java |   4 +
 .../cassandra/db/transform/Transformation.java  |   6 +
 .../apache/cassandra/db/view/TemporalRow.java   |  11 +-
 .../cassandra/index/internal/IndexEntry.java    |   1 +
 .../index/internal/keys/KeysSearcher.java       |   4 +-
 .../io/sstable/metadata/MetadataCollector.java  |   4 -
 .../cassandra/io/util/MemoryInputStream.java    |   2 +-
 .../apache/cassandra/service/CacheService.java  |   2 -
 .../cassandra/thrift/CassandraServer.java       |  16 +-
 .../cassandra/thrift/ThriftResultsMerger.java   |   2 +-
 .../utils/memory/AbstractAllocator.java         |   1 -
 .../cassandra/utils/memory/EnsureOnHeap.java    | 150 ++++++++++++
 .../cassandra/utils/memory/HeapAllocator.java   |   5 +
 .../apache/cassandra/utils/memory/HeapPool.java |   5 -
 .../cassandra/utils/memory/MemoryUtil.java      |  19 +-
 .../utils/memory/MemtableAllocator.java         |   5 +-
 .../cassandra/utils/memory/MemtablePool.java    |   1 -
 .../cassandra/utils/memory/NativeAllocator.java |  46 +++-
 .../cassandra/utils/memory/NativePool.java      |   6 -
 .../cassandra/utils/memory/SlabAllocator.java   |  15 +-
 .../apache/cassandra/utils/memory/SlabPool.java |   5 -
 test/conf/cassandra.yaml                        |   3 +-
 .../org/apache/cassandra/db/KeyspaceTest.java   |   4 +-
 .../org/apache/cassandra/db/NativeCellTest.java | 171 ++++++++++++++
 .../cassandra/db/RangeTombstoneListTest.java    |   2 +-
 .../apache/cassandra/db/RangeTombstoneTest.java |  20 +-
 .../apache/cassandra/db/ReadMessageTest.java    |   2 +-
 .../org/apache/cassandra/db/RowCacheTest.java   |   2 +-
 .../rows/DigestBackwardCompatibilityTest.java   |   2 -
 .../rows/UnfilteredRowIteratorsMergeTest.java   |   2 +-
 .../io/sstable/CQLSSTableWriterClientTest.java  |   4 -
 .../cassandra/io/sstable/SSTableLoaderTest.java |   2 +-
 .../service/pager/PagingStateTest.java          |   2 +-
 62 files changed, 1265 insertions(+), 547 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index e29a6d3..a9749f2 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -398,6 +398,7 @@ concurrent_materialized_view_writes: 32
 # Options are:
 #   heap_buffers:    on heap nio buffers
 #   offheap_buffers: off heap (direct) nio buffers
+#   offheap_objects: off heap objects
 memtable_allocation_type: heap_buffers
 
 # Total space to use for commit logs on disk.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 2a2719a..b09605f 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1847,8 +1847,7 @@ public class DatabaseDescriptor
                 }
                 return new SlabPool(heapLimit, offHeapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily());
             case offheap_objects:
-                throw new ConfigurationException("offheap_objects are not available in 3.0. They should be re-introduced in a future release, see https://issues.apache.org/jira/browse/CASSANDRA-9472 for details");
-                // return new NativePool(heapLimit, offHeapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily());
+                return new NativePool(heapLimit, offHeapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily());
             default:
                 throw new AssertionError();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/AbstractBufferClusteringPrefix.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractBufferClusteringPrefix.java b/src/java/org/apache/cassandra/db/AbstractBufferClusteringPrefix.java
new file mode 100644
index 0000000..95bc777
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/AbstractBufferClusteringPrefix.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.utils.ObjectSizes;
+
+public abstract class AbstractBufferClusteringPrefix extends AbstractClusteringPrefix
+{
+    public static final ByteBuffer[] EMPTY_VALUES_ARRAY = new ByteBuffer[0];
+    private static final long EMPTY_SIZE = ObjectSizes.measure(Clustering.make(EMPTY_VALUES_ARRAY));
+
+    protected final Kind kind;
+    protected final ByteBuffer[] values;
+
+    protected AbstractBufferClusteringPrefix(Kind kind, ByteBuffer[] values)
+    {
+        this.kind = kind;
+        this.values = values;
+    }
+
+    public Kind kind()
+    {
+        return kind;
+    }
+
+    public ClusteringPrefix clustering()
+    {
+        return this;
+    }
+
+    public int size()
+    {
+        return values.length;
+    }
+
+    public ByteBuffer get(int i)
+    {
+        return values[i];
+    }
+
+    public ByteBuffer[] getRawValues()
+    {
+        return values;
+    }
+
+    public long unsharedHeapSize()
+    {
+        return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(values);
+    }
+
+    public long unsharedHeapSizeExcludingData()
+    {
+        return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(values);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java b/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java
index 2631b46..0b1daf7 100644
--- a/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java
+++ b/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java
@@ -22,48 +22,14 @@ import java.security.MessageDigest;
 import java.util.Objects;
 
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.ObjectSizes;
 
 public abstract class AbstractClusteringPrefix implements ClusteringPrefix
 {
-    protected static final ByteBuffer[] EMPTY_VALUES_ARRAY = new ByteBuffer[0];
-
-    private static final long EMPTY_SIZE = ObjectSizes.measure(new Clustering(EMPTY_VALUES_ARRAY));
-
-    protected final Kind kind;
-    protected final ByteBuffer[] values;
-
-    protected AbstractClusteringPrefix(Kind kind, ByteBuffer[] values)
-    {
-        this.kind = kind;
-        this.values = values;
-    }
-
-    public Kind kind()
-    {
-        return kind;
-    }
-
     public ClusteringPrefix clustering()
     {
         return this;
     }
 
-    public int size()
-    {
-        return values.length;
-    }
-
-    public ByteBuffer get(int i)
-    {
-        return values[i];
-    }
-
-    public ByteBuffer[] getRawValues()
-    {
-        return values;
-    }
-
     public int dataSize()
     {
         int size = 0;
@@ -86,16 +52,6 @@ public abstract class AbstractClusteringPrefix implements ClusteringPrefix
         FBUtilities.updateWithByte(digest, kind().ordinal());
     }
 
-    public long unsharedHeapSize()
-    {
-        return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(values);
-    }
-
-    public long unsharedHeapSizeExcludingData()
-    {
-        return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(values);
-    }
-
     @Override
     public final int hashCode()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/BufferClustering.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BufferClustering.java b/src/java/org/apache/cassandra/db/BufferClustering.java
new file mode 100644
index 0000000..7c6bb20
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/BufferClustering.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+
+/**
+ * The clustering column values for a row.
+ * <p>
+ * A {@code Clustering} is a {@code ClusteringPrefix} that must always be "complete", i.e. have
+ * as many values as there is clustering columns in the table it is part of. It is the clustering
+ * prefix used by rows.
+ * <p>
+ * Note however that while it's size must be equal to the table clustering size, a clustering can have
+ * {@code null} values, and this mostly for thrift backward compatibility (in practice, if a value is null,
+ * all of the following ones will be too because that's what thrift allows, but it's never assumed by the
+ * code so we could start generally allowing nulls for clustering columns if we wanted to).
+ */
+public class BufferClustering extends AbstractBufferClusteringPrefix implements Clustering
+{
+    BufferClustering(ByteBuffer... values)
+    {
+        super(Kind.CLUSTERING, values);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/CBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CBuilder.java b/src/java/org/apache/cassandra/db/CBuilder.java
index 94feb93..73b575f 100644
--- a/src/java/org/apache/cassandra/db/CBuilder.java
+++ b/src/java/org/apache/cassandra/db/CBuilder.java
@@ -162,7 +162,7 @@ public abstract class CBuilder
             built = true;
 
             // Currently, only dense table can leave some clustering column out (see #7990)
-            return size == 0 ? Clustering.EMPTY : new Clustering(values);
+            return size == 0 ? Clustering.EMPTY : Clustering.make(values);
         }
 
         public Slice.Bound buildBound(boolean isStart, boolean isInclusive)
@@ -196,7 +196,7 @@ public abstract class CBuilder
 
             ByteBuffer[] newValues = Arrays.copyOf(values, type.size());
             newValues[size] = value;
-            return new Clustering(newValues);
+            return Clustering.make(newValues);
         }
 
         public Clustering buildWith(List<ByteBuffer> newValues)
@@ -207,7 +207,7 @@ public abstract class CBuilder
             for (ByteBuffer value : newValues)
                 buffers[newSize++] = value;
 
-            return new Clustering(buffers);
+            return Clustering.make(buffers);
         }
 
         public Slice.Bound buildBoundWith(ByteBuffer value, boolean isStart, boolean isInclusive)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/Clustering.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Clustering.java b/src/java/org/apache/cassandra/db/Clustering.java
index a40cc1f..f5ffae4 100644
--- a/src/java/org/apache/cassandra/db/Clustering.java
+++ b/src/java/org/apache/cassandra/db/Clustering.java
@@ -1,53 +1,91 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
 package org.apache.cassandra.db;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.List;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.memory.AbstractAllocator;
 
-/**
- * The clustering column values for a row.
- * <p>
- * A {@code Clustering} is a {@code ClusteringPrefix} that must always be "complete", i.e. have
- * as many values as there is clustering columns in the table it is part of. It is the clustering
- * prefix used by rows.
- * <p>
- * Note however that while it's size must be equal to the table clustering size, a clustering can have
- * {@code null} values, and this mostly for thrift backward compatibility (in practice, if a value is null,
- * all of the following ones will be too because that's what thrift allows, but it's never assumed by the
- * code so we could start generally allowing nulls for clustering columns if we wanted to).
- */
-public class Clustering extends AbstractClusteringPrefix
+import static org.apache.cassandra.db.AbstractBufferClusteringPrefix.EMPTY_VALUES_ARRAY;
+
+public interface Clustering extends ClusteringPrefix
 {
     public static final Serializer serializer = new Serializer();
 
+    public long unsharedHeapSizeExcludingData();
+
+    public default Clustering copy(AbstractAllocator allocator)
+    {
+        // Important for STATIC_CLUSTERING (but must copy empty native clustering types).
+        if (size() == 0)
+            return kind() == Kind.STATIC_CLUSTERING ? this : new BufferClustering(EMPTY_VALUES_ARRAY);
+
+        ByteBuffer[] newValues = new ByteBuffer[size()];
+        for (int i = 0; i < size(); i++)
+        {
+            ByteBuffer val = get(i);
+            newValues[i] = val == null ? null : allocator.clone(val);
+        }
+        return new BufferClustering(newValues);
+    }
+
+    public default String toString(CFMetaData metadata)
+    {
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < size(); i++)
+        {
+            ColumnDefinition c = metadata.clusteringColumns().get(i);
+            sb.append(i == 0 ? "" : ", ").append(c.name).append('=').append(get(i) == null ? "null" : c.type.getString(get(i)));
+        }
+        return sb.toString();
+    }
+
+    public default String toCQLString(CFMetaData metadata)
+    {
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < size(); i++)
+        {
+            ColumnDefinition c = metadata.clusteringColumns().get(i);
+            sb.append(i == 0 ? "" : ", ").append(c.type.getString(get(i)));
+        }
+        return sb.toString();
+    }
+
+    public static Clustering make(ByteBuffer... values)
+    {
+        return new BufferClustering(values);
+    }
+
     /**
      * The special cased clustering used by all static rows. It is a special case in the
      * sense that it's always empty, no matter how many clustering columns the table has.
      */
-    public static final Clustering STATIC_CLUSTERING = new Clustering(EMPTY_VALUES_ARRAY)
+    public static final Clustering STATIC_CLUSTERING = new BufferClustering(EMPTY_VALUES_ARRAY)
     {
         @Override
         public Kind kind()
@@ -69,7 +107,7 @@ public class Clustering extends AbstractClusteringPrefix
     };
 
     /** Empty clustering for tables having no clustering columns. */
-    public static final Clustering EMPTY = new Clustering(EMPTY_VALUES_ARRAY)
+    public static final Clustering EMPTY = new BufferClustering(EMPTY_VALUES_ARRAY)
     {
         @Override
         public String toString(CFMetaData metadata)
@@ -78,50 +116,6 @@ public class Clustering extends AbstractClusteringPrefix
         }
     };
 
-    public Clustering(ByteBuffer... values)
-    {
-        super(Kind.CLUSTERING, values);
-    }
-
-    public Kind kind()
-    {
-        return Kind.CLUSTERING;
-    }
-
-    public Clustering copy(AbstractAllocator allocator)
-    {
-        // Important for STATIC_CLUSTERING (but no point in being wasteful in general).
-        if (size() == 0)
-            return this;
-
-        ByteBuffer[] newValues = new ByteBuffer[size()];
-        for (int i = 0; i < size(); i++)
-            newValues[i] = values[i] == null ? null : allocator.clone(values[i]);
-        return new Clustering(newValues);
-    }
-
-    public String toString(CFMetaData metadata)
-    {
-        StringBuilder sb = new StringBuilder();
-        for (int i = 0; i < size(); i++)
-        {
-            ColumnDefinition c = metadata.clusteringColumns().get(i);
-            sb.append(i == 0 ? "" : ", ").append(c.name).append('=').append(get(i) == null ? "null" : c.type.getString(get(i)));
-        }
-        return sb.toString();
-    }
-
-    public String toCQLString(CFMetaData metadata)
-    {
-        StringBuilder sb = new StringBuilder();
-        for (int i = 0; i < size(); i++)
-        {
-            ColumnDefinition c = metadata.clusteringColumns().get(i);
-            sb.append(i == 0 ? "" : ", ").append(c.type.getString(get(i)));
-        }
-        return sb.toString();
-    }
-
     /**
      * Serializer for Clustering object.
      * <p>
@@ -161,7 +155,7 @@ public class Clustering extends AbstractClusteringPrefix
                 return EMPTY;
 
             ByteBuffer[] values = ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, types.size(), version, types);
-            return new Clustering(values);
+            return new BufferClustering(values);
         }
 
         public Clustering deserialize(ByteBuffer in, int version, List<AbstractType<?>> types)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/ClusteringComparator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ClusteringComparator.java b/src/java/org/apache/cassandra/db/ClusteringComparator.java
index f3411cf..f5f6ae8 100644
--- a/src/java/org/apache/cassandra/db/ClusteringComparator.java
+++ b/src/java/org/apache/cassandra/db/ClusteringComparator.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.db;
 
 import java.nio.ByteBuffer;
-import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
@@ -29,8 +28,6 @@ import com.google.common.collect.ImmutableList;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FastByteOperations;
 
 import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/ClusteringPrefix.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
index 9477651..cacaeb5 100644
--- a/src/java/org/apache/cassandra/db/ClusteringPrefix.java
+++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
@@ -74,7 +74,7 @@ public interface ClusteringPrefix extends IMeasurableMemory, Clusterable
          */
         public final int comparedToClustering;
 
-        private Kind(int comparison, int comparedToClustering)
+        Kind(int comparison, int comparedToClustering)
         {
             this.comparison = comparison;
             this.comparedToClustering = comparedToClustering;
@@ -500,7 +500,7 @@ public interface ClusteringPrefix extends IMeasurableMemory, Clusterable
         {
             assert nextIsRow;
             deserializeAll();
-            Clustering clustering = new Clustering(nextValues);
+            Clustering clustering = Clustering.make(nextValues);
             nextValues = null;
             return clustering;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/LegacyLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java
index 07778b3..4e1eab5 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -103,7 +103,7 @@ public abstract class LegacyLayout
         if (metadata.isSuper())
         {
             assert superColumnName != null;
-            return decodeForSuperColumn(metadata, new Clustering(superColumnName), cellname);
+            return decodeForSuperColumn(metadata, Clustering.make(superColumnName), cellname);
         }
 
         assert superColumnName == null;
@@ -161,7 +161,7 @@ public abstract class LegacyLayout
         {
             // If it's a compact table, it means the column is in fact a "dynamic" one
             if (metadata.isCompactTable())
-                return new LegacyCellName(new Clustering(column), metadata.compactValueColumn(), null);
+                return new LegacyCellName(Clustering.make(column), metadata.compactValueColumn(), null);
 
             if (def == null)
                 throw new UnknownColumnException(metadata, column);
@@ -298,7 +298,7 @@ public abstract class LegacyLayout
                                     ? CompositeType.splitName(value)
                                     : Collections.singletonList(value);
 
-        return new Clustering(components.subList(0, Math.min(csize, components.size())).toArray(new ByteBuffer[csize]));
+        return Clustering.make(components.subList(0, Math.min(csize, components.size())).toArray(new ByteBuffer[csize]));
     }
 
     public static ByteBuffer encodeClustering(CFMetaData metadata, ClusteringPrefix clustering)
@@ -1308,7 +1308,7 @@ public abstract class LegacyLayout
             ByteBuffer[] values = new ByteBuffer[bound.size()];
             for (int i = 0; i < bound.size(); i++)
                 values[i] = bound.get(i);
-            return new Clustering(values);
+            return Clustering.make(values);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 952c045..244c7b6 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
 import org.apache.cassandra.index.transactions.UpdateTransaction;
@@ -53,6 +54,7 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.HeapAllocator;
 import org.apache.cassandra.utils.memory.MemtableAllocator;
 import org.apache.cassandra.utils.memory.MemtablePool;
 
@@ -60,7 +62,7 @@ public class Memtable implements Comparable<Memtable>
 {
     private static final Logger logger = LoggerFactory.getLogger(Memtable.class);
 
-    static final MemtablePool MEMORY_POOL = DatabaseDescriptor.getMemtableAllocatorPool();
+    public static final MemtablePool MEMORY_POOL = DatabaseDescriptor.getMemtableAllocatorPool();
     private static final int ROW_OVERHEAD_HEAP_SIZE = estimateRowOverhead(Integer.parseInt(System.getProperty("cassandra.memtable_row_overhead_computation_step", "100000")));
 
     private final MemtableAllocator allocator;
@@ -526,6 +528,7 @@ public class Memtable implements Comparable<Memtable>
             assert entry.getKey() instanceof DecoratedKey;
             DecoratedKey key = (DecoratedKey)entry.getKey();
             ClusteringIndexFilter filter = dataRange.clusteringIndexFilter(key);
+
             return filter.getUnfilteredRowIterator(columnFilter, entry.getValue());
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/MultiCBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MultiCBuilder.java b/src/java/org/apache/cassandra/db/MultiCBuilder.java
index 8353703..7a4eef0 100644
--- a/src/java/org/apache/cassandra/db/MultiCBuilder.java
+++ b/src/java/org/apache/cassandra/db/MultiCBuilder.java
@@ -239,7 +239,7 @@ public abstract class MultiCBuilder
             if (hasMissingElements)
                 return BTreeSet.empty(comparator);
 
-            return BTreeSet.of(comparator, size == 0 ? Clustering.EMPTY : new Clustering(elements));
+            return BTreeSet.of(comparator, size == 0 ? Clustering.EMPTY : Clustering.make(elements));
         }
 
         public NavigableSet<Slice.Bound> buildBound(boolean isStart, boolean isInclusive)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/NativeClustering.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/NativeClustering.java b/src/java/org/apache/cassandra/db/NativeClustering.java
new file mode 100644
index 0000000..1943b71
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/NativeClustering.java
@@ -0,0 +1,125 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.MemoryUtil;
+import org.apache.cassandra.utils.memory.NativeAllocator;
+
+public class NativeClustering extends AbstractClusteringPrefix implements Clustering
+{
+    private static final long EMPTY_SIZE = ObjectSizes.measure(new NativeClustering());
+
+    private final long peer;
+
+    private NativeClustering() { peer = 0; }
+
+    public NativeClustering(NativeAllocator allocator, OpOrder.Group writeOp, Clustering clustering)
+    {
+        int count = clustering.size();
+        int metadataSize = (count * 2) + 4;
+        int dataSize = clustering.dataSize();
+        int bitmapSize = ((count + 7) >>> 3);
+
+        assert count < 64 << 10;
+        assert dataSize < 64 << 10;
+
+        peer = allocator.allocate(metadataSize + dataSize + bitmapSize, writeOp);
+        long bitmapStart = peer + metadataSize;
+        MemoryUtil.setShort(peer, (short) count);
+        MemoryUtil.setShort(peer + (metadataSize - 2), (short) dataSize); // goes at the end of the other offsets
+
+        MemoryUtil.setByte(bitmapStart, bitmapSize, (byte) 0);
+        long dataStart = peer + metadataSize + bitmapSize;
+        int dataOffset = 0;
+        for (int i = 0 ; i < count ; i++)
+        {
+            MemoryUtil.setShort(peer + 2 + i * 2, (short) dataOffset);
+
+            ByteBuffer value = clustering.get(i);
+            if (value == null)
+            {
+                long boffset = bitmapStart + (i >>> 3);
+                int b = MemoryUtil.getByte(boffset);
+                b |= 1 << (i & 7);
+                MemoryUtil.setByte(boffset, (byte) b);
+                continue;
+            }
+
+            assert value.order() == ByteOrder.BIG_ENDIAN;
+
+            int size = value.remaining();
+            MemoryUtil.setBytes(dataStart + dataOffset, value);
+            dataOffset += size;
+        }
+    }
+
+    public Kind kind()
+    {
+        return Kind.CLUSTERING;
+    }
+
+    public int size()
+    {
+        return MemoryUtil.getShort(peer);
+    }
+
+    public ByteBuffer get(int i)
+    {
+        // offset at which we store the dataOffset
+        int size = size();
+        if (i >= size)
+            throw new IndexOutOfBoundsException();
+
+        int metadataSize = (size * 2) + 4;
+        int bitmapSize = ((size + 7) >>> 3);
+        long bitmapStart = peer + metadataSize;
+        int b = MemoryUtil.getByte(bitmapStart + (i >>> 3));
+        if ((b & (1 << (i & 7))) != 0)
+            return null;
+
+        int startOffset = MemoryUtil.getShort(peer + 2 + i * 2);
+        int endOffset = MemoryUtil.getShort(peer + 4 + i * 2);
+        return MemoryUtil.getByteBuffer(bitmapStart + bitmapSize + startOffset,
+                                        endOffset - startOffset,
+                                        ByteOrder.BIG_ENDIAN);
+    }
+
+    public ByteBuffer[] getRawValues()
+    {
+        ByteBuffer[] values = new ByteBuffer[size()];
+        for (int i = 0 ; i < values.length ; i++)
+            values[i] = get(i);
+        return values;
+    }
+
+    public long unsharedHeapSize()
+    {
+        return EMPTY_SIZE;
+    }
+
+    public long unsharedHeapSizeExcludingData()
+    {
+        return EMPTY_SIZE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/NativeDecoratedKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/NativeDecoratedKey.java b/src/java/org/apache/cassandra/db/NativeDecoratedKey.java
index ca874c3..019209e 100644
--- a/src/java/org/apache/cassandra/db/NativeDecoratedKey.java
+++ b/src/java/org/apache/cassandra/db/NativeDecoratedKey.java
@@ -18,9 +18,11 @@
 package org.apache.cassandra.db;
 
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.HeapAllocator;
 import org.apache.cassandra.utils.memory.MemoryUtil;
 import org.apache.cassandra.utils.memory.NativeAllocator;
 
@@ -32,6 +34,8 @@ public class NativeDecoratedKey extends DecoratedKey
     {
         super(token);
         assert key != null;
+        assert key.order() == ByteOrder.BIG_ENDIAN;
+
         int size = key.remaining();
         this.peer = allocator.allocate(4 + size, writeOp);
         MemoryUtil.setInt(peer, size);
@@ -40,6 +44,6 @@ public class NativeDecoratedKey extends DecoratedKey
 
     public ByteBuffer getKey()
     {
-        return MemoryUtil.getByteBuffer(peer + 4, MemoryUtil.getInt(peer));
+        return MemoryUtil.getByteBuffer(peer + 4, MemoryUtil.getInt(peer), ByteOrder.BIG_ENDIAN);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
index 8ace988..0ceec90 100644
--- a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
+++ b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.db;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -27,7 +26,6 @@ import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.marshal.SetType;
-import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.partitions.*;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/Serializers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Serializers.java b/src/java/org/apache/cassandra/db/Serializers.java
index 9b29d89..17f1de0 100644
--- a/src/java/org/apache/cassandra/db/Serializers.java
+++ b/src/java/org/apache/cassandra/db/Serializers.java
@@ -29,8 +29,6 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
-
 /**
  * Holds references on serializers that depend on the table definition.
  */
@@ -70,7 +68,7 @@ public class Serializers
                         return Clustering.EMPTY;
 
                     if (!metadata.isCompound())
-                        return new Clustering(bb);
+                        return Clustering.make(bb);
 
                     List<ByteBuffer> components = CompositeType.splitName(bb);
                     byte eoc = CompositeType.lastEOC(bb);
@@ -81,7 +79,7 @@ public class Serializers
                         if (components.size() > clusteringSize)
                             components = components.subList(0, clusteringSize);
 
-                        return new Clustering(components.toArray(new ByteBuffer[clusteringSize]));
+                        return Clustering.make(components.toArray(new ByteBuffer[clusteringSize]));
                     }
                     else
                     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 9ad9ba3..680b4b5 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -460,8 +460,7 @@ public class SinglePartitionReadCommand extends ReadCommand
     {
         Tracing.trace("Executing single-partition query on {}", cfs.name);
 
-        boolean copyOnHeap = Memtable.MEMORY_POOL.needToCopyOnHeap();
-        return queryMemtableAndDiskInternal(cfs, copyOnHeap);
+        return queryMemtableAndDiskInternal(cfs);
     }
 
     @Override
@@ -470,7 +469,7 @@ public class SinglePartitionReadCommand extends ReadCommand
         return oldestUnrepairedTombstone;
     }
 
-    private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, boolean copyOnHeap)
+    private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs)
     {
         /*
          * We have 2 main strategies:
@@ -484,7 +483,7 @@ public class SinglePartitionReadCommand extends ReadCommand
          *      of shards so have the same problem).
          */
         if (clusteringIndexFilter() instanceof ClusteringIndexNamesFilter && queryNeitherCountersNorCollections())
-            return queryMemtableAndSSTablesInTimestampOrder(cfs, copyOnHeap, (ClusteringIndexNamesFilter)clusteringIndexFilter());
+            return queryMemtableAndSSTablesInTimestampOrder(cfs, (ClusteringIndexNamesFilter)clusteringIndexFilter());
 
         Tracing.trace("Acquiring sstable references");
         ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
@@ -502,10 +501,8 @@ public class SinglePartitionReadCommand extends ReadCommand
 
                 @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
                 UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition);
-                @SuppressWarnings("resource") // same as above
-                UnfilteredRowIterator maybeCopied = copyOnHeap ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance) : iter;
                 oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, partition.stats().minLocalDeletionTime);
-                iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(maybeCopied, nowInSec()) : maybeCopied);
+                iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter);
             }
             /*
              * We can't eliminate full sstables based on the timestamp of what we've already read like
@@ -649,7 +646,7 @@ public class SinglePartitionReadCommand extends ReadCommand
      * no collection or counters are included).
      * This method assumes the filter is a {@code ClusteringIndexNamesFilter}.
      */
-    private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFamilyStore cfs, boolean copyOnHeap, ClusteringIndexNamesFilter filter)
+    private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFamilyStore cfs, ClusteringIndexNamesFilter filter)
     {
         Tracing.trace("Acquiring sstable references");
         ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
@@ -668,10 +665,7 @@ public class SinglePartitionReadCommand extends ReadCommand
                 if (iter.isEmpty())
                     continue;
 
-                UnfilteredRowIterator clonedFilter = copyOnHeap
-                                                   ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance)
-                                                   : iter;
-                result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(clonedFilter, nowInSec()) : clonedFilter, result, filter, false);
+                result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter, result, filter, false);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/Slice.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Slice.java b/src/java/org/apache/cassandra/db/Slice.java
index 7fde45e..8611470 100644
--- a/src/java/org/apache/cassandra/db/Slice.java
+++ b/src/java/org/apache/cassandra/db/Slice.java
@@ -343,7 +343,7 @@ public class Slice
      * <p>
      * This can be either a start or an end bound, and this can be either inclusive or exclusive.
      */
-    public static class Bound extends AbstractClusteringPrefix
+    public static class Bound extends AbstractBufferClusteringPrefix
     {
         public static final Serializer serializer = new Serializer();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/filter/RowFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java
index c234fc9..79bdbd7 100644
--- a/src/java/org/apache/cassandra/db/filter/RowFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@ -194,11 +194,11 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
         if (metadata.isCompound())
         {
             List<ByteBuffer> values = CompositeType.splitName(name);
-            return new Clustering(values.toArray(new ByteBuffer[metadata.comparator.size()]));
+            return Clustering.make(values.toArray(new ByteBuffer[metadata.comparator.size()]));
         }
         else
         {
-            return new Clustering(name);
+            return Clustering.make(name);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
index e44124f..c276c57 100644
--- a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
@@ -183,7 +183,7 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
         if (slices.size() == 0)
         {
             DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion();
-            return UnfilteredRowIterators.noRowsIterator(metadata, partitionKey, staticRow, partitionDeletion, reversed);
+            return UnfilteredRowIterators.noRowsIterator(metadata, partitionKey(), staticRow, partitionDeletion, reversed);
         }
 
         return slices.size() == 1
@@ -202,9 +202,9 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
     }
 
     private RowAndDeletionMergeIterator merge(Iterator<Row> rowIter, Iterator<RangeTombstone> deleteIter,
-                                                     ColumnFilter selection, boolean reversed, Holder current, Row staticRow)
+                                              ColumnFilter selection, boolean reversed, Holder current, Row staticRow)
     {
-        return new RowAndDeletionMergeIterator(metadata, partitionKey, current.deletionInfo.getPartitionDeletion(),
+        return new RowAndDeletionMergeIterator(metadata, partitionKey(), current.deletionInfo.getPartitionDeletion(),
                                                selection, staticRow, reversed, current.stats,
                                                rowIter, deleteIter,
                                                canHaveShadowedData());
@@ -215,22 +215,10 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
         final Holder current;
         final ColumnFilter selection;
 
-        private AbstractIterator(ColumnFilter selection, boolean isReversed)
-        {
-            this(AbstractBTreePartition.this.holder(), selection, isReversed);
-        }
-
-        private AbstractIterator(Holder current, ColumnFilter selection, boolean isReversed)
-        {
-            this(current,
-                 AbstractBTreePartition.this.staticRow(current, selection, false),
-                 selection, isReversed);
-        }
-
         private AbstractIterator(Holder current, Row staticRow, ColumnFilter selection, boolean isReversed)
         {
             super(AbstractBTreePartition.this.metadata,
-                  AbstractBTreePartition.this.partitionKey,
+                  AbstractBTreePartition.this.partitionKey(),
                   current.deletionInfo.getPartitionDeletion(),
                   selection.fetchedColumns(), // non-selected columns will be filtered in subclasses by RowAndDeletionMergeIterator
                                               // it would also be more precise to return the intersection of the selection and current.columns,
@@ -318,10 +306,7 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
         BTree.Builder<Row> builder = BTree.builder(metadata.comparator, initialRowCapacity);
         builder.auto(false);
         while (rows.hasNext())
-        {
-            Row row = rows.next();
-            builder.add(row);
-        }
+            builder.add(rows.next());
 
         if (reversed)
             builder.reverse();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
index 2be882e..c7113d4 100644
--- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.partitions;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -26,12 +27,12 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.EncodingStats;
-import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.db.rows.Rows;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.index.transactions.UpdateTransaction;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.SearchIterator;
 import org.apache.cassandra.utils.btree.BTree;
 import org.apache.cassandra.utils.btree.UpdateFunction;
 import org.apache.cassandra.utils.concurrent.Locks;
@@ -181,7 +182,66 @@ public class AtomicBTreePartition extends AbstractBTreePartition
             if (monitorOwned)
                 Locks.monitorExitUnsafe(this);
         }
+    }
+
+    @Override
+    public DeletionInfo deletionInfo()
+    {
+        return allocator.ensureOnHeap().applyToDeletionInfo(super.deletionInfo());
+    }
 
+    @Override
+    public Row staticRow()
+    {
+        return allocator.ensureOnHeap().applyToStatic(super.staticRow());
+    }
+
+    @Override
+    public DecoratedKey partitionKey()
+    {
+        return allocator.ensureOnHeap().applyToPartitionKey(super.partitionKey());
+    }
+
+    @Override
+    public Row getRow(Clustering clustering)
+    {
+        return allocator.ensureOnHeap().applyToRow(super.getRow(clustering));
+    }
+
+    @Override
+    public Row lastRow()
+    {
+        return allocator.ensureOnHeap().applyToRow(super.lastRow());
+    }
+
+    @Override
+    public SearchIterator<Clustering, Row> searchIterator(ColumnFilter columns, boolean reversed)
+    {
+        return allocator.ensureOnHeap().applyToPartition(super.searchIterator(columns, reversed));
+    }
+
+    @Override
+    public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, Slices slices, boolean reversed)
+    {
+        return allocator.ensureOnHeap().applyToPartition(super.unfilteredIterator(selection, slices, reversed));
+    }
+
+    @Override
+    public UnfilteredRowIterator unfilteredIterator()
+    {
+        return allocator.ensureOnHeap().applyToPartition(super.unfilteredIterator());
+    }
+
+    @Override
+    public UnfilteredRowIterator unfilteredIterator(Holder current, ColumnFilter selection, Slices slices, boolean reversed)
+    {
+        return allocator.ensureOnHeap().applyToPartition(super.unfilteredIterator(current, selection, slices, reversed));
+    }
+
+    @Override
+    public Iterator<Row> iterator()
+    {
+        return allocator.ensureOnHeap().applyToPartition(super.iterator());
     }
 
     public boolean usePessimisticLocking()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java b/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
index 26a947b..70a4678 100644
--- a/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
@@ -65,7 +65,7 @@ public class FilteredPartition extends ImmutableBTreePartition
 
             public DecoratedKey partitionKey()
             {
-                return partitionKey;
+                return FilteredPartition.this.partitionKey();
             }
 
             public Row staticRow()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/rows/AbstractCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractCell.java b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
index 882c0e0..1ea9713 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractCell.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
@@ -17,15 +17,19 @@
  */
 package org.apache.cassandra.db.rows;
 
+import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.util.Objects;
 
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.DeletionPurger;
+import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
 
 /**
  * Base abstract class for {@code Cell} implementations.
@@ -40,6 +44,81 @@ public abstract class AbstractCell extends Cell
         super(column);
     }
 
+    public boolean isCounterCell()
+    {
+        return !isTombstone() && column.cellValueType().isCounter();
+    }
+
+    public boolean isLive(int nowInSec)
+    {
+        return localDeletionTime() == NO_DELETION_TIME || (ttl() != NO_TTL && nowInSec < localDeletionTime());
+    }
+
+    public boolean isTombstone()
+    {
+        return localDeletionTime() != NO_DELETION_TIME && ttl() == NO_TTL;
+    }
+
+    public boolean isExpiring()
+    {
+        return ttl() != NO_TTL;
+    }
+
+    public Cell markCounterLocalToBeCleared()
+    {
+        if (!isCounterCell())
+            return this;
+
+        ByteBuffer value = value();
+        ByteBuffer marked = CounterContext.instance().markLocalToBeCleared(value);
+        return marked == value ? this : new BufferCell(column, timestamp(), ttl(), localDeletionTime(), marked, path());
+    }
+
+    public Cell purge(DeletionPurger purger, int nowInSec)
+    {
+        if (!isLive(nowInSec))
+        {
+            if (purger.shouldPurge(timestamp(), localDeletionTime()))
+                return null;
+
+            // We slightly hijack purging to convert expired but not purgeable columns to tombstones. The reason we do that is
+            // that once a column has expired it is equivalent to a tombstone but actually using a tombstone is more compact since
+            // we don't keep the column value. The reason we do it here is that 1) it's somewhat related to dealing with tombstones
+            // so hopefully not too surprising and 2) we want to this and purging at the same places, so it's simpler/more efficient
+            // to do both here.
+            if (isExpiring())
+            {
+                // Note that as long as the expiring column and the tombstone put together live longer than GC grace seconds,
+                // we'll fulfil our responsibility to repair. See discussion at
+                // http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/repair-compaction-and-tombstone-rows-td7583481.html
+                return BufferCell.tombstone(column, timestamp(), localDeletionTime() - ttl());
+            }
+        }
+        return this;
+    }
+
+    public Cell copy(AbstractAllocator allocator)
+    {
+        CellPath path = path();
+        return new BufferCell(column, timestamp(), ttl(), localDeletionTime(), allocator.clone(value()), path == null ? null : path.copy(allocator));
+    }
+
+    // note: while the cell returned may be different, the value is the same, so if the value is offheap it must be referenced inside a guarded context (or copied)
+    public Cell updateAllTimestamp(long newTimestamp)
+    {
+        return new BufferCell(column, isTombstone() ? newTimestamp - 1 : newTimestamp, ttl(), localDeletionTime(), value(), path());
+    }
+
+    public int dataSize()
+    {
+        CellPath path = path();
+        return TypeSizes.sizeof(timestamp())
+               + TypeSizes.sizeof(ttl())
+               + TypeSizes.sizeof(localDeletionTime())
+               + value().remaining()
+               + (path == null ? 0 : path.dataSize());
+    }
+
     public void digest(MessageDigest digest)
     {
         digest.update(value().duplicate());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/rows/BTreeRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index a0912ae..47cfd58 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -62,7 +62,11 @@ public class BTreeRow extends AbstractRow
     // no expiring cells, this will be Integer.MAX_VALUE;
     private final int minLocalDeletionTime;
 
-    private BTreeRow(Clustering clustering, LivenessInfo primaryKeyLivenessInfo, Deletion deletion, Object[] btree, int minLocalDeletionTime)
+    private BTreeRow(Clustering clustering,
+                     LivenessInfo primaryKeyLivenessInfo,
+                     Deletion deletion,
+                     Object[] btree,
+                     int minLocalDeletionTime)
     {
         assert !deletion.isShadowedBy(primaryKeyLivenessInfo);
         this.clustering = clustering;
@@ -78,7 +82,10 @@ public class BTreeRow extends AbstractRow
     }
 
     // Note that it's often easier/safer to use the sortedBuilder/unsortedBuilder or one of the static creation method below. Only directly useful in a small amount of cases.
-    public static BTreeRow create(Clustering clustering, LivenessInfo primaryKeyLivenessInfo, Deletion deletion, Object[] btree)
+    public static BTreeRow create(Clustering clustering,
+                                  LivenessInfo primaryKeyLivenessInfo,
+                                  Deletion deletion,
+                                  Object[] btree)
     {
         int minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion.time()));
         if (minDeletionTime != Integer.MIN_VALUE)
@@ -87,6 +94,15 @@ public class BTreeRow extends AbstractRow
                 minDeletionTime = Math.min(minDeletionTime, minDeletionTime(cd));
         }
 
+        return create(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
+    }
+
+    public static BTreeRow create(Clustering clustering,
+                                  LivenessInfo primaryKeyLivenessInfo,
+                                  Deletion deletion,
+                                  Object[] btree,
+                                  int minDeletionTime)
+    {
         return new BTreeRow(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
     }
 
@@ -113,7 +129,11 @@ public class BTreeRow extends AbstractRow
     public static BTreeRow noCellLiveRow(Clustering clustering, LivenessInfo primaryKeyLivenessInfo)
     {
         assert !primaryKeyLivenessInfo.isEmpty();
-        return new BTreeRow(clustering, primaryKeyLivenessInfo, Deletion.LIVE, BTree.empty(), minDeletionTime(primaryKeyLivenessInfo));
+        return new BTreeRow(clustering,
+                            primaryKeyLivenessInfo,
+                            Deletion.LIVE,
+                            BTree.empty(),
+                            minDeletionTime(primaryKeyLivenessInfo));
     }
 
     private static int minDeletionTime(Cell cell)
@@ -368,7 +388,7 @@ public class BTreeRow extends AbstractRow
             return null;
 
         int minDeletionTime = minDeletionTime(transformed, info, deletion.time());
-        return new BTreeRow(clustering, info, deletion, transformed, minDeletionTime);
+        return BTreeRow.create(clustering, info, deletion, transformed, minDeletionTime);
     }
 
     public int dataSize()
@@ -594,7 +614,7 @@ public class BTreeRow extends AbstractRow
                 return new ComplexColumnData(column, btree, deletion);
             }
 
-        };
+        }
         protected Clustering clustering;
         protected LivenessInfo primaryKeyLivenessInfo = LivenessInfo.EMPTY;
         protected Deletion deletion = Deletion.LIVE;
@@ -680,10 +700,9 @@ public class BTreeRow extends AbstractRow
                 deletion = Deletion.LIVE;
 
             int minDeletionTime = minDeletionTime(btree, primaryKeyLivenessInfo, deletion.time());
-            Row row = new BTreeRow(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
+            Row row = BTreeRow.create(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
             reset();
             return row;
         }
-
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/rows/BufferCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BufferCell.java b/src/java/org/apache/cassandra/db/rows/BufferCell.java
index 4176ba6..cac63ac 100644
--- a/src/java/org/apache/cassandra/db/rows/BufferCell.java
+++ b/src/java/org/apache/cassandra/db/rows/BufferCell.java
@@ -17,16 +17,11 @@
  */
 package org.apache.cassandra.db.rows;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.marshal.ByteType;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.FBUtilities;
@@ -88,26 +83,6 @@ public class BufferCell extends AbstractCell
         return new BufferCell(column, timestamp, NO_TTL, nowInSec, ByteBufferUtil.EMPTY_BYTE_BUFFER, path);
     }
 
-    public boolean isCounterCell()
-    {
-        return !isTombstone() && column.cellValueType().isCounter();
-    }
-
-    public boolean isLive(int nowInSec)
-    {
-        return localDeletionTime == NO_DELETION_TIME || (ttl != NO_TTL && nowInSec < localDeletionTime);
-    }
-
-    public boolean isTombstone()
-    {
-        return localDeletionTime != NO_DELETION_TIME && ttl == NO_TTL;
-    }
-
-    public boolean isExpiring()
-    {
-        return ttl != NO_TTL;
-    }
-
     public long timestamp()
     {
         return timestamp;
@@ -146,216 +121,9 @@ public class BufferCell extends AbstractCell
         return new BufferCell(column, timestamp, ttl, localDeletionTime, allocator.clone(value), path == null ? null : path.copy(allocator));
     }
 
-    public Cell markCounterLocalToBeCleared()
-    {
-        if (!isCounterCell())
-            return this;
-
-        ByteBuffer marked = CounterContext.instance().markLocalToBeCleared(value());
-        return marked == value() ? this : new BufferCell(column, timestamp, ttl, localDeletionTime, marked, path);
-    }
-
-    public Cell purge(DeletionPurger purger, int nowInSec)
-    {
-        if (!isLive(nowInSec))
-        {
-            if (purger.shouldPurge(timestamp, localDeletionTime))
-                return null;
-
-            // We slightly hijack purging to convert expired but not purgeable columns to tombstones. The reason we do that is
-            // that once a column has expired it is equivalent to a tombstone but actually using a tombstone is more compact since
-            // we don't keep the column value. The reason we do it here is that 1) it's somewhat related to dealing with tombstones
-            // so hopefully not too surprising and 2) we want to this and purging at the same places, so it's simpler/more efficient
-            // to do both here.
-            if (isExpiring())
-            {
-                // Note that as long as the expiring column and the tombstone put together live longer than GC grace seconds,
-                // we'll fulfil our responsibility to repair. See discussion at
-                // http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/repair-compaction-and-tombstone-rows-td7583481.html
-                return BufferCell.tombstone(column, timestamp, localDeletionTime - ttl);
-            }
-        }
-        return this;
-    }
-
-    public Cell updateAllTimestamp(long newTimestamp)
-    {
-        return new BufferCell(column, isTombstone() ? newTimestamp - 1 : newTimestamp, ttl, localDeletionTime, value, path);
-    }
-
-    public int dataSize()
-    {
-        return TypeSizes.sizeof(timestamp)
-             + TypeSizes.sizeof(ttl)
-             + TypeSizes.sizeof(localDeletionTime)
-             + value.remaining()
-             + (path == null ? 0 : path.dataSize());
-    }
-
     public long unsharedHeapSizeExcludingData()
     {
         return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(value) + (path == null ? 0 : path.unsharedHeapSizeExcludingData());
     }
 
-    /**
-     * The serialization format for cell is:
-     *     [ flags ][ timestamp ][ deletion time ][    ttl    ][ path size ][ path ][ value size ][ value ]
-     *     [   1b  ][ 8b (vint) ][   4b (vint)   ][ 4b (vint) ][ 4b (vint) ][  arb ][  4b (vint) ][  arb  ]
-     *
-     * where not all field are always present (in fact, only the [ flags ] are guaranteed to be present). The fields have the following
-     * meaning:
-     *   - [ flags ] is the cell flags. It is a byte for which each bit represents a flag whose meaning is explained below (*_MASK constants)
-     *   - [ timestamp ] is the cell timestamp. Present unless the cell has the USE_TIMESTAMP_MASK.
-     *   - [ deletion time]: the local deletion time for the cell. Present if either the cell is deleted (IS_DELETED_MASK)
-     *       or it is expiring (IS_EXPIRING_MASK) but doesn't have the USE_ROW_TTL_MASK.
-     *   - [ ttl ]: the ttl for the cell. Present if the row is expiring (IS_EXPIRING_MASK) but doesn't have the
-     *       USE_ROW_TTL_MASK.
-     *   - [ value size ] is the size of the [ value ] field. It's present unless either the cell has the HAS_EMPTY_VALUE_MASK, or the value
-     *       for columns of this type have a fixed length.
-     *   - [ path size ] is the size of the [ path ] field. Present iff this is the cell of a complex column.
-     *   - [ value ]: the cell value, unless it has the HAS_EMPTY_VALUE_MASK.
-     *   - [ path ]: the cell path if the column this is a cell of is complex.
-     */
-    static class Serializer implements Cell.Serializer
-    {
-        private final static int IS_DELETED_MASK             = 0x01; // Whether the cell is a tombstone or not.
-        private final static int IS_EXPIRING_MASK            = 0x02; // Whether the cell is expiring.
-        private final static int HAS_EMPTY_VALUE_MASK        = 0x04; // Wether the cell has an empty value. This will be the case for tombstone in particular.
-        private final static int USE_ROW_TIMESTAMP_MASK      = 0x08; // Wether the cell has the same timestamp than the row this is a cell of.
-        private final static int USE_ROW_TTL_MASK            = 0x10; // Wether the cell has the same ttl than the row this is a cell of.
-
-        public void serialize(Cell cell, DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) throws IOException
-        {
-            assert cell != null;
-            boolean hasValue = cell.value().hasRemaining();
-            boolean isDeleted = cell.isTombstone();
-            boolean isExpiring = cell.isExpiring();
-            boolean useRowTimestamp = !rowLiveness.isEmpty() && cell.timestamp() == rowLiveness.timestamp();
-            boolean useRowTTL = isExpiring && rowLiveness.isExpiring() && cell.ttl() == rowLiveness.ttl() && cell.localDeletionTime() == rowLiveness.localExpirationTime();
-            int flags = 0;
-            if (!hasValue)
-                flags |= HAS_EMPTY_VALUE_MASK;
-
-            if (isDeleted)
-                flags |= IS_DELETED_MASK;
-            else if (isExpiring)
-                flags |= IS_EXPIRING_MASK;
-
-            if (useRowTimestamp)
-                flags |= USE_ROW_TIMESTAMP_MASK;
-            if (useRowTTL)
-                flags |= USE_ROW_TTL_MASK;
-
-            out.writeByte((byte)flags);
-
-            if (!useRowTimestamp)
-                header.writeTimestamp(cell.timestamp(), out);
-
-            if ((isDeleted || isExpiring) && !useRowTTL)
-                header.writeLocalDeletionTime(cell.localDeletionTime(), out);
-            if (isExpiring && !useRowTTL)
-                header.writeTTL(cell.ttl(), out);
-
-            if (cell.column().isComplex())
-                cell.column().cellPathSerializer().serialize(cell.path(), out);
-
-            if (hasValue)
-                header.getType(cell.column()).writeValue(cell.value(), out);
-        }
-
-        public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, ColumnDefinition column, SerializationHeader header, SerializationHelper helper) throws IOException
-        {
-            int flags = in.readUnsignedByte();
-            boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;
-            boolean isDeleted = (flags & IS_DELETED_MASK) != 0;
-            boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0;
-            boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP_MASK) != 0;
-            boolean useRowTTL = (flags & USE_ROW_TTL_MASK) != 0;
-
-            long timestamp = useRowTimestamp ? rowLiveness.timestamp() : header.readTimestamp(in);
-
-            int localDeletionTime = useRowTTL
-                                  ? rowLiveness.localExpirationTime()
-                                  : (isDeleted || isExpiring ? header.readLocalDeletionTime(in) : NO_DELETION_TIME);
-
-            int ttl = useRowTTL ? rowLiveness.ttl() : (isExpiring ? header.readTTL(in) : NO_TTL);
-
-            CellPath path = column.isComplex()
-                          ? column.cellPathSerializer().deserialize(in)
-                          : null;
-
-            boolean isCounter = localDeletionTime == NO_DELETION_TIME && column.type.isCounter();
-
-            ByteBuffer value = ByteBufferUtil.EMPTY_BYTE_BUFFER;
-            if (hasValue)
-            {
-                if (helper.canSkipValue(column) || (path != null && helper.canSkipValue(path)))
-                {
-                    header.getType(column).skipValue(in);
-                }
-                else
-                {
-                    value = header.getType(column).readValue(in);
-                    if (isCounter)
-                        value = helper.maybeClearCounterValue(value);
-                }
-            }
-
-            return new BufferCell(column, timestamp, ttl, localDeletionTime, value, path);
-        }
-
-        public long serializedSize(Cell cell, LivenessInfo rowLiveness, SerializationHeader header)
-        {
-            long size = 1; // flags
-            boolean hasValue = cell.value().hasRemaining();
-            boolean isDeleted = cell.isTombstone();
-            boolean isExpiring = cell.isExpiring();
-            boolean useRowTimestamp = !rowLiveness.isEmpty() && cell.timestamp() == rowLiveness.timestamp();
-            boolean useRowTTL = isExpiring && rowLiveness.isExpiring() && cell.ttl() == rowLiveness.ttl() && cell.localDeletionTime() == rowLiveness.localExpirationTime();
-
-            if (!useRowTimestamp)
-                size += header.timestampSerializedSize(cell.timestamp());
-
-            if ((isDeleted || isExpiring) && !useRowTTL)
-                size += header.localDeletionTimeSerializedSize(cell.localDeletionTime());
-            if (isExpiring && !useRowTTL)
-                size += header.ttlSerializedSize(cell.ttl());
-
-            if (cell.column().isComplex())
-                size += cell.column().cellPathSerializer().serializedSize(cell.path());
-
-            if (hasValue)
-                size += header.getType(cell.column()).writtenLength(cell.value());
-
-            return size;
-        }
-
-        // Returns if the skipped cell was an actual cell (i.e. it had its presence flag).
-        public boolean skip(DataInputPlus in, ColumnDefinition column, SerializationHeader header) throws IOException
-        {
-            int flags = in.readUnsignedByte();
-            boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;
-            boolean isDeleted = (flags & IS_DELETED_MASK) != 0;
-            boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0;
-            boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP_MASK) != 0;
-            boolean useRowTTL = (flags & USE_ROW_TTL_MASK) != 0;
-
-            if (!useRowTimestamp)
-                header.skipTimestamp(in);
-
-            if (!useRowTTL && (isDeleted || isExpiring))
-                header.skipLocalDeletionTime(in);
-
-            if (!useRowTTL && isExpiring)
-                header.skipTTL(in);
-
-            if (column.isComplex())
-                column.cellPathSerializer().skip(in);
-
-            if (hasValue)
-                header.getType(column).skipValue(in);
-
-            return true;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/rows/Cell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java b/src/java/org/apache/cassandra/db/rows/Cell.java
index 73d9e44..ad1c39a 100644
--- a/src/java/org/apache/cassandra/db/rows/Cell.java
+++ b/src/java/org/apache/cassandra/db/rows/Cell.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.memory.AbstractAllocator;
 
 /**
@@ -141,15 +142,165 @@ public abstract class Cell extends ColumnData
     // Overrides super type to provide a more precise return type.
     public abstract Cell purge(DeletionPurger purger, int nowInSec);
 
-    public interface Serializer
+    /**
+     * The serialization format for cell is:
+     *     [ flags ][ timestamp ][ deletion time ][    ttl    ][ path size ][ path ][ value size ][ value ]
+     *     [   1b  ][ 8b (vint) ][   4b (vint)   ][ 4b (vint) ][ 4b (vint) ][  arb ][  4b (vint) ][  arb  ]
+     *
+     * where not all field are always present (in fact, only the [ flags ] are guaranteed to be present). The fields have the following
+     * meaning:
+     *   - [ flags ] is the cell flags. It is a byte for which each bit represents a flag whose meaning is explained below (*_MASK constants)
+     *   - [ timestamp ] is the cell timestamp. Present unless the cell has the USE_TIMESTAMP_MASK.
+     *   - [ deletion time]: the local deletion time for the cell. Present if either the cell is deleted (IS_DELETED_MASK)
+     *       or it is expiring (IS_EXPIRING_MASK) but doesn't have the USE_ROW_TTL_MASK.
+     *   - [ ttl ]: the ttl for the cell. Present if the row is expiring (IS_EXPIRING_MASK) but doesn't have the
+     *       USE_ROW_TTL_MASK.
+     *   - [ value size ] is the size of the [ value ] field. It's present unless either the cell has the HAS_EMPTY_VALUE_MASK, or the value
+     *       for columns of this type have a fixed length.
+     *   - [ path size ] is the size of the [ path ] field. Present iff this is the cell of a complex column.
+     *   - [ value ]: the cell value, unless it has the HAS_EMPTY_VALUE_MASK.
+     *   - [ path ]: the cell path if the column this is a cell of is complex.
+     */
+    static class Serializer
     {
-        public void serialize(Cell cell, DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) throws IOException;
+        private final static int IS_DELETED_MASK             = 0x01; // Whether the cell is a tombstone or not.
+        private final static int IS_EXPIRING_MASK            = 0x02; // Whether the cell is expiring.
+        private final static int HAS_EMPTY_VALUE_MASK        = 0x04; // Wether the cell has an empty value. This will be the case for tombstone in particular.
+        private final static int USE_ROW_TIMESTAMP_MASK      = 0x08; // Wether the cell has the same timestamp than the row this is a cell of.
+        private final static int USE_ROW_TTL_MASK            = 0x10; // Wether the cell has the same ttl than the row this is a cell of.
+
+        public void serialize(Cell cell, DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) throws IOException
+        {
+            assert cell != null;
+            boolean hasValue = cell.value().hasRemaining();
+            boolean isDeleted = cell.isTombstone();
+            boolean isExpiring = cell.isExpiring();
+            boolean useRowTimestamp = !rowLiveness.isEmpty() && cell.timestamp() == rowLiveness.timestamp();
+            boolean useRowTTL = isExpiring && rowLiveness.isExpiring() && cell.ttl() == rowLiveness.ttl() && cell.localDeletionTime() == rowLiveness.localExpirationTime();
+            int flags = 0;
+            if (!hasValue)
+                flags |= HAS_EMPTY_VALUE_MASK;
+
+            if (isDeleted)
+                flags |= IS_DELETED_MASK;
+            else if (isExpiring)
+                flags |= IS_EXPIRING_MASK;
+
+            if (useRowTimestamp)
+                flags |= USE_ROW_TIMESTAMP_MASK;
+            if (useRowTTL)
+                flags |= USE_ROW_TTL_MASK;
+
+            out.writeByte((byte)flags);
+
+            if (!useRowTimestamp)
+                header.writeTimestamp(cell.timestamp(), out);
+
+            if ((isDeleted || isExpiring) && !useRowTTL)
+                header.writeLocalDeletionTime(cell.localDeletionTime(), out);
+            if (isExpiring && !useRowTTL)
+                header.writeTTL(cell.ttl(), out);
+
+            if (cell.column().isComplex())
+                cell.column().cellPathSerializer().serialize(cell.path(), out);
+
+            if (hasValue)
+                header.getType(cell.column()).writeValue(cell.value(), out);
+        }
+
+        public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, ColumnDefinition column, SerializationHeader header, SerializationHelper helper) throws IOException
+        {
+            int flags = in.readUnsignedByte();
+            boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;
+            boolean isDeleted = (flags & IS_DELETED_MASK) != 0;
+            boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0;
+            boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP_MASK) != 0;
+            boolean useRowTTL = (flags & USE_ROW_TTL_MASK) != 0;
+
+            long timestamp = useRowTimestamp ? rowLiveness.timestamp() : header.readTimestamp(in);
+
+            int localDeletionTime = useRowTTL
+                                    ? rowLiveness.localExpirationTime()
+                                    : (isDeleted || isExpiring ? header.readLocalDeletionTime(in) : NO_DELETION_TIME);
+
+            int ttl = useRowTTL ? rowLiveness.ttl() : (isExpiring ? header.readTTL(in) : NO_TTL);
 
-        public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, ColumnDefinition column, SerializationHeader header, SerializationHelper helper) throws IOException;
+            CellPath path = column.isComplex()
+                            ? column.cellPathSerializer().deserialize(in)
+                            : null;
 
-        public long serializedSize(Cell cell, LivenessInfo rowLiveness, SerializationHeader header);
+            boolean isCounter = localDeletionTime == NO_DELETION_TIME && column.type.isCounter();
+
+            ByteBuffer value = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+            if (hasValue)
+            {
+                if (helper.canSkipValue(column) || (path != null && helper.canSkipValue(path)))
+                {
+                    header.getType(column).skipValue(in);
+                }
+                else
+                {
+                    value = header.getType(column).readValue(in);
+                    if (isCounter)
+                        value = helper.maybeClearCounterValue(value);
+                }
+            }
+
+            return new BufferCell(column, timestamp, ttl, localDeletionTime, value, path);
+        }
+
+        public long serializedSize(Cell cell, LivenessInfo rowLiveness, SerializationHeader header)
+        {
+            long size = 1; // flags
+            boolean hasValue = cell.value().hasRemaining();
+            boolean isDeleted = cell.isTombstone();
+            boolean isExpiring = cell.isExpiring();
+            boolean useRowTimestamp = !rowLiveness.isEmpty() && cell.timestamp() == rowLiveness.timestamp();
+            boolean useRowTTL = isExpiring && rowLiveness.isExpiring() && cell.ttl() == rowLiveness.ttl() && cell.localDeletionTime() == rowLiveness.localExpirationTime();
+
+            if (!useRowTimestamp)
+                size += header.timestampSerializedSize(cell.timestamp());
+
+            if ((isDeleted || isExpiring) && !useRowTTL)
+                size += header.localDeletionTimeSerializedSize(cell.localDeletionTime());
+            if (isExpiring && !useRowTTL)
+                size += header.ttlSerializedSize(cell.ttl());
+
+            if (cell.column().isComplex())
+                size += cell.column().cellPathSerializer().serializedSize(cell.path());
+
+            if (hasValue)
+                size += header.getType(cell.column()).writtenLength(cell.value());
+
+            return size;
+        }
 
         // Returns if the skipped cell was an actual cell (i.e. it had its presence flag).
-        public boolean skip(DataInputPlus in, ColumnDefinition column, SerializationHeader header) throws IOException;
+        public boolean skip(DataInputPlus in, ColumnDefinition column, SerializationHeader header) throws IOException
+        {
+            int flags = in.readUnsignedByte();
+            boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;
+            boolean isDeleted = (flags & IS_DELETED_MASK) != 0;
+            boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0;
+            boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP_MASK) != 0;
+            boolean useRowTTL = (flags & USE_ROW_TTL_MASK) != 0;
+
+            if (!useRowTimestamp)
+                header.skipTimestamp(in);
+
+            if (!useRowTTL && (isDeleted || isExpiring))
+                header.skipLocalDeletionTime(in);
+
+            if (!useRowTTL && isExpiring)
+                header.skipTTL(in);
+
+            if (column.isComplex())
+                column.cellPathSerializer().skip(in);
+
+            if (hasValue)
+                header.getType(column).skipValue(in);
+
+            return true;
+        }
     }
 }