You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2016/01/27 11:50:06 UTC

[1/2] cassandra git commit: 9472: Reintroduce Off-Heap Memtables

Repository: cassandra
Updated Branches:
  refs/heads/trunk a0901b8a5 -> 2f4124319


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/rows/NativeCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/NativeCell.java b/src/java/org/apache/cassandra/db/rows/NativeCell.java
new file mode 100644
index 0000000..9d816f3
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/NativeCell.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.MemoryUtil;
+import org.apache.cassandra.utils.memory.NativeAllocator;
+
+public class NativeCell extends AbstractCell
+{
+    private static final long EMPTY_SIZE = ObjectSizes.measure(new NativeCell());
+
+    private static final long HAS_CELLPATH = 0;
+    private static final long TIMESTAMP = 1;
+    private static final long TTL = 9;
+    private static final long DELETION = 13;
+    private static final long LENGTH = 17;
+    private static final long VALUE = 21;
+
+    private final long peer;
+
+    private NativeCell()
+    {
+        super(null);
+        this.peer = 0;
+    }
+
+    public NativeCell(NativeAllocator allocator,
+                      OpOrder.Group writeOp,
+                      Cell cell)
+    {
+        this(allocator,
+             writeOp,
+             cell.column(),
+             cell.timestamp(),
+             cell.ttl(),
+             cell.localDeletionTime(),
+             cell.value(),
+             cell.path());
+    }
+
+    public NativeCell(NativeAllocator allocator,
+                      OpOrder.Group writeOp,
+                      ColumnDefinition column,
+                      long timestamp,
+                      int ttl,
+                      int localDeletionTime,
+                      ByteBuffer value,
+                      CellPath path)
+    {
+        super(column);
+        long size = simpleSize(value.remaining());
+
+        assert value.order() == ByteOrder.BIG_ENDIAN;
+        assert column.isComplex() == (path != null);
+        if (path != null)
+        {
+            assert path.size() == 1;
+            size += 4 + path.get(0).remaining();
+        }
+
+        if (size > Integer.MAX_VALUE)
+            throw new IllegalStateException();
+
+        // cellpath? : timestamp : ttl : localDeletionTime : length : <data> : [cell path length] : [<cell path data>]
+        peer = allocator.allocate((int) size, writeOp);
+        MemoryUtil.setByte(peer + HAS_CELLPATH, (byte)(path == null ? 0 : 1));
+        MemoryUtil.setLong(peer + TIMESTAMP, timestamp);
+        MemoryUtil.setInt(peer + TTL, ttl);
+        MemoryUtil.setInt(peer + DELETION, localDeletionTime);
+        MemoryUtil.setInt(peer + LENGTH, value.remaining());
+        MemoryUtil.setBytes(peer + VALUE, value);
+
+        if (path != null)
+        {
+            ByteBuffer pathbuffer = path.get(0);
+            assert pathbuffer.order() == ByteOrder.BIG_ENDIAN;
+
+            long offset = peer + VALUE + value.remaining();
+            MemoryUtil.setInt(offset, pathbuffer.remaining());
+            MemoryUtil.setBytes(offset + 4, pathbuffer);
+        }
+    }
+
+    private static long simpleSize(int length)
+    {
+        return VALUE + length;
+    }
+
+    public long timestamp()
+    {
+        return MemoryUtil.getLong(peer + TIMESTAMP);
+    }
+
+    public int ttl()
+    {
+        return MemoryUtil.getInt(peer + TTL);
+    }
+
+    public int localDeletionTime()
+    {
+        return MemoryUtil.getInt(peer + DELETION);
+    }
+
+    public ByteBuffer value()
+    {
+        int length = MemoryUtil.getInt(peer + LENGTH);
+        return MemoryUtil.getByteBuffer(peer + VALUE, length, ByteOrder.BIG_ENDIAN);
+    }
+
+    public CellPath path()
+    {
+        if (MemoryUtil.getByte(peer+ HAS_CELLPATH) == 0)
+            return null;
+
+        long offset = peer + VALUE + MemoryUtil.getInt(peer + LENGTH);
+        int size = MemoryUtil.getInt(offset);
+        return CellPath.create(MemoryUtil.getByteBuffer(offset + 4, size, ByteOrder.BIG_ENDIAN));
+    }
+
+    public Cell withUpdatedValue(ByteBuffer newValue)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public long unsharedHeapSizeExcludingData()
+    {
+        return EMPTY_SIZE;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/rows/RowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowIterator.java b/src/java/org/apache/cassandra/db/rows/RowIterator.java
index f0b4499..0cc4a3c 100644
--- a/src/java/org/apache/cassandra/db/rows/RowIterator.java
+++ b/src/java/org/apache/cassandra/db/rows/RowIterator.java
@@ -17,11 +17,6 @@
  */
 package org.apache.cassandra.db.rows;
 
-import java.util.Iterator;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.*;
-
 /**
  * An iterator over rows belonging to a partition.
  *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 9aa7cc4..52b346a 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -36,7 +36,6 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.IMergeIterator;
 import org.apache.cassandra.utils.MergeIterator;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
 
 /**
  * Static methods to work with atom iterators.
@@ -175,32 +174,6 @@ public abstract class UnfilteredRowIterators
         return MoreRows.extend(iter1, new Extend());
     }
 
-    public static UnfilteredRowIterator cloningIterator(UnfilteredRowIterator iterator, final AbstractAllocator allocator)
-    {
-        class Cloner extends Transformation
-        {
-            private final Row.Builder builder = allocator.cloningBTreeRowBuilder();
-
-            public Row applyToStatic(Row row)
-            {
-                return Rows.copy(row, builder).build();
-            }
-
-            @Override
-            public Row applyToRow(Row row)
-            {
-                return Rows.copy(row, builder).build();
-            }
-
-            @Override
-            public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
-            {
-                return marker.copy(allocator);
-            }
-        }
-        return Transformation.apply(iterator, new Cloner());
-    }
-
     /**
      * Validate that the data of the provided iterator is valid, that is that the values
      * it contains are valid for the type they represent, and more generally that the

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/transform/BaseRows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/BaseRows.java b/src/java/org/apache/cassandra/db/transform/BaseRows.java
index 78526e8..825db44 100644
--- a/src/java/org/apache/cassandra/db/transform/BaseRows.java
+++ b/src/java/org/apache/cassandra/db/transform/BaseRows.java
@@ -13,11 +13,13 @@ implements BaseRowIterator<R>
 {
 
     private Row staticRow;
+    private DecoratedKey partitionKey;
 
     public BaseRows(I input)
     {
         super(input);
         staticRow = input.staticRow();
+        partitionKey = input.partitionKey();
     }
 
     // swap parameter order to avoid casting errors
@@ -25,6 +27,7 @@ implements BaseRowIterator<R>
     {
         super(copyFrom);
         staticRow = copyFrom.staticRow;
+        partitionKey = copyFrom.partitionKey();
     }
 
     public CFMetaData metadata()
@@ -84,6 +87,7 @@ implements BaseRowIterator<R>
         // transform any existing data
         staticRow = transformation.applyToStatic(staticRow);
         next = applyOne(next, transformation);
+        partitionKey = transformation.applyToPartitionKey(partitionKey);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/transform/Transformation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/Transformation.java b/src/java/org/apache/cassandra/db/transform/Transformation.java
index 29e2e15..230be5f 100644
--- a/src/java/org/apache/cassandra/db/transform/Transformation.java
+++ b/src/java/org/apache/cassandra/db/transform/Transformation.java
@@ -1,5 +1,6 @@
 package org.apache.cassandra.db.transform;
 
+import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.DeletionTime;
 import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
@@ -62,6 +63,11 @@ public abstract class Transformation<I extends BaseRowIterator<?>>
     }
 
     /**
+     * Applied to the partition key of any rows/unfiltered iterator we are applied to
+     */
+    protected DecoratedKey applyToPartitionKey(DecoratedKey key) { return key; }
+
+    /**
      * Applied to the static row of any rows iterator.
      *
      * NOTE that this is only applied to the first iterator in any sequence of iterators filled by a MoreContents;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/view/TemporalRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/TemporalRow.java b/src/java/org/apache/cassandra/db/view/TemporalRow.java
index 23705b9..4774be4 100644
--- a/src/java/org/apache/cassandra/db/view/TemporalRow.java
+++ b/src/java/org/apache/cassandra/db/view/TemporalRow.java
@@ -37,16 +37,7 @@ import com.google.common.collect.Iterables;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.db.CBuilder;
-import org.apache.cassandra.db.Clustering;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Conflicts;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.DeletionInfo;
-import org.apache.cassandra.db.DeletionTime;
-import org.apache.cassandra.db.LivenessInfo;
-import org.apache.cassandra.db.RangeTombstone;
-import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.db.partitions.AbstractBTreePartition;
 import org.apache.cassandra.db.rows.BufferCell;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/index/internal/IndexEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/IndexEntry.java b/src/java/org/apache/cassandra/index/internal/IndexEntry.java
index 6f94ace..834dd6a 100644
--- a/src/java/org/apache/cassandra/index/internal/IndexEntry.java
+++ b/src/java/org/apache/cassandra/index/internal/IndexEntry.java
@@ -3,6 +3,7 @@ package org.apache.cassandra.index.internal;
 import java.nio.ByteBuffer;
 
 import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.DecoratedKey;
 
 /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java b/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
index 0ad0891..101f2bb 100644
--- a/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
@@ -145,7 +145,7 @@ public class KeysSearcher extends CassandraIndexSearcher
             // is the indexed name. Ans so we need to materialize the partition.
             ImmutableBTreePartition result = ImmutableBTreePartition.create(iterator);
             iterator.close();
-            Row data = result.getRow(new Clustering(index.getIndexedColumn().name.bytes));
+            Row data = result.getRow(Clustering.make(index.getIndexedColumn().name.bytes));
 
             // for thrift tables, we need to compare the index entry against the compact value column,
             // not the column actually designated as the indexed column so we don't use the index function
@@ -155,7 +155,7 @@ public class KeysSearcher extends CassandraIndexSearcher
             {
                 // Index is stale, remove the index entry and ignore
                 index.deleteStaleEntry(index.getIndexCfs().decorateKey(indexedValue),
-                                         new Clustering(index.getIndexedColumn().name.bytes),
+                                         Clustering.make(index.getIndexedColumn().name.bytes),
                                          new DeletionTime(indexHit.primaryKeyLivenessInfo().timestamp(), nowInSec),
                                          writeOp);
                 return null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index 1c93f58..3de0e5a 100644
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -17,14 +17,11 @@
  */
 package org.apache.cassandra.io.sstable.metadata;
 
-import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import com.google.common.collect.Maps;
 
@@ -35,7 +32,6 @@ import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
 import org.apache.cassandra.db.rows.Cell;
-import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.service.ActiveRepairService;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/io/util/MemoryInputStream.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MemoryInputStream.java b/src/java/org/apache/cassandra/io/util/MemoryInputStream.java
index e009528..3daa4c4 100644
--- a/src/java/org/apache/cassandra/io/util/MemoryInputStream.java
+++ b/src/java/org/apache/cassandra/io/util/MemoryInputStream.java
@@ -71,6 +71,6 @@ public class MemoryInputStream extends RebufferingInputStream implements DataInp
 
     private static ByteBuffer getByteBuffer(long offset, int length)
     {
-        return MemoryUtil.getByteBuffer(offset, length).order(ByteOrder.BIG_ENDIAN);
+        return MemoryUtil.getByteBuffer(offset, length, ByteOrder.BIG_ENDIAN);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index 03d7cd4..9bda3a0 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -40,10 +40,8 @@ import org.apache.cassandra.cache.*;
 import org.apache.cassandra.cache.AutoSavingCache.CacheSerializer;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.filter.*;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 61d9b5f..378a051 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -360,7 +360,7 @@ public class CassandraServer implements Cassandra.Iface
     private ClusteringIndexFilter toInternalFilter(CFMetaData metadata, ColumnParent parent, SliceRange range)
     {
         if (metadata.isSuper() && parent.isSetSuper_column())
-            return new ClusteringIndexNamesFilter(FBUtilities.singleton(new Clustering(parent.bufferForSuper_column()), metadata.comparator), range.reversed);
+            return new ClusteringIndexNamesFilter(FBUtilities.singleton(Clustering.make(parent.bufferForSuper_column()), metadata.comparator), range.reversed);
         else
             return new ClusteringIndexSliceFilter(makeSlices(metadata, range), range.reversed);
     }
@@ -384,13 +384,13 @@ public class CassandraServer implements Cassandra.Iface
                 {
                     if (parent.isSetSuper_column())
                     {
-                        return new ClusteringIndexNamesFilter(FBUtilities.singleton(new Clustering(parent.bufferForSuper_column()), metadata.comparator), false);
+                        return new ClusteringIndexNamesFilter(FBUtilities.singleton(Clustering.make(parent.bufferForSuper_column()), metadata.comparator), false);
                     }
                     else
                     {
                         NavigableSet<Clustering> clusterings = new TreeSet<>(metadata.comparator);
                         for (ByteBuffer bb : predicate.column_names)
-                            clusterings.add(new Clustering(bb));
+                            clusterings.add(Clustering.make(bb));
                         return new ClusteringIndexNamesFilter(clusterings, false);
                     }
                 }
@@ -460,7 +460,7 @@ public class CassandraServer implements Cassandra.Iface
             // We only want to include the static columns that are selected by the slices
             for (ColumnDefinition def : columns.statics)
             {
-                if (slices.selects(new Clustering(def.name.bytes)))
+                if (slices.selects(Clustering.make(def.name.bytes)))
                     builder.add(def);
             }
             columns = builder.build();
@@ -617,7 +617,7 @@ public class CassandraServer implements Cassandra.Iface
                     builder.select(dynamicDef, CellPath.create(column_path.column));
                     columns = builder.build();
                 }
-                filter = new ClusteringIndexNamesFilter(FBUtilities.singleton(new Clustering(column_path.super_column), metadata.comparator),
+                filter = new ClusteringIndexNamesFilter(FBUtilities.singleton(Clustering.make(column_path.super_column), metadata.comparator),
                                                   false);
             }
             else
@@ -631,7 +631,7 @@ public class CassandraServer implements Cassandra.Iface
                     builder.add(cellname.column);
                     builder.add(metadata.compactValueColumn());
                     columns = builder.build();
-                    filter = new ClusteringIndexNamesFilter(FBUtilities.singleton(new Clustering(column_path.column), metadata.comparator), false);
+                    filter = new ClusteringIndexNamesFilter(FBUtilities.singleton(Clustering.make(column_path.column), metadata.comparator), false);
                 }
                 else
                 {
@@ -1353,7 +1353,7 @@ public class CassandraServer implements Cassandra.Iface
         }
         else if (column_path.super_column != null && column_path.column == null)
         {
-            Row row = BTreeRow.emptyDeletedRow(new Clustering(column_path.super_column), Row.Deletion.regular(new DeletionTime(timestamp, nowInSec)));
+            Row row = BTreeRow.emptyDeletedRow(Clustering.make(column_path.super_column), Row.Deletion.regular(new DeletionTime(timestamp, nowInSec)));
             update = PartitionUpdate.singleRowUpdate(metadata, dk, row);
         }
         else
@@ -1611,7 +1611,7 @@ public class CassandraServer implements Cassandra.Iface
                 ClusteringIndexFilter filter = new ClusteringIndexSliceFilter(Slices.ALL, false);
                 DataLimits limits = getLimits(range.count, true, Integer.MAX_VALUE);
                 Clustering pageFrom = metadata.isSuper()
-                                    ? new Clustering(start_column)
+                                    ? Clustering.make(start_column)
                                     : LegacyLayout.decodeCellName(metadata, start_column).clustering;
                 PartitionRangeReadCommand cmd = new PartitionRangeReadCommand(false,
                                                                               0,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java b/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java
index ea3fa2f..a14409e 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java
@@ -199,7 +199,7 @@ public class ThriftResultsMerger extends Transformation<UnfilteredRowIterator>
             Cell cell = staticCells.next();
 
             // Given a static cell, the equivalent row uses the column name as clustering and the value as unique cell value.
-            builder.newRow(new Clustering(cell.column().name.bytes));
+            builder.newRow(Clustering.make(cell.column().name.bytes));
             builder.addCell(new BufferCell(metadata().compactValueColumn(), cell.timestamp(), cell.ttl(), cell.localDeletionTime(), cell.value(), cell.path()));
             nextToMerge = builder.build();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java b/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java
index 9066335..c3cac2b 100644
--- a/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.utils.memory;
 import java.nio.ByteBuffer;
 
 import org.apache.cassandra.db.Clustering;
-import org.apache.cassandra.db.Columns;
 import org.apache.cassandra.db.rows.BTreeRow;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.Row;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/utils/memory/EnsureOnHeap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/EnsureOnHeap.java b/src/java/org/apache/cassandra/utils/memory/EnsureOnHeap.java
new file mode 100644
index 0000000..b53c2a0
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/EnsureOnHeap.java
@@ -0,0 +1,150 @@
+package org.apache.cassandra.utils.memory;
+
+import java.util.Iterator;
+
+import org.apache.cassandra.db.BufferDecoratedKey;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionInfo;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.utils.SearchIterator;
+
+public abstract class EnsureOnHeap extends Transformation
+{
+    public abstract DecoratedKey applyToPartitionKey(DecoratedKey key);
+    public abstract UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition);
+    public abstract SearchIterator<Clustering, Row> applyToPartition(SearchIterator<Clustering, Row> partition);
+    public abstract Iterator<Row> applyToPartition(Iterator<Row> partition);
+    public abstract DeletionInfo applyToDeletionInfo(DeletionInfo deletionInfo);
+    public abstract Row applyToRow(Row row);
+    public abstract Row applyToStatic(Row row);
+    public abstract RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker);
+
+    static class CloneToHeap extends EnsureOnHeap
+    {
+        protected BaseRowIterator<?> applyToPartition(BaseRowIterator partition)
+        {
+            return partition instanceof UnfilteredRowIterator
+                   ? Transformation.apply((UnfilteredRowIterator) partition, this)
+                   : Transformation.apply((RowIterator) partition, this);
+        }
+
+        public DecoratedKey applyToPartitionKey(DecoratedKey key)
+        {
+            return new BufferDecoratedKey(key.getToken(), HeapAllocator.instance.clone(key.getKey()));
+        }
+
+        public Row applyToRow(Row row)
+        {
+            if (row == null)
+                return null;
+            return Rows.copy(row, HeapAllocator.instance.cloningBTreeRowBuilder()).build();
+        }
+
+        public Row applyToStatic(Row row)
+        {
+            if (row == Rows.EMPTY_STATIC_ROW)
+                return row;
+            return applyToRow(row);
+        }
+
+        public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+        {
+            return marker.copy(HeapAllocator.instance);
+        }
+
+        public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
+        {
+            return Transformation.apply(partition, this);
+        }
+
+        public SearchIterator<Clustering, Row> applyToPartition(SearchIterator<Clustering, Row> partition)
+        {
+            return new SearchIterator<Clustering, Row>()
+            {
+                public boolean hasNext()
+                {
+                    return partition.hasNext();
+                }
+
+                public Row next(Clustering key)
+                {
+                    return applyToRow(partition.next(key));
+                }
+            };
+        }
+
+        public Iterator<Row> applyToPartition(Iterator<Row> partition)
+        {
+            return new Iterator<Row>()
+            {
+                public boolean hasNext()
+                {
+                    return partition.hasNext();
+                }
+                public Row next()
+                {
+                    return applyToRow(partition.next());
+                }
+                public void remove()
+                {
+                    partition.remove();
+                }
+            };
+        }
+
+        public DeletionInfo applyToDeletionInfo(DeletionInfo deletionInfo)
+        {
+            return deletionInfo.copy(HeapAllocator.instance);
+        }
+    }
+
+    static class NoOp extends EnsureOnHeap
+    {
+        protected BaseRowIterator<?> applyToPartition(BaseRowIterator partition)
+        {
+            return partition;
+        }
+
+        public DecoratedKey applyToPartitionKey(DecoratedKey key)
+        {
+            return key;
+        }
+
+        public Row applyToRow(Row row)
+        {
+            return row;
+        }
+
+        public Row applyToStatic(Row row)
+        {
+            return row;
+        }
+
+        public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+        {
+            return marker;
+        }
+
+        public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
+        {
+            return partition;
+        }
+
+        public SearchIterator<Clustering, Row> applyToPartition(SearchIterator<Clustering, Row> partition)
+        {
+            return partition;
+        }
+
+        public Iterator<Row> applyToPartition(Iterator<Row> partition)
+        {
+            return partition;
+        }
+
+        public DeletionInfo applyToDeletionInfo(DeletionInfo deletionInfo)
+        {
+            return deletionInfo;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/utils/memory/HeapAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/HeapAllocator.java b/src/java/org/apache/cassandra/utils/memory/HeapAllocator.java
index 41877f5..8333142 100644
--- a/src/java/org/apache/cassandra/utils/memory/HeapAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/HeapAllocator.java
@@ -33,4 +33,9 @@ public final class HeapAllocator extends AbstractAllocator
     {
         return ByteBuffer.allocate(size);
     }
+
+    public boolean allocatingOnHeap()
+    {
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/utils/memory/HeapPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/HeapPool.java b/src/java/org/apache/cassandra/utils/memory/HeapPool.java
index 19f81be..593b443 100644
--- a/src/java/org/apache/cassandra/utils/memory/HeapPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/HeapPool.java
@@ -32,11 +32,6 @@ public class HeapPool extends MemtablePool
         super(maxOnHeapMemory, 0, cleanupThreshold, cleaner);
     }
 
-    public boolean needToCopyOnHeap()
-    {
-        return false;
-    }
-
     public MemtableAllocator newAllocator()
     {
         // TODO

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java b/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
index 7fa01d2..f205e7b 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemoryUtil.java
@@ -106,6 +106,11 @@ public abstract class MemoryUtil
         unsafe.putByte(address, b);
     }
 
+    public static void setByte(long address, int count, byte b)
+    {
+        unsafe.setMemory(address, count, b);
+    }
+
     public static void setShort(long address, short s)
     {
         unsafe.putShort(address, s);
@@ -149,13 +154,23 @@ public abstract class MemoryUtil
 
     public static ByteBuffer getByteBuffer(long address, int length)
     {
-        ByteBuffer instance = getHollowDirectByteBuffer();
+        return getByteBuffer(address, length, ByteOrder.nativeOrder());
+    }
+
+    public static ByteBuffer getByteBuffer(long address, int length, ByteOrder order)
+    {
+        ByteBuffer instance = getHollowDirectByteBuffer(order);
         setByteBuffer(instance, address, length);
         return instance;
     }
 
     public static ByteBuffer getHollowDirectByteBuffer()
     {
+        return getHollowDirectByteBuffer(ByteOrder.nativeOrder());
+    }
+
+    public static ByteBuffer getHollowDirectByteBuffer(ByteOrder order)
+    {
         ByteBuffer instance;
         try
         {
@@ -165,7 +180,7 @@ public abstract class MemoryUtil
         {
             throw new AssertionError(e);
         }
-        instance.order(ByteOrder.nativeOrder());
+        instance.order(order);
         return instance;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
index 588b433..1f1bf5e 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtableAllocator.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.utils.memory;
 
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -32,7 +31,7 @@ public abstract class MemtableAllocator
     private final SubAllocator offHeap;
     volatile LifeCycle state = LifeCycle.LIVE;
 
-    static enum LifeCycle
+    enum LifeCycle
     {
         LIVE, DISCARDING, DISCARDED;
         LifeCycle transition(LifeCycle targetState)
@@ -62,6 +61,7 @@ public abstract class MemtableAllocator
     public abstract Row.Builder rowBuilder(OpOrder.Group opGroup);
     public abstract DecoratedKey clone(DecoratedKey key, OpOrder.Group opGroup);
     public abstract DataReclaimer reclaimer();
+    public abstract EnsureOnHeap ensureOnHeap();
 
     public SubAllocator onHeap()
     {
@@ -251,4 +251,5 @@ public abstract class MemtableAllocator
         private static final AtomicLongFieldUpdater<SubAllocator> reclaimingUpdater = AtomicLongFieldUpdater.newUpdater(SubAllocator.class, "reclaiming");
     }
 
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
index bb85884..8f35042 100644
--- a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
@@ -56,7 +56,6 @@ public abstract class MemtablePool
         return cleaner == null ? null : new MemtableCleanerThread<>(this, cleaner);
     }
 
-    public abstract boolean needToCopyOnHeap();
     public abstract MemtableAllocator newAllocator();
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
index e5458b4..6acf6c6 100644
--- a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
@@ -24,10 +24,8 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.NativeDecoratedKey;
-import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 
 public class NativeAllocator extends MemtableAllocator
@@ -47,16 +45,42 @@ public class NativeAllocator extends MemtableAllocator
 
     private final AtomicReference<Region> currentRegion = new AtomicReference<>();
     private final ConcurrentLinkedQueue<Region> regions = new ConcurrentLinkedQueue<>();
+    private final EnsureOnHeap.CloneToHeap cloneToHeap = new EnsureOnHeap.CloneToHeap();
 
     protected NativeAllocator(NativePool pool)
     {
         super(pool.onHeap.newAllocator(), pool.offHeap.newAllocator());
     }
 
+    private static class CloningBTreeRowBuilder extends BTreeRow.Builder
+    {
+        final OpOrder.Group writeOp;
+        final NativeAllocator allocator;
+        private CloningBTreeRowBuilder(OpOrder.Group writeOp, NativeAllocator allocator)
+        {
+            super(true);
+            this.writeOp = writeOp;
+            this.allocator = allocator;
+        }
+
+        @Override
+        public void newRow(Clustering clustering)
+        {
+            if (clustering != Clustering.STATIC_CLUSTERING)
+                clustering = new NativeClustering(allocator, writeOp, clustering);
+            super.newRow(clustering);
+        }
+
+        @Override
+        public void addCell(Cell cell)
+        {
+            super.addCell(new NativeCell(allocator, writeOp, cell));
+        }
+    }
+
     public Row.Builder rowBuilder(OpOrder.Group opGroup)
     {
-        // TODO
-        throw new UnsupportedOperationException();
+        return new CloningBTreeRowBuilder(opGroup, this);
     }
 
     public DecoratedKey clone(DecoratedKey key, OpOrder.Group writeOp)
@@ -70,6 +94,11 @@ public class NativeAllocator extends MemtableAllocator
         return NO_OP;
     }
 
+    public EnsureOnHeap ensureOnHeap()
+    {
+        return cloneToHeap;
+    }
+
     public long allocate(int size, OpOrder.Group opGroup)
     {
         assert size >= 0;
@@ -136,6 +165,7 @@ public class NativeAllocator extends MemtableAllocator
     {
         for (Region region : regions)
             MemoryUtil.free(region.peer);
+
         super.setDiscarded();
     }
 
@@ -181,12 +211,12 @@ public class NativeAllocator extends MemtableAllocator
          * Offset for the next allocation, or the sentinel value -1
          * which implies that the region is still uninitialized.
          */
-        private AtomicInteger nextFreeOffset = new AtomicInteger(0);
+        private final AtomicInteger nextFreeOffset = new AtomicInteger(0);
 
         /**
          * Total number of allocations satisfied from this buffer
          */
-        private AtomicInteger allocCount = new AtomicInteger();
+        private final AtomicInteger allocCount = new AtomicInteger();
 
         /**
          * Create an uninitialized region. Note that memory is not allocated yet, so

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/utils/memory/NativePool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/NativePool.java b/src/java/org/apache/cassandra/utils/memory/NativePool.java
index 012867a..800c777 100644
--- a/src/java/org/apache/cassandra/utils/memory/NativePool.java
+++ b/src/java/org/apache/cassandra/utils/memory/NativePool.java
@@ -26,12 +26,6 @@ public class NativePool extends MemtablePool
     }
 
     @Override
-    public boolean needToCopyOnHeap()
-    {
-        return true;
-    }
-
-    @Override
     public NativeAllocator newAllocator()
     {
         return new NativeAllocator(this);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/utils/memory/SlabAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/SlabAllocator.java b/src/java/org/apache/cassandra/utils/memory/SlabAllocator.java
index 8defd25..ce4f41f 100644
--- a/src/java/org/apache/cassandra/utils/memory/SlabAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/SlabAllocator.java
@@ -59,13 +59,20 @@ public class SlabAllocator extends MemtableBufferAllocator
 
     // this queue is used to keep references to off-heap allocated regions so that we can free them when we are discarded
     private final ConcurrentLinkedQueue<Region> offHeapRegions = new ConcurrentLinkedQueue<>();
-    private AtomicLong unslabbedSize = new AtomicLong(0);
+    private final AtomicLong unslabbedSize = new AtomicLong(0);
     private final boolean allocateOnHeapOnly;
+    private final EnsureOnHeap ensureOnHeap;
 
     SlabAllocator(SubAllocator onHeap, SubAllocator offHeap, boolean allocateOnHeapOnly)
     {
         super(onHeap, offHeap);
         this.allocateOnHeapOnly = allocateOnHeapOnly;
+        this.ensureOnHeap = allocateOnHeapOnly ? new EnsureOnHeap.NoOp() : new EnsureOnHeap.CloneToHeap();
+    }
+
+    public EnsureOnHeap ensureOnHeap()
+    {
+        return ensureOnHeap;
     }
 
     public ByteBuffer allocate(int size)
@@ -168,18 +175,18 @@ public class SlabAllocator extends MemtableBufferAllocator
         /**
          * Actual underlying data
          */
-        private ByteBuffer data;
+        private final ByteBuffer data;
 
         /**
          * Offset for the next allocation, or the sentinel value -1
          * which implies that the region is still uninitialized.
          */
-        private AtomicInteger nextFreeOffset = new AtomicInteger(0);
+        private final AtomicInteger nextFreeOffset = new AtomicInteger(0);
 
         /**
          * Total number of allocations satisfied from this buffer
          */
-        private AtomicInteger allocCount = new AtomicInteger();
+        private final AtomicInteger allocCount = new AtomicInteger();
 
         /**
          * Create an uninitialized region. Note that memory is not allocated yet, so

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/utils/memory/SlabPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/SlabPool.java b/src/java/org/apache/cassandra/utils/memory/SlabPool.java
index c5c44e1..bd7ec1f 100644
--- a/src/java/org/apache/cassandra/utils/memory/SlabPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/SlabPool.java
@@ -32,9 +32,4 @@ public class SlabPool extends MemtablePool
     {
         return new SlabAllocator(onHeap.newAllocator(), offHeap.newAllocator(), allocateOnHeap);
     }
-
-    public boolean needToCopyOnHeap()
-    {
-        return !allocateOnHeap;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index 1dba284..eb03d17 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -3,7 +3,8 @@
 # Consider the effects on 'o.a.c.i.s.LegacySSTableTest' before changing schemas in this file.
 #
 cluster_name: Test Cluster
-memtable_allocation_type: heap_buffers
+# memtable_allocation_type: heap_buffers
+memtable_allocation_type: offheap_objects
 commitlog_sync: batch
 commitlog_sync_batch_window_in_ms: 1.0
 commitlog_segment_size_in_mb: 5

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/test/unit/org/apache/cassandra/db/KeyspaceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyspaceTest.java b/test/unit/org/apache/cassandra/db/KeyspaceTest.java
index c0c2753..6536285 100644
--- a/test/unit/org/apache/cassandra/db/KeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyspaceTest.java
@@ -129,8 +129,8 @@ public class KeyspaceTest extends CQLTester
 
     private static void assertRowsInSlice(ColumnFamilyStore cfs, String key, int sliceStart, int sliceEnd, int limit, boolean reversed, String columnValuePrefix)
     {
-        Clustering startClustering = new Clustering(ByteBufferUtil.bytes(sliceStart));
-        Clustering endClustering = new Clustering(ByteBufferUtil.bytes(sliceEnd));
+        Clustering startClustering = Clustering.make(ByteBufferUtil.bytes(sliceStart));
+        Clustering endClustering = Clustering.make(ByteBufferUtil.bytes(sliceEnd));
         Slices slices = Slices.with(cfs.getComparator(), Slice.make(startClustering, endClustering));
         ClusteringIndexSliceFilter filter = new ClusteringIndexSliceFilter(slices, reversed);
         SinglePartitionReadCommand command = singlePartitionSlice(cfs, key, filter, limit);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/test/unit/org/apache/cassandra/db/NativeCellTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/NativeCellTest.java b/test/unit/org/apache/cassandra/db/NativeCellTest.java
new file mode 100644
index 0000000..69e615b
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/NativeCellTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.UUID;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.HeapAllocator;
+import org.apache.cassandra.utils.memory.NativeAllocator;
+import org.apache.cassandra.utils.memory.NativePool;
+
+public class NativeCellTest
+{
+
+    private static final Logger logger = LoggerFactory.getLogger(NativeCellTest.class);
+    private static final NativeAllocator nativeAllocator = new NativePool(Integer.MAX_VALUE, Integer.MAX_VALUE, 1f, null).newAllocator();
+    private static final OpOrder.Group group = new OpOrder().start();
+    private static Random rand;
+
+    @BeforeClass
+    public static void setUp()
+    {
+        long seed = System.currentTimeMillis();
+        logger.info("Seed : {}", seed);
+        rand = new Random(seed);
+    }
+
+    @Test
+    public void testCells() throws IOException
+    {
+        for (int run = 0 ; run < 1000 ; run++)
+        {
+            Row.Builder builder = BTreeRow.unsortedBuilder(1);
+            builder.newRow(rndclustering());
+            int count = 1 + rand.nextInt(10);
+            for (int i = 0 ; i < count ; i++)
+                rndcd(builder);
+            test(builder.build());
+        }
+    }
+
+    private static Clustering rndclustering()
+    {
+        int count = 1 + rand.nextInt(100);
+        ByteBuffer[] values = new ByteBuffer[count];
+        int size = rand.nextInt(65535);
+        for (int i = 0 ; i < count ; i++)
+        {
+            int twiceShare = 1 + (2 * size) / (count - i);
+            int nextSize = Math.min(size, rand.nextInt(twiceShare));
+            if (nextSize < 10 && rand.nextBoolean())
+                continue;
+
+            byte[] bytes = new byte[nextSize];
+            rand.nextBytes(bytes);
+            values[i] = ByteBuffer.wrap(bytes);
+            size -= nextSize;
+        }
+        return Clustering.make(values);
+    }
+
+    private static void rndcd(Row.Builder builder)
+    {
+        ColumnDefinition col = rndcol();
+        if (!col.isComplex())
+        {
+            builder.addCell(rndcell(col));
+        }
+        else
+        {
+            int count = 1 + rand.nextInt(100);
+            for (int i = 0 ; i < count ; i++)
+                builder.addCell(rndcell(col));
+        }
+    }
+
+    private static ColumnDefinition rndcol()
+    {
+        UUID uuid = new UUID(rand.nextLong(), rand.nextLong());
+        boolean isComplex = rand.nextBoolean();
+        return new ColumnDefinition("",
+                                    "",
+                                    ColumnIdentifier.getInterned(uuid.toString(), false),
+                                    isComplex ? new SetType<>(BytesType.instance, true) : BytesType.instance,
+                                    -1,
+                                    ColumnDefinition.Kind.REGULAR);
+    }
+
+    private static Cell rndcell(ColumnDefinition col)
+    {
+        long timestamp = rand.nextLong();
+        int ttl = rand.nextInt();
+        int localDeletionTime = rand.nextInt();
+        byte[] value = new byte[rand.nextInt(sanesize(expdecay()))];
+        rand.nextBytes(value);
+        CellPath path = null;
+        if (col.isComplex())
+        {
+            byte[] pathbytes = new byte[rand.nextInt(sanesize(expdecay()))];
+            rand.nextBytes(value);
+            path = CellPath.create(ByteBuffer.wrap(pathbytes));
+        }
+
+        return new BufferCell(col, timestamp, ttl, localDeletionTime, ByteBuffer.wrap(value), path);
+    }
+
+    private static int expdecay()
+    {
+        return 1 << Integer.numberOfTrailingZeros(Integer.lowestOneBit(rand.nextInt()));
+    }
+
+    private static int sanesize(int randomsize)
+    {
+        return Math.min(Math.max(1, randomsize), 1 << 26);
+    }
+
+    private static void test(Row row)
+    {
+        Row nrow = clone(row, nativeAllocator.rowBuilder(group));
+        Row brow = clone(row, HeapAllocator.instance.cloningBTreeRowBuilder());
+        Assert.assertEquals(row, nrow);
+        Assert.assertEquals(row, brow);
+        Assert.assertEquals(nrow, brow);
+
+        Assert.assertEquals(row.clustering(), nrow.clustering());
+        Assert.assertEquals(row.clustering(), brow.clustering());
+        Assert.assertEquals(nrow.clustering(), brow.clustering());
+
+        ClusteringComparator comparator = new ClusteringComparator(UTF8Type.instance);
+        Assert.assertTrue(comparator.compare(row.clustering(), nrow.clustering()) == 0);
+        Assert.assertTrue(comparator.compare(row.clustering(), brow.clustering()) == 0);
+        Assert.assertTrue(comparator.compare(nrow.clustering(), brow.clustering()) == 0);
+    }
+
+    private static Row clone(Row row, Row.Builder builder)
+    {
+        return Rows.copy(row, builder).build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/test/unit/org/apache/cassandra/db/RangeTombstoneListTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RangeTombstoneListTest.java b/test/unit/org/apache/cassandra/db/RangeTombstoneListTest.java
index f40abe9..1e637b3 100644
--- a/test/unit/org/apache/cassandra/db/RangeTombstoneListTest.java
+++ b/test/unit/org/apache/cassandra/db/RangeTombstoneListTest.java
@@ -605,7 +605,7 @@ public class RangeTombstoneListTest
 
     private static Clustering clustering(int i)
     {
-        return new Clustering(bb(i));
+        return Clustering.make(bb(i));
     }
 
     private static ByteBuffer bb(int i)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
index d0cc890..4be46ff 100644
--- a/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
+++ b/test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
@@ -112,17 +112,17 @@ public class RangeTombstoneTest
         int nowInSec = FBUtilities.nowInSeconds();
 
         for (int i : live)
-            assertTrue("Row " + i + " should be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec));
+            assertTrue("Row " + i + " should be live", partition.getRow(Clustering.make(bb(i))).hasLiveData(nowInSec));
         for (int i : dead)
-            assertFalse("Row " + i + " shouldn't be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec));
+            assertFalse("Row " + i + " shouldn't be live", partition.getRow(Clustering.make(bb(i))).hasLiveData(nowInSec));
 
         // Queries by slices
         partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key).fromIncl(7).toIncl(30).build());
 
         for (int i : new int[]{ 7, 8, 9, 11, 13, 15, 17, 28, 29, 30 })
-            assertTrue("Row " + i + " should be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec));
+            assertTrue("Row " + i + " should be live", partition.getRow(Clustering.make(bb(i))).hasLiveData(nowInSec));
         for (int i : new int[]{ 10, 12, 14, 16, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27 })
-            assertFalse("Row " + i + " shouldn't be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec));
+            assertFalse("Row " + i + " shouldn't be live", partition.getRow(Clustering.make(bb(i))).hasLiveData(nowInSec));
     }
 
     @Test
@@ -408,22 +408,22 @@ public class RangeTombstoneTest
         int nowInSec = FBUtilities.nowInSeconds();
 
         for (int i = 0; i < 5; i++)
-            assertTrue("Row " + i + " should be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec));
+            assertTrue("Row " + i + " should be live", partition.getRow(Clustering.make(bb(i))).hasLiveData(nowInSec));
         for (int i = 16; i < 20; i++)
-            assertTrue("Row " + i + " should be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec));
+            assertTrue("Row " + i + " should be live", partition.getRow(Clustering.make(bb(i))).hasLiveData(nowInSec));
         for (int i = 5; i <= 15; i++)
-            assertFalse("Row " + i + " shouldn't be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec));
+            assertFalse("Row " + i + " shouldn't be live", partition.getRow(Clustering.make(bb(i))).hasLiveData(nowInSec));
 
         // Compact everything and re-test
         CompactionManager.instance.performMaximal(cfs, false);
         partition = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, key).build());
 
         for (int i = 0; i < 5; i++)
-            assertTrue("Row " + i + " should be live", partition.getRow(new Clustering(bb(i))).hasLiveData(FBUtilities.nowInSeconds()));
+            assertTrue("Row " + i + " should be live", partition.getRow(Clustering.make(bb(i))).hasLiveData(FBUtilities.nowInSeconds()));
         for (int i = 16; i < 20; i++)
-            assertTrue("Row " + i + " should be live", partition.getRow(new Clustering(bb(i))).hasLiveData(FBUtilities.nowInSeconds()));
+            assertTrue("Row " + i + " should be live", partition.getRow(Clustering.make(bb(i))).hasLiveData(FBUtilities.nowInSeconds()));
         for (int i = 5; i <= 15; i++)
-            assertFalse("Row " + i + " shouldn't be live", partition.getRow(new Clustering(bb(i))).hasLiveData(nowInSec));
+            assertFalse("Row " + i + " shouldn't be live", partition.getRow(Clustering.make(bb(i))).hasLiveData(nowInSec));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/test/unit/org/apache/cassandra/db/ReadMessageTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ReadMessageTest.java b/test/unit/org/apache/cassandra/db/ReadMessageTest.java
index d801b32..6dafa37 100644
--- a/test/unit/org/apache/cassandra/db/ReadMessageTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadMessageTest.java
@@ -219,7 +219,7 @@ public class ReadMessageTest
         {
             for (PartitionUpdate upd : mutation.getPartitionUpdates())
             {
-                Row r = upd.getRow(new Clustering(ByteBufferUtil.bytes("c")));
+                Row r = upd.getRow(Clustering.make(ByteBufferUtil.bytes("c")));
                 if (r != null)
                 {
                     if (r.getCell(withCommit) != null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/test/unit/org/apache/cassandra/db/RowCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RowCacheTest.java b/test/unit/org/apache/cassandra/db/RowCacheTest.java
index 267e5e4..21d7b8f 100644
--- a/test/unit/org/apache/cassandra/db/RowCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java
@@ -480,7 +480,7 @@ public class RowCacheTest
         for (int i = offset; i < offset + numberOfRows; i++)
         {
             DecoratedKey key = Util.dk("key" + i);
-            Clustering cl = new Clustering(ByteBufferUtil.bytes("col" + i));
+            Clustering cl = Clustering.make(ByteBufferUtil.bytes("col" + i));
             Util.getAll(Util.cmd(store, key).build());
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java b/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java
index 5503cfb..15e961b 100644
--- a/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java
+++ b/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.db.rows;
 
 import java.nio.ByteBuffer;
-import java.util.*;
 import java.security.MessageDigest;
 
 import org.junit.Test;
@@ -28,7 +27,6 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.net.MessagingService;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
index 7637fa0..6f1985a 100644
--- a/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
+++ b/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsMergeTest.java
@@ -364,7 +364,7 @@ public class UnfilteredRowIteratorsMergeTest
 
     private static Clustering clusteringFor(int i)
     {
-        return new Clustering(Int32Type.instance.decompose(i));
+        return Clustering.make(Int32Type.instance.decompose(i));
     }
 
     static Row emptyRowAt(int pos, Function<Integer, Integer> timeGenerator)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
index d38276f..6df2d65 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
@@ -28,14 +28,10 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.Directories;
-import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.io.util.FileUtils;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 public class CQLSSTableWriterClientTest
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
index ad7523d..72c7467 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
@@ -142,7 +142,7 @@ public class SSTableLoaderTest
 
         assertEquals(1, partitions.size());
         assertEquals("key1", AsciiType.instance.getString(partitions.get(0).partitionKey().getKey()));
-        assertEquals(ByteBufferUtil.bytes("100"), partitions.get(0).getRow(new Clustering(ByteBufferUtil.bytes("col1")))
+        assertEquals(ByteBufferUtil.bytes("100"), partitions.get(0).getRow(Clustering.make(ByteBufferUtil.bytes("col1")))
                                                                    .getCell(cfmeta.getColumnDefinition(ByteBufferUtil.bytes("val")))
                                                                    .value());
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/test/unit/org/apache/cassandra/service/pager/PagingStateTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/pager/PagingStateTest.java b/test/unit/org/apache/cassandra/service/pager/PagingStateTest.java
index ba82e85..f4ac377 100644
--- a/test/unit/org/apache/cassandra/service/pager/PagingStateTest.java
+++ b/test/unit/org/apache/cassandra/service/pager/PagingStateTest.java
@@ -49,7 +49,7 @@ public class PagingStateTest
         ByteBuffer pk = ByteBufferUtil.bytes("someKey");
 
         ColumnDefinition def = metadata.getColumnDefinition(new ColumnIdentifier("myCol", false));
-        Clustering c = new Clustering(ByteBufferUtil.bytes("c1"), ByteBufferUtil.bytes(42));
+        Clustering c = Clustering.make(ByteBufferUtil.bytes("c1"), ByteBufferUtil.bytes(42));
         Row row = BTreeRow.singleCellRow(c, BufferCell.live(metadata, def, 0, ByteBufferUtil.EMPTY_BYTE_BUFFER));
         PagingState.RowMark mark = PagingState.RowMark.create(metadata, row, protocolVersion);
         return new PagingState(pk, mark, 10, 0);


[2/2] cassandra git commit: 9472: Reintroduce Off-Heap Memtables

Posted by be...@apache.org.
9472: Reintroduce Off-Heap Memtables

patch by benedict and stefania


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2f412431
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2f412431
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2f412431

Branch: refs/heads/trunk
Commit: 2f41243191c381193a3bf6ec3730ff6555325d06
Parents: a0901b8
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Wed Nov 4 17:03:38 2015 +0000
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Wed Jan 27 10:49:26 2016 +0000

----------------------------------------------------------------------
 conf/cassandra.yaml                             |   1 +
 .../cassandra/config/DatabaseDescriptor.java    |   3 +-
 .../db/AbstractBufferClusteringPrefix.java      |  72 ++++++
 .../cassandra/db/AbstractClusteringPrefix.java  |  44 ----
 .../apache/cassandra/db/BufferClustering.java   |  44 ++++
 src/java/org/apache/cassandra/db/CBuilder.java  |   6 +-
 .../org/apache/cassandra/db/Clustering.java     | 150 ++++++------
 .../cassandra/db/ClusteringComparator.java      |   3 -
 .../apache/cassandra/db/ClusteringPrefix.java   |   4 +-
 .../org/apache/cassandra/db/LegacyLayout.java   |   8 +-
 src/java/org/apache/cassandra/db/Memtable.java  |   5 +-
 .../org/apache/cassandra/db/MultiCBuilder.java  |   2 +-
 .../apache/cassandra/db/NativeClustering.java   | 125 ++++++++++
 .../apache/cassandra/db/NativeDecoratedKey.java |   6 +-
 .../apache/cassandra/db/RowUpdateBuilder.java   |   2 -
 .../org/apache/cassandra/db/Serializers.java    |   6 +-
 .../db/SinglePartitionReadCommand.java          |  18 +-
 src/java/org/apache/cassandra/db/Slice.java     |   2 +-
 .../apache/cassandra/db/filter/RowFilter.java   |   4 +-
 .../db/partitions/AbstractBTreePartition.java   |  25 +-
 .../db/partitions/AtomicBTreePartition.java     |  66 +++++-
 .../db/partitions/FilteredPartition.java        |   2 +-
 .../apache/cassandra/db/rows/AbstractCell.java  |  79 +++++++
 .../org/apache/cassandra/db/rows/BTreeRow.java  |  33 ++-
 .../apache/cassandra/db/rows/BufferCell.java    | 232 -------------------
 src/java/org/apache/cassandra/db/rows/Cell.java | 161 ++++++++++++-
 .../apache/cassandra/db/rows/NativeCell.java    | 151 ++++++++++++
 .../apache/cassandra/db/rows/RowIterator.java   |   5 -
 .../db/rows/UnfilteredRowIterators.java         |  27 ---
 .../apache/cassandra/db/transform/BaseRows.java |   4 +
 .../cassandra/db/transform/Transformation.java  |   6 +
 .../apache/cassandra/db/view/TemporalRow.java   |  11 +-
 .../cassandra/index/internal/IndexEntry.java    |   1 +
 .../index/internal/keys/KeysSearcher.java       |   4 +-
 .../io/sstable/metadata/MetadataCollector.java  |   4 -
 .../cassandra/io/util/MemoryInputStream.java    |   2 +-
 .../apache/cassandra/service/CacheService.java  |   2 -
 .../cassandra/thrift/CassandraServer.java       |  16 +-
 .../cassandra/thrift/ThriftResultsMerger.java   |   2 +-
 .../utils/memory/AbstractAllocator.java         |   1 -
 .../cassandra/utils/memory/EnsureOnHeap.java    | 150 ++++++++++++
 .../cassandra/utils/memory/HeapAllocator.java   |   5 +
 .../apache/cassandra/utils/memory/HeapPool.java |   5 -
 .../cassandra/utils/memory/MemoryUtil.java      |  19 +-
 .../utils/memory/MemtableAllocator.java         |   5 +-
 .../cassandra/utils/memory/MemtablePool.java    |   1 -
 .../cassandra/utils/memory/NativeAllocator.java |  46 +++-
 .../cassandra/utils/memory/NativePool.java      |   6 -
 .../cassandra/utils/memory/SlabAllocator.java   |  15 +-
 .../apache/cassandra/utils/memory/SlabPool.java |   5 -
 test/conf/cassandra.yaml                        |   3 +-
 .../org/apache/cassandra/db/KeyspaceTest.java   |   4 +-
 .../org/apache/cassandra/db/NativeCellTest.java | 171 ++++++++++++++
 .../cassandra/db/RangeTombstoneListTest.java    |   2 +-
 .../apache/cassandra/db/RangeTombstoneTest.java |  20 +-
 .../apache/cassandra/db/ReadMessageTest.java    |   2 +-
 .../org/apache/cassandra/db/RowCacheTest.java   |   2 +-
 .../rows/DigestBackwardCompatibilityTest.java   |   2 -
 .../rows/UnfilteredRowIteratorsMergeTest.java   |   2 +-
 .../io/sstable/CQLSSTableWriterClientTest.java  |   4 -
 .../cassandra/io/sstable/SSTableLoaderTest.java |   2 +-
 .../service/pager/PagingStateTest.java          |   2 +-
 62 files changed, 1265 insertions(+), 547 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index e29a6d3..a9749f2 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -398,6 +398,7 @@ concurrent_materialized_view_writes: 32
 # Options are:
 #   heap_buffers:    on heap nio buffers
 #   offheap_buffers: off heap (direct) nio buffers
+#   offheap_objects: off heap objects
 memtable_allocation_type: heap_buffers
 
 # Total space to use for commit logs on disk.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 2a2719a..b09605f 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1847,8 +1847,7 @@ public class DatabaseDescriptor
                 }
                 return new SlabPool(heapLimit, offHeapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily());
             case offheap_objects:
-                throw new ConfigurationException("offheap_objects are not available in 3.0. They should be re-introduced in a future release, see https://issues.apache.org/jira/browse/CASSANDRA-9472 for details");
-                // return new NativePool(heapLimit, offHeapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily());
+                return new NativePool(heapLimit, offHeapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily());
             default:
                 throw new AssertionError();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/AbstractBufferClusteringPrefix.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractBufferClusteringPrefix.java b/src/java/org/apache/cassandra/db/AbstractBufferClusteringPrefix.java
new file mode 100644
index 0000000..95bc777
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/AbstractBufferClusteringPrefix.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.utils.ObjectSizes;
+
+public abstract class AbstractBufferClusteringPrefix extends AbstractClusteringPrefix
+{
+    public static final ByteBuffer[] EMPTY_VALUES_ARRAY = new ByteBuffer[0];
+    private static final long EMPTY_SIZE = ObjectSizes.measure(Clustering.make(EMPTY_VALUES_ARRAY));
+
+    protected final Kind kind;
+    protected final ByteBuffer[] values;
+
+    protected AbstractBufferClusteringPrefix(Kind kind, ByteBuffer[] values)
+    {
+        this.kind = kind;
+        this.values = values;
+    }
+
+    public Kind kind()
+    {
+        return kind;
+    }
+
+    public ClusteringPrefix clustering()
+    {
+        return this;
+    }
+
+    public int size()
+    {
+        return values.length;
+    }
+
+    public ByteBuffer get(int i)
+    {
+        return values[i];
+    }
+
+    public ByteBuffer[] getRawValues()
+    {
+        return values;
+    }
+
+    public long unsharedHeapSize()
+    {
+        return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(values);
+    }
+
+    public long unsharedHeapSizeExcludingData()
+    {
+        return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(values);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java b/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java
index 2631b46..0b1daf7 100644
--- a/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java
+++ b/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java
@@ -22,48 +22,14 @@ import java.security.MessageDigest;
 import java.util.Objects;
 
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.ObjectSizes;
 
 public abstract class AbstractClusteringPrefix implements ClusteringPrefix
 {
-    protected static final ByteBuffer[] EMPTY_VALUES_ARRAY = new ByteBuffer[0];
-
-    private static final long EMPTY_SIZE = ObjectSizes.measure(new Clustering(EMPTY_VALUES_ARRAY));
-
-    protected final Kind kind;
-    protected final ByteBuffer[] values;
-
-    protected AbstractClusteringPrefix(Kind kind, ByteBuffer[] values)
-    {
-        this.kind = kind;
-        this.values = values;
-    }
-
-    public Kind kind()
-    {
-        return kind;
-    }
-
     public ClusteringPrefix clustering()
     {
         return this;
     }
 
-    public int size()
-    {
-        return values.length;
-    }
-
-    public ByteBuffer get(int i)
-    {
-        return values[i];
-    }
-
-    public ByteBuffer[] getRawValues()
-    {
-        return values;
-    }
-
     public int dataSize()
     {
         int size = 0;
@@ -86,16 +52,6 @@ public abstract class AbstractClusteringPrefix implements ClusteringPrefix
         FBUtilities.updateWithByte(digest, kind().ordinal());
     }
 
-    public long unsharedHeapSize()
-    {
-        return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(values);
-    }
-
-    public long unsharedHeapSizeExcludingData()
-    {
-        return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(values);
-    }
-
     @Override
     public final int hashCode()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/BufferClustering.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BufferClustering.java b/src/java/org/apache/cassandra/db/BufferClustering.java
new file mode 100644
index 0000000..7c6bb20
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/BufferClustering.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+
+/**
+ * The clustering column values for a row.
+ * <p>
+ * A {@code Clustering} is a {@code ClusteringPrefix} that must always be "complete", i.e. have
+ * as many values as there is clustering columns in the table it is part of. It is the clustering
+ * prefix used by rows.
+ * <p>
+ * Note however that while it's size must be equal to the table clustering size, a clustering can have
+ * {@code null} values, and this mostly for thrift backward compatibility (in practice, if a value is null,
+ * all of the following ones will be too because that's what thrift allows, but it's never assumed by the
+ * code so we could start generally allowing nulls for clustering columns if we wanted to).
+ */
+public class BufferClustering extends AbstractBufferClusteringPrefix implements Clustering
+{
+    BufferClustering(ByteBuffer... values)
+    {
+        super(Kind.CLUSTERING, values);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/CBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CBuilder.java b/src/java/org/apache/cassandra/db/CBuilder.java
index 94feb93..73b575f 100644
--- a/src/java/org/apache/cassandra/db/CBuilder.java
+++ b/src/java/org/apache/cassandra/db/CBuilder.java
@@ -162,7 +162,7 @@ public abstract class CBuilder
             built = true;
 
             // Currently, only dense table can leave some clustering column out (see #7990)
-            return size == 0 ? Clustering.EMPTY : new Clustering(values);
+            return size == 0 ? Clustering.EMPTY : Clustering.make(values);
         }
 
         public Slice.Bound buildBound(boolean isStart, boolean isInclusive)
@@ -196,7 +196,7 @@ public abstract class CBuilder
 
             ByteBuffer[] newValues = Arrays.copyOf(values, type.size());
             newValues[size] = value;
-            return new Clustering(newValues);
+            return Clustering.make(newValues);
         }
 
         public Clustering buildWith(List<ByteBuffer> newValues)
@@ -207,7 +207,7 @@ public abstract class CBuilder
             for (ByteBuffer value : newValues)
                 buffers[newSize++] = value;
 
-            return new Clustering(buffers);
+            return Clustering.make(buffers);
         }
 
         public Slice.Bound buildBoundWith(ByteBuffer value, boolean isStart, boolean isInclusive)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/Clustering.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Clustering.java b/src/java/org/apache/cassandra/db/Clustering.java
index a40cc1f..f5ffae4 100644
--- a/src/java/org/apache/cassandra/db/Clustering.java
+++ b/src/java/org/apache/cassandra/db/Clustering.java
@@ -1,53 +1,91 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
 package org.apache.cassandra.db;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.List;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.memory.AbstractAllocator;
 
-/**
- * The clustering column values for a row.
- * <p>
- * A {@code Clustering} is a {@code ClusteringPrefix} that must always be "complete", i.e. have
- * as many values as there is clustering columns in the table it is part of. It is the clustering
- * prefix used by rows.
- * <p>
- * Note however that while it's size must be equal to the table clustering size, a clustering can have
- * {@code null} values, and this mostly for thrift backward compatibility (in practice, if a value is null,
- * all of the following ones will be too because that's what thrift allows, but it's never assumed by the
- * code so we could start generally allowing nulls for clustering columns if we wanted to).
- */
-public class Clustering extends AbstractClusteringPrefix
+import static org.apache.cassandra.db.AbstractBufferClusteringPrefix.EMPTY_VALUES_ARRAY;
+
+public interface Clustering extends ClusteringPrefix
 {
     public static final Serializer serializer = new Serializer();
 
+    public long unsharedHeapSizeExcludingData();
+
+    public default Clustering copy(AbstractAllocator allocator)
+    {
+        // Important for STATIC_CLUSTERING (but must copy empty native clustering types).
+        if (size() == 0)
+            return kind() == Kind.STATIC_CLUSTERING ? this : new BufferClustering(EMPTY_VALUES_ARRAY);
+
+        ByteBuffer[] newValues = new ByteBuffer[size()];
+        for (int i = 0; i < size(); i++)
+        {
+            ByteBuffer val = get(i);
+            newValues[i] = val == null ? null : allocator.clone(val);
+        }
+        return new BufferClustering(newValues);
+    }
+
+    public default String toString(CFMetaData metadata)
+    {
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < size(); i++)
+        {
+            ColumnDefinition c = metadata.clusteringColumns().get(i);
+            sb.append(i == 0 ? "" : ", ").append(c.name).append('=').append(get(i) == null ? "null" : c.type.getString(get(i)));
+        }
+        return sb.toString();
+    }
+
+    public default String toCQLString(CFMetaData metadata)
+    {
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < size(); i++)
+        {
+            ColumnDefinition c = metadata.clusteringColumns().get(i);
+            sb.append(i == 0 ? "" : ", ").append(c.type.getString(get(i)));
+        }
+        return sb.toString();
+    }
+
+    public static Clustering make(ByteBuffer... values)
+    {
+        return new BufferClustering(values);
+    }
+
     /**
      * The special cased clustering used by all static rows. It is a special case in the
      * sense that it's always empty, no matter how many clustering columns the table has.
      */
-    public static final Clustering STATIC_CLUSTERING = new Clustering(EMPTY_VALUES_ARRAY)
+    public static final Clustering STATIC_CLUSTERING = new BufferClustering(EMPTY_VALUES_ARRAY)
     {
         @Override
         public Kind kind()
@@ -69,7 +107,7 @@ public class Clustering extends AbstractClusteringPrefix
     };
 
     /** Empty clustering for tables having no clustering columns. */
-    public static final Clustering EMPTY = new Clustering(EMPTY_VALUES_ARRAY)
+    public static final Clustering EMPTY = new BufferClustering(EMPTY_VALUES_ARRAY)
     {
         @Override
         public String toString(CFMetaData metadata)
@@ -78,50 +116,6 @@ public class Clustering extends AbstractClusteringPrefix
         }
     };
 
-    public Clustering(ByteBuffer... values)
-    {
-        super(Kind.CLUSTERING, values);
-    }
-
-    public Kind kind()
-    {
-        return Kind.CLUSTERING;
-    }
-
-    public Clustering copy(AbstractAllocator allocator)
-    {
-        // Important for STATIC_CLUSTERING (but no point in being wasteful in general).
-        if (size() == 0)
-            return this;
-
-        ByteBuffer[] newValues = new ByteBuffer[size()];
-        for (int i = 0; i < size(); i++)
-            newValues[i] = values[i] == null ? null : allocator.clone(values[i]);
-        return new Clustering(newValues);
-    }
-
-    public String toString(CFMetaData metadata)
-    {
-        StringBuilder sb = new StringBuilder();
-        for (int i = 0; i < size(); i++)
-        {
-            ColumnDefinition c = metadata.clusteringColumns().get(i);
-            sb.append(i == 0 ? "" : ", ").append(c.name).append('=').append(get(i) == null ? "null" : c.type.getString(get(i)));
-        }
-        return sb.toString();
-    }
-
-    public String toCQLString(CFMetaData metadata)
-    {
-        StringBuilder sb = new StringBuilder();
-        for (int i = 0; i < size(); i++)
-        {
-            ColumnDefinition c = metadata.clusteringColumns().get(i);
-            sb.append(i == 0 ? "" : ", ").append(c.type.getString(get(i)));
-        }
-        return sb.toString();
-    }
-
     /**
      * Serializer for Clustering object.
      * <p>
@@ -161,7 +155,7 @@ public class Clustering extends AbstractClusteringPrefix
                 return EMPTY;
 
             ByteBuffer[] values = ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, types.size(), version, types);
-            return new Clustering(values);
+            return new BufferClustering(values);
         }
 
         public Clustering deserialize(ByteBuffer in, int version, List<AbstractType<?>> types)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/ClusteringComparator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ClusteringComparator.java b/src/java/org/apache/cassandra/db/ClusteringComparator.java
index f3411cf..f5f6ae8 100644
--- a/src/java/org/apache/cassandra/db/ClusteringComparator.java
+++ b/src/java/org/apache/cassandra/db/ClusteringComparator.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.db;
 
 import java.nio.ByteBuffer;
-import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
@@ -29,8 +28,6 @@ import com.google.common.collect.ImmutableList;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FastByteOperations;
 
 import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/ClusteringPrefix.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
index 9477651..cacaeb5 100644
--- a/src/java/org/apache/cassandra/db/ClusteringPrefix.java
+++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
@@ -74,7 +74,7 @@ public interface ClusteringPrefix extends IMeasurableMemory, Clusterable
          */
         public final int comparedToClustering;
 
-        private Kind(int comparison, int comparedToClustering)
+        Kind(int comparison, int comparedToClustering)
         {
             this.comparison = comparison;
             this.comparedToClustering = comparedToClustering;
@@ -500,7 +500,7 @@ public interface ClusteringPrefix extends IMeasurableMemory, Clusterable
         {
             assert nextIsRow;
             deserializeAll();
-            Clustering clustering = new Clustering(nextValues);
+            Clustering clustering = Clustering.make(nextValues);
             nextValues = null;
             return clustering;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/LegacyLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java
index 07778b3..4e1eab5 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -103,7 +103,7 @@ public abstract class LegacyLayout
         if (metadata.isSuper())
         {
             assert superColumnName != null;
-            return decodeForSuperColumn(metadata, new Clustering(superColumnName), cellname);
+            return decodeForSuperColumn(metadata, Clustering.make(superColumnName), cellname);
         }
 
         assert superColumnName == null;
@@ -161,7 +161,7 @@ public abstract class LegacyLayout
         {
             // If it's a compact table, it means the column is in fact a "dynamic" one
             if (metadata.isCompactTable())
-                return new LegacyCellName(new Clustering(column), metadata.compactValueColumn(), null);
+                return new LegacyCellName(Clustering.make(column), metadata.compactValueColumn(), null);
 
             if (def == null)
                 throw new UnknownColumnException(metadata, column);
@@ -298,7 +298,7 @@ public abstract class LegacyLayout
                                     ? CompositeType.splitName(value)
                                     : Collections.singletonList(value);
 
-        return new Clustering(components.subList(0, Math.min(csize, components.size())).toArray(new ByteBuffer[csize]));
+        return Clustering.make(components.subList(0, Math.min(csize, components.size())).toArray(new ByteBuffer[csize]));
     }
 
     public static ByteBuffer encodeClustering(CFMetaData metadata, ClusteringPrefix clustering)
@@ -1308,7 +1308,7 @@ public abstract class LegacyLayout
             ByteBuffer[] values = new ByteBuffer[bound.size()];
             for (int i = 0; i < bound.size(); i++)
                 values[i] = bound.get(i);
-            return new Clustering(values);
+            return Clustering.make(values);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 952c045..244c7b6 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
 import org.apache.cassandra.index.transactions.UpdateTransaction;
@@ -53,6 +54,7 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.HeapAllocator;
 import org.apache.cassandra.utils.memory.MemtableAllocator;
 import org.apache.cassandra.utils.memory.MemtablePool;
 
@@ -60,7 +62,7 @@ public class Memtable implements Comparable<Memtable>
 {
     private static final Logger logger = LoggerFactory.getLogger(Memtable.class);
 
-    static final MemtablePool MEMORY_POOL = DatabaseDescriptor.getMemtableAllocatorPool();
+    public static final MemtablePool MEMORY_POOL = DatabaseDescriptor.getMemtableAllocatorPool();
     private static final int ROW_OVERHEAD_HEAP_SIZE = estimateRowOverhead(Integer.parseInt(System.getProperty("cassandra.memtable_row_overhead_computation_step", "100000")));
 
     private final MemtableAllocator allocator;
@@ -526,6 +528,7 @@ public class Memtable implements Comparable<Memtable>
             assert entry.getKey() instanceof DecoratedKey;
             DecoratedKey key = (DecoratedKey)entry.getKey();
             ClusteringIndexFilter filter = dataRange.clusteringIndexFilter(key);
+
             return filter.getUnfilteredRowIterator(columnFilter, entry.getValue());
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/MultiCBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MultiCBuilder.java b/src/java/org/apache/cassandra/db/MultiCBuilder.java
index 8353703..7a4eef0 100644
--- a/src/java/org/apache/cassandra/db/MultiCBuilder.java
+++ b/src/java/org/apache/cassandra/db/MultiCBuilder.java
@@ -239,7 +239,7 @@ public abstract class MultiCBuilder
             if (hasMissingElements)
                 return BTreeSet.empty(comparator);
 
-            return BTreeSet.of(comparator, size == 0 ? Clustering.EMPTY : new Clustering(elements));
+            return BTreeSet.of(comparator, size == 0 ? Clustering.EMPTY : Clustering.make(elements));
         }
 
         public NavigableSet<Slice.Bound> buildBound(boolean isStart, boolean isInclusive)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/NativeClustering.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/NativeClustering.java b/src/java/org/apache/cassandra/db/NativeClustering.java
new file mode 100644
index 0000000..1943b71
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/NativeClustering.java
@@ -0,0 +1,125 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.MemoryUtil;
+import org.apache.cassandra.utils.memory.NativeAllocator;
+
+public class NativeClustering extends AbstractClusteringPrefix implements Clustering
+{
+    private static final long EMPTY_SIZE = ObjectSizes.measure(new NativeClustering());
+
+    private final long peer;
+
+    private NativeClustering() { peer = 0; }
+
+    public NativeClustering(NativeAllocator allocator, OpOrder.Group writeOp, Clustering clustering)
+    {
+        int count = clustering.size();
+        int metadataSize = (count * 2) + 4;
+        int dataSize = clustering.dataSize();
+        int bitmapSize = ((count + 7) >>> 3);
+
+        assert count < 64 << 10;
+        assert dataSize < 64 << 10;
+
+        peer = allocator.allocate(metadataSize + dataSize + bitmapSize, writeOp);
+        long bitmapStart = peer + metadataSize;
+        MemoryUtil.setShort(peer, (short) count);
+        MemoryUtil.setShort(peer + (metadataSize - 2), (short) dataSize); // goes at the end of the other offsets
+
+        MemoryUtil.setByte(bitmapStart, bitmapSize, (byte) 0);
+        long dataStart = peer + metadataSize + bitmapSize;
+        int dataOffset = 0;
+        for (int i = 0 ; i < count ; i++)
+        {
+            MemoryUtil.setShort(peer + 2 + i * 2, (short) dataOffset);
+
+            ByteBuffer value = clustering.get(i);
+            if (value == null)
+            {
+                long boffset = bitmapStart + (i >>> 3);
+                int b = MemoryUtil.getByte(boffset);
+                b |= 1 << (i & 7);
+                MemoryUtil.setByte(boffset, (byte) b);
+                continue;
+            }
+
+            assert value.order() == ByteOrder.BIG_ENDIAN;
+
+            int size = value.remaining();
+            MemoryUtil.setBytes(dataStart + dataOffset, value);
+            dataOffset += size;
+        }
+    }
+
+    public Kind kind()
+    {
+        return Kind.CLUSTERING;
+    }
+
+    public int size()
+    {
+        return MemoryUtil.getShort(peer);
+    }
+
+    public ByteBuffer get(int i)
+    {
+        // offset at which we store the dataOffset
+        int size = size();
+        if (i >= size)
+            throw new IndexOutOfBoundsException();
+
+        int metadataSize = (size * 2) + 4;
+        int bitmapSize = ((size + 7) >>> 3);
+        long bitmapStart = peer + metadataSize;
+        int b = MemoryUtil.getByte(bitmapStart + (i >>> 3));
+        if ((b & (1 << (i & 7))) != 0)
+            return null;
+
+        int startOffset = MemoryUtil.getShort(peer + 2 + i * 2);
+        int endOffset = MemoryUtil.getShort(peer + 4 + i * 2);
+        return MemoryUtil.getByteBuffer(bitmapStart + bitmapSize + startOffset,
+                                        endOffset - startOffset,
+                                        ByteOrder.BIG_ENDIAN);
+    }
+
+    public ByteBuffer[] getRawValues()
+    {
+        ByteBuffer[] values = new ByteBuffer[size()];
+        for (int i = 0 ; i < values.length ; i++)
+            values[i] = get(i);
+        return values;
+    }
+
+    public long unsharedHeapSize()
+    {
+        return EMPTY_SIZE;
+    }
+
+    public long unsharedHeapSizeExcludingData()
+    {
+        return EMPTY_SIZE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/NativeDecoratedKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/NativeDecoratedKey.java b/src/java/org/apache/cassandra/db/NativeDecoratedKey.java
index ca874c3..019209e 100644
--- a/src/java/org/apache/cassandra/db/NativeDecoratedKey.java
+++ b/src/java/org/apache/cassandra/db/NativeDecoratedKey.java
@@ -18,9 +18,11 @@
 package org.apache.cassandra.db;
 
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.HeapAllocator;
 import org.apache.cassandra.utils.memory.MemoryUtil;
 import org.apache.cassandra.utils.memory.NativeAllocator;
 
@@ -32,6 +34,8 @@ public class NativeDecoratedKey extends DecoratedKey
     {
         super(token);
         assert key != null;
+        assert key.order() == ByteOrder.BIG_ENDIAN;
+
         int size = key.remaining();
         this.peer = allocator.allocate(4 + size, writeOp);
         MemoryUtil.setInt(peer, size);
@@ -40,6 +44,6 @@ public class NativeDecoratedKey extends DecoratedKey
 
     public ByteBuffer getKey()
     {
-        return MemoryUtil.getByteBuffer(peer + 4, MemoryUtil.getInt(peer));
+        return MemoryUtil.getByteBuffer(peer + 4, MemoryUtil.getInt(peer), ByteOrder.BIG_ENDIAN);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
index 8ace988..0ceec90 100644
--- a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
+++ b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
@@ -18,7 +18,6 @@
 package org.apache.cassandra.db;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -27,7 +26,6 @@ import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.marshal.SetType;
-import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.partitions.*;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/Serializers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Serializers.java b/src/java/org/apache/cassandra/db/Serializers.java
index 9b29d89..17f1de0 100644
--- a/src/java/org/apache/cassandra/db/Serializers.java
+++ b/src/java/org/apache/cassandra/db/Serializers.java
@@ -29,8 +29,6 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
-import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
-
 /**
  * Holds references on serializers that depend on the table definition.
  */
@@ -70,7 +68,7 @@ public class Serializers
                         return Clustering.EMPTY;
 
                     if (!metadata.isCompound())
-                        return new Clustering(bb);
+                        return Clustering.make(bb);
 
                     List<ByteBuffer> components = CompositeType.splitName(bb);
                     byte eoc = CompositeType.lastEOC(bb);
@@ -81,7 +79,7 @@ public class Serializers
                         if (components.size() > clusteringSize)
                             components = components.subList(0, clusteringSize);
 
-                        return new Clustering(components.toArray(new ByteBuffer[clusteringSize]));
+                        return Clustering.make(components.toArray(new ByteBuffer[clusteringSize]));
                     }
                     else
                     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 9ad9ba3..680b4b5 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -460,8 +460,7 @@ public class SinglePartitionReadCommand extends ReadCommand
     {
         Tracing.trace("Executing single-partition query on {}", cfs.name);
 
-        boolean copyOnHeap = Memtable.MEMORY_POOL.needToCopyOnHeap();
-        return queryMemtableAndDiskInternal(cfs, copyOnHeap);
+        return queryMemtableAndDiskInternal(cfs);
     }
 
     @Override
@@ -470,7 +469,7 @@ public class SinglePartitionReadCommand extends ReadCommand
         return oldestUnrepairedTombstone;
     }
 
-    private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, boolean copyOnHeap)
+    private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs)
     {
         /*
          * We have 2 main strategies:
@@ -484,7 +483,7 @@ public class SinglePartitionReadCommand extends ReadCommand
          *      of shards so have the same problem).
          */
         if (clusteringIndexFilter() instanceof ClusteringIndexNamesFilter && queryNeitherCountersNorCollections())
-            return queryMemtableAndSSTablesInTimestampOrder(cfs, copyOnHeap, (ClusteringIndexNamesFilter)clusteringIndexFilter());
+            return queryMemtableAndSSTablesInTimestampOrder(cfs, (ClusteringIndexNamesFilter)clusteringIndexFilter());
 
         Tracing.trace("Acquiring sstable references");
         ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
@@ -502,10 +501,8 @@ public class SinglePartitionReadCommand extends ReadCommand
 
                 @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
                 UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition);
-                @SuppressWarnings("resource") // same as above
-                UnfilteredRowIterator maybeCopied = copyOnHeap ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance) : iter;
                 oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, partition.stats().minLocalDeletionTime);
-                iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(maybeCopied, nowInSec()) : maybeCopied);
+                iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter);
             }
             /*
              * We can't eliminate full sstables based on the timestamp of what we've already read like
@@ -649,7 +646,7 @@ public class SinglePartitionReadCommand extends ReadCommand
      * no collection or counters are included).
      * This method assumes the filter is a {@code ClusteringIndexNamesFilter}.
      */
