You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/04/29 14:07:52 UTC

[7/8] git commit: Push more of memtable data off-heap

Push more of memtable data off-heap

patch by benedict & xedin; reviewed by benedict, iamaleksey & xedin for CASSANDRA-6694


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8541cca7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8541cca7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8541cca7

Branch: refs/heads/trunk
Commit: 8541cca718fc324c2545831fc945247a4aeb3437
Parents: d402cf6
Author: Pavel Yaskevich <xe...@apache.org>
Authored: Mon Apr 14 14:16:08 2014 -0700
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Apr 29 13:58:20 2014 +0200

----------------------------------------------------------------------
 conf/cassandra.yaml                             |   1 +
 .../org/apache/cassandra/cache/RowCacheKey.java |   2 +-
 .../org/apache/cassandra/config/CFMetaData.java |   5 +-
 .../org/apache/cassandra/config/Config.java     |   3 +-
 .../cassandra/config/DatabaseDescriptor.java    |   7 +-
 .../org/apache/cassandra/config/Schema.java     |   2 +-
 .../apache/cassandra/cql/QueryProcessor.java    |  12 +-
 .../apache/cassandra/cql3/ColumnIdentifier.java |   2 +-
 .../apache/cassandra/cql3/UpdateParameters.java |   4 +-
 .../cql3/statements/ModificationStatement.java  |   7 +-
 .../cql3/statements/SelectStatement.java        |   8 +-
 .../org/apache/cassandra/db/AbstractCell.java   | 265 ++++++++
 .../apache/cassandra/db/AbstractNativeCell.java | 647 +++++++++++++++++++
 .../cassandra/db/ArrayBackedSortedColumns.java  |   6 +-
 .../apache/cassandra/db/AtomicBTreeColumns.java | 204 +++---
 .../org/apache/cassandra/db/BufferCell.java     | 103 +++
 .../apache/cassandra/db/BufferCounterCell.java  | 181 ++++++
 .../cassandra/db/BufferCounterUpdateCell.java   |  93 +++
 .../apache/cassandra/db/BufferDecoratedKey.java |  39 ++
 .../apache/cassandra/db/BufferDeletedCell.java  | 123 ++++
 .../apache/cassandra/db/BufferExpiringCell.java | 170 +++++
 .../org/apache/cassandra/db/CFRowAdder.java     |   6 +-
 src/java/org/apache/cassandra/db/Cell.java      | 245 +------
 .../cassandra/db/CollationController.java       |   6 +-
 .../org/apache/cassandra/db/ColumnFamily.java   |  16 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |  29 +-
 .../apache/cassandra/db/ColumnSerializer.java   |  10 +-
 .../org/apache/cassandra/db/CounterCell.java    | 217 +------
 .../apache/cassandra/db/CounterMutation.java    |   8 +-
 .../apache/cassandra/db/CounterUpdateCell.java  |  65 +-
 src/java/org/apache/cassandra/db/DataRange.java |   2 +-
 .../org/apache/cassandra/db/DecoratedKey.java   |  35 +-
 .../org/apache/cassandra/db/DefsTables.java     |   4 +-
 .../org/apache/cassandra/db/DeletedCell.java    | 100 +--
 .../org/apache/cassandra/db/ExpiringCell.java   | 159 +----
 .../cassandra/db/HintedHandOffManager.java      |   4 +-
 src/java/org/apache/cassandra/db/Keyspace.java  |   6 +-
 src/java/org/apache/cassandra/db/Memtable.java  |  47 +-
 src/java/org/apache/cassandra/db/Mutation.java  |   2 +-
 .../org/apache/cassandra/db/NativeCell.java     |  88 +++
 .../apache/cassandra/db/NativeCounterCell.java  | 190 ++++++
 .../apache/cassandra/db/NativeDecoratedKey.java |  45 ++
 .../apache/cassandra/db/NativeDeletedCell.java  | 125 ++++
 .../apache/cassandra/db/NativeExpiringCell.java | 173 +++++
 src/java/org/apache/cassandra/db/Row.java       |   4 +-
 .../apache/cassandra/db/RowIteratorFactory.java |   4 +-
 .../org/apache/cassandra/db/RowPosition.java    |  24 +-
 .../org/apache/cassandra/db/SystemKeyspace.java |   6 +-
 .../compaction/AbstractCompactionStrategy.java  |   2 +-
 .../db/compaction/CompactionController.java     |   2 +-
 .../db/compaction/CompactionManager.java        |  12 +-
 .../db/compaction/LazilyCompactedRow.java       |   4 +-
 .../db/compaction/LeveledManifest.java          |  12 +-
 .../cassandra/db/compaction/Scrubber.java       |   6 +-
 .../db/composites/AbstractCellNameType.java     |  17 +-
 .../db/composites/AbstractComposite.java        |   1 -
 .../db/composites/BoundedComposite.java         |  12 +-
 .../cassandra/db/composites/CellName.java       |   6 +-
 .../cassandra/db/composites/CellNameType.java   |   3 +-
 .../cassandra/db/composites/CellNames.java      |  16 +
 .../cassandra/db/composites/Composite.java      |   5 +-
 .../cassandra/db/composites/Composites.java     |  10 +-
 .../db/composites/CompoundComposite.java        |  12 +-
 .../db/composites/CompoundDenseCellName.java    |   5 +-
 .../composites/CompoundDenseCellNameType.java   |   3 +-
 .../db/composites/CompoundSparseCellName.java   |  15 +-
 .../composites/CompoundSparseCellNameType.java  |  15 +-
 .../db/composites/SimpleComposite.java          |  10 +-
 .../db/composites/SimpleDenseCellName.java      |   5 +-
 .../db/composites/SimpleDenseCellNameType.java  |   3 +-
 .../db/composites/SimpleSparseCellName.java     |  11 +-
 .../db/composites/SimpleSparseCellNameType.java |   5 +-
 .../SimpleSparseInternedCellName.java           |  11 +-
 .../apache/cassandra/db/filter/ColumnSlice.java |  14 +-
 .../cassandra/db/filter/ExtendedFilter.java     |   6 +-
 .../AbstractSimplePerColumnSecondaryIndex.java  |  23 +-
 .../cassandra/db/index/SecondaryIndex.java      |  10 +-
 .../db/index/SecondaryIndexManager.java         |  16 +-
 .../CompositesIndexOnClusteringKey.java         |   2 +-
 .../CompositesIndexOnCollectionKey.java         |   2 +-
 .../CompositesIndexOnCollectionValue.java       |   2 +-
 .../composites/CompositesIndexOnRegular.java    |   2 +-
 .../db/index/composites/CompositesSearcher.java |   8 +-
 .../cassandra/db/index/keys/KeysSearcher.java   |  12 +-
 .../db/marshal/LocalByPartionerType.java        |   2 +-
 .../apache/cassandra/dht/AbstractBounds.java    |   3 +-
 .../dht/AbstractByteOrderedPartitioner.java     |   3 +-
 .../apache/cassandra/dht/LocalPartitioner.java  |   3 +-
 .../cassandra/dht/Murmur3Partitioner.java       |   3 +-
 .../dht/OrderPreservingPartitioner.java         |   3 +-
 .../apache/cassandra/dht/RandomPartitioner.java |   3 +-
 src/java/org/apache/cassandra/dht/Token.java    |   7 +-
 .../hadoop/ColumnFamilyRecordReader.java        |   5 +-
 .../apache/cassandra/hadoop/pig/CqlStorage.java |   3 +-
 .../io/sstable/AbstractSSTableSimpleWriter.java |  10 +-
 .../cassandra/io/sstable/CQLSSTableWriter.java  |   2 +-
 .../io/sstable/IndexSummaryBuilder.java         |   9 +-
 .../apache/cassandra/io/sstable/SSTable.java    |   5 +-
 .../cassandra/io/sstable/SSTableReader.java     |  18 +-
 .../cassandra/io/sstable/SSTableScanner.java    |   4 +-
 .../io/sstable/SSTableSimpleUnsortedWriter.java |   4 +-
 .../cassandra/io/sstable/SSTableWriter.java     |  12 +-
 .../repair/RepairMessageVerbHandler.java        |   2 +-
 .../org/apache/cassandra/repair/Validator.java  |  10 +-
 .../cassandra/service/ActiveRepairService.java  |   2 +-
 .../cassandra/service/RowDataResolver.java      |   2 +-
 .../cassandra/service/StorageService.java       |   2 +-
 .../service/pager/RangeNamesQueryPager.java     |   2 +-
 .../service/pager/RangeSliceQueryPager.java     |   2 +-
 .../cassandra/thrift/CassandraServer.java       |  12 +-
 .../cassandra/thrift/ThriftValidation.java      |   6 +-
 .../apache/cassandra/tools/SSTableExport.java   |   6 +-
 .../apache/cassandra/tools/SSTableImport.java   |   4 +-
 .../org/apache/cassandra/tracing/Tracing.java   |   6 +-
 .../org/apache/cassandra/utils/FBUtilities.java |  35 +-
 .../apache/cassandra/utils/PureJavaCrc32.java   |  33 +-
 .../utils/memory/ContextAllocator.java          |  25 +-
 .../apache/cassandra/utils/memory/HeapPool.java |  58 +-
 .../cassandra/utils/memory/MemoryUtil.java      | 305 +++++++++
 .../utils/memory/MemtableAllocator.java         | 245 +++++++
 .../utils/memory/MemtableBufferAllocator.java   |  70 ++
 .../utils/memory/MemtableCleanerThread.java     |  73 +++
 .../cassandra/utils/memory/MemtablePool.java    | 199 ++++++
 .../cassandra/utils/memory/NativeAllocator.java | 258 ++++++++
 .../cassandra/utils/memory/NativePool.java      |  39 ++
 .../org/apache/cassandra/utils/memory/Pool.java | 199 ------
 .../cassandra/utils/memory/PoolAllocator.java   | 225 -------
 .../utils/memory/PoolCleanerThread.java         |  73 ---
 .../cassandra/utils/memory/SlabAllocator.java   |  13 +-
 .../apache/cassandra/utils/memory/SlabPool.java |   7 +-
 test/conf/cassandra.yaml                        |   2 +-
 .../cassandra/db/LongFlushMemtableTest.java     |   2 +-
 .../apache/cassandra/db/LongKeyspaceTest.java   |   2 +-
 .../db/compaction/LongCompactionsTest.java      |   2 +-
 .../LongLeveledCompactionStrategyTest.java      |   2 +-
 test/unit/org/apache/cassandra/Util.java        |   8 +-
 .../org/apache/cassandra/config/DefsTest.java   |  17 +-
 .../db/ArrayBackedSortedColumnsTest.java        |  20 +-
 .../cassandra/db/CollationControllerTest.java   |  12 +-
 .../cassandra/db/ColumnFamilyStoreTest.java     |  86 +--
 .../apache/cassandra/db/CounterCacheTest.java   |   4 +-
 .../apache/cassandra/db/CounterCellTest.java    |  80 +--
 .../org/apache/cassandra/db/KeyCacheTest.java   |   4 +-
 .../apache/cassandra/db/KeyCollisionTest.java   |  10 +-
 .../org/apache/cassandra/db/KeyspaceTest.java   |  38 +-
 .../org/apache/cassandra/db/MultitableTest.java |   4 +-
 .../org/apache/cassandra/db/NativeCellTest.java | 171 +++++
 .../apache/cassandra/db/RangeTombstoneTest.java |   9 +-
 .../apache/cassandra/db/ReadMessageTest.java    |  10 +-
 .../cassandra/db/RecoveryManager3Test.java      |   4 +-
 .../cassandra/db/RecoveryManagerTest.java       |  10 +-
 .../org/apache/cassandra/db/RemoveCellTest.java |   8 +-
 .../cassandra/db/RemoveColumnFamilyTest.java    |   4 +-
 .../db/RemoveColumnFamilyWithFlush1Test.java    |   4 +-
 .../db/RemoveColumnFamilyWithFlush2Test.java    |   4 +-
 .../apache/cassandra/db/RemoveSubCellTest.java  |  10 +-
 .../org/apache/cassandra/db/RowCacheTest.java   |   2 +-
 .../apache/cassandra/db/RowIterationTest.java   |   8 +-
 test/unit/org/apache/cassandra/db/RowTest.java  |   2 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java |   4 +-
 .../db/SecondaryIndexCellSizeTest.java          |  15 +-
 .../apache/cassandra/db/SerializationsTest.java |  31 +-
 .../org/apache/cassandra/db/TimeSortTest.java   |   8 +-
 .../db/compaction/AntiCompactionTest.java       |   6 +-
 .../compaction/BlacklistingCompactionsTest.java |   2 +-
 .../db/compaction/CompactionsPurgeTest.java     |  34 +-
 .../db/compaction/CompactionsTest.java          |  18 +-
 .../LeveledCompactionStrategyTest.java          |   8 +-
 .../db/compaction/OneCompactionTest.java        |   2 +-
 .../SizeTieredCompactionStrategyTest.java       |   4 +-
 .../cassandra/db/compaction/TTLExpiryTest.java  |  16 +-
 .../db/context/CounterContextTest.java          |  13 -
 .../db/index/PerRowSecondaryIndexTest.java      |   8 +-
 .../io/sstable/IndexSummaryManagerTest.java     |   6 +-
 .../cassandra/io/sstable/IndexSummaryTest.java  |  10 +-
 .../cassandra/io/sstable/SSTableLoaderTest.java |   2 +-
 .../io/sstable/SSTableMetadataTest.java         |  30 +-
 .../cassandra/io/sstable/SSTableReaderTest.java |  13 +-
 .../io/sstable/SSTableScannerTest.java          |   8 +-
 .../cassandra/io/sstable/SSTableUtils.java      |   2 +-
 .../apache/cassandra/repair/ValidatorTest.java  |   3 +-
 .../cassandra/service/QueryPagerTest.java       |   4 +-
 .../service/pager/AbstractQueryPagerTest.java   |   2 +-
 .../streaming/StreamingTransferTest.java        |  20 +-
 .../cassandra/tools/SSTableExportTest.java      |   6 +-
 .../cassandra/tools/SSTableImportTest.java      |   6 +-
 .../cassandra/triggers/TriggerExecutorTest.java |  14 +-
 .../apache/cassandra/triggers/TriggersTest.java |   8 +-
 188 files changed, 4529 insertions(+), 2129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 2176bf9..6e47918 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -283,6 +283,7 @@ memtable_cleanup_threshold: 0.4
 # Options are:
 #   heap_buffers:    on heap nio buffers
 #   offheap_buffers: off heap (direct) nio buffers
