You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/04/29 14:05:08 UTC

[6/7] Push more of memtable data off-heap

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/BufferDecoratedKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BufferDecoratedKey.java b/src/java/org/apache/cassandra/db/BufferDecoratedKey.java
new file mode 100644
index 0000000..d375162
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/BufferDecoratedKey.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.dht.Token;
+
+public class BufferDecoratedKey extends DecoratedKey
+{
+    private final ByteBuffer key;
+
+    public BufferDecoratedKey(Token token, ByteBuffer key)
+    {
+        super(token);
+        assert key != null;
+        this.key = key;
+    }
+
+    public ByteBuffer getKey()
+    {
+        return key;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/BufferDeletedCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BufferDeletedCell.java b/src/java/org/apache/cassandra/db/BufferDeletedCell.java
new file mode 100644
index 0000000..a6518de
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/BufferDeletedCell.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
+
+public class BufferDeletedCell extends BufferCell implements DeletedCell
+{
+    public BufferDeletedCell(CellName name, int localDeletionTime, long timestamp)
+    {
+        this(name, ByteBufferUtil.bytes(localDeletionTime), timestamp);
+    }
+
+    public BufferDeletedCell(CellName name, ByteBuffer value, long timestamp)
+    {
+        super(name, value, timestamp);
+    }
+
+    @Override
+    public Cell withUpdatedName(CellName newName)
+    {
+        return new BufferDeletedCell(newName, value, timestamp);
+    }
+
+    @Override
+    public Cell withUpdatedTimestamp(long newTimestamp)
+    {
+        return new BufferDeletedCell(name, value, newTimestamp);
+    }
+
+    @Override
+    public boolean isMarkedForDelete(long now)
+    {
+        return true;
+    }
+
+    @Override
+    public long getMarkedForDeleteAt()
+    {
+        return timestamp;
+    }
+
+    @Override
+    public int getLocalDeletionTime()
+    {
+       return value().getInt(value.position());
+    }
+
+    @Override
+    public Cell reconcile(Cell cell)
+    {
+        if (cell instanceof DeletedCell)
+            return super.reconcile(cell);
+        return cell.reconcile(this);
+    }
+
+    @Override
+    public DeletedCell localCopy(CFMetaData metadata, AbstractAllocator allocator)
+    {
+        return new BufferDeletedCell(name.copy(metadata, allocator), allocator.clone(value), timestamp);
+    }
+
+    @Override
+    public DeletedCell localCopy(CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group opGroup)
+    {
+        return allocator.clone(this, metadata, opGroup);
+    }
+
+    @Override
+    public int serializationFlags()
+    {
+        return ColumnSerializer.DELETION_MASK;
+    }
+
+    @Override
+    public void validateFields(CFMetaData metadata) throws MarshalException
+    {
+        validateName(metadata);
+        if (value().remaining() != 4)
+            throw new MarshalException("A tombstone value should be 4 bytes long");
+        if (getLocalDeletionTime() < 0)
+            throw new MarshalException("The local deletion time should not be negative");
+    }
+
+    public boolean equals(Cell cell)
+    {
+        return timestamp() == cell.timestamp() && getLocalDeletionTime() == cell.getLocalDeletionTime() && name().equals(cell.name());
+    }
+
+    @Override
+    public void updateDigest(MessageDigest digest)
+    {
+        digest.update(name().toByteBuffer().duplicate());
+
+        FBUtilities.updateWithLong(digest, timestamp());
+        FBUtilities.updateWithByte(digest, serializationFlags());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/BufferExpiringCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BufferExpiringCell.java b/src/java/org/apache/cassandra/db/BufferExpiringCell.java
new file mode 100644
index 0000000..95ed45a
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/BufferExpiringCell.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
+
+public class BufferExpiringCell extends BufferCell implements ExpiringCell
+{
+    private final int localExpirationTime;
+    private final int timeToLive;
+
+    public BufferExpiringCell(CellName name, ByteBuffer value, long timestamp, int timeToLive)
+    {
+        this(name, value, timestamp, timeToLive, (int) (System.currentTimeMillis() / 1000) + timeToLive);
+    }
+
+    public BufferExpiringCell(CellName name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime)
+    {
+        super(name, value, timestamp);
+        assert timeToLive > 0 : timeToLive;
+        assert localExpirationTime > 0 : localExpirationTime;
+        this.timeToLive = timeToLive;
+        this.localExpirationTime = localExpirationTime;
+    }
+
+    public int getTimeToLive()
+    {
+        return timeToLive;
+    }
+
+    @Override
+    public Cell withUpdatedName(CellName newName)
+    {
+        return new BufferExpiringCell(newName, value(), timestamp(), timeToLive, localExpirationTime);
+    }
+
+    @Override
+    public Cell withUpdatedTimestamp(long newTimestamp)
+    {
+        return new BufferExpiringCell(name(), value(), newTimestamp, timeToLive, localExpirationTime);
+    }
+
+    @Override
+    public int cellDataSize()
+    {
+        return super.cellDataSize() + TypeSizes.NATIVE.sizeof(localExpirationTime) + TypeSizes.NATIVE.sizeof(timeToLive);
+    }
+
+    @Override
+    public int serializedSize(CellNameType type, TypeSizes typeSizes)
+    {
+        /*
+         * An expired column adds to a Cell :
+         *    4 bytes for the localExpirationTime
+         *  + 4 bytes for the timeToLive
+        */
+        return super.serializedSize(type, typeSizes) + typeSizes.sizeof(localExpirationTime) + typeSizes.sizeof(timeToLive);
+    }
+
+    @Override
+    public void updateDigest(MessageDigest digest)
+    {
+        super.updateDigest(digest);
+        FBUtilities.updateWithInt(digest, timeToLive);
+    }
+
+    @Override
+    public int getLocalDeletionTime()
+    {
+        return localExpirationTime;
+    }
+
+    @Override
+    public ExpiringCell localCopy(CFMetaData metadata, AbstractAllocator allocator)
+    {
+        return new BufferExpiringCell(name.copy(metadata, allocator), allocator.clone(value), timestamp, timeToLive, localExpirationTime);
+    }
+
+    @Override
+    public ExpiringCell localCopy(CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group opGroup)
+    {
+        return allocator.clone(this, metadata, opGroup);
+    }
+
+    @Override
+    public String getString(CellNameType comparator)
+    {
+        return String.format("%s!%d", super.getString(comparator), timeToLive);
+    }
+
+    @Override
+    public boolean isMarkedForDelete(long now)
+    {
+        return (int) (now / 1000) >= getLocalDeletionTime();
+    }
+
+    @Override
+    public long getMarkedForDeleteAt()
+    {
+        return timestamp;
+    }
+
+    @Override
+    public int serializationFlags()
+    {
+        return ColumnSerializer.EXPIRATION_MASK;
+    }
+
+    @Override
+    public void validateFields(CFMetaData metadata) throws MarshalException
+    {
+        super.validateFields(metadata);
+
+        if (timeToLive <= 0)
+            throw new MarshalException("A column TTL should be > 0");
+        if (localExpirationTime < 0)
+            throw new MarshalException("The local expiration time should not be negative");
+    }
+
+    @Override
+    public boolean equals(Cell cell)
+    {
+        return cell instanceof ExpiringCell && equals((ExpiringCell) cell);
+    }
+
+    public boolean equals(ExpiringCell cell)
+    {
+        // super.equals() returns false if o is not a CounterCell
+        return super.equals(cell)
+               && getLocalDeletionTime() == cell.getLocalDeletionTime()
+               && getTimeToLive() == cell.getTimeToLive();
+    }
+
+    /** @return Either a DeletedCell, or an ExpiringCell. */
+    public static Cell create(CellName name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime, int expireBefore, ColumnSerializer.Flag flag)
+    {
+        if (localExpirationTime >= expireBefore || flag == ColumnSerializer.Flag.PRESERVE_SIZE)
+            return new BufferExpiringCell(name, value, timestamp, timeToLive, localExpirationTime);
+        // The column is now expired, we can safely return a simple tombstone. Note that
+        // as long as the expiring column and the tombstone put together live longer than GC grace seconds,
+        // we'll fulfil our responsibility to repair.  See discussion at
+        // http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/repair-compaction-and-tombstone-rows-td7583481.html
+        return new BufferDeletedCell(name, localExpirationTime - timeToLive, timestamp);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/CFRowAdder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CFRowAdder.java b/src/java/org/apache/cassandra/db/CFRowAdder.java
index f853d17..dfe49ee 100644
--- a/src/java/org/apache/cassandra/db/CFRowAdder.java
+++ b/src/java/org/apache/cassandra/db/CFRowAdder.java
@@ -52,7 +52,7 @@ public class CFRowAdder
 
         // If a CQL3 table, add the row marker
         if (cf.metadata().isCQL3Table() && !prefix.isStatic())
-            cf.addColumn(new Cell(cf.getComparator().rowMarker(prefix), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp));
+            cf.addColumn(new BufferCell(cf.getComparator().rowMarker(prefix), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp));
     }
 
     public CFRowAdder add(String cql3ColumnName, Object value)
@@ -96,14 +96,14 @@ public class CFRowAdder
     {
         if (value == null)
         {
-            cf.addColumn(new DeletedCell(name, ldt, timestamp));
+            cf.addColumn(new BufferDeletedCell(name, ldt, timestamp));
         }
         else
         {
             AbstractType valueType = def.type.isCollection()
                                    ? ((CollectionType) def.type).valueComparator()
                                    : def.type;
-            cf.addColumn(new Cell(name, value instanceof ByteBuffer ? (ByteBuffer)value : valueType.decompose(value), timestamp));
+            cf.addColumn(new BufferCell(name, value instanceof ByteBuffer ? (ByteBuffer)value : valueType.decompose(value), timestamp));
         }
         return this;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/Cell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Cell.java b/src/java/org/apache/cassandra/db/Cell.java
index 8db9770..c19b5dd 100644
--- a/src/java/org/apache/cassandra/db/Cell.java
+++ b/src/java/org/apache/cassandra/db/Cell.java
@@ -17,261 +17,58 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.DataInput;
-import java.io.IOError;
-import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import java.util.Iterator;
-
-import com.google.common.collect.AbstractIterator;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.CellNames;
-import org.apache.cassandra.db.marshal.*;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
 
 /**
  * Cell is immutable, which prevents all kinds of confusion in a multithreaded environment.
  */
-public class Cell implements OnDiskAtom
+public interface Cell extends OnDiskAtom
 {
     public static final int MAX_NAME_LENGTH = FBUtilities.MAX_UNSIGNED_SHORT;
 
-    private static final long EMPTY_SIZE = ObjectSizes.measure(new Cell(CellNames.simpleDense(ByteBuffer.allocate(1))));
-
-    public static Iterator<OnDiskAtom> onDiskIterator(final DataInput in,
-                                                      final ColumnSerializer.Flag flag,
-                                                      final int expireBefore,
-                                                      final Descriptor.Version version,
-                                                      final CellNameType type)
-    {
-        return new AbstractIterator<OnDiskAtom>()
-        {
-            protected OnDiskAtom computeNext()
-            {
-                OnDiskAtom atom;
-                try
-                {
-                    atom = type.onDiskAtomSerializer().deserializeFromSSTable(in, flag, expireBefore, version);
-                }
-                catch (IOException e)
-                {
-                    throw new IOError(e);
-                }
-                if (atom == null)
-                    return endOfData();
-
-                return atom;
-            }
-        };
-    }
-
-    protected final CellName name;
-    protected final ByteBuffer value;
-    protected final long timestamp;
-
-    Cell(CellName name)
-    {
-        this(name, ByteBufferUtil.EMPTY_BYTE_BUFFER);
-    }
-
-    public Cell(CellName name, ByteBuffer value)
-    {
-        this(name, value, 0);
-    }
+    public Cell withUpdatedName(CellName newName);
 
-    public Cell(CellName name, ByteBuffer value, long timestamp)
-    {
-        assert name != null;
-        assert value != null;
-        this.name = name;
-        this.value = value;
-        this.timestamp = timestamp;
-    }
+    public Cell withUpdatedTimestamp(long newTimestamp);
 
-    public Cell withUpdatedName(CellName newName)
-    {
-        return new Cell(newName, value, timestamp);
-    }
-
-    public Cell withUpdatedTimestamp(long newTimestamp)
-    {
-        return new Cell(name, value, newTimestamp);
-    }
-
-    public CellName name()
-    {
-        return name;
-    }
-
-    public ByteBuffer value()
-    {
-        return value;
-    }
+    @Override
+    public CellName name();
 
-    public long timestamp()
-    {
-        return timestamp;
-    }
+    public ByteBuffer value();
 
-    public boolean isMarkedForDelete(long now)
-    {
-        return false;
-    }
+    public boolean isMarkedForDelete(long now);
 
-    public boolean isLive(long now)
-    {
-        return !isMarkedForDelete(now);
-    }
+    public boolean isLive(long now);
 
     // Don't call unless the column is actually marked for delete.
-    public long getMarkedForDeleteAt()
-    {
-        return Long.MAX_VALUE;
-    }
+    public long getMarkedForDeleteAt();
 
-    public int dataSize()
-    {
-        return name.dataSize() + value.remaining() + TypeSizes.NATIVE.sizeof(timestamp);
-    }
+    public int cellDataSize();
 
     // returns the size of the Cell and all references on the heap, excluding any costs associated with byte arrays
     // that would be allocated by a localCopy, as these will be accounted for by the allocator
-    public long excessHeapSizeExcludingData()
-    {
-        return EMPTY_SIZE + name.excessHeapSizeExcludingData() + ObjectSizes.sizeOnHeapExcludingData(value);
-    }
-
-    public int serializedSize(CellNameType type, TypeSizes typeSizes)
-    {
-        /*
-         * Size of a column is =
-         *   size of a name (short + length of the string)
-         * + 1 byte to indicate if the column has been deleted
-         * + 8 bytes for timestamp
-         * + 4 bytes which basically indicates the size of the byte array
-         * + entire byte array.
-        */
-        int valueSize = value.remaining();
-        return ((int)type.cellSerializer().serializedSize(name, typeSizes)) + 1 + typeSizes.sizeof(timestamp) + typeSizes.sizeof(valueSize) + valueSize;
-    }
-
-    public int serializationFlags()
-    {
-        return 0;
-    }
-
-    public Cell diff(Cell cell)
-    {
-        if (timestamp() < cell.timestamp())
-            return cell;
-        return null;
-    }
-
-    public void updateDigest(MessageDigest digest)
-    {
-        digest.update(name.toByteBuffer().duplicate());
-        digest.update(value.duplicate());
-
-        DataOutputBuffer buffer = new DataOutputBuffer();
-        try
-        {
-            buffer.writeLong(timestamp);
-            buffer.writeByte(serializationFlags());
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-        digest.update(buffer.getData(), 0, buffer.getLength());
-    }
-
-    public int getLocalDeletionTime()
-    {
-        return Integer.MAX_VALUE;
-    }
-
-    public Cell reconcile(Cell cell)
-    {
-        // tombstones take precedence.  (if both are tombstones, then it doesn't matter which one we use.)
-        if (isMarkedForDelete(System.currentTimeMillis()))
-            return timestamp() < cell.timestamp() ? cell : this;
-        if (cell.isMarkedForDelete(System.currentTimeMillis()))
-            return timestamp() > cell.timestamp() ? this : cell;
-        // break ties by comparing values.
-        if (timestamp() == cell.timestamp())
-            return value().compareTo(cell.value()) < 0 ? cell : this;
-        // neither is tombstoned and timestamps are different
-        return timestamp() < cell.timestamp() ? cell : this;
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if (this == o)
-            return true;
-
-        if (o == null || getClass() != o.getClass())
-            return false;
-
-        Cell cell = (Cell)o;
-
-        return timestamp == cell.timestamp && name.equals(cell.name) && value.equals(cell.value);
-    }
-
-    @Override
-    public int hashCode()
-    {
-        int result = name != null ? name.hashCode() : 0;
-        result = 31 * result + (value != null ? value.hashCode() : 0);
-        result = 31 * result + (int)(timestamp ^ (timestamp >>> 32));
-        return result;
-    }
+    public long excessHeapSizeExcludingData();
 
-    public Cell localCopy(AbstractAllocator allocator)
-    {
-        return new Cell(name.copy(allocator), allocator.clone(value), timestamp);
-    }
+    public int serializedSize(CellNameType type, TypeSizes typeSizes);
+    public int serializationFlags();
 
-    public String getString(CellNameType comparator)
-    {
-        return String.format("%s:%b:%d@%d",
-                             comparator.getString(name),
-                             isMarkedForDelete(System.currentTimeMillis()),
-                             value.remaining(),
-                             timestamp);
-    }
+    public Cell diff(Cell cell);
 
-    protected void validateName(CFMetaData metadata) throws MarshalException
-    {
-        metadata.comparator.validate(name());
-    }
+    public Cell reconcile(Cell cell);
 
-    public void validateFields(CFMetaData metadata) throws MarshalException
-    {
-        validateName(metadata);
+    public Cell localCopy(CFMetaData metadata, AbstractAllocator allocator);
 
-        AbstractType<?> valueValidator = metadata.getValueValidator(name());
-        if (valueValidator != null)
-            valueValidator.validate(value());
-    }
+    public Cell localCopy(CFMetaData metaData, MemtableAllocator allocator, OpOrder.Group opGroup);
 
-    public static Cell create(CellName name, ByteBuffer value, long timestamp, int ttl, CFMetaData metadata)
-    {
-        if (ttl <= 0)
-            ttl = metadata.getDefaultTimeToLive();
+    public String getString(CellNameType comparator);
 
-        return ttl > 0
-               ? new ExpiringCell(name, value, timestamp, ttl)
-               : new Cell(name, value, timestamp);
-    }
+    void validateName(CFMetaData metadata) throws MarshalException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java
index 36a9ebf..f88c1a7 100644
--- a/src/java/org/apache/cassandra/db/CollationController.java
+++ b/src/java/org/apache/cassandra/db/CollationController.java
@@ -86,7 +86,7 @@ public class CollationController
                     {
                         OnDiskAtom atom = iter.next();
                         if (copyOnHeap)
-                            atom = ((Cell) atom).localCopy(HeapAllocator.instance);
+                            atom = ((Cell) atom).localCopy(cfs.metadata, HeapAllocator.instance);
                         container.addAtom(atom);
                     }
                 }
@@ -147,7 +147,7 @@ public class CollationController
                 && cfs.getCompactionStrategy() instanceof SizeTieredCompactionStrategy)
             {
                 Tracing.trace("Defragmenting requested data");
-                Mutation mutation = new Mutation(cfs.keyspace.getName(), filter.key.key, returnCF.cloneMe());
+                Mutation mutation = new Mutation(cfs.keyspace.getName(), filter.key.getKey(), returnCF.cloneMe());
                 // skipping commitlog and index updates is fine since we're just de-fragmenting existing data
                 Keyspace.open(mutation.getKeyspaceName()).apply(mutation, false, false);
             }
@@ -204,7 +204,7 @@ public class CollationController
                         ColumnFamily newCf = cf.cloneMeShallow(ArrayBackedSortedColumns.factory, false);
                         for (Cell cell : cf)
                         {
-                            newCf.addColumn(cell.localCopy(HeapAllocator.instance));
+                            newCf.addColumn(cell.localCopy(cfs.metadata, HeapAllocator.instance));
                         }
                         cf = newCf;
                         iter = filter.getColumnFamilyIterator(cf);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/ColumnFamily.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java
index 4f85610..a261d73 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamily.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamily.java
@@ -128,23 +128,23 @@ public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry
     public void addColumn(CellName name, ByteBuffer value, long timestamp, int timeToLive)
     {
         assert !metadata().isCounter();
-        Cell cell = Cell.create(name, value, timestamp, timeToLive, metadata());
+        Cell cell = AbstractCell.create(name, value, timestamp, timeToLive, metadata());
         addColumn(cell);
     }
 
     public void addCounter(CellName name, long value)
     {
-        addColumn(new CounterUpdateCell(name, value, System.currentTimeMillis()));
+        addColumn(new BufferCounterUpdateCell(name, value, System.currentTimeMillis()));
     }
 
     public void addTombstone(CellName name, ByteBuffer localDeletionTime, long timestamp)
     {
-        addColumn(new DeletedCell(name, localDeletionTime, timestamp));
+        addColumn(new BufferDeletedCell(name, localDeletionTime, timestamp));
     }
 
     public void addTombstone(CellName name, int localDeletionTime, long timestamp)
     {
-        addColumn(new DeletedCell(name, localDeletionTime, timestamp));
+        addColumn(new BufferDeletedCell(name, localDeletionTime, timestamp));
     }
 
     public void addAtom(OnDiskAtom atom)
@@ -325,7 +325,7 @@ public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry
     {
         long size = 0;
         for (Cell cell : this)
-            size += cell.dataSize();
+            size += cell.cellDataSize();
         return size;
     }
 
@@ -424,8 +424,8 @@ public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry
             int deletionTime = cell.getLocalDeletionTime();
             if (deletionTime < Integer.MAX_VALUE)
                 tombstones.update(deletionTime);
-            minColumnNamesSeen = ColumnNameHelper.minComponents(minColumnNamesSeen, cell.name, metadata.comparator);
-            maxColumnNamesSeen = ColumnNameHelper.maxComponents(maxColumnNamesSeen, cell.name, metadata.comparator);
+            minColumnNamesSeen = ColumnNameHelper.minComponents(minColumnNamesSeen, cell.name(), metadata.comparator);
+            maxColumnNamesSeen = ColumnNameHelper.maxComponents(maxColumnNamesSeen, cell.name(), metadata.comparator);
             if (cell instanceof CounterCell)
                 hasLegacyCounterShards = hasLegacyCounterShards || ((CounterCell) cell).hasLegacyShards();
         }
@@ -474,7 +474,7 @@ public abstract class ColumnFamily implements Iterable<Cell>, IRowCacheEntry
     {
         ImmutableMap.Builder<CellName, ByteBuffer> builder = ImmutableMap.builder();
         for (Cell cell : this)
-            builder.put(cell.name, cell.value);
+            builder.put(cell.name(), cell.value());
         return builder.build();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 6559b40..3b1c67e 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -75,6 +75,7 @@ import org.apache.cassandra.streaming.StreamLockfile;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
 
 public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 {
@@ -843,12 +844,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
         for (SecondaryIndex index : indexManager.getIndexes())
         {
-            if (index.getAllocator() != null)
+            if (index.getIndexCfs() != null)
             {
-                onHeapRatio += index.getAllocator().onHeap().ownershipRatio();
-                offHeapRatio += index.getAllocator().offHeap().ownershipRatio();
-                onHeapTotal += index.getAllocator().onHeap().owns();
-                offHeapTotal += index.getAllocator().offHeap().owns();
+                MemtableAllocator allocator = index.getIndexCfs().getDataTracker().getView().getCurrentMemtable().getAllocator();
+                onHeapRatio += allocator.onHeap().ownershipRatio();
+                offHeapRatio += allocator.offHeap().ownershipRatio();
+                onHeapTotal += allocator.onHeap().owns();
+                offHeapTotal += allocator.offHeap().owns();
             }
         }
 
@@ -1095,10 +1097,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
                 for (SecondaryIndex index : cfs.indexManager.getIndexes())
                 {
-                    if (index.getAllocator() != null)
+                    if (index.getIndexCfs() != null)
                     {
-                        onHeap += index.getAllocator().onHeap().ownershipRatio();
-                        offHeap += index.getAllocator().offHeap().ownershipRatio();
+                        MemtableAllocator allocator = index.getIndexCfs().getDataTracker().getView().getCurrentMemtable().getAllocator();
+                        onHeap += allocator.onHeap().ownershipRatio();
+                        offHeap += allocator.offHeap().ownershipRatio();
                     }
                 }
 
@@ -1213,7 +1216,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     // 2. if it has been re-added since then, this particular column was inserted before the last drop
     private static boolean isDroppedColumn(Cell c, CFMetaData meta)
     {
-        Long droppedAt = meta.getDroppedColumns().get(c.name().cql3ColumnName());
+        Long droppedAt = meta.getDroppedColumns().get(c.name().cql3ColumnName(meta));
         return droppedAt != null && c.timestamp() <= droppedAt;
     }
 
@@ -1869,7 +1872,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         ColumnFamily columns;
         try (OpOrder.Group op = readOrdering.start())
         {
-            columns = controller.getTopLevelColumns(Memtable.memoryPool.needToCopyOnHeap());
+            columns = controller.getTopLevelColumns(Memtable.MEMORY_POOL.needToCopyOnHeap());
         }
         metric.updateSSTableIterated(controller.getSstablesIterated());
         return columns;
@@ -1882,7 +1885,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         for (RowCacheKey key : CacheService.instance.rowCache.getKeySet())
         {
             DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.key));
-            if (key.cfId == metadata.cfId && !Range.isInRanges(dk.token, ranges))
+            if (key.cfId == metadata.cfId && !Range.isInRanges(dk.getToken(), ranges))
                 invalidateCachedRow(dk);
         }
 
@@ -1891,7 +1894,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             for (CounterCacheKey key : CacheService.instance.counterCache.getKeySet())
             {
                 DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.partitionKey));
-                if (key.cfId == metadata.cfId && !Range.isInRanges(dk.token, ranges))
+                if (key.cfId == metadata.cfId && !Range.isInRanges(dk.getToken(), ranges))
                     CacheService.instance.counterCache.remove(key);
             }
         }
@@ -1939,7 +1942,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                     return computeNext();
 
                 if (logger.isTraceEnabled())
-                    logger.trace("scanned {}", metadata.getKeyValidator().getString(key.key));
+                    logger.trace("scanned {}", metadata.getKeyValidator().getString(key.getKey()));
 
                 return current;
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/ColumnSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnSerializer.java b/src/java/org/apache/cassandra/db/ColumnSerializer.java
index 8c22d71..8e7026c 100644
--- a/src/java/org/apache/cassandra/db/ColumnSerializer.java
+++ b/src/java/org/apache/cassandra/db/ColumnSerializer.java
@@ -116,7 +116,7 @@ public class ColumnSerializer implements ISerializer<Cell>
             long timestampOfLastDelete = in.readLong();
             long ts = in.readLong();
             ByteBuffer value = ByteBufferUtil.readWithLength(in);
-            return CounterCell.create(name, value, ts, timestampOfLastDelete, flag);
+            return BufferCounterCell.create(name, value, ts, timestampOfLastDelete, flag);
         }
         else if ((mask & EXPIRATION_MASK) != 0)
         {
@@ -124,17 +124,17 @@ public class ColumnSerializer implements ISerializer<Cell>
             int expiration = in.readInt();
             long ts = in.readLong();
             ByteBuffer value = ByteBufferUtil.readWithLength(in);
-            return ExpiringCell.create(name, value, ts, ttl, expiration, expireBefore, flag);
+            return BufferExpiringCell.create(name, value, ts, ttl, expiration, expireBefore, flag);
         }
         else
         {
             long ts = in.readLong();
             ByteBuffer value = ByteBufferUtil.readWithLength(in);
             return (mask & COUNTER_UPDATE_MASK) != 0
-                   ? new CounterUpdateCell(name, value, ts)
+                   ? new BufferCounterUpdateCell(name, value, ts)
                    : ((mask & DELETION_MASK) == 0
-                      ? new Cell(name, value, ts)
-                      : new DeletedCell(name, value, ts));
+                      ? new BufferCell(name, value, ts)
+                      : new BufferDeletedCell(name, value, ts));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/CounterCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterCell.java b/src/java/org/apache/cassandra/db/CounterCell.java
index fc4ac3f..cda1200 100644
--- a/src/java/org/apache/cassandra/db/CounterCell.java
+++ b/src/java/org/apache/cassandra/db/CounterCell.java
@@ -17,223 +17,28 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.db.context.CounterContext;
-import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
 
 /**
  * A column that represents a partitioned counter.
  */
-public class CounterCell extends Cell
+public interface CounterCell extends Cell
 {
-    protected static final CounterContext contextManager = CounterContext.instance();
-
-    private final long timestampOfLastDelete;
-
-    public CounterCell(CellName name, ByteBuffer value, long timestamp)
-    {
-        this(name, value, timestamp, Long.MIN_VALUE);
-    }
-
-    public CounterCell(CellName name, ByteBuffer value, long timestamp, long timestampOfLastDelete)
-    {
-        super(name, value, timestamp);
-        this.timestampOfLastDelete = timestampOfLastDelete;
-    }
-
-    public static CounterCell create(CellName name, ByteBuffer value, long timestamp, long timestampOfLastDelete, ColumnSerializer.Flag flag)
-    {
-        if (flag == ColumnSerializer.Flag.FROM_REMOTE || (flag == ColumnSerializer.Flag.LOCAL && contextManager.shouldClearLocal(value)))
-            value = contextManager.clearAllLocal(value);
-        return new CounterCell(name, value, timestamp, timestampOfLastDelete);
-    }
-
-    // For use by tests of compatibility with pre-2.1 counter only.
-    public static CounterCell createLocal(CellName name, long value, long timestamp, long timestampOfLastDelete)
-    {
-        return new CounterCell(name, contextManager.createLocal(value), timestamp, timestampOfLastDelete);
-    }
-
-    @Override
-    public Cell withUpdatedName(CellName newName)
-    {
-        return new CounterCell(newName, value, timestamp, timestampOfLastDelete);
-    }
-
-    public long timestampOfLastDelete()
-    {
-        return timestampOfLastDelete;
-    }
-
-    public long total()
-    {
-        return contextManager.total(value);
-    }
-
-    @Override
-    public int dataSize()
-    {
-        // A counter column adds 8 bytes for timestampOfLastDelete to Cell.
-        return super.dataSize() + TypeSizes.NATIVE.sizeof(timestampOfLastDelete);
-    }
-
-    @Override
-    public int serializedSize(CellNameType type, TypeSizes typeSizes)
-    {
-        return super.serializedSize(type, typeSizes) + typeSizes.sizeof(timestampOfLastDelete);
-    }
-
-    @Override
-    public Cell diff(Cell cell)
-    {
-        assert (cell instanceof CounterCell) || (cell instanceof DeletedCell) : "Wrong class type: " + cell.getClass();
-
-        if (timestamp() < cell.timestamp())
-            return cell;
-
-        // Note that if at that point, cell can't be a tombstone. Indeed,
-        // cell is the result of merging us with other nodes results, and
-        // merging a CounterCell with a tombstone never return a tombstone
-        // unless that tombstone timestamp is greater that the CounterCell
-        // one.
-        assert cell instanceof CounterCell : "Wrong class type: " + cell.getClass();
-
-        if (timestampOfLastDelete() < ((CounterCell) cell).timestampOfLastDelete())
-            return cell;
-        CounterContext.Relationship rel = contextManager.diff(cell.value(), value());
-        if (rel == CounterContext.Relationship.GREATER_THAN || rel == CounterContext.Relationship.DISJOINT)
-            return cell;
-        return null;
-    }
-
-    /*
-     * We have to special case digest creation for counter column because
-     * we don't want to include the information about which shard of the
-     * context is a delta or not, since this information differs from node to
-     * node.
-     */
-    @Override
-    public void updateDigest(MessageDigest digest)
-    {
-        digest.update(name.toByteBuffer().duplicate());
-        // We don't take the deltas into account in a digest
-        contextManager.updateDigest(digest, value);
-        DataOutputBuffer buffer = new DataOutputBuffer();
-        try
-        {
-            buffer.writeLong(timestamp);
-            buffer.writeByte(serializationFlags());
-            buffer.writeLong(timestampOfLastDelete);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-        digest.update(buffer.getData(), 0, buffer.getLength());
-    }
-
-    @Override
-    public Cell reconcile(Cell cell)
-    {
-        // live + tombstone: track last tombstone
-        if (cell.isMarkedForDelete(Long.MIN_VALUE)) // cannot be an expired cell, so the current time is irrelevant
-        {
-            // live < tombstone
-            if (timestamp() < cell.timestamp())
-            {
-                return cell;
-            }
-            // live last delete >= tombstone
-            if (timestampOfLastDelete() >= cell.timestamp())
-            {
-                return this;
-            }
-            // live last delete < tombstone
-            return new CounterCell(name(), value(), timestamp(), cell.timestamp());
-        }
-
-        assert cell instanceof CounterCell : "Wrong class type: " + cell.getClass();
-
-        // live < live last delete
-        if (timestamp() < ((CounterCell) cell).timestampOfLastDelete())
-            return cell;
-        // live last delete > live
-        if (timestampOfLastDelete() > cell.timestamp())
-            return this;
-
-        // live + live. return one of the cells if its context is a superset of the other's, or merge them otherwise
-        ByteBuffer context = contextManager.merge(value(), cell.value());
-        if (context == value() && timestamp() >= cell.timestamp() && timestampOfLastDelete() >= ((CounterCell) cell).timestampOfLastDelete())
-            return this;
-        else if (context == cell.value() && cell.timestamp() >= timestamp() && ((CounterCell) cell).timestampOfLastDelete() >= timestampOfLastDelete())
-            return cell;
-        else // merge clocks and timsestamps.
-            return new CounterCell(name(),
-                                   context,
-                                   Math.max(timestamp(), cell.timestamp()),
-                                   Math.max(timestampOfLastDelete(), ((CounterCell) cell).timestampOfLastDelete()));
-    }
-
-    public boolean hasLegacyShards()
-    {
-        return contextManager.hasLegacyShards(value);
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        // super.equals() returns false if o is not a CounterCell
-        return super.equals(o) && timestampOfLastDelete == ((CounterCell)o).timestampOfLastDelete;
-    }
+    static final CounterContext contextManager = CounterContext.instance();
 
-    @Override
-    public int hashCode()
-    {
-        return 31 * super.hashCode() + (int)(timestampOfLastDelete ^ (timestampOfLastDelete >>> 32));
-    }
+    public long timestampOfLastDelete();
 
-    @Override
-    public Cell localCopy(AbstractAllocator allocator)
-    {
-        return new CounterCell(name.copy(allocator), allocator.clone(value), timestamp, timestampOfLastDelete);
-    }
+    public long total();
 
-    @Override
-    public String getString(CellNameType comparator)
-    {
-        return String.format("%s:false:%s@%d!%d",
-                             comparator.getString(name),
-                             contextManager.toString(value),
-                             timestamp,
-                             timestampOfLastDelete);
-    }
+    public boolean hasLegacyShards();
 
-    @Override
-    public int serializationFlags()
-    {
-        return ColumnSerializer.COUNTER_MASK;
-    }
+    public Cell markLocalToBeCleared();
 
-    @Override
-    public void validateFields(CFMetaData metadata) throws MarshalException
-    {
-        validateName(metadata);
-        // We cannot use the value validator as for other columns as the CounterColumnType validate a long,
-        // which is not the internal representation of counters
-        contextManager.validateContext(value());
-    }
+    CounterCell localCopy(CFMetaData metadata, AbstractAllocator allocator);
 
-    public Cell markLocalToBeCleared()
-    {
-        ByteBuffer marked = contextManager.markLocalToBeCleared(value);
-        return marked == value ? this : new CounterCell(name, marked, timestamp, timestampOfLastDelete);
-    }
+    CounterCell localCopy(CFMetaData metaData, MemtableAllocator allocator, OpOrder.Group opGroup);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index c19b436..32571cc 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -200,9 +200,9 @@ public class CounterMutation implements IMutation
             long clock = currentValue.clock + 1L;
             long count = currentValue.count + update.delta();
 
-            resultCF.addColumn(new CounterCell(update.name(),
-                                               CounterContext.instance().createGlobal(CounterId.getLocalId(), clock, count),
-                                               update.timestamp()));
+            resultCF.addColumn(new BufferCounterCell(update.name(),
+                                                     CounterContext.instance().createGlobal(CounterId.getLocalId(), clock, count),
+                                                     update.timestamp()));
         }
 
         return resultCF;
@@ -253,7 +253,7 @@ public class CounterMutation implements IMutation
         SortedSet<CellName> names = new TreeSet<>(cfs.metadata.comparator);
         for (int i = 0; i < currentValues.length; i++)
             if (currentValues[i] == null)
-                names.add(counterUpdateCells.get(i).name);
+                names.add(counterUpdateCells.get(i).name());
 
         ReadCommand cmd = new SliceByNamesReadCommand(getKeyspaceName(), key(), cfs.metadata.cfName, Long.MIN_VALUE, new NamesQueryFilter(names));
         Row row = cmd.getRow(cfs.keyspace);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/CounterUpdateCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterUpdateCell.java b/src/java/org/apache/cassandra/db/CounterUpdateCell.java
index 27d5270..58ac365 100644
--- a/src/java/org/apache/cassandra/db/CounterUpdateCell.java
+++ b/src/java/org/apache/cassandra/db/CounterUpdateCell.java
@@ -17,13 +17,6 @@
  */
 package org.apache.cassandra.db;
 
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.db.composites.CellNameType;
-
 /**
  * A counter update while it hasn't been applied yet by the leader replica.
  *
@@ -31,61 +24,7 @@ import org.apache.cassandra.db.composites.CellNameType;
  * is transformed to a relevant CounterCell. This Cell is a temporary data
  * structure that should never be stored inside a memtable or an sstable.
  */
-public class CounterUpdateCell extends Cell
+public interface CounterUpdateCell extends Cell
 {
-    public CounterUpdateCell(CellName name, long value, long timestamp)
-    {
-        this(name, ByteBufferUtil.bytes(value), timestamp);
-    }
-
-    public CounterUpdateCell(CellName name, ByteBuffer value, long timestamp)
-    {
-        super(name, value, timestamp);
-    }
-
-    public long delta()
-    {
-        return value().getLong(value().position());
-    }
-
-    @Override
-    public Cell diff(Cell cell)
-    {
-        // Diff is used during reads, but we should never read those columns
-        throw new UnsupportedOperationException("This operation is unsupported on CounterUpdateCell.");
-    }
-
-    @Override
-    public Cell reconcile(Cell cell)
-    {
-        // The only time this could happen is if a batchAdd ships two
-        // increment for the same cell. Hence we simply sums the delta.
-
-        // tombstones take precedence
-        if (cell.isMarkedForDelete(Long.MIN_VALUE)) // can't be an expired cell, so the current time is irrelevant
-            return timestamp() > cell.timestamp() ? this : cell;
-
-        // neither is tombstoned
-        assert cell instanceof CounterUpdateCell : "Wrong class type.";
-        CounterUpdateCell c = (CounterUpdateCell) cell;
-        return new CounterUpdateCell(name(), delta() + c.delta(), Math.max(timestamp(), c.timestamp()));
-    }
-
-    @Override
-    public int serializationFlags()
-    {
-        return ColumnSerializer.COUNTER_UPDATE_MASK;
-    }
-
-    @Override
-    public Cell localCopy(AbstractAllocator allocator)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public String getString(CellNameType comparator)
-    {
-        return String.format("%s:%s@%d", comparator.getString(name), ByteBufferUtil.toLong(value), timestamp);
-    }
+    public long delta();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/DataRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataRange.java b/src/java/org/apache/cassandra/db/DataRange.java
index 91ff512..31a9370 100644
--- a/src/java/org/apache/cassandra/db/DataRange.java
+++ b/src/java/org/apache/cassandra/db/DataRange.java
@@ -163,7 +163,7 @@ public class DataRange
 
         private boolean equals(RowPosition pos, ByteBuffer rowKey)
         {
-            return pos instanceof DecoratedKey && ((DecoratedKey)pos).key.equals(rowKey);
+            return pos instanceof DecoratedKey && ((DecoratedKey)pos).getKey().equals(rowKey);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/DecoratedKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DecoratedKey.java b/src/java/org/apache/cassandra/db/DecoratedKey.java
index 8f7a22b..fb3d7ab 100644
--- a/src/java/org/apache/cassandra/db/DecoratedKey.java
+++ b/src/java/org/apache/cassandra/db/DecoratedKey.java
@@ -22,6 +22,7 @@ import java.util.Comparator;
 
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
@@ -33,7 +34,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
  * if this matters, you can subclass RP to use a stronger hash, or use a non-lossy tokenization scheme (as in the
  * OrderPreservingPartitioner classes).
  */
-public class DecoratedKey extends RowPosition
+public abstract class DecoratedKey implements RowPosition
 {
     public static final Comparator<DecoratedKey> comparator = new Comparator<DecoratedKey>()
     {
@@ -43,20 +44,18 @@ public class DecoratedKey extends RowPosition
         }
     };
 
-    public final Token token;
-    public final ByteBuffer key;
+    private final Token token;
 
-    public DecoratedKey(Token token, ByteBuffer key)
+    public DecoratedKey(Token token)
     {
-        assert token != null && key != null;
+        assert token != null;
         this.token = token;
-        this.key = key;
     }
 
     @Override
     public int hashCode()
     {
-        return key.hashCode(); // hash of key is enough
+        return getKey().hashCode(); // hash of key is enough
     }
 
     @Override
@@ -64,12 +63,11 @@ public class DecoratedKey extends RowPosition
     {
         if (this == obj)
             return true;
-        if (obj == null || this.getClass() != obj.getClass())
+        if (obj == null || !(obj instanceof DecoratedKey))
             return false;
 
         DecoratedKey other = (DecoratedKey)obj;
-
-        return ByteBufferUtil.compareUnsigned(key, other.key) == 0; // we compare faster than BB.equals for array backed BB
+        return ByteBufferUtil.compareUnsigned(getKey(), other.getKey()) == 0; // we compare faster than BB.equals for array backed BB
     }
 
     public int compareTo(RowPosition pos)
@@ -82,8 +80,8 @@ public class DecoratedKey extends RowPosition
             return -pos.compareTo(this);
 
         DecoratedKey otherKey = (DecoratedKey) pos;
-        int cmp = token.compareTo(otherKey.getToken());
-        return cmp == 0 ? ByteBufferUtil.compareUnsigned(key, otherKey.key) : cmp;
+        int cmp = getToken().compareTo(otherKey.getToken());
+        return cmp == 0 ? ByteBufferUtil.compareUnsigned(getKey(), otherKey.getKey()) : cmp;
     }
 
     public static int compareTo(IPartitioner partitioner, ByteBuffer key, RowPosition position)
@@ -94,7 +92,7 @@ public class DecoratedKey extends RowPosition
 
         DecoratedKey otherKey = (DecoratedKey) position;
         int cmp = partitioner.getToken(key).compareTo(otherKey.getToken());
-        return cmp == 0 ? ByteBufferUtil.compareUnsigned(key, otherKey.key) : cmp;
+        return cmp == 0 ? ByteBufferUtil.compareUnsigned(key, otherKey.getKey()) : cmp;
     }
 
     public boolean isMinimum(IPartitioner partitioner)
@@ -103,6 +101,11 @@ public class DecoratedKey extends RowPosition
         return false;
     }
 
+    public boolean isMinimum()
+    {
+        return isMinimum(StorageService.getPartitioner());
+    }
+
     public RowPosition.Kind kind()
     {
         return RowPosition.Kind.ROW_KEY;
@@ -111,12 +114,14 @@ public class DecoratedKey extends RowPosition
     @Override
     public String toString()
     {
-        String keystring = key == null ? "null" : ByteBufferUtil.bytesToHex(key);
-        return "DecoratedKey(" + token + ", " + keystring + ")";
+        String keystring = getKey() == null ? "null" : ByteBufferUtil.bytesToHex(getKey());
+        return "DecoratedKey(" + getToken() + ", " + keystring + ")";
     }
 
     public Token getToken()
     {
         return token;
     }
+
+    public abstract ByteBuffer getKey();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/DefsTables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DefsTables.java b/src/java/org/apache/cassandra/db/DefsTables.java
index 351ee4b..6f9a270 100644
--- a/src/java/org/apache/cassandra/db/DefsTables.java
+++ b/src/java/org/apache/cassandra/db/DefsTables.java
@@ -248,7 +248,7 @@ public class DefsTables
             if (newState.hasColumns())
                 updateKeyspace(KSMetaData.fromSchema(new Row(key, newState), Collections.<CFMetaData>emptyList(), new UTMetaData()));
             else
-                keyspacesToDrop.add(AsciiType.instance.getString(key.key));
+                keyspacesToDrop.add(AsciiType.instance.getString(key.getKey()));
         }
 
         return keyspacesToDrop;
@@ -297,7 +297,7 @@ public class DefsTables
             }
             else // has modifications in the nested ColumnFamilies, need to perform nested diff to determine what was really changed
             {
-                String ksName = AsciiType.instance.getString(keyspace.key);
+                String ksName = AsciiType.instance.getString(keyspace.getKey());
 
                 Map<String, CFMetaData> oldCfDefs = new HashMap<String, CFMetaData>();
                 for (CFMetaData cfm : Schema.instance.getKSMetaData(ksName).cfMetaData().values())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/DeletedCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletedCell.java b/src/java/org/apache/cassandra/db/DeletedCell.java
index 00788f8..998c409 100644
--- a/src/java/org/apache/cassandra/db/DeletedCell.java
+++ b/src/java/org/apache/cassandra/db/DeletedCell.java
@@ -17,104 +17,14 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
 
-public class DeletedCell extends Cell
+public interface DeletedCell extends Cell
 {
-    public DeletedCell(CellName name, int localDeletionTime, long timestamp)
-    {
-        this(name, ByteBufferUtil.bytes(localDeletionTime), timestamp);
-    }
-
-    public DeletedCell(CellName name, ByteBuffer value, long timestamp)
-    {
-        super(name, value, timestamp);
-    }
-
-    @Override
-    public Cell withUpdatedName(CellName newName)
-    {
-        return new DeletedCell(newName, value, timestamp);
-    }
-
-    @Override
-    public Cell withUpdatedTimestamp(long newTimestamp)
-    {
-        return new DeletedCell(name, value, newTimestamp);
-    }
-
-    @Override
-    public boolean isMarkedForDelete(long now)
-    {
-        return true;
-    }
-
-    @Override
-    public long getMarkedForDeleteAt()
-    {
-        return timestamp;
-    }
-
-    @Override
-    public void updateDigest(MessageDigest digest)
-    {
-        digest.update(name.toByteBuffer().duplicate());
-
-        DataOutputBuffer buffer = new DataOutputBuffer();
-        try
-        {
-            buffer.writeLong(timestamp);
-            buffer.writeByte(serializationFlags());
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-        digest.update(buffer.getData(), 0, buffer.getLength());
-    }
-
-    @Override
-    public int getLocalDeletionTime()
-    {
-       return value.getInt(value.position());
-    }
-
-    @Override
-    public Cell reconcile(Cell cell)
-    {
-        if (cell instanceof DeletedCell)
-            return super.reconcile(cell);
-        return cell.reconcile(this);
-    }
-
-    @Override
-    public Cell localCopy(AbstractAllocator allocator)
-    {
-        return new DeletedCell(name.copy(allocator), allocator.clone(value), timestamp);
-    }
-
-    @Override
-    public int serializationFlags()
-    {
-        return ColumnSerializer.DELETION_MASK;
-    }
+    DeletedCell localCopy(CFMetaData metadata, AbstractAllocator allocator);
 
-    @Override
-    public void validateFields(CFMetaData metadata) throws MarshalException
-    {
-        validateName(metadata);
-        if (value().remaining() != 4)
-            throw new MarshalException("A tombstone value should be 4 bytes long");
-        if (getLocalDeletionTime() < 0)
-            throw new MarshalException("The local deletion time should not be negative");
-    }
+    DeletedCell localCopy(CFMetaData metaData, MemtableAllocator allocator, OpOrder.Group opGroup);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/ExpiringCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ExpiringCell.java b/src/java/org/apache/cassandra/db/ExpiringCell.java
index 57fac5b..5fc0f94 100644
--- a/src/java/org/apache/cassandra/db/ExpiringCell.java
+++ b/src/java/org/apache/cassandra/db/ExpiringCell.java
@@ -17,16 +17,10 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.memory.AbstractAllocator;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
 
 /**
  * Alternative to Cell that have an expiring time.
@@ -38,154 +32,13 @@ import org.apache.cassandra.utils.memory.AbstractAllocator;
  * we can't mix it with the timestamp field, which is client-supplied and whose resolution we
  * can't assume anything about.)
  */
-public class ExpiringCell extends Cell
+public interface ExpiringCell extends Cell
 {
     public static final int MAX_TTL = 20 * 365 * 24 * 60 * 60; // 20 years in seconds
 
-    private final int localExpirationTime;
-    private final int timeToLive;
-
-    public ExpiringCell(CellName name, ByteBuffer value, long timestamp, int timeToLive)
-    {
-      this(name, value, timestamp, timeToLive, (int) (System.currentTimeMillis() / 1000) + timeToLive);
-    }
-
-    public ExpiringCell(CellName name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime)
-    {
-        super(name, value, timestamp);
-        assert timeToLive > 0 : timeToLive;
-        assert localExpirationTime > 0 : localExpirationTime;
-        this.timeToLive = timeToLive;
-        this.localExpirationTime = localExpirationTime;
-    }
-
-    /** @return Either a DeletedCell, or an ExpiringCell. */
-    public static Cell create(CellName name, ByteBuffer value, long timestamp, int timeToLive, int localExpirationTime, int expireBefore, ColumnSerializer.Flag flag)
-    {
-        if (localExpirationTime >= expireBefore || flag == ColumnSerializer.Flag.PRESERVE_SIZE)
-            return new ExpiringCell(name, value, timestamp, timeToLive, localExpirationTime);
-        // The column is now expired, we can safely return a simple tombstone. Note that
-        // as long as the expiring column and the tombstone put together live longer than GC grace seconds,
-        // we'll fulfil our responsibility to repair.  See discussion at
-        // http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/repair-compaction-and-tombstone-rows-td7583481.html
-        return new DeletedCell(name, localExpirationTime - timeToLive, timestamp);
-    }
-
-    public int getTimeToLive()
-    {
-        return timeToLive;
-    }
-
-    @Override
-    public Cell withUpdatedName(CellName newName)
-    {
-        return new ExpiringCell(newName, value, timestamp, timeToLive, localExpirationTime);
-    }
-
-    @Override
-    public Cell withUpdatedTimestamp(long newTimestamp)
-    {
-        return new ExpiringCell(name, value, newTimestamp, timeToLive, localExpirationTime);
-    }
-
-    @Override
-    public int dataSize()
-    {
-        return super.dataSize() + TypeSizes.NATIVE.sizeof(localExpirationTime) + TypeSizes.NATIVE.sizeof(timeToLive);
-    }
-
-    @Override
-    public int serializedSize(CellNameType type, TypeSizes typeSizes)
-    {
-        /*
-         * An expired column adds to a Cell :
-         *    4 bytes for the localExpirationTime
-         *  + 4 bytes for the timeToLive
-        */
-        return super.serializedSize(type, typeSizes) + typeSizes.sizeof(localExpirationTime) + typeSizes.sizeof(timeToLive);
-    }
-
-    @Override
-    public void updateDigest(MessageDigest digest)
-    {
-        digest.update(name.toByteBuffer().duplicate());
-        digest.update(value.duplicate());
-
-        DataOutputBuffer buffer = new DataOutputBuffer();
-        try
-        {
-            buffer.writeLong(timestamp);
-            buffer.writeByte(serializationFlags());
-            buffer.writeInt(timeToLive);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-        digest.update(buffer.getData(), 0, buffer.getLength());
-    }
-
-    @Override
-    public int getLocalDeletionTime()
-    {
-        return localExpirationTime;
-    }
-
-    @Override
-    public Cell localCopy(AbstractAllocator allocator)
-    {
-        return new ExpiringCell(name.copy(allocator), allocator.clone(value), timestamp, timeToLive, localExpirationTime);
-    }
-
-    @Override
-    public String getString(CellNameType comparator)
-    {
-        return String.format("%s!%d", super.getString(comparator), timeToLive);
-    }
-
-    @Override
-    public boolean isMarkedForDelete(long now)
-    {
-        return (int) (now / 1000) >= getLocalDeletionTime();
-    }
-
-    @Override
-    public long getMarkedForDeleteAt()
-    {
-        return timestamp;
-    }
-
-    @Override
-    public int serializationFlags()
-    {
-        return ColumnSerializer.EXPIRATION_MASK;
-    }
-
-    @Override
-    public void validateFields(CFMetaData metadata) throws MarshalException
-    {
-        super.validateFields(metadata);
-        if (timeToLive <= 0)
-            throw new MarshalException("A column TTL should be > 0");
-        if (localExpirationTime < 0)
-            throw new MarshalException("The local expiration time should not be negative");
-    }
+    public int getTimeToLive();
 
-    @Override
-    public boolean equals(Object o)
-    {
-        // super.equals() returns false if o is not a CounterCell
-        return super.equals(o)
-            && localExpirationTime == ((ExpiringCell)o).localExpirationTime
-            && timeToLive == ((ExpiringCell)o).timeToLive;
-    }
+    ExpiringCell localCopy(CFMetaData metadata, AbstractAllocator allocator);
 
-    @Override
-    public int hashCode()
-    {
-        int result = super.hashCode();
-        result = 31 * result + localExpirationTime;
-        result = 31 * result + timeToLive;
-        return result;
-    }
+    ExpiringCell localCopy(CFMetaData metaData, MemtableAllocator allocator, OpOrder.Group opGroup);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 4415e63..8c892d6 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -514,7 +514,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         List<Row> rows = hintStore.getRangeSlice(range, null, filter, Integer.MAX_VALUE, System.currentTimeMillis());
         for (Row row : rows)
         {
-            UUID hostId = UUIDGen.getUUID(row.key.key);
+            UUID hostId = UUIDGen.getUUID(row.key.getKey());
             InetAddress target = StorageService.instance.getTokenMetadata().getEndpointForHostId(hostId);
             // token may have since been removed (in which case we have just read back a tombstone)
             if (target != null)
@@ -573,7 +573,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
         for (Row row : getHintsSlice(1))
         {
             if (row.cf != null) //ignore removed rows
-                result.addFirst(tokenFactory.toString(row.key.token));
+                result.addFirst(tokenFactory.toString(row.key.getToken()));
         }
         return result;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 31e68c1..17d1364 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -410,13 +410,13 @@ public class Keyspace
     public static void indexRow(DecoratedKey key, ColumnFamilyStore cfs, Set<String> idxNames)
     {
         if (logger.isDebugEnabled())
-            logger.debug("Indexing row {} ", cfs.metadata.getKeyValidator().getString(key.key));
+            logger.debug("Indexing row {} ", cfs.metadata.getKeyValidator().getString(key.getKey()));
 
         try (OpOrder.Group opGroup = cfs.keyspace.writeOrder.start())
         {
             Set<SecondaryIndex> indexes = cfs.indexManager.getIndexesByNames(idxNames);
 
-            Iterator<ColumnFamily> pager = QueryPagers.pageRowLocally(cfs, key.key, DEFAULT_PAGE_SIZE);
+            Iterator<ColumnFamily> pager = QueryPagers.pageRowLocally(cfs, key.getKey(), DEFAULT_PAGE_SIZE);
             while (pager.hasNext())
             {
                 ColumnFamily cf = pager.next();
@@ -426,7 +426,7 @@ public class Keyspace
                     if (cfs.indexManager.indexes(cell.name(), indexes))
                         cf2.addColumn(cell);
                 }
-                cfs.indexManager.indexRow(key.key, cf2, opGroup);
+                cfs.indexManager.indexRow(key.getKey(), cf2, opGroup);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 4cf6654..6f4c1c7 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -46,19 +46,16 @@ import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.memory.ContextAllocator;
-import org.apache.cassandra.utils.memory.HeapAllocator;
-import org.apache.cassandra.utils.memory.Pool;
-import org.apache.cassandra.utils.memory.PoolAllocator;
+import org.apache.cassandra.utils.memory.*;
 
 public class Memtable
 {
     private static final Logger logger = LoggerFactory.getLogger(Memtable.class);
 
-    static final Pool memoryPool = DatabaseDescriptor.getMemtableAllocatorPool();
+    static final MemtablePool MEMORY_POOL = DatabaseDescriptor.getMemtableAllocatorPool();
     private static final int ROW_OVERHEAD_HEAP_SIZE;
 
-    private final PoolAllocator allocator;
+    private final MemtableAllocator allocator;
     private final AtomicLong liveDataSize = new AtomicLong(0);
     private final AtomicLong currentOperations = new AtomicLong(0);
 
@@ -85,12 +82,12 @@ public class Memtable
     public Memtable(ColumnFamilyStore cfs)
     {
         this.cfs = cfs;
-        this.allocator = memoryPool.newAllocator();
+        this.allocator = MEMORY_POOL.newAllocator();
         this.initialComparator = cfs.metadata.comparator;
         this.cfs.scheduleFlush();
     }
 
-    public PoolAllocator getAllocator()
+    public MemtableAllocator getAllocator()
     {
         return allocator;
     }
@@ -177,7 +174,7 @@ public class Memtable
         if (previous == null)
         {
             AtomicBTreeColumns empty = cf.cloneMeShallow(AtomicBTreeColumns.factory, false);
-            final DecoratedKey cloneKey = new DecoratedKey(key.token, allocator.clone(key.key, opGroup));
+            final DecoratedKey cloneKey = allocator.clone(key, opGroup);
             // We'll add the columns later. This avoids wasting works if we get beaten in the putIfAbsent
             previous = rows.putIfAbsent(cloneKey, empty);
             if (previous == null)
@@ -185,27 +182,17 @@ public class Memtable
                 previous = empty;
                 // allocate the row overhead after the fact; this saves over allocating and having to free after, but
                 // means we can overshoot our declared limit.
-                int overhead = (int) (cfs.partitioner.getHeapSizeOf(key.token) + ROW_OVERHEAD_HEAP_SIZE);
-                allocator.allocate(overhead, opGroup);
+                int overhead = (int) (cfs.partitioner.getHeapSizeOf(key.getToken()) + ROW_OVERHEAD_HEAP_SIZE);
+                allocator.onHeap().allocate(overhead, opGroup);
             }
             else
             {
-                allocator.free(cloneKey.key);
+                allocator.reclaimer().reclaimImmediately(cloneKey);
             }
         }
 
-        ContextAllocator contextAllocator = allocator.wrap(opGroup);
-        AtomicBTreeColumns.Delta delta = previous.addAllWithSizeDelta(cf, contextAllocator, indexer, new AtomicBTreeColumns.Delta());
-        liveDataSize.addAndGet(delta.dataSize());
+        liveDataSize.addAndGet(previous.addAllWithSizeDelta(cf, allocator, opGroup, indexer));
         currentOperations.addAndGet(cf.getColumnCount() + (cf.isMarkedForDelete() ? 1 : 0) + cf.deletionInfo().rangeCount());
-
-        // allocate or free the delta in column overhead after the fact
-        for (Cell cell : delta.reclaimed())
-        {
-            cell.name.free(allocator);
-            allocator.free(cell.value);
-        }
-        allocator.allocate((int) delta.excessHeapSize(), opGroup);
     }
 
     // for debugging
@@ -256,10 +243,10 @@ public class Memtable
                 Map.Entry<? extends RowPosition, ? extends ColumnFamily> entry = iter.next();
                 // Actual stored key should be true DecoratedKey
                 assert entry.getKey() instanceof DecoratedKey;
-                if (memoryPool.needToCopyOnHeap())
+                if (MEMORY_POOL.needToCopyOnHeap())
                 {
                     DecoratedKey key = (DecoratedKey) entry.getKey();
-                    key = new DecoratedKey(key.token, HeapAllocator.instance.clone(key.key));
+                    key = new BufferDecoratedKey(key.getToken(), HeapAllocator.instance.clone(key.getKey()));
                     ColumnFamily cells = ArrayBackedSortedColumns.localCopy(entry.getValue(), HeapAllocator.instance);
                     entry = new AbstractMap.SimpleImmutableEntry<>(key, cells);
                 }
@@ -307,7 +294,7 @@ public class Memtable
             {
                 //  make sure we don't write non-sensical keys
                 assert key instanceof DecoratedKey;
-                keySize += ((DecoratedKey)key).key.remaining();
+                keySize += ((DecoratedKey)key).getKey().remaining();
             }
             estimatedSize = (long) ((keySize // index entries
                                     + keySize // keys in data file
@@ -410,16 +397,20 @@ public class Memtable
     static
     {
         // calculate row overhead
+        final OpOrder.Group group = new OpOrder().start();
         int rowOverhead;
+        MemtableAllocator allocator = MEMORY_POOL.newAllocator();
         ConcurrentNavigableMap<RowPosition, Object> rows = new ConcurrentSkipListMap<>();
         final int count = 100000;
         final Object val = new Object();
         for (int i = 0 ; i < count ; i++)
-            rows.put(new DecoratedKey(new LongToken((long) i), ByteBufferUtil.EMPTY_BYTE_BUFFER), val);
+            rows.put(allocator.clone(new BufferDecoratedKey(new LongToken((long) i), ByteBufferUtil.EMPTY_BYTE_BUFFER), group), val);
         double avgSize = ObjectSizes.measureDeep(rows) / (double) count;
         rowOverhead = (int) ((avgSize - Math.floor(avgSize)) < 0.05 ? Math.floor(avgSize) : Math.ceil(avgSize));
         rowOverhead -= ObjectSizes.measureDeep(new LongToken((long) 0));
-        rowOverhead += AtomicBTreeColumns.HEAP_SIZE;
+        rowOverhead += AtomicBTreeColumns.EMPTY_SIZE;
+        allocator.setDiscarding();
+        allocator.setDiscarded();
         ROW_OVERHEAD_HEAP_SIZE = rowOverhead;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index 6fae9b0..b64c675 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -64,7 +64,7 @@ public class Mutation implements IMutation
 
     public Mutation(String keyspaceName, Row row)
     {
-        this(keyspaceName, row.key.key, row.cf);
+        this(keyspaceName, row.key.getKey(), row.cf);
     }
 
     protected Mutation(String keyspaceName, ByteBuffer key, Map<UUID, ColumnFamily> modifications)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/NativeCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/NativeCell.java b/src/java/org/apache/cassandra/db/NativeCell.java
new file mode 100644
index 0000000..1c8ebd9
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/NativeCell.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.security.MessageDigest;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
+import org.apache.cassandra.utils.memory.NativeAllocator;
+
+public class NativeCell extends AbstractNativeCell
+{
+    private static final long SIZE = ObjectSizes.measure(new NativeCell());
+
+    NativeCell()
+    {}
+
+    public NativeCell(NativeAllocator allocator, OpOrder.Group writeOp, Cell copyOf)
+    {
+        super(allocator, writeOp, copyOf);
+    }
+
+    @Override
+    public CellName name()
+    {
+        return this;
+    }
+
+    @Override
+    public long timestamp()
+    {
+        return getLong(TIMESTAMP_OFFSET);
+    }
+
+    @Override
+    public Cell localCopy(CFMetaData metadata, AbstractAllocator allocator)
+    {
+        return new BufferCell(copy(metadata, allocator), allocator.clone(value()), timestamp());
+    }
+
+    @Override
+    public Cell localCopy(CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group opGroup)
+    {
+        return allocator.clone(this, metadata, opGroup);
+    }
+
+    @Override
+    public void updateDigest(MessageDigest digest)
+    {
+        updateWithName(digest);  // name
+        updateWithValue(digest); // value
+
+        FBUtilities.updateWithLong(digest, timestamp());
+        FBUtilities.updateWithByte(digest, serializationFlags());
+    }
+
+    @Override
+    public long excessHeapSizeExcludingData()
+    {
+        return SIZE;
+    }
+
+    @Override
+    public long unsharedHeapSize()
+    {
+        return SIZE;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/NativeCounterCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/NativeCounterCell.java b/src/java/org/apache/cassandra/db/NativeCounterCell.java
new file mode 100644
index 0000000..abcf598
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/NativeCounterCell.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.security.MessageDigest;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+import org.apache.cassandra.utils.memory.MemtableAllocator;
+import org.apache.cassandra.utils.memory.NativeAllocator;
+
+public class NativeCounterCell extends NativeCell implements CounterCell
+{
+    private static final long SIZE = ObjectSizes.measure(new NativeCounterCell());
+
+    private NativeCounterCell()
+    {}
+
+    public NativeCounterCell(NativeAllocator allocator, OpOrder.Group writeOp, CounterCell copyOf)
+    {
+        super(allocator, writeOp, copyOf);
+    }
+
+    @Override
+    protected void construct(Cell from)
+    {
+        super.construct(from);
+        setLong(internalSize() - 8, ((CounterCell) from).timestampOfLastDelete());
+    }
+
+    @Override
+    protected int postfixSize()
+    {
+        return 8;
+    }
+
+    @Override
+    protected int sizeOf(Cell cell)
+    {
+        return 8 + super.sizeOf(cell);
+    }
+
+    @Override
+    public long timestampOfLastDelete()
+    {
+        return getLong(internalSize() - 8);
+    }
+
+    @Override
+    public long total()
+    {
+        return contextManager.total(value());
+    }
+
+    @Override
+    public boolean hasLegacyShards()
+    {
+        return contextManager.hasLegacyShards(value());
+    }
+
+    @Override
+    public Cell markLocalToBeCleared()
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Cell diff(Cell cell)
+    {
+        return diff(this, cell);
+    }
+
+    @Override
+    public Cell reconcile(Cell cell)
+    {
+        return reconcile(this, cell);
+    }
+
+    @Override
+    public int serializationFlags()
+    {
+        return ColumnSerializer.COUNTER_MASK;
+    }
+
+    @Override
+    public int cellDataSize()
+    {
+        // A counter column adds 8 bytes for timestampOfLastDelete to Cell.
+        return super.cellDataSize() + TypeSizes.NATIVE.sizeof(timestampOfLastDelete());
+    }
+
+    @Override
+    public int serializedSize(CellNameType type, TypeSizes typeSizes)
+    {
+        return super.serializedSize(type, typeSizes) + typeSizes.sizeof(timestampOfLastDelete());
+    }
+
+    @Override
+    public void validateFields(CFMetaData metadata) throws MarshalException
+    {
+        validateName(metadata);
+        // We cannot use the value validator as for other columns as the CounterColumnType validate a long,
+        // which is not the internal representation of counters
+        contextManager.validateContext(value());
+    }
+
+    /*
+     * We have to special case digest creation for counter column because
+     * we don't want to include the information about which shard of the
+     * context is a delta or not, since this information differs from node to
+     * node.
+     */
+    @Override
+    public void updateDigest(MessageDigest digest)
+    {
+        updateWithName(digest);
+
+        // We don't take the deltas into account in a digest
+        contextManager.updateDigest(digest, value());
+
+        FBUtilities.updateWithLong(digest, timestamp());
+        FBUtilities.updateWithByte(digest, serializationFlags());
+        FBUtilities.updateWithLong(digest, timestampOfLastDelete());
+    }
+
+    @Override
+    public String getString(CellNameType comparator)
+    {
+        return String.format("%s(%s:false:%s@%d!%d)",
+                             getClass().getSimpleName(),
+                             comparator.getString(name()),
+                             contextManager.toString(value()),
+                             timestamp(),
+                             timestampOfLastDelete());
+    }
+
+    @Override
+    public CounterCell localCopy(CFMetaData metadata, AbstractAllocator allocator)
+    {
+        return new BufferCounterCell(copy(metadata, allocator), allocator.clone(value()), timestamp(), timestampOfLastDelete());
+    }
+
+    @Override
+    public CounterCell localCopy(CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group opGroup)
+    {
+        return allocator.clone(this, metadata, opGroup);
+    }
+
+    @Override
+    public long excessHeapSizeExcludingData()
+    {
+        return SIZE;
+    }
+
+    @Override
+    public long unsharedHeapSize()
+    {
+        return SIZE;
+    }
+
+    public boolean equals(Cell cell)
+    {
+        return cell instanceof CounterCell && equals((CounterCell) this);
+    }
+
+    public boolean equals(CounterCell cell)
+    {
+        return super.equals(cell) && timestampOfLastDelete() == cell.timestampOfLastDelete();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/db/NativeDecoratedKey.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/NativeDecoratedKey.java b/src/java/org/apache/cassandra/db/NativeDecoratedKey.java
new file mode 100644
index 0000000..52aa50c
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/NativeDecoratedKey.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.memory.MemoryUtil;
+import org.apache.cassandra.utils.memory.NativeAllocator;
+
+public class NativeDecoratedKey extends DecoratedKey
+{
+    private final long peer;
+
+    public NativeDecoratedKey(Token token, NativeAllocator allocator, OpOrder.Group writeOp, ByteBuffer key)
+    {
+        super(token);
+        assert key != null;
+        int size = key.remaining();
+        this.peer = allocator.allocate(4 + size, writeOp);
+        MemoryUtil.setInt(peer, size);
+        MemoryUtil.setBytes(peer + 4, key);
+    }
+
+    public ByteBuffer getKey()
+    {
+        return MemoryUtil.getByteBuffer(peer + 4, MemoryUtil.getInt(peer));
+    }
+}