-    private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFamilyStore cfs, boolean copyOnHeap, ClusteringIndexNamesFilter filter)
+    private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFamilyStore cfs, ClusteringIndexNamesFilter filter)
     {
         Tracing.trace("Acquiring sstable references");
         ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));
@@ -668,10 +665,7 @@ public class SinglePartitionReadCommand extends ReadCommand
                 if (iter.isEmpty())
                     continue;
 
-                UnfilteredRowIterator clonedFilter = copyOnHeap
-                                                   ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance)
-                                                   : iter;
-                result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(clonedFilter, nowInSec()) : clonedFilter, result, filter, false);
+                result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter, result, filter, false);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/Slice.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Slice.java b/src/java/org/apache/cassandra/db/Slice.java
index 7fde45e..8611470 100644
--- a/src/java/org/apache/cassandra/db/Slice.java
+++ b/src/java/org/apache/cassandra/db/Slice.java
@@ -343,7 +343,7 @@ public class Slice
      * <p>
      * This can be either a start or an end bound, and this can be either inclusive or exclusive.
      */
-    public static class Bound extends AbstractClusteringPrefix
+    public static class Bound extends AbstractBufferClusteringPrefix
     {
         public static final Serializer serializer = new Serializer();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/filter/RowFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java
index c234fc9..79bdbd7 100644
--- a/src/java/org/apache/cassandra/db/filter/RowFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@ -194,11 +194,11 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression>
         if (metadata.isCompound())
         {
             List<ByteBuffer> values = CompositeType.splitName(name);
-            return new Clustering(values.toArray(new ByteBuffer[metadata.comparator.size()]));
+            return Clustering.make(values.toArray(new ByteBuffer[metadata.comparator.size()]));
         }
         else
         {
-            return new Clustering(name);
+            return Clustering.make(name);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
index e44124f..c276c57 100644
--- a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java
@@ -183,7 +183,7 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
         if (slices.size() == 0)
         {
             DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion();
-            return UnfilteredRowIterators.noRowsIterator(metadata, partitionKey, staticRow, partitionDeletion, reversed);
+            return UnfilteredRowIterators.noRowsIterator(metadata, partitionKey(), staticRow, partitionDeletion, reversed);
         }
 
         return slices.size() == 1
@@ -202,9 +202,9 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
     }
 
     private RowAndDeletionMergeIterator merge(Iterator<Row> rowIter, Iterator<RangeTombstone> deleteIter,
-                                                     ColumnFilter selection, boolean reversed, Holder current, Row staticRow)
+                                              ColumnFilter selection, boolean reversed, Holder current, Row staticRow)
     {
-        return new RowAndDeletionMergeIterator(metadata, partitionKey, current.deletionInfo.getPartitionDeletion(),
+        return new RowAndDeletionMergeIterator(metadata, partitionKey(), current.deletionInfo.getPartitionDeletion(),
                                                selection, staticRow, reversed, current.stats,
                                                rowIter, deleteIter,
                                                canHaveShadowedData());
@@ -215,22 +215,10 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
         final Holder current;
         final ColumnFilter selection;
 
-        private AbstractIterator(ColumnFilter selection, boolean isReversed)
-        {
-            this(AbstractBTreePartition.this.holder(), selection, isReversed);
-        }
-
-        private AbstractIterator(Holder current, ColumnFilter selection, boolean isReversed)
-        {
-            this(current,
-                 AbstractBTreePartition.this.staticRow(current, selection, false),
-                 selection, isReversed);
-        }
-
         private AbstractIterator(Holder current, Row staticRow, ColumnFilter selection, boolean isReversed)
         {
             super(AbstractBTreePartition.this.metadata,
-                  AbstractBTreePartition.this.partitionKey,
+                  AbstractBTreePartition.this.partitionKey(),
                   current.deletionInfo.getPartitionDeletion(),
                   selection.fetchedColumns(), // non-selected columns will be filtered in subclasses by RowAndDeletionMergeIterator
                                               // it would also be more precise to return the intersection of the selection and current.columns,
@@ -318,10 +306,7 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row>
         BTree.Builder<Row> builder = BTree.builder(metadata.comparator, initialRowCapacity);
         builder.auto(false);
         while (rows.hasNext())
-        {
-            Row row = rows.next();
-            builder.add(row);
-        }
+            builder.add(rows.next());
 
         if (reversed)
             builder.reverse();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
index 2be882e..c7113d4 100644
--- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.partitions;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -26,12 +27,12 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.EncodingStats;
-import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.db.rows.Rows;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.index.transactions.UpdateTransaction;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.SearchIterator;
 import org.apache.cassandra.utils.btree.BTree;
 import org.apache.cassandra.utils.btree.UpdateFunction;
 import org.apache.cassandra.utils.concurrent.Locks;
@@ -181,7 +182,66 @@ public class AtomicBTreePartition extends AbstractBTreePartition
             if (monitorOwned)
                 Locks.monitorExitUnsafe(this);
         }
+    }
+
+    @Override
+    public DeletionInfo deletionInfo()
+    {
+        return allocator.ensureOnHeap().applyToDeletionInfo(super.deletionInfo());
+    }
 
+    @Override
+    public Row staticRow()
+    {
+        return allocator.ensureOnHeap().applyToStatic(super.staticRow());
+    }
+
+    @Override
+    public DecoratedKey partitionKey()
+    {
+        return allocator.ensureOnHeap().applyToPartitionKey(super.partitionKey());
+    }
+
+    @Override
+    public Row getRow(Clustering clustering)
+    {
+        return allocator.ensureOnHeap().applyToRow(super.getRow(clustering));
+    }
+
+    @Override
+    public Row lastRow()
+    {
+        return allocator.ensureOnHeap().applyToRow(super.lastRow());
+    }
+
+    @Override
+    public SearchIterator<Clustering, Row> searchIterator(ColumnFilter columns, boolean reversed)
+    {
+        return allocator.ensureOnHeap().applyToPartition(super.searchIterator(columns, reversed));
+    }
+
+    @Override
+    public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, Slices slices, boolean reversed)
+    {
+        return allocator.ensureOnHeap().applyToPartition(super.unfilteredIterator(selection, slices, reversed));
+    }
+
+    @Override
+    public UnfilteredRowIterator unfilteredIterator()
+    {
+        return allocator.ensureOnHeap().applyToPartition(super.unfilteredIterator());
+    }
+
+    @Override
+    public UnfilteredRowIterator unfilteredIterator(Holder current, ColumnFilter selection, Slices slices, boolean reversed)
+    {
+        return allocator.ensureOnHeap().applyToPartition(super.unfilteredIterator(current, selection, slices, reversed));
+    }
+
+    @Override
+    public Iterator<Row> iterator()
+    {
+        return allocator.ensureOnHeap().applyToPartition(super.iterator());
     }
 
     public boolean usePessimisticLocking()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java b/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
index 26a947b..70a4678 100644
--- a/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
@@ -65,7 +65,7 @@ public class FilteredPartition extends ImmutableBTreePartition
 
             public DecoratedKey partitionKey()
             {
-                return partitionKey;
+                return FilteredPartition.this.partitionKey();
             }
 
             public Row staticRow()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/rows/AbstractCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractCell.java b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
index 882c0e0..1ea9713 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractCell.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java
@@ -17,15 +17,19 @@
  */
 package org.apache.cassandra.db.rows;
 
+import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.util.Objects;
 
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.DeletionPurger;
+import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
 
 /**
  * Base abstract class for {@code Cell} implementations.
@@ -40,6 +44,81 @@ public abstract class AbstractCell extends Cell
         super(column);
     }
 
+    public boolean isCounterCell()
+    {
+        return !isTombstone() && column.cellValueType().isCounter();
+    }
+
+    public boolean isLive(int nowInSec)
+    {
+        return localDeletionTime() == NO_DELETION_TIME || (ttl() != NO_TTL && nowInSec < localDeletionTime());
+    }
+
+    public boolean isTombstone()
+    {
+        return localDeletionTime() != NO_DELETION_TIME && ttl() == NO_TTL;
+    }
+
+    public boolean isExpiring()
+    {
+        return ttl() != NO_TTL;
+    }
+
+    public Cell markCounterLocalToBeCleared()
+    {
+        if (!isCounterCell())
+            return this;
+
+        ByteBuffer value = value();
+        ByteBuffer marked = CounterContext.instance().markLocalToBeCleared(value);
+        return marked == value ? this : new BufferCell(column, timestamp(), ttl(), localDeletionTime(), marked, path());
+    }
+
+    public Cell purge(DeletionPurger purger, int nowInSec)
+    {
+        if (!isLive(nowInSec))
+        {
+            if (purger.shouldPurge(timestamp(), localDeletionTime()))
+                return null;
+
+            // We slightly hijack purging to convert expired but not purgeable columns to tombstones. The reason we do that is
+            // that once a column has expired it is equivalent to a tombstone but actually using a tombstone is more compact since
+            // we don't keep the column value. The reason we do it here is that 1) it's somewhat related to dealing with tombstones
+            // so hopefully not too surprising and 2) we want to this and purging at the same places, so it's simpler/more efficient
+            // to do both here.
+            if (isExpiring())
+            {
+                // Note that as long as the expiring column and the tombstone put together live longer than GC grace seconds,
+                // we'll fulfil our responsibility to repair. See discussion at
+                // http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/repair-compaction-and-tombstone-rows-td7583481.html
+                return BufferCell.tombstone(column, timestamp(), localDeletionTime() - ttl());
+            }
+        }
+        return this;
+    }
+
+    public Cell copy(AbstractAllocator allocator)
+    {
+        CellPath path = path();
+        return new BufferCell(column, timestamp(), ttl(), localDeletionTime(), allocator.clone(value()), path == null ? null : path.copy(allocator));
+    }
+
+    // note: while the cell returned may be different, the value is the same, so if the value is offheap it must be referenced inside a guarded context (or copied)
+    public Cell updateAllTimestamp(long newTimestamp)
+    {
+        return new BufferCell(column, isTombstone() ? newTimestamp - 1 : newTimestamp, ttl(), localDeletionTime(), value(), path());
+    }
+
+    public int dataSize()
+    {
+        CellPath path = path();
+        return TypeSizes.sizeof(timestamp())
+               + TypeSizes.sizeof(ttl())
+               + TypeSizes.sizeof(localDeletionTime())
+               + value().remaining()
+               + (path == null ? 0 : path.dataSize());
+    }
+
     public void digest(MessageDigest digest)
     {
         digest.update(value().duplicate());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/rows/BTreeRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index a0912ae..47cfd58 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -62,7 +62,11 @@ public class BTreeRow extends AbstractRow
     // no expiring cells, this will be Integer.MAX_VALUE;
     private final int minLocalDeletionTime;
 
-    private BTreeRow(Clustering clustering, LivenessInfo primaryKeyLivenessInfo, Deletion deletion, Object[] btree, int minLocalDeletionTime)
+    private BTreeRow(Clustering clustering,
+                     LivenessInfo primaryKeyLivenessInfo,
+                     Deletion deletion,
+                     Object[] btree,
+                     int minLocalDeletionTime)
     {
         assert !deletion.isShadowedBy(primaryKeyLivenessInfo);
         this.clustering = clustering;
@@ -78,7 +82,10 @@ public class BTreeRow extends AbstractRow
     }
 
     // Note that it's often easier/safer to use the sortedBuilder/unsortedBuilder or one of the static creation method below. Only directly useful in a small amount of cases.
-    public static BTreeRow create(Clustering clustering, LivenessInfo primaryKeyLivenessInfo, Deletion deletion, Object[] btree)
+    public static BTreeRow create(Clustering clustering,
+                                  LivenessInfo primaryKeyLivenessInfo,
+                                  Deletion deletion,
+                                  Object[] btree)
     {
         int minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion.time()));
         if (minDeletionTime != Integer.MIN_VALUE)
@@ -87,6 +94,15 @@ public class BTreeRow extends AbstractRow
                 minDeletionTime = Math.min(minDeletionTime, minDeletionTime(cd));
         }
 
+        return create(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
+    }
+
+    public static BTreeRow create(Clustering clustering,
+                                  LivenessInfo primaryKeyLivenessInfo,
+                                  Deletion deletion,
+                                  Object[] btree,
+                                  int minDeletionTime)
+    {
         return new BTreeRow(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
     }
 
@@ -113,7 +129,11 @@ public class BTreeRow extends AbstractRow
     public static BTreeRow noCellLiveRow(Clustering clustering, LivenessInfo primaryKeyLivenessInfo)
     {
         assert !primaryKeyLivenessInfo.isEmpty();
-        return new BTreeRow(clustering, primaryKeyLivenessInfo, Deletion.LIVE, BTree.empty(), minDeletionTime(primaryKeyLivenessInfo));
+        return new BTreeRow(clustering,
+                            primaryKeyLivenessInfo,
+                            Deletion.LIVE,
+                            BTree.empty(),
+                            minDeletionTime(primaryKeyLivenessInfo));
     }
 
     private static int minDeletionTime(Cell cell)
@@ -368,7 +388,7 @@ public class BTreeRow extends AbstractRow
             return null;
 
         int minDeletionTime = minDeletionTime(transformed, info, deletion.time());
-        return new BTreeRow(clustering, info, deletion, transformed, minDeletionTime);
+        return BTreeRow.create(clustering, info, deletion, transformed, minDeletionTime);
     }
 
     public int dataSize()
@@ -594,7 +614,7 @@ public class BTreeRow extends AbstractRow
                 return new ComplexColumnData(column, btree, deletion);
             }
 
-        };
+        }
         protected Clustering clustering;
         protected LivenessInfo primaryKeyLivenessInfo = LivenessInfo.EMPTY;
         protected Deletion deletion = Deletion.LIVE;
@@ -680,10 +700,9 @@ public class BTreeRow extends AbstractRow
                 deletion = Deletion.LIVE;
 
             int minDeletionTime = minDeletionTime(btree, primaryKeyLivenessInfo, deletion.time());
-            Row row = new BTreeRow(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
+            Row row = BTreeRow.create(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
             reset();
             return row;
         }
-
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/rows/BufferCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BufferCell.java b/src/java/org/apache/cassandra/db/rows/BufferCell.java
index 4176ba6..cac63ac 100644
--- a/src/java/org/apache/cassandra/db/rows/BufferCell.java
+++ b/src/java/org/apache/cassandra/db/rows/BufferCell.java
@@ -17,16 +17,11 @@
  */
 package org.apache.cassandra.db.rows;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.marshal.ByteType;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.FBUtilities;
@@ -88,26 +83,6 @@ public class BufferCell extends AbstractCell
         return new BufferCell(column, timestamp, NO_TTL, nowInSec, ByteBufferUtil.EMPTY_BYTE_BUFFER, path);
     }
 