+#   offheap_objects: native memory, eliminating nio buffer heap overhead
 memtable_allocation_type: heap_buffers
 
 # Total space to use for commitlogs.  Since commitlog segments are

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/cache/RowCacheKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/RowCacheKey.java b/src/java/org/apache/cassandra/cache/RowCacheKey.java
index bbd8591..aebb129 100644
--- a/src/java/org/apache/cassandra/cache/RowCacheKey.java
+++ b/src/java/org/apache/cassandra/cache/RowCacheKey.java
@@ -37,7 +37,7 @@ public class RowCacheKey implements CacheKey, Comparable<RowCacheKey>
 
     public RowCacheKey(UUID cfId, DecoratedKey key)
     {
-        this(cfId, key.key);
+        this(cfId, key.getKey());
     }
 
     public RowCacheKey(UUID cfId, ByteBuffer key)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index b8c1fae..b4b3fbe 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -55,6 +55,7 @@ import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.cql3.statements.CFStatement;
 import org.apache.cassandra.cql3.statements.CreateTableStatement;
+import org.apache.cassandra.db.AbstractCell;
 import org.apache.cassandra.db.AtomDeserializer;
 import org.apache.cassandra.db.CFRowAdder;
 import org.apache.cassandra.db.Cell;
@@ -1328,7 +1329,7 @@ public final class CFMetaData
      */
     public ColumnDefinition getColumnDefinition(CellName cellName)
     {
-        ColumnIdentifier id = cellName.cql3ColumnName();
+        ColumnIdentifier id = cellName.cql3ColumnName(this);
         ColumnDefinition def = id == null
                              ? getColumnDefinition(cellName.toByteBuffer())  // Means a dense layout, try the full column name
                              : getColumnDefinition(id);
@@ -1409,7 +1410,7 @@ public final class CFMetaData
 
     public Iterator<OnDiskAtom> getOnDiskIterator(DataInput in, ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version)
     {
-        return Cell.onDiskIterator(in, flag, expireBefore, version, comparator);
+        return AbstractCell.onDiskIterator(in, flag, expireBefore, version, comparator);
     }
 
     public AtomDeserializer getOnDiskDeserializer(DataInput in, Descriptor.Version version)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 7ab7a8c..97ceb7f 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -290,7 +290,8 @@ public class Config
     {
         unslabbed_heap_buffers,
         heap_buffers,
-        offheap_buffers
+        offheap_buffers,
+        offheap_objects
     }
 
     public static enum DiskFailurePolicy

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 4b0043c..cdcceb1 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -66,7 +66,8 @@ import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.memory.HeapPool;
-import org.apache.cassandra.utils.memory.Pool;
+import org.apache.cassandra.utils.memory.NativePool;
+import org.apache.cassandra.utils.memory.MemtablePool;
 import org.apache.cassandra.utils.memory.SlabPool;
 
 public class DatabaseDescriptor
@@ -1476,7 +1477,7 @@ public class DatabaseDescriptor
         return conf.inter_dc_tcp_nodelay;
     }
 
-    public static Pool getMemtableAllocatorPool()
+    public static MemtablePool getMemtableAllocatorPool()
     {
         long heapLimit = ((long) conf.memtable_heap_space_in_mb) << 20;
         long offHeapLimit = ((long) conf.memtable_offheap_space_in_mb) << 20;
@@ -1493,6 +1494,8 @@ public class DatabaseDescriptor
                     System.exit(-1);
                 }
                 return new SlabPool(heapLimit, offHeapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily());