-    public boolean isCounterCell()
-    {
-        return !isTombstone() && column.cellValueType().isCounter();
-    }
-
-    public boolean isLive(int nowInSec)
-    {
-        return localDeletionTime == NO_DELETION_TIME || (ttl != NO_TTL && nowInSec < localDeletionTime);
-    }
-
-    public boolean isTombstone()
-    {
-        return localDeletionTime != NO_DELETION_TIME && ttl == NO_TTL;
-    }
-
-    public boolean isExpiring()
-    {
-        return ttl != NO_TTL;
-    }
-
     public long timestamp()
     {
         return timestamp;
@@ -146,216 +121,9 @@ public class BufferCell extends AbstractCell
         return new BufferCell(column, timestamp, ttl, localDeletionTime, allocator.clone(value), path == null ? null : path.copy(allocator));
     }
 
-    public Cell markCounterLocalToBeCleared()
-    {
-        if (!isCounterCell())
-            return this;
-
-        ByteBuffer marked = CounterContext.instance().markLocalToBeCleared(value());
-        return marked == value() ? this : new BufferCell(column, timestamp, ttl, localDeletionTime, marked, path);
-    }
-
-    public Cell purge(DeletionPurger purger, int nowInSec)
-    {
-        if (!isLive(nowInSec))
-        {
-            if (purger.shouldPurge(timestamp, localDeletionTime))
-                return null;
-
-            // We slightly hijack purging to convert expired but not purgeable columns to tombstones. The reason we do that is
-            // that once a column has expired it is equivalent to a tombstone but actually using a tombstone is more compact since
-            // we don't keep the column value. The reason we do it here is that 1) it's somewhat related to dealing with tombstones
-            // so hopefully not too surprising and 2) we want to this and purging at the same places, so it's simpler/more efficient
-            // to do both here.
-            if (isExpiring())
-            {
-                // Note that as long as the expiring column and the tombstone put together live longer than GC grace seconds,
-                // we'll fulfil our responsibility to repair. See discussion at
-                // http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/repair-compaction-and-tombstone-rows-td7583481.html
-                return BufferCell.tombstone(column, timestamp, localDeletionTime - ttl);
-            }
-        }
-        return this;
-    }
-
-    public Cell updateAllTimestamp(long newTimestamp)
-    {
-        return new BufferCell(column, isTombstone() ? newTimestamp - 1 : newTimestamp, ttl, localDeletionTime, value, path);
-    }
-
-    public int dataSize()
-    {
-        return TypeSizes.sizeof(timestamp)
-             + TypeSizes.sizeof(ttl)
-             + TypeSizes.sizeof(localDeletionTime)
-             + value.remaining()
-             + (path == null ? 0 : path.dataSize());
-    }
-
     public long unsharedHeapSizeExcludingData()
     {
         return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(value) + (path == null ? 0 : path.unsharedHeapSizeExcludingData());
     }
 
-    /**
-     * The serialization format for cell is:
-     *     [ flags ][ timestamp ][ deletion time ][    ttl    ][ path size ][ path ][ value size ][ value ]
-     *     [   1b  ][ 8b (vint) ][   4b (vint)   ][ 4b (vint) ][ 4b (vint) ][  arb ][  4b (vint) ][  arb  ]
-     *
-     * where not all field are always present (in fact, only the [ flags ] are guaranteed to be present). The fields have the following
-     * meaning:
-     *   - [ flags ] is the cell flags. It is a byte for which each bit represents a flag whose meaning is explained below (*_MASK constants)
-     *   - [ timestamp ] is the cell timestamp. Present unless the cell has the USE_TIMESTAMP_MASK.
-     *   - [ deletion time]: the local deletion time for the cell. Present if either the cell is deleted (IS_DELETED_MASK)
-     *       or it is expiring (IS_EXPIRING_MASK) but doesn't have the USE_ROW_TTL_MASK.
-     *   - [ ttl ]: the ttl for the cell. Present if the row is expiring (IS_EXPIRING_MASK) but doesn't have the
-     *       USE_ROW_TTL_MASK.
-     *   - [ value size ] is the size of the [ value ] field. It's present unless either the cell has the HAS_EMPTY_VALUE_MASK, or the value
-     *       for columns of this type have a fixed length.
-     *   - [ path size ] is the size of the [ path ] field. Present iff this is the cell of a complex column.
-     *   - [ value ]: the cell value, unless it has the HAS_EMPTY_VALUE_MASK.
-     *   - [ path ]: the cell path if the column this is a cell of is complex.
-     */
-    static class Serializer implements Cell.Serializer
-    {
-        private final static int IS_DELETED_MASK             = 0x01; // Whether the cell is a tombstone or not.
-        private final static int IS_EXPIRING_MASK            = 0x02; // Whether the cell is expiring.
-        private final static int HAS_EMPTY_VALUE_MASK        = 0x04; // Wether the cell has an empty value. This will be the case for tombstone in particular.
-        private final static int USE_ROW_TIMESTAMP_MASK      = 0x08; // Wether the cell has the same timestamp than the row this is a cell of.
-        private final static int USE_ROW_TTL_MASK            = 0x10; // Wether the cell has the same ttl than the row this is a cell of.
-
-        public void serialize(Cell cell, DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) throws IOException
-        {
-            assert cell != null;
-            boolean hasValue = cell.value().hasRemaining();
-            boolean isDeleted = cell.isTombstone();
-            boolean isExpiring = cell.isExpiring();
-            boolean useRowTimestamp = !rowLiveness.isEmpty() && cell.timestamp() == rowLiveness.timestamp();
-            boolean useRowTTL = isExpiring && rowLiveness.isExpiring() && cell.ttl() == rowLiveness.ttl() && cell.localDeletionTime() == rowLiveness.localExpirationTime();
-            int flags = 0;
-            if (!hasValue)
-                flags |= HAS_EMPTY_VALUE_MASK;
-
-            if (isDeleted)
-                flags |= IS_DELETED_MASK;
-            else if (isExpiring)
-                flags |= IS_EXPIRING_MASK;
-
-            if (useRowTimestamp)
-                flags |= USE_ROW_TIMESTAMP_MASK;
-            if (useRowTTL)
-                flags |= USE_ROW_TTL_MASK;
-
-            out.writeByte((byte)flags);
-
-            if (!useRowTimestamp)
-                header.writeTimestamp(cell.timestamp(), out);
-
-            if ((isDeleted || isExpiring) && !useRowTTL)
-                header.writeLocalDeletionTime(cell.localDeletionTime(), out);
-            if (isExpiring && !useRowTTL)
-                header.writeTTL(cell.ttl(), out);
-
-            if (cell.column().isComplex())
-                cell.column().cellPathSerializer().serialize(cell.path(), out);
-
-            if (hasValue)
-                header.getType(cell.column()).writeValue(cell.value(), out);
-        }
-
-        public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, ColumnDefinition column, SerializationHeader header, SerializationHelper helper) throws IOException
-        {
-            int flags = in.readUnsignedByte();
-            boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;
-            boolean isDeleted = (flags & IS_DELETED_MASK) != 0;
-            boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0;
-            boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP_MASK) != 0;
-            boolean useRowTTL = (flags & USE_ROW_TTL_MASK) != 0;
-
-            long timestamp = useRowTimestamp ? rowLiveness.timestamp() : header.readTimestamp(in);
-
-            int localDeletionTime = useRowTTL
-                                  ? rowLiveness.localExpirationTime()
-                                  : (isDeleted || isExpiring ? header.readLocalDeletionTime(in) : NO_DELETION_TIME);
-
-            int ttl = useRowTTL ? rowLiveness.ttl() : (isExpiring ? header.readTTL(in) : NO_TTL);
-
-            CellPath path = column.isComplex()
-                          ? column.cellPathSerializer().deserialize(in)
-                          : null;
-
-            boolean isCounter = localDeletionTime == NO_DELETION_TIME && column.type.isCounter();
-
-            ByteBuffer value = ByteBufferUtil.EMPTY_BYTE_BUFFER;
-            if (hasValue)
-            {
-                if (helper.canSkipValue(column) || (path != null && helper.canSkipValue(path)))
-                {
-                    header.getType(column).skipValue(in);
-                }
-                else
-                {
-                    value = header.getType(column).readValue(in);
-                    if (isCounter)
-                        value = helper.maybeClearCounterValue(value);
-                }
-            }
-
-            return new BufferCell(column, timestamp, ttl, localDeletionTime, value, path);
-        }
-
-        public long serializedSize(Cell cell, LivenessInfo rowLiveness, SerializationHeader header)
-        {
-            long size = 1; // flags
-            boolean hasValue = cell.value().hasRemaining();
-            boolean isDeleted = cell.isTombstone();
-            boolean isExpiring = cell.isExpiring();
-            boolean useRowTimestamp = !rowLiveness.isEmpty() && cell.timestamp() == rowLiveness.timestamp();
-            boolean useRowTTL = isExpiring && rowLiveness.isExpiring() && cell.ttl() == rowLiveness.ttl() && cell.localDeletionTime() == rowLiveness.localExpirationTime();
-
-            if (!useRowTimestamp)
-                size += header.timestampSerializedSize(cell.timestamp());
-
-            if ((isDeleted || isExpiring) && !useRowTTL)
-                size += header.localDeletionTimeSerializedSize(cell.localDeletionTime());
-            if (isExpiring && !useRowTTL)
-                size += header.ttlSerializedSize(cell.ttl());
-
-            if (cell.column().isComplex())
-                size += cell.column().cellPathSerializer().serializedSize(cell.path());
-
-            if (hasValue)
-                size += header.getType(cell.column()).writtenLength(cell.value());
-
-            return size;
-        }
-
-        // Returns if the skipped cell was an actual cell (i.e. it had its presence flag).
-        public boolean skip(DataInputPlus in, ColumnDefinition column, SerializationHeader header) throws IOException
-        {
-            int flags = in.readUnsignedByte();
-            boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;
-            boolean isDeleted = (flags & IS_DELETED_MASK) != 0;
-            boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0;
-            boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP_MASK) != 0;
-            boolean useRowTTL = (flags & USE_ROW_TTL_MASK) != 0;
-
-            if (!useRowTimestamp)
-                header.skipTimestamp(in);
-
-            if (!useRowTTL && (isDeleted || isExpiring))
-                header.skipLocalDeletionTime(in);
-
-            if (!useRowTTL && isExpiring)
-                header.skipTTL(in);
-
-            if (column.isComplex())
-                column.cellPathSerializer().skip(in);
-
-            if (hasValue)
-                header.getType(column).skipValue(in);
-
-            return true;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/rows/Cell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java b/src/java/org/apache/cassandra/db/rows/Cell.java
index 73d9e44..ad1c39a 100644
--- a/src/java/org/apache/cassandra/db/rows/Cell.java
+++ b/src/java/org/apache/cassandra/db/rows/Cell.java
@@ -25,6 +25,7 @@ import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.memory.AbstractAllocator;
 
 /**
@@ -141,15 +142,165 @@ public abstract class Cell extends ColumnData
     // Overrides super type to provide a more precise return type.
     public abstract Cell purge(DeletionPurger purger, int nowInSec);
 
-    public interface Serializer
+    /**
+     * The serialization format for cell is:
+     *     [ flags ][ timestamp ][ deletion time ][    ttl    ][ path size ][ path ][ value size ][ value ]
+     *     [   1b  ][ 8b (vint) ][   4b (vint)   ][ 4b (vint) ][ 4b (vint) ][  arb ][  4b (vint) ][  arb  ]
+     *
+     * where not all field are always present (in fact, only the [ flags ] are guaranteed to be present). The fields have the following
+     * meaning:
+     *   - [ flags ] is the cell flags. It is a byte for which each bit represents a flag whose meaning is explained below (*_MASK constants)
+     *   - [ timestamp ] is the cell timestamp. Present unless the cell has the USE_TIMESTAMP_MASK.
+     *   - [ deletion time]: the local deletion time for the cell. Present if either the cell is deleted (IS_DELETED_MASK)
+     *       or it is expiring (IS_EXPIRING_MASK) but doesn't have the USE_ROW_TTL_MASK.
+     *   - [ ttl ]: the ttl for the cell. Present if the row is expiring (IS_EXPIRING_MASK) but doesn't have the
+     *       USE_ROW_TTL_MASK.
+     *   - [ value size ] is the size of the [ value ] field. It's present unless either the cell has the HAS_EMPTY_VALUE_MASK, or the value
+     *       for columns of this type have a fixed length.
+     *   - [ path size ] is the size of the [ path ] field. Present iff this is the cell of a complex column.
+     *   - [ value ]: the cell value, unless it has the HAS_EMPTY_VALUE_MASK.
+     *   - [ path ]: the cell path if the column this is a cell of is complex.
+     */
+    static class Serializer
     {
-        public void serialize(Cell cell, DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) throws IOException;
+        private final static int IS_DELETED_MASK             = 0x01; // Whether the cell is a tombstone or not.
+        private final static int IS_EXPIRING_MASK            = 0x02; // Whether the cell is expiring.
+        private final static int HAS_EMPTY_VALUE_MASK        = 0x04; // Wether the cell has an empty value. This will be the case for tombstone in particular.
+        private final static int USE_ROW_TIMESTAMP_MASK      = 0x08; // Wether the cell has the same timestamp than the row this is a cell of.
+        private final static int USE_ROW_TTL_MASK            = 0x10; // Wether the cell has the same ttl than the row this is a cell of.
+
+        public void serialize(Cell cell, DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) throws IOException
+        {
+            assert cell != null;
+            boolean hasValue = cell.value().hasRemaining();
+            boolean isDeleted = cell.isTombstone();
+            boolean isExpiring = cell.isExpiring();
+            boolean useRowTimestamp = !rowLiveness.isEmpty() && cell.timestamp() == rowLiveness.timestamp();
+            boolean useRowTTL = isExpiring && rowLiveness.isExpiring() && cell.ttl() == rowLiveness.ttl() && cell.localDeletionTime() == rowLiveness.localExpirationTime();
+            int flags = 0;
+            if (!hasValue)
+                flags |= HAS_EMPTY_VALUE_MASK;
+
+            if (isDeleted)
+                flags |= IS_DELETED_MASK;
+            else if (isExpiring)
+                flags |= IS_EXPIRING_MASK;
+
+            if (useRowTimestamp)
+                flags |= USE_ROW_TIMESTAMP_MASK;
+            if (useRowTTL)
+                flags |= USE_ROW_TTL_MASK;
+
+            out.writeByte((byte)flags);
+
+            if (!useRowTimestamp)
+                header.writeTimestamp(cell.timestamp(), out);
+
+            if ((isDeleted || isExpiring) && !useRowTTL)
+                header.writeLocalDeletionTime(cell.localDeletionTime(), out);
+            if (isExpiring && !useRowTTL)
+                header.writeTTL(cell.ttl(), out);
+
+            if (cell.column().isComplex())
+                cell.column().cellPathSerializer().serialize(cell.path(), out);
+
+            if (hasValue)
+                header.getType(cell.column()).writeValue(cell.value(), out);
+        }
+
+        public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, ColumnDefinition column, SerializationHeader header, SerializationHelper helper) throws IOException
+        {
+            int flags = in.readUnsignedByte();
+            boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;
+            boolean isDeleted = (flags & IS_DELETED_MASK) != 0;
+            boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0;
+            boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP_MASK) != 0;
+            boolean useRowTTL = (flags & USE_ROW_TTL_MASK) != 0;
+
+            long timestamp = useRowTimestamp ? rowLiveness.timestamp() : header.readTimestamp(in);
+
+            int localDeletionTime = useRowTTL
+                                    ? rowLiveness.localExpirationTime()
+                                    : (isDeleted || isExpiring ? header.readLocalDeletionTime(in) : NO_DELETION_TIME);
+
+            int ttl = useRowTTL ? rowLiveness.ttl() : (isExpiring ? header.readTTL(in) : NO_TTL);
 