+            case offheap_objects:
+                return new NativePool(heapLimit, offHeapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily());
             default:
                 throw new AssertionError();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index b1e0f2f..106ad9b 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -416,7 +416,7 @@ public class Schema
     {
         try
         {
-            return systemKeyspaceNames.contains(ByteBufferUtil.string(row.key.key));
+            return systemKeyspaceNames.contains(ByteBufferUtil.string(row.key.getKey()));
         }
         catch (CharacterCodingException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/cql/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql/QueryProcessor.java b/src/java/org/apache/cassandra/cql/QueryProcessor.java
index baf89b2..3b35555 100644
--- a/src/java/org/apache/cassandra/cql/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql/QueryProcessor.java
@@ -171,7 +171,7 @@ public class QueryProcessor
                                     ? select.getKeyFinish().getByteBuffer(keyType,variables)
                                     : null;
 
-        RowPosition startKey = RowPosition.forKey(startKeyBytes, p), finishKey = RowPosition.forKey(finishKeyBytes, p);
+        RowPosition startKey = RowPosition.ForKey.get(startKeyBytes, p), finishKey = RowPosition.ForKey.get(finishKeyBytes, p);
         if (startKey.compareTo(finishKey) > 0 && !finishKey.isMinimum(p))
         {
             if (p instanceof RandomPartitioner)
@@ -213,7 +213,7 @@ public class QueryProcessor
         // if start key was set and relation was "greater than"
         if (select.getKeyStart() != null && !select.includeStartKey() && !rows.isEmpty())
         {
-            if (rows.get(0).key.key.equals(startKeyBytes))
+            if (rows.get(0).key.getKey().equals(startKeyBytes))
                 rows.remove(0);
         }
 
@@ -221,7 +221,7 @@ public class QueryProcessor
         if (select.getKeyFinish() != null && !select.includeFinishKey() && !rows.isEmpty())
         {
             int lastIndex = rows.size() - 1;
-            if (rows.get(lastIndex).key.key.equals(finishKeyBytes))
+            if (rows.get(lastIndex).key.getKey().equals(finishKeyBytes))
                 rows.remove(lastIndex);
         }
 
@@ -459,7 +459,7 @@ public class QueryProcessor
                         {
                             // prepend key
                             ByteBuffer keyName = ByteBufferUtil.bytes(metadata.getCQL2KeyName());
-                            thriftColumns.add(new Column(keyName).setValue(row.key.key).setTimestamp(-1));
+                            thriftColumns.add(new Column(keyName).setValue(row.key.getKey()).setTimestamp(-1));
                             result.schema.name_types.put(keyName, TypeParser.getShortName(AsciiType.instance));
                             result.schema.value_types.put(keyName, TypeParser.getShortName(metadata.getKeyValidator()));
                         }
@@ -491,7 +491,7 @@ public class QueryProcessor
                             {
                                 // preserve case of key as it was requested
                                 ByteBuffer requestedKey = ByteBufferUtil.bytes(term.getText());
-                                thriftColumns.add(new Column(requestedKey).setValue(row.key.key).setTimestamp(-1));
+                                thriftColumns.add(new Column(requestedKey).setValue(row.key.getKey()).setTimestamp(-1));
                                 result.schema.name_types.put(requestedKey, TypeParser.getShortName(AsciiType.instance));
                                 result.schema.value_types.put(requestedKey, TypeParser.getShortName(metadata.getKeyValidator()));
                                 continue;
@@ -524,7 +524,7 @@ public class QueryProcessor
 
                     // Create a new row, add the columns to it, and then add it to the list of rows
                     CqlRow cqlRow = new CqlRow();
-                    cqlRow.key = row.key.key;
+                    cqlRow.key = row.key.getKey();
                     cqlRow.columns = thriftColumns;
                     if (select.isColumnsReversed())
                         Collections.reverse(cqlRow.columns);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java b/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
index 3bf3acc..1b232cf 100644
--- a/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
+++ b/src/java/org/apache/cassandra/cql3/ColumnIdentifier.java
@@ -50,7 +50,7 @@ public class ColumnIdentifier implements Selectable, IMeasurableMemory
         this.text = type.getString(bytes);
     }
 
-    private ColumnIdentifier(ByteBuffer bytes, String text)
+    public ColumnIdentifier(ByteBuffer bytes, String text)
     {
         this.bytes = bytes;
         this.text = text;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/cql3/UpdateParameters.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
index 87ce22e..fad8fae 100644
--- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java
+++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java
@@ -55,13 +55,13 @@ public class UpdateParameters
     public Cell makeColumn(CellName name, ByteBuffer value) throws InvalidRequestException
     {
         QueryProcessor.validateCellName(name, metadata.comparator);
-        return Cell.create(name, value, timestamp, ttl, metadata);
+        return AbstractCell.create(name, value, timestamp, ttl, metadata);
     }
 
     public Cell makeTombstone(CellName name) throws InvalidRequestException
     {
         QueryProcessor.validateCellName(name, metadata.comparator);
-        return new DeletedCell(name, localDeletionTime, timestamp);
+        return new BufferDeletedCell(name, localDeletionTime, timestamp);
     }
 
     public RangeTombstone makeRangeTombstone(ColumnSlice slice) throws InvalidRequestException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index acc4802..4741b9a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -32,12 +32,9 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.composites.CBuilder;
 import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
 import org.apache.cassandra.db.filter.SliceQueryFilter;
-import org.apache.cassandra.db.marshal.ListType;
 import org.apache.cassandra.db.marshal.BooleanType;
 import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.service.CASConditions;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageProxy;
@@ -449,10 +446,10 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF
             if (row.cf == null || row.cf.isEmpty())
                 continue;
 
-            Iterator<CQL3Row> iter = cfm.comparator.CQL3RowBuilder(now).group(row.cf.getSortedColumns().iterator());
+            Iterator<CQL3Row> iter = cfm.comparator.CQL3RowBuilder(cfm, now).group(row.cf.getSortedColumns().iterator());
             if (iter.hasNext())
             {
-                map.put(row.key.key, iter.next());
+                map.put(row.key.getKey(), iter.next());
                 // We can only update one CQ3Row per partition key at a time (we don't allow IN for clustering key)
                 assert !iter.hasNext();
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index f28d570..d79bd5b 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -400,8 +400,8 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             ByteBuffer startKeyBytes = getKeyBound(Bound.START, variables);
             ByteBuffer finishKeyBytes = getKeyBound(Bound.END, variables);
 
-            RowPosition startKey = RowPosition.forKey(startKeyBytes, p);
-            RowPosition finishKey = RowPosition.forKey(finishKeyBytes, p);
+            RowPosition startKey = RowPosition.ForKey.get(startKeyBytes, p);
+            RowPosition finishKey = RowPosition.ForKey.get(finishKeyBytes, p);
 
             if (startKey.compareTo(finishKey) > 0 && !finishKey.isMinimum(p))
                 return null;
@@ -1007,7 +1007,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
             if (row.cf == null)
                 continue;
 
-            processColumnFamily(row.key.key, row.cf, variables, now, result);
+            processColumnFamily(row.key.getKey(), row.cf, variables, now, result);
         }
 
         ResultSet cqlRows = result.build();
@@ -1042,7 +1042,7 @@ public class SelectStatement implements CQLStatement, MeasurableForPreparedCache
         if (sliceRestriction != null)
             cells = applySliceRestriction(cells, variables);
 
-        CQL3Row.RowIterator iter = cfm.comparator.CQL3RowBuilder(now).group(cells);
+        CQL3Row.RowIterator iter = cfm.comparator.CQL3RowBuilder(cfm, now).group(cells);
 
         // If there is static columns but there is no non-static row, then provided the select was a full
         // partition selection (i.e. not a 2ndary index search and there was no condition on clustering columns)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/AbstractCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractCell.java b/src/java/org/apache/cassandra/db/AbstractCell.java
new file mode 100644
index 0000000..1075278
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/AbstractCell.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.IOError;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.Iterator;
+
+import com.google.common.collect.AbstractIterator;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.FBUtilities;
+
+public abstract class AbstractCell implements Cell
+{
+    public static Iterator<OnDiskAtom> onDiskIterator(final DataInput in,
+                                                      final ColumnSerializer.Flag flag,
+                                                      final int expireBefore,
+                                                      final Descriptor.Version version,
+                                                      final CellNameType type)
+    {
+        return new AbstractIterator<OnDiskAtom>()
+        {
+            protected OnDiskAtom computeNext()
+            {
+                OnDiskAtom atom;
+                try
+                {
+                    atom = type.onDiskAtomSerializer().deserializeFromSSTable(in, flag, expireBefore, version);
+                }
+                catch (IOException e)
+                {
+                    throw new IOError(e);
+                }
+                if (atom == null)
+                    return endOfData();
+
+                return atom;
+            }
+        };
+    }
+
+    @Override
+    public boolean isMarkedForDelete(long now)
+    {
+        return false;
+    }
+
+    @Override
+    public boolean isLive(long now)
+    {
+        return !isMarkedForDelete(now);
+    }
+
+    // Don't call unless the column is actually marked for delete.
+    @Override
+    public long getMarkedForDeleteAt()
+    {
+        return Long.MAX_VALUE;
+    }
+
+    @Override
+    public int cellDataSize()
+    {
+        return name().dataSize() + value().remaining() + TypeSizes.NATIVE.sizeof(timestamp());
+    }
+
+    @Override
+    public int serializedSize(CellNameType type, TypeSizes typeSizes)
+    {
+        /*
+         * Size of a column is =
+         *   size of a name (short + length of the string)
+         * + 1 byte to indicate if the column has been deleted
+         * + 8 bytes for timestamp
+         * + 4 bytes which basically indicates the size of the byte array
+         * + entire byte array.
+        */
+        int valueSize = value().remaining();
+        return ((int)type.cellSerializer().serializedSize(name(), typeSizes)) + 1 + typeSizes.sizeof(timestamp()) + typeSizes.sizeof(valueSize) + valueSize;
+    }
+
+    @Override
+    public int serializationFlags()
+    {
+        return 0;
+    }
+
+    @Override
+    public Cell diff(Cell cell)
+    {
+        if (timestamp() < cell.timestamp())
+            return cell;
+        return null;
+    }
+
+    @Override
+    public void updateDigest(MessageDigest digest)
+    {
+        digest.update(name().toByteBuffer().duplicate());
+        digest.update(value().duplicate());
+
+        FBUtilities.updateWithLong(digest, timestamp());
+        FBUtilities.updateWithByte(digest, serializationFlags());
+    }
+
+    @Override
+    public int getLocalDeletionTime()
+    {
+        return Integer.MAX_VALUE;
+    }
+
+    @Override
+    public Cell reconcile(Cell cell)
+    {
+        // tombstones take precedence.  (if both are tombstones, then it doesn't matter which one we use.)
+        if (isMarkedForDelete(System.currentTimeMillis()))
+            return timestamp() < cell.timestamp() ? cell : this;
+        if (cell.isMarkedForDelete(System.currentTimeMillis()))
+            return timestamp() > cell.timestamp() ? this : cell;
+        // break ties by comparing values.
+        if (timestamp() == cell.timestamp())
+            return value().compareTo(cell.value()) < 0 ? cell : this;
+        // neither is tombstoned and timestamps are different
+        return timestamp() < cell.timestamp() ? cell : this;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        return this == o || (o instanceof Cell && equals((Cell) o));
+    }
+
+    public boolean equals(Cell cell)
+    {
+        return timestamp() == cell.timestamp() && name().equals(cell.name()) && value().equals(cell.value());
+    }
+
+    public int hashCode()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String getString(CellNameType comparator)
+    {
+        return String.format("%s:%b:%d@%d",
+                comparator.getString(name()),
+                isMarkedForDelete(System.currentTimeMillis()),
+                value().remaining(),
+                timestamp());
+    }
+
+    @Override
+    public void validateName(CFMetaData metadata) throws MarshalException
+    {
+        metadata.comparator.validate(name());
+    }
+
+    @Override
+    public void validateFields(CFMetaData metadata) throws MarshalException
+    {
+        validateName(metadata);
+
+        AbstractType<?> valueValidator = metadata.getValueValidator(name());
+        if (valueValidator != null)
+            valueValidator.validate(value());
+    }
+
+    public static Cell create(CellName name, ByteBuffer value, long timestamp, int ttl, CFMetaData metadata)
+    {
+        if (ttl <= 0)
+            ttl = metadata.getDefaultTimeToLive();
+
+        return ttl > 0
+                ? new BufferExpiringCell(name, value, timestamp, ttl)
+                : new BufferCell(name, value, timestamp);
+    }
+
+    public static Cell diff(CounterCell a, Cell b)
+    {
+        if (a.timestamp() < b.timestamp())
+            return b;
+
+        // Note that if at that point, cell can't be a tombstone. Indeed,
+        // cell is the result of merging us with other nodes results, and
+        // merging a CounterCell with a tombstone never return a tombstone
+        // unless that tombstone timestamp is greater that the CounterCell
+        // one.
+        assert b instanceof CounterCell : "Wrong class type: " + b.getClass();
+
+        if (a.timestampOfLastDelete() < ((CounterCell) b).timestampOfLastDelete())
+            return b;
+
+        CounterContext.Relationship rel = CounterCell.contextManager.diff(b.value(), a.value());
+        return (rel == CounterContext.Relationship.GREATER_THAN || rel == CounterContext.Relationship.DISJOINT) ? b : null;
+    }
+
+    /** This is temporary until we start creating Cells of the different type (buffer vs. native) */
+    public static Cell reconcile(CounterCell a, Cell b)
+    {
+        assert (b instanceof CounterCell) || (b instanceof DeletedCell) : "Wrong class type: " + b.getClass();
+
+        // live + tombstone: track last tombstone
+        if (b.isMarkedForDelete(Long.MIN_VALUE)) // cannot be an expired cell, so the current time is irrelevant
+        {
+            // live < tombstone
+            if (a.timestamp() < b.timestamp())
+                return b;
+
+            // live last delete >= tombstone
+            if (a.timestampOfLastDelete() >= b.timestamp())
+                return a;
+
+            // live last delete < tombstone
+            return new BufferCounterCell(a.name(), a.value(), a.timestamp(), b.timestamp());
+        }
+
+        assert b instanceof CounterCell : "Wrong class type: " + b.getClass();
+
+        // live < live last delete
+        if (a.timestamp() < ((CounterCell) b).timestampOfLastDelete())
+            return b;
+
+        // live last delete > live
+        if (a.timestampOfLastDelete() > b.timestamp())
+            return a;
+
+        // live + live. return one of the cells if its context is a superset of the other's, or merge them otherwise
+        ByteBuffer context = CounterCell.contextManager.merge(a.value(), b.value());
+        if (context == a.value() && a.timestamp() >= b.timestamp() && a.timestampOfLastDelete() >= ((CounterCell) b).timestampOfLastDelete())
+            return a;
+        else if (context == b.value() && b.timestamp() >= a.timestamp() && ((CounterCell) b).timestampOfLastDelete() >= a.timestampOfLastDelete())
+            return b;
+        else // merge clocks and timsestamps.
+            return new BufferCounterCell(a.name(),
+                                         context,
+                                         Math.max(a.timestamp(), b.timestamp()),
+                                         Math.max(a.timestampOfLastDelete(), ((CounterCell) b).timestampOfLastDelete()));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/AbstractNativeCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractNativeCell.java b/src/java/org/apache/cassandra/db/AbstractNativeCell.java
new file mode 100644
index 0000000..d21171f
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/AbstractNativeCell.java
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.composites.*;
+import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.*;
+
+
+/**
+ * Packs a CellName AND a Cell into one off-heap representation.
+ * Layout is:
+ *
+ * Note we store the ColumnIdentifier in full as bytes. This seems an okay tradeoff for now, as we just
+ * look it back up again when we need to, and in the near future we hope to switch to ints, longs or
+ * UUIDs representing column identifiers on disk, at which point we can switch that here as well.
+ *
+ * [timestamp][value offset][name size]][name extra][name offset deltas][cell names][value][Descendants]
+ * [   8b    ][     4b     ][    2b   ][     1b    ][     each 2b      ][ arb < 64k][ arb ][ arbitrary ]
+ *
+ * descendants: any overriding classes will put their state here
+ * name offsets are deltas from their base offset, and don't include the first offset, or the end position of the final entry,
+ * i.e. there will be size - 1 entries, and each is a delta that is added to the offset of the position of the first name
+ * (which is always CELL_NAME_OFFSETS_OFFSET + (2 * (size - 1))). The length of the final name fills up any remaining
+ * space upto the value offset
+ * name extra:  lowest 2 bits indicate the clustering size delta (i.e. how many name items are NOT part of the clustering key)
+ *              the next 2 bits indicate the CellNameType
+ *              the next bit indicates if the column is a static or clustered/dynamic column
+ */
+public abstract class AbstractNativeCell extends AbstractCell implements CellName
+{
+    static final int TIMESTAMP_OFFSET = 4;
+    private static final int VALUE_OFFSET_OFFSET = 12;
+    private static final int CELL_NAME_SIZE_OFFSET = 16;
+    private static final int CELL_NAME_EXTRA_OFFSET = 18;
+    private static final int CELL_NAME_OFFSETS_OFFSET = 19;
+    private static final int CELL_NAME_SIZE_DELTA_MASK = 3;
+    private static final int CELL_NAME_TYPE_SHIFT = 2;
+    private static final int CELL_NAME_TYPE_MASK = 7;
+
+    private static enum NameType
+    {
+        COMPOUND_DENSE(0 << 2), COMPOUND_SPARSE(1 << 2), COMPOUND_SPARSE_STATIC(2 << 2), SIMPLE_DENSE(3 << 2), SIMPLE_SPARSE(4 << 2);
+        static final NameType[] TYPES = NameType.values();
+        final int bits;
+
+        NameType(int bits)
+        {
+            this.bits = bits;
+        }
+
+        static NameType typeOf(CellName name)
+        {
+            if (name instanceof CompoundDenseCellName)
+            {
+                assert !name.isStatic();
+                return COMPOUND_DENSE;
+            }
+
+            if (name instanceof CompoundSparseCellName)
+                return name.isStatic() ? COMPOUND_SPARSE_STATIC : COMPOUND_SPARSE;
+
+            if (name instanceof SimpleDenseCellName)
+            {
+                assert !name.isStatic();
+                return SIMPLE_DENSE;
+            }
+
+            if (name instanceof SimpleSparseCellName)
+            {
+                assert !name.isStatic();
+                return SIMPLE_SPARSE;
+            }
+
+            if (name instanceof NativeCell)
+                return ((NativeCell) name).nametype();
+
+            throw new AssertionError();
+        }
+    }
+
+    private final long peer; // peer is assigned by peer updater in setPeer method
+
+    AbstractNativeCell()
+    {
+        peer = -1;
+    }
+
+    public AbstractNativeCell(NativeAllocator allocator, OpOrder.Group writeOp, Cell copyOf)
+    {
+        int size = sizeOf(copyOf);
+        peer = allocator.allocate(size, writeOp);
+
+        MemoryUtil.setInt(peer, size);
+        construct(copyOf);
+    }
+
+    protected int sizeOf(Cell cell)
+    {
+        int size = CELL_NAME_OFFSETS_OFFSET + Math.max(0, cell.name().size() - 1) * 2 + cell.value().remaining();
+        CellName name = cell.name();
+        for (int i = 0; i < name.size(); i++)
+            size += name.get(i).remaining();
+        return size;
+    }
+
+    protected void construct(Cell from)
+    {
+        setLong(TIMESTAMP_OFFSET, from.timestamp());
+        CellName name = from.name();
+        int nameSize = name.size();
+        int offset = CELL_NAME_SIZE_OFFSET;
+        setShort(offset, (short) nameSize);
+        assert nameSize - name.clusteringSize() <= 2;
+        byte cellNameExtraBits = (byte) ((nameSize - name.clusteringSize()) | NameType.typeOf(name).bits);
+        setByte(offset += 2, cellNameExtraBits);
+        offset += 1;
+        short cellNameDelta = 0;
+        for (int i = 1; i < nameSize; i++)
+        {
+            cellNameDelta += name.get(i - 1).remaining();
+            setShort(offset, cellNameDelta);
+            offset += 2;
+        }
+        for (int i = 0; i < nameSize; i++)
+        {
+            ByteBuffer bb = name.get(i);
+            setBytes(offset, bb);
+            offset += bb.remaining();
+        }
+        setInt(VALUE_OFFSET_OFFSET, offset);
+        setBytes(offset, from.value());
+    }
+
+    // the offset at which to read the short that gives the names
+    private int nameDeltaOffset(int i)
+    {
+        return CELL_NAME_OFFSETS_OFFSET + ((i - 1) * 2);
+    }
+
+    int valueStartOffset()
+    {
+        return getInt(VALUE_OFFSET_OFFSET);
+    }
+
+    private int valueEndOffset()
+    {
+        return (int) (internalSize() - postfixSize());
+    }
+
+    protected int postfixSize()
+    {
+        return 0;
+    }
+
+    @Override
+    public ByteBuffer value()
+    {
+        long offset = valueStartOffset();
+        return getByteBuffer(offset, (int) (internalSize() - (postfixSize() + offset)));
+    }
+
+    private int clusteringSizeDelta()
+    {
+        return getByte(CELL_NAME_EXTRA_OFFSET) & CELL_NAME_SIZE_DELTA_MASK;
+    }
+
+    public boolean isStatic()
+    {
+        return nametype() == NameType.COMPOUND_SPARSE_STATIC;
+    }
+
+    NameType nametype()
+    {
+        return NameType.TYPES[(((int) this.getByte(CELL_NAME_EXTRA_OFFSET)) >> CELL_NAME_TYPE_SHIFT) & CELL_NAME_TYPE_MASK];
+    }
+
+    public long minTimestamp()
+    {
+        return timestamp();
+    }
+
+    public long maxTimestamp()
+    {
+        return timestamp();
+    }
+
+    public int clusteringSize()
+    {
+        return size() - clusteringSizeDelta();
+    }
+
+    @Override
+    public ColumnIdentifier cql3ColumnName(CFMetaData metadata)
+    {
+        switch (nametype())
+        {
+            case SIMPLE_SPARSE:
+                return getIdentifier(metadata, get(clusteringSize()));
+            case COMPOUND_SPARSE_STATIC:
+            case COMPOUND_SPARSE:
+                ByteBuffer buffer = get(clusteringSize());
+                if (buffer.remaining() == 0)
+                    return CompoundSparseCellNameType.rowMarkerId;
+
+                return getIdentifier(metadata, buffer);
+            case SIMPLE_DENSE:
+            case COMPOUND_DENSE:
+                return null;
+            default:
+                throw new AssertionError();
+        }
+    }
+
+    public ByteBuffer collectionElement()
+    {
+        return isCollectionCell() ? get(size() - 1) : null;
+    }
+
+    // we always have a collection element if our clustering size is 2 less than our total size,
+    // and we never have one otherwiss
+    public boolean isCollectionCell()
+    {
+        return clusteringSizeDelta() == 2;
+    }
+
+    public boolean isSameCQL3RowAs(CellNameType type, CellName other)
+    {
+        switch (nametype())
+        {
+            case SIMPLE_DENSE:
+            case COMPOUND_DENSE:
+                return type.compare(this, other) == 0;
+            case COMPOUND_SPARSE_STATIC:
+            case COMPOUND_SPARSE:
+                int clusteringSize = clusteringSize();
+                if (clusteringSize != other.clusteringSize() || other.isStatic() != isStatic())
+                    return false;
+                for (int i = 0; i < clusteringSize; i++)
+                    if (type.subtype(i).compare(get(i), other.get(i)) != 0)
+                        return false;
+                return true;
+            case SIMPLE_SPARSE:
+                return true;
+            default:
+                throw new AssertionError();
+        }
+    }
+
+    public int size()
+    {
+        return getShort(CELL_NAME_SIZE_OFFSET);
+    }
+
+    public boolean isEmpty()
+    {
+        return size() == 0;
+    }
+
+    public ByteBuffer get(int i)
+    {
+        // remember to take dense/sparse into account, and only return EOC when not dense
+        int size = size();
+        assert i >= 0 && i < size();
+        int cellNamesOffset = nameDeltaOffset(size);
+        int startDelta = i == 0 ? 0 : getShort(nameDeltaOffset(i));
+        int endDelta = i < size - 1 ? getShort(nameDeltaOffset(i + 1)) : valueStartOffset() - cellNamesOffset;
+        return getByteBuffer(cellNamesOffset + startDelta, endDelta - startDelta);
+    }
+
+    private static final ThreadLocal<byte[]> BUFFER = new ThreadLocal<byte[]>()
+    {
+        protected byte[] initialValue()
+        {
+            return new byte[256];
+        }
+    };
+
+    protected void writeComponentTo(MessageDigest digest, int i, boolean includeSize)
+    {
+        // remember to take dense/sparse into account, and only return EOC when not dense
+        int size = size();
+        assert i >= 0 && i < size();
+        int cellNamesOffset = nameDeltaOffset(size);
+        int startDelta = i == 0 ? 0 : getShort(nameDeltaOffset(i));
+        int endDelta = i < size - 1 ? getShort(nameDeltaOffset(i + 1)) : valueStartOffset() - cellNamesOffset;
+
+        int componentStart = cellNamesOffset + startDelta;
+        int count = endDelta - startDelta;
+
+        if (includeSize)
+            FBUtilities.updateWithShort(digest, count);
+
+        writeMemoryTo(digest, componentStart, count);
+    }
+
+    protected void writeMemoryTo(MessageDigest digest, int from, int count)
+    {
+        // only batch if we have more than 16 bytes remaining to transfer, otherwise fall-back to single-byte updates
+        int i = 0, batchEnd = count - 16;
+        if (i < batchEnd)
+        {
+            byte[] buffer = BUFFER.get();
+            while (i < batchEnd)
+            {
+                int transfer = Math.min(count - i, 256);
+                getBytes(from + i, buffer, 0, transfer);
+                digest.update(buffer, 0, transfer);
+                i += transfer;
+            }
+        }
+        while (i < count)
+            digest.update(getByte(from + i++));
+    }
+
+    public EOC eoc()
+    {
+        return EOC.NONE;
+    }
+
+    public Composite withEOC(EOC eoc)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public Composite start()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public Composite end()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public ColumnSlice slice()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public boolean isPrefixOf(CType type, Composite c)
+    {
+        if (size() > c.size() || isStatic() != c.isStatic())
+            return false;
+
+        for (int i = 0; i < size(); i++)
+        {
+            if (type.subtype(i).compare(get(i), c.get(i)) != 0)
+                return false;
+        }
+        return true;
+    }
+
+    public ByteBuffer toByteBuffer()
+    {
+        // for simple sparse we just return our one name buffer
+        switch (nametype())
+        {
+            case SIMPLE_DENSE:
+            case SIMPLE_SPARSE:
+                return get(0);
+            case COMPOUND_DENSE:
+            case COMPOUND_SPARSE_STATIC:
+            case COMPOUND_SPARSE:
+                // This is the legacy format of composites.
+                // See org.apache.cassandra.db.marshal.CompositeType for details.
+                ByteBuffer result = ByteBuffer.allocate(cellDataSize());
+                if (isStatic())
+                    ByteBufferUtil.writeShortLength(result, CompositeType.STATIC_MARKER);
+
+                for (int i = 0; i < size(); i++)
+                {
+                    ByteBuffer bb = get(i);
+                    ByteBufferUtil.writeShortLength(result, bb.remaining());
+                    result.put(bb);
+                    result.put((byte) 0);
+                }
+                result.flip();
+                return result;
+            default:
+                throw new AssertionError();
+        }
+    }
+
+    protected void updateWithName(MessageDigest digest)
+    {
+        // for simple sparse we just return our one name buffer
+        switch (nametype())
+        {
+            case SIMPLE_DENSE:
+            case SIMPLE_SPARSE:
+                writeComponentTo(digest, 0, false);
+                break;
+
+            case COMPOUND_DENSE:
+            case COMPOUND_SPARSE_STATIC:
+            case COMPOUND_SPARSE:
+                // This is the legacy format of composites.
+                // See org.apache.cassandra.db.marshal.CompositeType for details.
+                if (isStatic())
+                    FBUtilities.updateWithShort(digest, CompositeType.STATIC_MARKER);
+
+                for (int i = 0; i < size(); i++)
+                {
+                    writeComponentTo(digest, i, true);
+                    digest.update((byte) 0);
+                }
+                break;
+
+            default:
+                throw new AssertionError();
+        }
+    }
+
+    protected void updateWithValue(MessageDigest digest)
+    {
+        int offset = valueStartOffset();
+        int length = valueEndOffset() - offset;
+        writeMemoryTo(digest, offset, length);
+    }
+
+    @Override // this is the NAME dataSize, only!
+    public int dataSize()
+    {
+        switch (nametype())
+        {
+            case SIMPLE_DENSE:
+            case SIMPLE_SPARSE:
+                return valueStartOffset() - nameDeltaOffset(size());
+            case COMPOUND_DENSE:
+            case COMPOUND_SPARSE_STATIC:
+            case COMPOUND_SPARSE:
+                int size = size();
+                return valueStartOffset() - nameDeltaOffset(size) + 3 * size + (isStatic() ? 2 : 0);
+            default:
+                throw new AssertionError();
+        }
+    }
+
+    public boolean equals(Object obj)
+    {
+        if (obj == this)
+            return true;
+        if (obj instanceof CellName)
+            return equals((CellName) obj);
+        if (obj instanceof Cell)
+            return equals((Cell) obj);
+        return false;
+    }
+
+    public boolean equals(CellName that)
+    {
+        int size = this.size();
+        if (size != that.size())
+            return false;
+
+        for (int i = 0 ; i < size ; i++)
+            if (!get(i).equals(that.get(i)))
+                return false;
+        return true;
+    }
+
+    private static final ByteBuffer[] EMPTY = new ByteBuffer[0];
+
+    @Override
+    public CellName copy(CFMetaData cfm, AbstractAllocator allocator)
+    {
+        ByteBuffer[] r;
+        switch (nametype())
+        {
+            case SIMPLE_DENSE:
+                return CellNames.simpleDense(allocator.clone(get(0)));
+
+            case COMPOUND_DENSE:
+                r = new ByteBuffer[size()];
+                for (int i = 0; i < r.length; i++)
+                    r[i] = allocator.clone(get(i));
+                return CellNames.compositeDense(r);
+
+            case COMPOUND_SPARSE_STATIC:
+            case COMPOUND_SPARSE:
+                int clusteringSize = clusteringSize();
+                r = clusteringSize == 0 ? EMPTY : new ByteBuffer[clusteringSize()];
+                for (int i = 0; i < clusteringSize; i++)
+                    r[i] = allocator.clone(get(i));
+
+                ByteBuffer nameBuffer = get(r.length);
+                ColumnIdentifier name;
+
+                if (nameBuffer.remaining() == 0)
+                {
+                    name = CompoundSparseCellNameType.rowMarkerId;
+                }
+                else
+                {
+                    name = getIdentifier(cfm, nameBuffer);
+                }
+
+                if (clusteringSizeDelta() == 2)
+                {
+                    ByteBuffer element = allocator.clone(get(size() - 1));
+                    return CellNames.compositeSparseWithCollection(r, element, name, isStatic());
+                }
+                return CellNames.compositeSparse(r, name, isStatic());
+
+            case SIMPLE_SPARSE:
+                return CellNames.simpleSparse(getIdentifier(cfm, get(0)));
+        }
+        throw new IllegalStateException();
+    }
+
+    private static ColumnIdentifier getIdentifier(CFMetaData cfMetaData, ByteBuffer name)
+    {
+        ColumnDefinition def = cfMetaData.getColumnDefinition(name);
+        if (def != null)
+        {
+            return def.name;
+        }
+        else
+        {
+            // it's safe to simply grab based on clusteringPrefixSize() as we are only called if not a dense type
+            AbstractType<?> type = cfMetaData.comparator.subtype(cfMetaData.comparator.clusteringPrefixSize());
+            return new ColumnIdentifier(HeapAllocator.instance.clone(name), type);
+        }
+    }
+
+    @Override
+    public Cell withUpdatedName(CellName newName)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Cell withUpdatedTimestamp(long newTimestamp)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    protected long internalSize()
+    {
+        return MemoryUtil.getInt(peer);
+    }
+
+    private void checkPosition(long offset, long size)
+    {
+        assert size >= 0;
+        assert peer > 0 : "Memory was freed";
+        assert offset >= 0 && offset + size <= internalSize() : String.format("Illegal range: [%d..%d), size: %s", offset, offset + size, internalSize());
+    }
+
+    protected final void setByte(long offset, byte b)
+    {
+        checkPosition(offset, 1);
+        MemoryUtil.setByte(peer + offset, b);
+    }
+
+    protected final void setShort(long offset, short s)
+    {
+        checkPosition(offset, 1);
+        MemoryUtil.setShort(peer + offset, s);
+    }
+
+    protected final void setInt(long offset, int l)
+    {
+        checkPosition(offset, 4);
+        MemoryUtil.setInt(peer + offset, l);
+    }
+
+    protected final void setLong(long offset, long l)
+    {
+        checkPosition(offset, 8);
+        MemoryUtil.setLong(peer + offset, l);
+    }
+
+    protected final void setBytes(long offset, ByteBuffer buffer)
+    {
+        int start = buffer.position();
+        int count = buffer.limit() - start;
+        if (count == 0)
+            return;
+
+        checkPosition(offset, count);
+        MemoryUtil.setBytes(peer + offset, buffer);
+    }
+
+    protected final byte getByte(long offset)
+    {
+        checkPosition(offset, 1);
+        return MemoryUtil.getByte(peer + offset);
+    }
+
+    protected final void getBytes(long offset, byte[] trg, int trgOffset, int count)
+    {
+        checkPosition(offset, count);
+        MemoryUtil.getBytes(peer + offset, trg, trgOffset, count);
+    }
+
+    protected final int getShort(long offset)
+    {
+        checkPosition(offset, 2);
+        return MemoryUtil.getShort(peer + offset);
+    }
+
+    protected final int getInt(long offset)
+    {
+        checkPosition(offset, 4);
+        return MemoryUtil.getInt(peer + offset);
+    }
+
+    protected final long getLong(long offset)
+    {
+        checkPosition(offset, 8);
+        return MemoryUtil.getLong(peer + offset);
+    }
+
+    protected final ByteBuffer getByteBuffer(long offset, int length)
+    {
+        checkPosition(offset, length);
+        return MemoryUtil.getByteBuffer(peer + offset, length);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
index e04867a..c9cff77 100644
--- a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
+++ b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
@@ -90,7 +90,7 @@ public class ArrayBackedSortedColumns extends ColumnFamily
     {
         ArrayBackedSortedColumns copy = new ArrayBackedSortedColumns(original.metadata, false, new Cell[original.getColumnCount()], 0, 0);
         for (Cell cell : original)
-            copy.internalAdd(cell.localCopy(allocator));
+            copy.internalAdd(cell.localCopy(original.metadata, allocator));
         copy.sortedSize = copy.size; // internalAdd doesn't update sortedSize.
         copy.delete(original);
         return copy;
@@ -138,7 +138,7 @@ public class ArrayBackedSortedColumns extends ColumnFamily
         Arrays.sort(cells, sortedSize, size, comparator);
 
         // Determine the merge start position for that segment
-        int pos = binarySearch(0, sortedSize, cells[sortedSize].name, internalComparator());
+        int pos = binarySearch(0, sortedSize, cells[sortedSize].name(), internalComparator());
         if (pos < 0)
             pos = -pos - 1;
 
@@ -420,7 +420,7 @@ public class ArrayBackedSortedColumns extends ColumnFamily
         {
             public CellName apply(Cell cell)
             {
-                return cell.name;
+                return cell.name();
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
index 72038b6..20fe64c 100644
--- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
+++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
@@ -19,7 +19,6 @@ package org.apache.cassandra.db;
 
 import java.util.AbstractCollection;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.Iterator;
@@ -28,17 +27,19 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import com.google.common.base.Function;
 import com.google.common.base.Functions;
-import com.google.common.collect.*;
+import com.google.common.collect.Iterators;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.btree.BTree;
 import org.apache.cassandra.utils.btree.BTreeSet;
 import org.apache.cassandra.utils.btree.UpdateFunction;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
 
 import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater;
 
@@ -53,14 +54,14 @@ import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater;
  */
 public class AtomicBTreeColumns extends ColumnFamily
 {
-    static final long HEAP_SIZE = ObjectSizes.measure(new AtomicBTreeColumns(CFMetaData.IndexCf, null))
+    static final long EMPTY_SIZE = ObjectSizes.measure(new AtomicBTreeColumns(CFMetaData.IndexCf, null))
             + ObjectSizes.measure(new Holder(null, null));
 
     private static final Function<Cell, CellName> NAME = new Function<Cell, CellName>()
     {
         public CellName apply(Cell column)
         {
-            return column.name;
+            return column.name();
         }
     };
 
@@ -155,110 +156,40 @@ public class AtomicBTreeColumns extends ColumnFamily
         }
     }
 
-    // the function we provide to the btree utilities to perform any column replacements
-    private static final class ColumnUpdater implements UpdateFunction<Cell>
-    {
-        final AtomicBTreeColumns updating;
-        final Holder ref;
-        final Function<Cell, Cell> transform;
-        final Updater indexer;
-        final Delta delta;
-
-        private ColumnUpdater(AtomicBTreeColumns updating, Holder ref, Function<Cell, Cell> transform, Updater indexer, Delta delta)
-        {
-            this.updating = updating;
-            this.ref = ref;
-            this.transform = transform;
-            this.indexer = indexer;
-            this.delta = delta;
-        }
-
-        public Cell apply(Cell inserted)
-        {
-            indexer.insert(inserted);
-            delta.insert(inserted);
-            return transform.apply(inserted);
-        }
-
-        public Cell apply(Cell existing, Cell update)
-        {
-            Cell reconciled = update.reconcile(existing);
-            indexer.update(existing, reconciled);
-            if (existing != reconciled)
-                delta.swap(existing, reconciled);
-            else
-                delta.abort(update);
-            return transform.apply(reconciled);
-        }
-
-        public boolean abortEarly()
-        {
-            return updating.ref != ref;
-        }
-
-        public void allocated(long heapSize)
-        {
-            delta.addHeapSize(heapSize);
-        }
-    }
-
-    private static Collection<Cell> transform(Comparator<Cell> cmp, ColumnFamily cf, Function<Cell, Cell> transformation, boolean sort)
-    {
-        Cell[] tmp = new Cell[cf.getColumnCount()];
-
-        int i = 0;
-        for (Cell c : cf)
-            tmp[i++] = transformation.apply(c);
-
-        if (sort)
-            Arrays.sort(tmp, cmp);
-
-        return Arrays.asList(tmp);
-    }
-
     /**
      * This is only called by Memtable.resolve, so only AtomicBTreeColumns needs to implement it.
      *
      * @return the difference in size seen after merging the given columns
      */
-    public Delta addAllWithSizeDelta(final ColumnFamily cm, Function<Cell, Cell> transformation, Updater indexer, Delta delta)
+    public long addAllWithSizeDelta(final ColumnFamily cm, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
     {
-        boolean transformed = false;
-        Collection<Cell> insert = cm.getSortedColumns();
-
+        ColumnUpdater updater = new ColumnUpdater(this, cm.metadata, allocator, writeOp, indexer);
         while (true)
         {
             Holder current = ref;
+            updater.ref = current;
+            updater.reset();
 
-            delta.reset();
             DeletionInfo deletionInfo;
             if (cm.deletionInfo().mayModify(current.deletionInfo))
             {
                 deletionInfo = current.deletionInfo.copy().add(cm.deletionInfo());
-                delta.addHeapSize(deletionInfo.unsharedHeapSize() - current.deletionInfo.unsharedHeapSize());
+                updater.allocated(deletionInfo.unsharedHeapSize() - current.deletionInfo.unsharedHeapSize());
             }
             else
             {
                 deletionInfo = current.deletionInfo;
             }
 
-            ColumnUpdater updater = new ColumnUpdater(this, current, transformation, indexer, delta);
-            Object[] tree = BTree.update(current.tree, metadata.comparator.columnComparator(), insert, true, updater);
+            Object[] tree = BTree.update(current.tree, metadata.comparator.columnComparator(), cm, cm.getColumnCount(), true, updater);
 
             if (tree != null && refUpdater.compareAndSet(this, current, new Holder(tree, deletionInfo)))
             {
                 indexer.updateRowLevelIndexes();
-                return updater.delta;
-            }
-
-            if (!transformed)
-            {
-                // After failing once, transform Columns into a new collection to avoid repeatedly allocating Slab space
-                insert = transform(metadata.comparator.columnComparator(), cm, transformation, false);
-                transformed = true;
+                updater.finish();
+                return updater.dataSize;
             }
         }
-
     }
 
     // no particular reason not to implement these next methods, we just haven't needed them yet
@@ -290,7 +221,7 @@ public class AtomicBTreeColumns extends ColumnFamily
         {
             public int compare(Object o1, Object o2)
             {
-                return cmp.compare((CellName) o1, ((Cell) o2).name);
+                return cmp.compare((CellName) o1, ((Cell) o2).name());
             }
         };
     }
@@ -352,7 +283,7 @@ public class AtomicBTreeColumns extends ColumnFamily
         return false;
     }
 
-    private static class Holder
+    private static final class Holder
     {
         final DeletionInfo deletionInfo;
         // the btree of columns
@@ -375,69 +306,96 @@ public class AtomicBTreeColumns extends ColumnFamily
         }
     }
 
-    // TODO: create a stack-allocation-friendly list to help optimise garbage for updates to rows with few columns
-
-    /**
-     * tracks the size changes made while merging a new group of cells in
-     */
-    public static final class Delta
+    // the function we provide to the btree utilities to perform any column replacements
+    private static final class ColumnUpdater implements UpdateFunction<Cell>
     {
-        private long dataSize;
-        private long heapSize;
+        final AtomicBTreeColumns updating;
+        final CFMetaData metadata;
+        final MemtableAllocator allocator;
+        final OpOrder.Group writeOp;
+        final Updater indexer;
+        Holder ref;
+        long dataSize;
+        long heapSize;
+        final MemtableAllocator.DataReclaimer reclaimer;
+        List<Cell> inserted; // TODO: replace with walk of aborted BTree
 
-        // we track the discarded cells (cells that were in the btree, but replaced by new ones)
-        // separately from aborted ones (were part of an update but older than existing cells)
-        // since we need to reset the former when we race on the btree update, but not the latter
-        private List<Cell> discarded = new ArrayList<>();
-        private List<Cell> aborted;
+        private ColumnUpdater(AtomicBTreeColumns updating, CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
+        {
+            this.updating = updating;
+            this.allocator = allocator;
+            this.writeOp = writeOp;
+            this.indexer = indexer;
+            this.metadata = metadata;
+            this.reclaimer = allocator.reclaimer();
+        }
 
-        protected void reset()
+        public Cell apply(Cell insert)
         {
-            this.dataSize = 0;
-            this.heapSize = 0;
-            discarded.clear();
+            indexer.insert(insert);
+            insert = insert.localCopy(metadata, allocator, writeOp);
+            this.dataSize += insert.cellDataSize();
+            this.heapSize += insert.excessHeapSizeExcludingData();
+            if (inserted == null)
+                inserted = new ArrayList<>();
+            inserted.add(insert);
+            return insert;
         }
 
-        protected void addHeapSize(long heapSize)
+        public Cell apply(Cell existing, Cell update)
         {
-            this.heapSize += heapSize;
+            Cell reconciled = existing.reconcile(update);
+            indexer.update(existing, reconciled);
+            if (existing != reconciled)
+            {
+                reconciled = reconciled.localCopy(metadata, allocator, writeOp);
+                dataSize += reconciled.cellDataSize() - existing.cellDataSize();
+                heapSize += reconciled.excessHeapSizeExcludingData() - existing.excessHeapSizeExcludingData();
+                if (inserted == null)
+                    inserted = new ArrayList<>();
+                inserted.add(reconciled);
+                discard(existing);
+            }
+            return reconciled;
         }
 
-        protected void swap(Cell old, Cell updated)
+        protected void reset()
         {
-            dataSize += updated.dataSize() - old.dataSize();
-            heapSize += updated.excessHeapSizeExcludingData() - old.excessHeapSizeExcludingData();
-            discarded.add(old);
+            this.dataSize = 0;
+            this.heapSize = 0;
+            if (inserted != null)
+            {
+                for (Cell cell : inserted)
+                    abort(cell);
+                inserted.clear();
+            }
+            reclaimer.cancel();
         }
 
-        protected void insert(Cell insert)
+        protected void abort(Cell abort)
         {
-            this.dataSize += insert.dataSize();
-            this.heapSize += insert.excessHeapSizeExcludingData();
+            reclaimer.reclaimImmediately(abort);
         }
 
-        private void abort(Cell neverUsed)
+        protected void discard(Cell discard)
         {
-            if (aborted == null)
-                aborted = new ArrayList<>();
-            aborted.add(neverUsed);
+            reclaimer.reclaim(discard);
         }
 
-        public long dataSize()
+        public boolean abortEarly()
         {
-            return dataSize;
+            return updating.ref != ref;
         }
 
-        public long excessHeapSize()
+        public void allocated(long heapSize)
         {
-            return heapSize;
+            this.heapSize += heapSize;
         }
 
-        public Iterable<Cell> reclaimed()
+        protected void finish()
         {
-            if (aborted == null)
-                return discarded;
-            return Iterables.concat(discarded, aborted);
+            allocator.onHeap().allocate(heapSize, writeOp);
+            reclaimer.commit();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/BufferCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BufferCell.java b/src/java/org/apache/cassandra/db/BufferCell.java
new file mode 100644
index 0000000..93251c8
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/BufferCell.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNames;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
+
+public class BufferCell extends AbstractCell
+{
+    private static final long EMPTY_SIZE = ObjectSizes.measure(new BufferCell(CellNames.simpleDense(ByteBuffer.allocate(1))));
+
+    protected final CellName name;
+    protected final ByteBuffer value;
+    protected final long timestamp;
+
+    BufferCell(CellName name)
+    {
+        this(name, ByteBufferUtil.EMPTY_BYTE_BUFFER);
+    }
+
+    public BufferCell(CellName name, ByteBuffer value)
+    {
+        this(name, value, 0);
+    }
+
+    public BufferCell(CellName name, ByteBuffer value, long timestamp)
+    {
+        assert name != null;
+        assert value != null;
+
+        this.name = name;
+        this.value = value;
+        this.timestamp = timestamp;
+    }
+
+    @Override
+    public Cell withUpdatedName(CellName newName)
+    {
+        return new BufferCell(newName, value, timestamp);
+    }
+
+    @Override
+    public Cell withUpdatedTimestamp(long newTimestamp)
+    {
+        return new BufferCell(name, value, newTimestamp);
+    }
+
+    @Override
+    public CellName name() {
+        return name;
+    }
+
+    @Override
+    public ByteBuffer value() {
+        return value;
+    }
+
+    @Override
+    public long timestamp() {
+        return timestamp;
+    }
+
+    @Override
+    public long excessHeapSizeExcludingData()
+    {
+        return EMPTY_SIZE + name.excessHeapSizeExcludingData() + ObjectSizes.sizeOnHeapExcludingData(value);
+    }
+
+    @Override
+    public Cell localCopy(CFMetaData metadata, AbstractAllocator allocator)
+    {
+        return new BufferCell(name.copy(metadata, allocator), allocator.clone(value), timestamp);
+    }
+
+    @Override
+    public Cell localCopy(CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group opGroup)
+    {
+        return allocator.clone(this, metadata, opGroup);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/BufferCounterCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BufferCounterCell.java b/src/java/org/apache/cassandra/db/BufferCounterCell.java
new file mode 100644
index 0000000..a70e274
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/BufferCounterCell.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
+
+public class BufferCounterCell extends BufferCell implements CounterCell
+{
+    private final long timestampOfLastDelete;
+
+    public BufferCounterCell(CellName name, ByteBuffer value, long timestamp)
+    {
+        this(name, value, timestamp, Long.MIN_VALUE);
+    }
+
+    public BufferCounterCell(CellName name, ByteBuffer value, long timestamp, long timestampOfLastDelete)
+    {
+        super(name, value, timestamp);
+        this.timestampOfLastDelete = timestampOfLastDelete;
+    }
+
+    public static CounterCell create(CellName name, ByteBuffer value, long timestamp, long timestampOfLastDelete, ColumnSerializer.Flag flag)
+    {
+        if (flag == ColumnSerializer.Flag.FROM_REMOTE || (flag == ColumnSerializer.Flag.LOCAL && contextManager.shouldClearLocal(value)))
+            value = contextManager.clearAllLocal(value);
+        return new BufferCounterCell(name, value, timestamp, timestampOfLastDelete);
+    }
+
+    // For use by tests of compatibility with pre-2.1 counter only.
+    public static CounterCell createLocal(CellName name, long value, long timestamp, long timestampOfLastDelete)
+    {
+        return new BufferCounterCell(name, contextManager.createLocal(value), timestamp, timestampOfLastDelete);
+    }
+
+    @Override
+    public Cell withUpdatedName(CellName newName)
+    {
+        return new BufferCounterCell(newName, value, timestamp, timestampOfLastDelete);
+    }
+
+    @Override
+    public long timestampOfLastDelete()
+    {
+        return timestampOfLastDelete;
+    }
+
+    @Override
+    public long total()
+    {
+        return contextManager.total(value);
+    }
+
+    @Override
+    public int cellDataSize()
+    {
+        // A counter column adds 8 bytes for timestampOfLastDelete to Cell.
+        return super.cellDataSize() + TypeSizes.NATIVE.sizeof(timestampOfLastDelete);
+    }
+
+    @Override
+    public int serializedSize(CellNameType type, TypeSizes typeSizes)
+    {
+        return super.serializedSize(type, typeSizes) + typeSizes.sizeof(timestampOfLastDelete);
+    }
+
+    @Override
+    public Cell diff(Cell cell)
+    {
+        return diff(this, cell);
+    }
+
+    /*
+     * We have to special case digest creation for counter column because
+     * we don't want to include the information about which shard of the
+     * context is a delta or not, since this information differs from node to
+     * node.
+     */
+    @Override
+    public void updateDigest(MessageDigest digest)
+    {
+        digest.update(name().toByteBuffer().duplicate());
+        // We don't take the deltas into account in a digest
+        contextManager.updateDigest(digest, value());
+
+        FBUtilities.updateWithLong(digest, timestamp);
+        FBUtilities.updateWithByte(digest, serializationFlags());
+        FBUtilities.updateWithLong(digest, timestampOfLastDelete);
+    }
+
+    @Override
+    public Cell reconcile(Cell cell)
+    {
+        return reconcile(this, cell);
+    }
+
+    @Override
+    public boolean hasLegacyShards()
+    {
+        return contextManager.hasLegacyShards(value);
+    }
+
+    @Override
+    public CounterCell localCopy(CFMetaData metadata, AbstractAllocator allocator)
+    {
+        return new BufferCounterCell(name.copy(metadata, allocator), allocator.clone(value), timestamp, timestampOfLastDelete);
+    }
+
+    @Override
+    public CounterCell localCopy(CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group opGroup)
+    {
+        return allocator.clone(this, metadata, opGroup);
+    }
+
+    @Override
+    public String getString(CellNameType comparator)
+    {
+        return String.format("%s:false:%s@%d!%d",
+                             comparator.getString(name()),
+                             contextManager.toString(value()),
+                             timestamp(),
+                             timestampOfLastDelete);
+    }
+
+    @Override
+    public int serializationFlags()
+    {
+        return ColumnSerializer.COUNTER_MASK;
+    }
+
+    @Override
+    public void validateFields(CFMetaData metadata) throws MarshalException
+    {
+        validateName(metadata);
+        // We cannot use the value validator as for other columns as the CounterColumnType validate a long,
+        // which is not the internal representation of counters
+        contextManager.validateContext(value());
+    }
+
+    @Override
+    public Cell markLocalToBeCleared()
+    {
+        ByteBuffer marked = contextManager.markLocalToBeCleared(value());
+        return marked == value() ? this : new BufferCounterCell(name(), marked, timestamp(), timestampOfLastDelete);
+    }
+
+    @Override
+    public boolean equals(Cell cell)
+    {
+        return cell instanceof CounterCell && equals((CounterCell) cell);
+    }
+
+    public boolean equals(CounterCell cell)
+    {
+        return super.equals(cell) && timestampOfLastDelete == cell.timestampOfLastDelete();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/BufferCounterUpdateCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BufferCounterUpdateCell.java b/src/java/org/apache/cassandra/db/BufferCounterUpdateCell.java
new file mode 100644
index 0000000..44ab83e
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/BufferCounterUpdateCell.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
+
+public class BufferCounterUpdateCell extends BufferCell implements CounterUpdateCell
+{
+    public BufferCounterUpdateCell(CellName name, long value, long timestamp)
+    {
+        this(name, ByteBufferUtil.bytes(value), timestamp);
+    }
+
+    public BufferCounterUpdateCell(CellName name, ByteBuffer value, long timestamp)
+    {
+        super(name, value, timestamp);
+    }
+
+    public long delta()
+    {
+        return value().getLong(value.position());
+    }
+
+    @Override
+    public Cell diff(Cell cell)
+    {
+        // Diff is used during reads, but we should never read those columns
+        throw new UnsupportedOperationException("This operation is unsupported on CounterUpdateCell.");
+    }
+
+    @Override
+    public Cell reconcile(Cell cell)
+    {
+        // The only time this could happen is if a batchAdd ships two
+        // increment for the same cell. Hence we simply sums the delta.
+
+        // tombstones take precedence
+        if (cell.isMarkedForDelete(Long.MIN_VALUE)) // can't be an expired cell, so the current time is irrelevant
+            return timestamp > cell.timestamp() ? this : cell;
+
+        // neither is tombstoned
+        assert cell instanceof CounterUpdateCell : "Wrong class type.";
+        CounterUpdateCell c = (CounterUpdateCell) cell;
+        return new BufferCounterUpdateCell(name, delta() + c.delta(), Math.max(timestamp, c.timestamp()));
+    }
+
+    @Override
+    public int serializationFlags()
+    {
+        return ColumnSerializer.COUNTER_UPDATE_MASK;
+    }
+
+    @Override
+    public Cell localCopy(CFMetaData metadata, AbstractAllocator allocator)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Cell localCopy(CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group opGroup)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String getString(CellNameType comparator)
+    {
+        return String.format("%s:%s@%d", comparator.getString(name()), ByteBufferUtil.toLong(value), timestamp());
+    }
+}