-        public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, ColumnDefinition column, SerializationHeader header, SerializationHelper helper) throws IOException;
+            CellPath path = column.isComplex()
+                            ? column.cellPathSerializer().deserialize(in)
+                            : null;
 
-        public long serializedSize(Cell cell, LivenessInfo rowLiveness, SerializationHeader header);
+            boolean isCounter = localDeletionTime == NO_DELETION_TIME && column.type.isCounter();
+
+            ByteBuffer value = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+            if (hasValue)
+            {
+                if (helper.canSkipValue(column) || (path != null && helper.canSkipValue(path)))
+                {
+                    header.getType(column).skipValue(in);
+                }
+                else
+                {
+                    value = header.getType(column).readValue(in);
+                    if (isCounter)
+                        value = helper.maybeClearCounterValue(value);
+                }
+            }
+
+            return new BufferCell(column, timestamp, ttl, localDeletionTime, value, path);
+        }
+
+        public long serializedSize(Cell cell, LivenessInfo rowLiveness, SerializationHeader header)
+        {
+            long size = 1; // flags
+            boolean hasValue = cell.value().hasRemaining();
+            boolean isDeleted = cell.isTombstone();
+            boolean isExpiring = cell.isExpiring();
+            boolean useRowTimestamp = !rowLiveness.isEmpty() && cell.timestamp() == rowLiveness.timestamp();
+            boolean useRowTTL = isExpiring && rowLiveness.isExpiring() && cell.ttl() == rowLiveness.ttl() && cell.localDeletionTime() == rowLiveness.localExpirationTime();
+
+            if (!useRowTimestamp)
+                size += header.timestampSerializedSize(cell.timestamp());
+
+            if ((isDeleted || isExpiring) && !useRowTTL)
+                size += header.localDeletionTimeSerializedSize(cell.localDeletionTime());
+            if (isExpiring && !useRowTTL)
+                size += header.ttlSerializedSize(cell.ttl());
+
+            if (cell.column().isComplex())
+                size += cell.column().cellPathSerializer().serializedSize(cell.path());
+
+            if (hasValue)
+                size += header.getType(cell.column()).writtenLength(cell.value());
+
+            return size;
+        }
 
         // Returns if the skipped cell was an actual cell (i.e. it had its presence flag).
-        public boolean skip(DataInputPlus in, ColumnDefinition column, SerializationHeader header) throws IOException;
+        public boolean skip(DataInputPlus in, ColumnDefinition column, SerializationHeader header) throws IOException
+        {
+            int flags = in.readUnsignedByte();
+            boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;
+            boolean isDeleted = (flags & IS_DELETED_MASK) != 0;
+            boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0;
+            boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP_MASK) != 0;
+            boolean useRowTTL = (flags & USE_ROW_TTL_MASK) != 0;
+
+            if (!useRowTimestamp)
+                header.skipTimestamp(in);
+
+            if (!useRowTTL && (isDeleted || isExpiring))
+                header.skipLocalDeletionTime(in);
+
+            if (!useRowTTL && isExpiring)
+                header.skipTTL(in);
+
+            if (column.isComplex())
+                column.cellPathSerializer().skip(in);
+
+            if (hasValue)
+                header.getType(column).skipValue(in);
+
+            return true;
+        }
     }
 }