You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2014/04/29 14:05:05 UTC

[3/7] Push more of memtable data off-heap

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
new file mode 100644
index 0000000..1b5dcf2
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/NativeAllocator.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.memory;
+
+import java.lang.reflect.Field;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.db.CounterCell;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletedCell;
+import org.apache.cassandra.db.ExpiringCell;
+import org.apache.cassandra.db.NativeCell;
+import org.apache.cassandra.db.NativeCounterCell;
+import org.apache.cassandra.db.NativeDecoratedKey;
+import org.apache.cassandra.db.NativeDeletedCell;
+import org.apache.cassandra.db.NativeExpiringCell;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.misc.Unsafe;
+
+public class NativeAllocator extends MemtableAllocator
+{
+    private static final Logger logger = LoggerFactory.getLogger(NativeAllocator.class);
+
+    private final static int REGION_SIZE = 1024 * 1024;
+    private final static int MAX_CLONED_SIZE = 128 * 1024; // bigger than this don't go in the region
+
+    // globally stash any Regions we allocate but are beaten to using, and use these up before allocating any more
+    private static final ConcurrentLinkedQueue<Region> RACE_ALLOCATED = new ConcurrentLinkedQueue<>();
+
+    private final AtomicReference<Region> currentRegion = new AtomicReference<>();
+    private final AtomicInteger regionCount = new AtomicInteger(0);
+    private final ConcurrentLinkedQueue<Region> regions = new ConcurrentLinkedQueue<>();
+    private AtomicLong unslabbed = new AtomicLong(0);
+
+    protected NativeAllocator(NativePool pool)
+    {
+        super(pool.onHeap.newAllocator(), pool.offHeap.newAllocator());
+    }
+
+    @Override
+    public Cell clone(Cell cell, CFMetaData cfm, OpOrder.Group writeOp)
+    {
+        return new NativeCell(this, writeOp, cell);
+    }
+
+    @Override
+    public CounterCell clone(CounterCell cell, CFMetaData cfm, OpOrder.Group writeOp)
+    {
+        return new NativeCounterCell(this, writeOp, cell);
+    }
+
+    @Override
+    public DeletedCell clone(DeletedCell cell, CFMetaData cfm, OpOrder.Group writeOp)
+    {
+        return new NativeDeletedCell(this, writeOp, cell);
+    }
+
+    @Override
+    public ExpiringCell clone(ExpiringCell cell, CFMetaData cfm, OpOrder.Group writeOp)
+    {
+        return new NativeExpiringCell(this, writeOp, cell);
+    }
+
+    public DecoratedKey clone(DecoratedKey key, OpOrder.Group writeOp)
+    {
+        return new NativeDecoratedKey(key.getToken(), this, writeOp, key.getKey());
+    }
+
+    @Override
+    public MemtableAllocator.DataReclaimer reclaimer()
+    {
+        return NO_OP;
+    }
+
+    public long allocate(int size, OpOrder.Group opGroup)
+    {
+        assert size >= 0;
+        offHeap().allocate(size, opGroup);
+        // satisfy large allocations directly from JVM since they don't cause fragmentation
+        // as badly, and fill up our regions quickly
+        if (size > MAX_CLONED_SIZE)
+        {
+            unslabbed.addAndGet(size);
+            Region region = new Region(unsafe.allocateMemory(size), size);
+            regions.add(region);
+
+            long peer;
+            if ((peer = region.allocate(size)) == -1)
+                throw new AssertionError();
+
+            return peer;
+        }
+
+        while (true)
+        {
+            Region region = getRegion();
+
+            long peer;
+            if ((peer = region.allocate(size)) > 0)
+                return peer;
+
+            // not enough space!
+            currentRegion.compareAndSet(region, null);
+        }
+    }
+
+    public void setDiscarded()
+    {
+        for (Region region : regions)
+            unsafe.freeMemory(region.peer);
+        super.setDiscarded();
+    }
+
+    /**
+     * Get the current region, or, if there is no current region, allocate a new one
+     */
+    private Region getRegion()
+    {
+        while (true)
+        {
+            // Try to get the region
+            Region region = currentRegion.get();
+            if (region != null)
+                return region;
+
+            // No current region, so we want to allocate one. We race
+            // against other allocators to CAS in a Region, and if we fail we stash the region for re-use
+            region = RACE_ALLOCATED.poll();
+            if (region == null)
+                region = new Region(unsafe.allocateMemory(REGION_SIZE), REGION_SIZE);
+            if (currentRegion.compareAndSet(null, region))
+            {
+                regions.add(region);
+                regionCount.incrementAndGet();
+                logger.trace("{} regions now allocated in {}", regionCount, this);
+                return region;
+            }
+
+            // someone else won race - that's fine, we'll try to grab theirs
+            // in the next iteration of the loop.
+            RACE_ALLOCATED.add(region);
+        }
+    }
+
+    /**
+     * A region of memory out of which allocations are sliced.
+     *
+     * This serves two purposes:
+     *  - to provide a step between initialization and allocation, so that racing to CAS a
+     *    new region in is harmless
+     *  - encapsulates the allocation offset
+     */
+    private static class Region
+    {
+        /**
+         * Actual underlying data
+         */
+        private final long peer;
+
+        private final long capacity;
+
+        /**
+         * Offset for the next allocation, or the sentinel value -1
+         * which implies that the region is still uninitialized.
+         */
+        private AtomicInteger nextFreeOffset = new AtomicInteger(0);
+
+        /**
+         * Total number of allocations satisfied from this buffer
+         */
+        private AtomicInteger allocCount = new AtomicInteger();
+
+        /**
+         * Create an uninitialized region. Note that memory is not allocated yet, so
+         * this is cheap.
+         *
+         * @param peer peer
+         */
+        private Region(long peer, long capacity)
+        {
+            this.peer = peer;
+            this.capacity = capacity;
+        }
+
+        /**
+         * Try to allocate <code>size</code> bytes from the region.
+         *
+         * @return the successful allocation, or null to indicate not-enough-space
+         */
+        long allocate(int size)
+        {
+            while (true)
+            {
+                int oldOffset = nextFreeOffset.get();
+
+                if (oldOffset + size > capacity) // capacity == remaining
+                    return -1;
+
+                // Try to atomically claim this region
+                if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size))
+                {
+                    // we got the alloc
+                    allocCount.incrementAndGet();
+                    return peer + oldOffset;
+                }
+                // we raced and lost alloc, try again
+            }
+        }
+
+        @Override
+        public String toString()
+        {
+            return "Region@" + System.identityHashCode(this) +
+                    " allocs=" + allocCount.get() + "waste=" +
+                    (capacity - nextFreeOffset.get());
+        }
+    }
+
+
+    static final Unsafe unsafe;
+
+    static
+    {
+        try
+        {
+            Field field = sun.misc.Unsafe.class.getDeclaredField("theUnsafe");
+            field.setAccessible(true);
+            unsafe = (sun.misc.Unsafe) field.get(null);
+        }
+        catch (Exception e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/utils/memory/NativePool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/NativePool.java b/src/java/org/apache/cassandra/utils/memory/NativePool.java
new file mode 100644
index 0000000..012867a
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/NativePool.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cassandra.utils.memory;
+
+public class NativePool extends MemtablePool
+{
+    public NativePool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanThreshold, Runnable cleaner)
+    {
+        super(maxOnHeapMemory, maxOffHeapMemory, cleanThreshold, cleaner);
+    }
+
+    @Override
+    public boolean needToCopyOnHeap()
+    {
+        return true;
+    }
+
+    @Override
+    public NativeAllocator newAllocator()
+    {
+        return new NativeAllocator(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/utils/memory/Pool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/Pool.java b/src/java/org/apache/cassandra/utils/memory/Pool.java
deleted file mode 100644
index aa5e05c..0000000
--- a/src/java/org/apache/cassandra/utils/memory/Pool.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra.utils.memory;
-
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-
-import org.apache.cassandra.utils.concurrent.WaitQueue;
-
-
-/**
- * Represents an amount of memory used for a given purpose, that can be allocated to specific tasks through
- * child PoolAllocator objects.
- */
-public abstract class Pool
-{
-    final PoolCleanerThread<?> cleaner;
-
-    // the total memory used by this pool
-    public final SubPool onHeap;
-    public final SubPool offHeap;
-
-    final WaitQueue hasRoom = new WaitQueue();
-
-    Pool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanThreshold, Runnable cleaner)
-    {
-        this.onHeap = getSubPool(maxOnHeapMemory, cleanThreshold);
-        this.offHeap = getSubPool(maxOffHeapMemory, cleanThreshold);
-        this.cleaner = getCleaner(cleaner);
-        if (this.cleaner != null)
-            this.cleaner.start();
-    }
-
-    SubPool getSubPool(long limit, float cleanThreshold)
-    {
-        return new SubPool(limit, cleanThreshold);
-    }
-
-    PoolCleanerThread<?> getCleaner(Runnable cleaner)
-    {
-        return cleaner == null ? null : new PoolCleanerThread<>(this, cleaner);
-    }
-
-    public abstract boolean needToCopyOnHeap();
-    public abstract PoolAllocator newAllocator();
-
-    /**
-     * Note the difference between acquire() and allocate(); allocate() makes more resources available to all owners,
-     * and acquire() makes shared resources unavailable but still recorded. An Owner must always acquire resources,
-     * but only needs to allocate if there are none already available. This distinction is not always meaningful.
-     */
-    public class SubPool
-    {
-
-        // total memory/resource permitted to allocate
-        public final long limit;
-
-        // ratio of used to spare (both excluding 'reclaiming') at which to trigger a clean
-        public final float cleanThreshold;
-
-        // total bytes allocated and reclaiming
-        volatile long allocated;
-        volatile long reclaiming;
-
-        // a cache of the calculation determining at what allocation threshold we should next clean
-        volatile long nextClean;
-
-        public SubPool(long limit, float cleanThreshold)
-        {
-            this.limit = limit;
-            this.cleanThreshold = cleanThreshold;
-        }
-
-        /** Methods for tracking and triggering a clean **/
-
-        boolean needsCleaning()
-        {
-            // use strictly-greater-than so we don't clean when limit is 0
-            return used() > nextClean && updateNextClean();
-        }
-
-        void maybeClean()
-        {
-            if (needsCleaning() && cleaner != null)
-                cleaner.trigger();
-        }
-
-        private boolean updateNextClean()
-        {
-            while (true)
-            {
-                long current = nextClean;
-                long reclaiming = this.reclaiming;
-                long next =  reclaiming + (long) (this.limit * cleanThreshold);
-                if (current == next || nextCleanUpdater.compareAndSet(this, current, next))
-                    return used() > next;
-            }
-        }
-
-        /** Methods to allocate space **/
-
-        boolean tryAllocate(long size)
-        {
-            while (true)
-            {
-                long cur;
-                if ((cur = allocated) + size > limit)
-                    return false;
-                if (allocatedUpdater.compareAndSet(this, cur, cur + size))
-                    return true;
-            }
-        }
-
-        /**
-         * apply the size adjustment to allocated, bypassing any limits or constraints. If this reduces the
-         * allocated total, we will signal waiters
-         */
-        void adjustAllocated(long size)
-        {
-            if (size == 0)
-                return;
-            while (true)
-            {
-                long cur = allocated;
-                if (allocatedUpdater.compareAndSet(this, cur, cur + size))
-                    return;
-            }
-        }
-
-        // 'acquires' an amount of memory, and maybe also marks it allocated. This method is meant to be overridden
-        // by implementations with a separate concept of acquired/allocated. As this method stands, an acquire
-        // without an allocate is a no-op (acquisition is achieved through allocation), however a release (where size < 0)
-        // is always processed and accounted for in allocated.
-        void adjustAcquired(long size, boolean alsoAllocated)
-        {
-            if (size > 0 || alsoAllocated)
-            {
-                if (alsoAllocated)
-                    adjustAllocated(size);
-                maybeClean();
-            }
-            else if (size < 0)
-            {
-                adjustAllocated(size);
-                hasRoom.signalAll();
-            }
-        }
-
-        // space reclaimed should be released prior to calling this, to avoid triggering unnecessary cleans
-        void adjustReclaiming(long reclaiming)
-        {
-            if (reclaiming == 0)
-                return;
-            reclaimingUpdater.addAndGet(this, reclaiming);
-            if (reclaiming < 0 && updateNextClean() && cleaner != null)
-                cleaner.trigger();
-        }
-
-        public long allocated()
-        {
-            return allocated;
-        }
-
-        public long used()
-        {
-            return allocated;
-        }
-
-        public PoolAllocator.SubAllocator newAllocator()
-        {
-            return new PoolAllocator.SubAllocator(this);
-        }
-
-        public WaitQueue hasRoom()
-        {
-            return hasRoom;
-        }
-    }
-
-    private static final AtomicLongFieldUpdater<SubPool> reclaimingUpdater = AtomicLongFieldUpdater.newUpdater(SubPool.class, "reclaiming");
-    private static final AtomicLongFieldUpdater<SubPool> allocatedUpdater = AtomicLongFieldUpdater.newUpdater(SubPool.class, "allocated");
-    private static final AtomicLongFieldUpdater<SubPool> nextCleanUpdater = AtomicLongFieldUpdater.newUpdater(SubPool.class, "nextClean");
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/utils/memory/PoolAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/PoolAllocator.java b/src/java/org/apache/cassandra/utils/memory/PoolAllocator.java
deleted file mode 100644
index aa374fe..0000000
--- a/src/java/org/apache/cassandra/utils/memory/PoolAllocator.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.utils.memory;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.concurrent.WaitQueue;
-
-public abstract class PoolAllocator extends AbstractAllocator
-{
-
-    private final SubAllocator onHeap;
-    private final SubAllocator offHeap;
-    volatile LifeCycle state = LifeCycle.LIVE;
-
-    static enum LifeCycle
-    {
-        LIVE, DISCARDING, DISCARDED;
-        LifeCycle transition(LifeCycle targetState)
-        {
-            switch (targetState)
-            {
-                case DISCARDING:
-                    assert this == LifeCycle.LIVE;
-                    return LifeCycle.DISCARDING;
-                case DISCARDED:
-                    assert this == LifeCycle.DISCARDING;
-                    return LifeCycle.DISCARDED;
-            }
-            throw new IllegalStateException();
-        }
-    }
-
-    PoolAllocator(SubAllocator onHeap, SubAllocator offHeap)
-    {
-        this.onHeap = onHeap;
-        this.offHeap = offHeap;
-    }
-
-    public SubAllocator onHeap()
-    {
-        return onHeap;
-    }
-
-    public SubAllocator offHeap()
-    {
-        return offHeap;
-    }
-
-    /**
-     * Mark this allocator reclaiming; this will permit any outstanding allocations to temporarily
-     * overshoot the maximum memory limit so that flushing can begin immediately
-     */
-    public void setDiscarding()
-    {
-        state = state.transition(LifeCycle.DISCARDING);
-        // mark the memory owned by this allocator as reclaiming
-        onHeap.markAllReclaiming();
-        offHeap.markAllReclaiming();
-    }
-
-    /**
-     * Indicate the memory and resources owned by this allocator are no longer referenced,
-     * and can be reclaimed/reused.
-     */
-    public void setDiscarded()
-    {
-        state = state.transition(LifeCycle.DISCARDED);
-        // release any memory owned by this allocator; automatically signals waiters
-        onHeap.releaseAll();
-        offHeap.releaseAll();
-    }
-
-    public boolean isLive()
-    {
-        return state == LifeCycle.LIVE;
-    }
-
-    public abstract ByteBuffer allocate(int size, OpOrder.Group opGroup);
-    public abstract void free(ByteBuffer name);
-
-    /**
-     * Allocate a slice of the given length.
-     */
-    public ByteBuffer clone(ByteBuffer buffer, OpOrder.Group opGroup)
-    {
-        assert buffer != null;
-        if (buffer.remaining() == 0)
-            return ByteBufferUtil.EMPTY_BYTE_BUFFER;
-        ByteBuffer cloned = allocate(buffer.remaining(), opGroup);
-
-        cloned.mark();
-        cloned.put(buffer.duplicate());
-        cloned.reset();
-        return cloned;
-    }
-
-    public ContextAllocator wrap(OpOrder.Group opGroup)
-    {
-        return new ContextAllocator(opGroup, this);
-    }
-
-    /** Mark the BB as unused, permitting it to be reclaimed */
-    public static final class SubAllocator
-    {
-        // the tracker we are owning memory from
-        private final Pool.SubPool parent;
-
-        // the amount of memory/resource owned by this object
-        private volatile long owns;
-        // the amount of memory we are reporting to collect; this may be inaccurate, but is close
-        // and is used only to ensure that once we have reclaimed we mark the tracker with the same amount
-        private volatile long reclaiming;
-
-        SubAllocator(Pool.SubPool parent)
-        {
-            this.parent = parent;
-        }
-
-        // should only be called once we know we will never allocate to the object again.
-        // currently no corroboration/enforcement of this is performed.
-        void releaseAll()
-        {
-            parent.adjustAcquired(-ownsUpdater.getAndSet(this, 0), false);
-            parent.adjustReclaiming(-reclaimingUpdater.getAndSet(this, 0));
-        }
-
-        // allocate memory in the tracker, and mark ourselves as owning it
-        public void allocate(long size, OpOrder.Group opGroup)
-        {
-            while (true)
-            {
-                if (parent.tryAllocate(size))
-                {
-                    acquired(size);
-                    return;
-                }
-                WaitQueue.Signal signal = opGroup.isBlockingSignal(parent.hasRoom().register());
-                boolean allocated = parent.tryAllocate(size);
-                if (allocated || opGroup.isBlocking())
-                {
-                    signal.cancel();
-                    if (allocated) // if we allocated, take ownership
-                        acquired(size);
-                    else // otherwise we're blocking so we're permitted to overshoot our constraints, to just allocate without blocking
-                        allocated(size);
-                    return;
-                }
-                else
-                    signal.awaitUninterruptibly();
-            }
-        }
-
-        // retroactively mark an amount allocated amd acquired in the tracker, and owned by us
-        void allocated(long size)
-        {
-            parent.adjustAcquired(size, true);
-            ownsUpdater.addAndGet(this, size);
-        }
-
-        // retroactively mark an amount acquired in the tracker, and owned by us
-        void acquired(long size)
-        {
-            parent.adjustAcquired(size, false);
-            ownsUpdater.addAndGet(this, size);
-        }
-
-        void release(long size)
-        {
-            parent.adjustAcquired(-size, false);
-            ownsUpdater.addAndGet(this, -size);
-        }
-
-        // mark everything we currently own as reclaiming, both here and in our parent
-        void markAllReclaiming()
-        {
-            while (true)
-            {
-                long cur = owns;
-                long prev = reclaiming;
-                if (reclaimingUpdater.compareAndSet(this, prev, cur))
-                {
-                    parent.adjustReclaiming(cur - prev);
-                    return;
-                }
-            }
-        }
-
-        public long owns()
-        {
-            return owns;
-        }
-
-        public float ownershipRatio()
-        {
-            float r = owns / (float) parent.limit;
-            if (Float.isNaN(r))
-                return 0;
-            return r;
-        }
-
-        private static final AtomicLongFieldUpdater<SubAllocator> ownsUpdater = AtomicLongFieldUpdater.newUpdater(SubAllocator.class, "owns");
-        private static final AtomicLongFieldUpdater<SubAllocator> reclaimingUpdater = AtomicLongFieldUpdater.newUpdater(SubAllocator.class, "reclaiming");
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/utils/memory/PoolCleanerThread.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/PoolCleanerThread.java b/src/java/org/apache/cassandra/utils/memory/PoolCleanerThread.java
deleted file mode 100644
index 68b0c20..0000000
--- a/src/java/org/apache/cassandra/utils/memory/PoolCleanerThread.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cassandra.utils.memory;
-
-import org.apache.cassandra.utils.concurrent.WaitQueue;
-
-/**
- * A thread that reclaims memory from a Pool on demand.  The actual reclaiming work is delegated to the
- * cleaner Runnable, e.g., FlushLargestColumnFamily
- */
-class PoolCleanerThread<P extends Pool> extends Thread
-{
-    /** The pool we're cleaning */
-    final P pool;
-
-    /** should ensure that at least some memory has been marked reclaiming after completion */
-    final Runnable cleaner;
-
-    /** signalled whenever needsCleaning() may return true */
-    final WaitQueue wait = new WaitQueue();
-
-    PoolCleanerThread(P pool, Runnable cleaner)
-    {
-        super(pool.getClass().getSimpleName() + "Cleaner");
-        this.pool = pool;
-        this.cleaner = cleaner;
-    }
-
-    boolean needsCleaning()
-    {
-        return pool.onHeap.needsCleaning() || pool.offHeap.needsCleaning();
-    }
-
-    // should ONLY be called when we really think it already needs cleaning
-    void trigger()
-    {
-        wait.signal();
-    }
-
-    @Override
-    public void run()
-    {
-        while (true)
-        {
-            while (!needsCleaning())
-            {
-                final WaitQueue.Signal signal = wait.register();
-                if (!needsCleaning())
-                    signal.awaitUninterruptibly();
-                else
-                    signal.cancel();
-            }
-
-            cleaner.run();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/utils/memory/SlabAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/SlabAllocator.java b/src/java/org/apache/cassandra/utils/memory/SlabAllocator.java
index a90357c..19334ee 100644
--- a/src/java/org/apache/cassandra/utils/memory/SlabAllocator.java
+++ b/src/java/org/apache/cassandra/utils/memory/SlabAllocator.java
@@ -44,7 +44,7 @@ import sun.nio.ch.DirectBuffer;
  * interleaved throughout the heap, and the old generation gets progressively
  * more fragmented until a stop-the-world compacting collection occurs.
  */
-public class SlabAllocator extends PoolAllocator
+public class SlabAllocator extends MemtableBufferAllocator
 {
     private static final Logger logger = LoggerFactory.getLogger(SlabAllocator.class);
 
@@ -54,7 +54,7 @@ public class SlabAllocator extends PoolAllocator
     // globally stash any Regions we allocate but are beaten to using, and use these up before allocating any more
     private static final ConcurrentLinkedQueue<Region> RACE_ALLOCATED = new ConcurrentLinkedQueue<>();
 
-    private final AtomicReference<Region> currentRegion = new AtomicReference<Region>();
+    private final AtomicReference<Region> currentRegion = new AtomicReference<>();
     private final AtomicInteger regionCount = new AtomicInteger(0);
 
     // this queue is used to keep references to off-heap allocated regions so that we can free them when we are discarded
@@ -106,9 +106,9 @@ public class SlabAllocator extends PoolAllocator
         }
     }
 
-    public void free(ByteBuffer name)
+    public DataReclaimer reclaimer()
     {
-        // have to assume we cannot free the memory here, and just reclaim it all when we flush
+        return NO_OP;
     }
 
     public void setDiscarded()
@@ -150,6 +150,11 @@ public class SlabAllocator extends PoolAllocator
         }
     }
 
+    protected AbstractAllocator allocator(OpOrder.Group writeOp)
+    {
+        return new ContextAllocator(writeOp, this);
+    }
+
     /**
      * A region of memory out of which allocations are sliced.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/src/java/org/apache/cassandra/utils/memory/SlabPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/SlabPool.java b/src/java/org/apache/cassandra/utils/memory/SlabPool.java
index 7276e57..c5c44e1 100644
--- a/src/java/org/apache/cassandra/utils/memory/SlabPool.java
+++ b/src/java/org/apache/cassandra/utils/memory/SlabPool.java
@@ -18,18 +18,17 @@
  */
 package org.apache.cassandra.utils.memory;
 
-
-public class SlabPool extends Pool
+public class SlabPool extends MemtablePool
 {
-
     final boolean allocateOnHeap;
+
     public SlabPool(long maxOnHeapMemory, long maxOffHeapMemory, float cleanupThreshold, Runnable cleaner)
     {
         super(maxOnHeapMemory, maxOffHeapMemory, cleanupThreshold, cleaner);
         this.allocateOnHeap = maxOffHeapMemory == 0;
     }
 
-    public SlabAllocator newAllocator()
+    public MemtableAllocator newAllocator()
     {
         return new SlabAllocator(onHeap.newAllocator(), offHeap.newAllocator(), allocateOnHeap);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index 37b0b96..b766a64 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -3,7 +3,7 @@
 # Consider the effects on 'o.a.c.i.s.LegacySSTableTest' before changing schemas in this file.
 #
 cluster_name: Test Cluster
-memtable_allocation_type: offheap_buffers
+memtable_allocation_type: offheap_objects
 in_memory_compaction_limit_in_mb: 1
 commitlog_sync: batch
 commitlog_sync_batch_window_in_ms: 1.0

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java b/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java
index ba59404..4bb8fdd 100644
--- a/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java
+++ b/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java
@@ -54,7 +54,7 @@ public class LongFlushMemtableTest extends SchemaLoader
                 ColumnFamily cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "_CF" + i);
                 // don't cheat by allocating this outside of the loop; that defeats the purpose of deliberately using lots of memory
                 ByteBuffer value = ByteBuffer.allocate(100000);
-                cf.addColumn(new Cell(Util.cellname("c"), value));
+                cf.addColumn(new BufferCell(Util.cellname("c"), value));
                 rm.add(cf);
                 rm.applyUnsafe();
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/test/long/org/apache/cassandra/db/LongKeyspaceTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/LongKeyspaceTest.java b/test/long/org/apache/cassandra/db/LongKeyspaceTest.java
index 327ff47..7a5b837 100644
--- a/test/long/org/apache/cassandra/db/LongKeyspaceTest.java
+++ b/test/long/org/apache/cassandra/db/LongKeyspaceTest.java
@@ -37,7 +37,7 @@ public class LongKeyspaceTest extends SchemaLoader
 
         for (int i = 1; i < 5000; i += 100)
         {
-            Mutation rm = new Mutation("Keyspace1", Util.dk("key" + i).key);
+            Mutation rm = new Mutation("Keyspace1", Util.dk("key" + i).getKey());
             ColumnFamily cf = ArrayBackedSortedColumns.factory.create("Keyspace1", "Standard1");
             for (int j = 0; j < i; j++)
                 cf.addColumn(column("c" + j, "v" + j, 1L));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
index 35c2b5e..94bc09f 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
@@ -127,7 +127,7 @@ public class LongCompactionsTest extends SchemaLoader
         for (int j = 0; j < SSTABLES; j++) {
             for (int i = 0; i < ROWS_PER_SSTABLE; i++) {
                 DecoratedKey key = Util.dk(String.valueOf(i % 2));
-                Mutation rm = new Mutation(KEYSPACE1, key.key);
+                Mutation rm = new Mutation(KEYSPACE1, key.getKey());
                 long timestamp = j * ROWS_PER_SSTABLE + i;
                 rm.add("Standard1", Util.cellname(String.valueOf(i / 2)),
                        ByteBufferUtil.EMPTY_BYTE_BUFFER,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
index 90e7123..b071001 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
@@ -54,7 +54,7 @@ public class LongLeveledCompactionStrategyTest extends SchemaLoader
         for (int r = 0; r < rows; r++)
         {
             DecoratedKey key = Util.dk(String.valueOf(r));
-            Mutation rm = new Mutation(ksname, key.key);
+            Mutation rm = new Mutation(ksname, key.getKey());
             for (int c = 0; c < columns; c++)
             {
                 rm.add(cfname, Util.cellname("column" + c), value, 0);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index fe80009..d2fe949 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -77,7 +77,7 @@ public class Util
 
     public static RowPosition rp(String key, IPartitioner partitioner)
     {
-        return RowPosition.forKey(ByteBufferUtil.bytes(key), partitioner);
+        return RowPosition.ForKey.get(ByteBufferUtil.bytes(key), partitioner);
     }
 
     public static CellName cellname(ByteBuffer... bbs)
@@ -108,17 +108,17 @@ public class Util
 
     public static Cell column(String name, String value, long timestamp)
     {
-        return new Cell(cellname(name), ByteBufferUtil.bytes(value), timestamp);
+        return new BufferCell(cellname(name), ByteBufferUtil.bytes(value), timestamp);
     }
 
     public static Cell expiringColumn(String name, String value, long timestamp, int ttl)
     {
-        return new ExpiringCell(cellname(name), ByteBufferUtil.bytes(value), timestamp, ttl);
+        return new BufferExpiringCell(cellname(name), ByteBufferUtil.bytes(value), timestamp, ttl);
     }
 
     public static Cell counterColumn(String name, long value, long timestamp)
     {
-        return new CounterUpdateCell(cellname(name), value, timestamp);
+        return new BufferCounterUpdateCell(cellname(name), value, timestamp);
     }
 
     public static Token token(String key)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/test/unit/org/apache/cassandra/config/DefsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/DefsTest.java b/test/unit/org/apache/cassandra/config/DefsTest.java
index 2e1876f..94738ac 100644
--- a/test/unit/org/apache/cassandra/config/DefsTest.java
+++ b/test/unit/org/apache/cassandra/config/DefsTest.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.config;
 
 import java.io.File;
 import java.nio.ByteBuffer;
-import java.util.*;
 
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
@@ -170,7 +169,7 @@ public class DefsTest extends SchemaLoader
         // now read and write to it.
         CellName col0 = cellname("col0");
         DecoratedKey dk = Util.dk("key0");
-        Mutation rm = new Mutation(ks, dk.key);
+        Mutation rm = new Mutation(ks, dk.getKey());
         rm.add(cf, col0, ByteBufferUtil.bytes("value0"), 1L);
         rm.apply();
         ColumnFamilyStore store = Keyspace.open(ks).getColumnFamilyStore(cf);
@@ -194,7 +193,7 @@ public class DefsTest extends SchemaLoader
         assert cfm != null;
 
         // write some data, force a flush, then verify that files exist on disk.
-        Mutation rm = new Mutation(ks.name, dk.key);
+        Mutation rm = new Mutation(ks.name, dk.getKey());
         for (int i = 0; i < 100; i++)
             rm.add(cfm.cfName, cellname("col" + i), ByteBufferUtil.bytes("anyvalue"), 1L);
         rm.apply();
@@ -208,7 +207,7 @@ public class DefsTest extends SchemaLoader
         assert !Schema.instance.getKSMetaData(ks.name).cfMetaData().containsKey(cfm.cfName);
 
         // any write should fail.
-        rm = new Mutation(ks.name, dk.key);
+        rm = new Mutation(ks.name, dk.getKey());
         boolean success = true;
         try
         {
@@ -244,7 +243,7 @@ public class DefsTest extends SchemaLoader
 
         // test reads and writes.
         CellName col0 = cellname("col0");
-        Mutation rm = new Mutation(newCf.ksName, dk.key);
+        Mutation rm = new Mutation(newCf.ksName, dk.getKey());
         rm.add(newCf.cfName, col0, ByteBufferUtil.bytes("value0"), 1L);
         rm.apply();
         ColumnFamilyStore store = Keyspace.open(newCf.ksName).getColumnFamilyStore(newCf.cfName);
@@ -268,7 +267,7 @@ public class DefsTest extends SchemaLoader
         assert cfm != null;
 
         // write some data, force a flush, then verify that files exist on disk.
-        Mutation rm = new Mutation(ks.name, dk.key);
+        Mutation rm = new Mutation(ks.name, dk.getKey());
         for (int i = 0; i < 100; i++)
             rm.add(cfm.cfName, cellname("col" + i), ByteBufferUtil.bytes("anyvalue"), 1L);
         rm.apply();
@@ -282,7 +281,7 @@ public class DefsTest extends SchemaLoader
         assert Schema.instance.getKSMetaData(ks.name) == null;
 
         // write should fail.
-        rm = new Mutation(ks.name, dk.key);
+        rm = new Mutation(ks.name, dk.getKey());
         boolean success = true;
         try
         {
@@ -319,7 +318,7 @@ public class DefsTest extends SchemaLoader
         assert cfm != null;
 
         // write some data
-        Mutation rm = new Mutation(ks.name, dk.key);
+        Mutation rm = new Mutation(ks.name, dk.getKey());
         for (int i = 0; i < 100; i++)
             rm.add(cfm.cfName, cellname("col" + i), ByteBufferUtil.bytes("anyvalue"), 1L);
         rm.apply();
@@ -353,7 +352,7 @@ public class DefsTest extends SchemaLoader
         // now read and write to it.
         CellName col0 = cellname("col0");
         DecoratedKey dk = Util.dk("key0");
-        Mutation rm = new Mutation(newKs.name, dk.key);
+        Mutation rm = new Mutation(newKs.name, dk.getKey());
         rm.add(newCf.cfName, col0, ByteBufferUtil.bytes("value0"), 1L);
         rm.apply();
         ColumnFamilyStore store = Keyspace.open(newKs.name).getColumnFamilyStore(newCf.cfName);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java b/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
index a1c98f3..83a58e4 100644
--- a/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
+++ b/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
@@ -54,7 +54,7 @@ public class ArrayBackedSortedColumnsTest extends SchemaLoader
         int[] values = new int[]{ 1, 2, 2, 3 };
 
         for (int i = 0; i < values.length; ++i)
-            map.addColumn(new Cell(type.makeCellName(values[reversed ? values.length - 1 - i : i])));
+            map.addColumn(new BufferCell(type.makeCellName(values[reversed ? values.length - 1 - i : i])));
 
         Iterator<Cell> iter = map.iterator();
         assertEquals("1st column", 1, iter.next().name().toByteBuffer().getInt(0));
@@ -76,7 +76,7 @@ public class ArrayBackedSortedColumnsTest extends SchemaLoader
 
         int[] values = new int[]{ 1, 2, 1, 3, 4, 4, 5, 5, 1, 2, 6, 6, 6, 1, 2, 3 };
         for (int i = 0; i < values.length; ++i)
-            cells.addColumn(new Cell(type.makeCellName(values[reversed ? values.length - 1 - i : i])));
+            cells.addColumn(new BufferCell(type.makeCellName(values[reversed ? values.length - 1 - i : i])));
 
         assertEquals(6, cells.getColumnCount());
 
@@ -91,7 +91,7 @@ public class ArrayBackedSortedColumnsTest extends SchemaLoader
         // Add more values
         values = new int[]{ 11, 15, 12, 12, 12, 16, 10, 8, 8, 7, 4, 4, 5 };
         for (int i = 0; i < values.length; ++i)
-            cells.addColumn(new Cell(type.makeCellName(values[reversed ? values.length - 1 - i : i])));
+            cells.addColumn(new BufferCell(type.makeCellName(values[reversed ? values.length - 1 - i : i])));
 
         assertEquals(13, cells.getColumnCount());
 
@@ -125,7 +125,7 @@ public class ArrayBackedSortedColumnsTest extends SchemaLoader
 
         int[] values = new int[]{ -1, 20, 44, 55, 27, 27, 17, 1, 9, 89, 33, 44, 0, 9 };
         for (int i = 0; i < values.length; ++i)
-            cells.addColumn(new Cell(type.makeCellName(values[reversed ? values.length - 1 - i : i])));
+            cells.addColumn(new BufferCell(type.makeCellName(values[reversed ? values.length - 1 - i : i])));
 
         for (int i : values)
             assertEquals(i, cells.getColumn(type.makeCellName(i)).name().toByteBuffer().getInt(0));
@@ -148,10 +148,10 @@ public class ArrayBackedSortedColumnsTest extends SchemaLoader
         int[] values2 = new int[]{ 2, 4, 5, 6 };
 
         for (int i = 0; i < values1.length; ++i)
-            map.addColumn(new Cell(type.makeCellName(values1[reversed ? values1.length - 1 - i : i])));
+            map.addColumn(new BufferCell(type.makeCellName(values1[reversed ? values1.length - 1 - i : i])));
 
         for (int i = 0; i < values2.length; ++i)
-            map2.addColumn(new Cell(type.makeCellName(values2[reversed ? values2.length - 1 - i : i])));
+            map2.addColumn(new BufferCell(type.makeCellName(values2[reversed ? values2.length - 1 - i : i])));
 
         map2.addAll(map);
 
@@ -179,12 +179,12 @@ public class ArrayBackedSortedColumnsTest extends SchemaLoader
 
         List<Cell> sorted = new ArrayList<>();
         for (int v : values)
-            sorted.add(new Cell(type.makeCellName(v)));
+            sorted.add(new BufferCell(type.makeCellName(v)));
         List<Cell> reverseSorted = new ArrayList<>(sorted);
         Collections.reverse(reverseSorted);
 
         for (int i = 0; i < values.length; ++i)
-            map.addColumn(new Cell(type.makeCellName(values[reversed ? values.length - 1 - i : i])));
+            map.addColumn(new BufferCell(type.makeCellName(values[reversed ? values.length - 1 - i : i])));
 
         assertSame(sorted, map.getSortedColumns());
         assertSame(reverseSorted, map.getReverseSortedColumns());
@@ -205,7 +205,7 @@ public class ArrayBackedSortedColumnsTest extends SchemaLoader
         int[] values = new int[]{ 1, 2, 3, 5, 9 };
 
         for (int i = 0; i < values.length; ++i)
-            map.addColumn(new Cell(type.makeCellName(values[reversed ? values.length - 1 - i : i])));
+            map.addColumn(new BufferCell(type.makeCellName(values[reversed ? values.length - 1 - i : i])));
 
         assertSame(new int[]{ 3, 2, 1 }, map.reverseIterator(new ColumnSlice[]{ new ColumnSlice(type.make(3), Composites.EMPTY) }));
         assertSame(new int[]{ 3, 2, 1 }, map.reverseIterator(new ColumnSlice[]{ new ColumnSlice(type.make(4), Composites.EMPTY) }));
@@ -251,7 +251,7 @@ public class ArrayBackedSortedColumnsTest extends SchemaLoader
         int[] values = new int[]{ 1, 2, 2, 3 };
 
         for (int i = 0; i < values.length; ++i)
-            map.addColumn(new Cell(type.makeCellName(values[reversed ? values.length - 1 - i : i])));
+            map.addColumn(new BufferCell(type.makeCellName(values[reversed ? values.length - 1 - i : i])));
 
         Iterator<Cell> iter = map.getReverseSortedColumns().iterator();
         assertTrue(iter.hasNext());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/test/unit/org/apache/cassandra/db/CollationControllerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CollationControllerTest.java b/test/unit/org/apache/cassandra/db/CollationControllerTest.java
index fc92aae..22c60b8 100644
--- a/test/unit/org/apache/cassandra/db/CollationControllerTest.java
+++ b/test/unit/org/apache/cassandra/db/CollationControllerTest.java
@@ -40,26 +40,26 @@ public class CollationControllerTest extends SchemaLoader
         DecoratedKey dk = Util.dk("key1");
         
         // add data
-        rm = new Mutation(keyspace.getName(), dk.key);
+        rm = new Mutation(keyspace.getName(), dk.getKey());
         rm.add(cfs.name, Util.cellname("Column1"), ByteBufferUtil.bytes("asdf"), 0);
         rm.apply();
         cfs.forceBlockingFlush();
         
         // remove
-        rm = new Mutation(keyspace.getName(), dk.key);
+        rm = new Mutation(keyspace.getName(), dk.getKey());
         rm.delete(cfs.name, 10);
         rm.apply();
         
         // add another mutation because sstable maxtimestamp isn't set
         // correctly during flush if the most recent mutation is a row delete
-        rm = new Mutation(keyspace.getName(), Util.dk("key2").key);
+        rm = new Mutation(keyspace.getName(), Util.dk("key2").getKey());
         rm.add(cfs.name, Util.cellname("Column1"), ByteBufferUtil.bytes("zxcv"), 20);
         rm.apply();
         
         cfs.forceBlockingFlush();
 
         // add yet one more mutation
-        rm = new Mutation(keyspace.getName(), dk.key);
+        rm = new Mutation(keyspace.getName(), dk.getKey());
         rm.add(cfs.name, Util.cellname("Column1"), ByteBufferUtil.bytes("foobar"), 30);
         rm.apply();
         cfs.forceBlockingFlush();
@@ -92,13 +92,13 @@ public class CollationControllerTest extends SchemaLoader
         CellName cellName = Util.cellname("Column1");
 
         // add data
-        rm = new Mutation(keyspace.getName(), dk.key);
+        rm = new Mutation(keyspace.getName(), dk.getKey());
         rm.add(cfs.name, cellName, ByteBufferUtil.bytes("asdf"), 0);
         rm.apply();
         cfs.forceBlockingFlush();
 
         // remove
-        rm = new Mutation(keyspace.getName(), dk.key);
+        rm = new Mutation(keyspace.getName(), dk.getKey());
         rm.delete(cfs.name, cellName, 0);
         rm.apply();
         cfs.forceBlockingFlush();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index d180b82..f812849 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -202,7 +202,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
                                              Util.namesFilter(cfs, "asdf"),
                                              10);
         assertEquals(1, result.size());
-        assert result.get(0).key.key.equals(ByteBufferUtil.bytes("key2"));
+        assert result.get(0).key.getKey().equals(ByteBufferUtil.bytes("key2"));
     }
 
     @Test
@@ -243,10 +243,10 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         assert rows != null;
         assert rows.size() == 2 : StringUtils.join(rows, ",");
 
-        String key = new String(rows.get(0).key.key.array(),rows.get(0).key.key.position(),rows.get(0).key.key.remaining());
+        String key = new String(rows.get(0).key.getKey().array(), rows.get(0).key.getKey().position(), rows.get(0).key.getKey().remaining());
         assert "k1".equals( key ) : key;
 
-        key = new String(rows.get(1).key.key.array(),rows.get(1).key.key.position(),rows.get(1).key.key.remaining());
+        key = new String(rows.get(1).key.getKey().array(), rows.get(1).key.getKey().position(), rows.get(1).key.getKey().remaining());
         assert "k3".equals(key) : key;
 
         assert ByteBufferUtil.bytes(1L).equals( rows.get(0).cf.getColumn(birthdate).value());
@@ -258,14 +258,14 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         rows = cfs.search(range, clause, filter, 100);
 
         assert rows.size() == 1 : StringUtils.join(rows, ",");
-        key = new String(rows.get(0).key.key.array(),rows.get(0).key.key.position(),rows.get(0).key.key.remaining());
+        key = new String(rows.get(0).key.getKey().array(), rows.get(0).key.getKey().position(), rows.get(0).key.getKey().remaining());
         assert "k3".equals( key );
 
         // same query again, but with resultset not including the subordinate expression
         rows = cfs.search(range, clause, Util.namesFilter(cfs, "birthdate"), 100);
 
         assert rows.size() == 1 : StringUtils.join(rows, ",");
-        key = new String(rows.get(0).key.key.array(),rows.get(0).key.key.position(),rows.get(0).key.key.remaining());
+        key = new String(rows.get(0).key.getKey().array(), rows.get(0).key.getKey().position(), rows.get(0).key.getKey().remaining());
         assert "k3".equals( key );
 
         assert rows.get(0).cf.getColumnCount() == 1 : rows.get(0).cf;
@@ -275,7 +275,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         rows = cfs.search(range, clause, emptyFilter, 100);
 
         assert rows.size() == 1 : StringUtils.join(rows, ",");
-        key = new String(rows.get(0).key.key.array(),rows.get(0).key.key.position(),rows.get(0).key.key.remaining());
+        key = new String(rows.get(0).key.getKey().array(), rows.get(0).key.getKey().position(), rows.get(0).key.getKey().remaining());
         assert "k3".equals( key );
 
         assertFalse(rows.get(0).cf.hasColumns());
@@ -334,7 +334,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         Range<RowPosition> range = Util.range("", "");
         List<Row> rows = cfs.search(range, clause, filter, 100);
         assert rows.size() == 1 : StringUtils.join(rows, ",");
-        String key = ByteBufferUtil.string(rows.get(0).key.key);
+        String key = ByteBufferUtil.string(rows.get(0).key.getKey());
         assert "k1".equals( key );
 
         // delete the column directly
@@ -358,7 +358,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         rm.apply();
         rows = cfs.search(range, clause, filter, 100);
         assert rows.size() == 1 : StringUtils.join(rows, ",");
-        key = ByteBufferUtil.string(rows.get(0).key.key);
+        key = ByteBufferUtil.string(rows.get(0).key.getKey());
         assert "k1".equals( key );
 
         // verify that row and delete w/ older timestamp does nothing
@@ -367,7 +367,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         rm.apply();
         rows = cfs.search(range, clause, filter, 100);
         assert rows.size() == 1 : StringUtils.join(rows, ",");
-        key = ByteBufferUtil.string(rows.get(0).key.key);
+        key = ByteBufferUtil.string(rows.get(0).key.getKey());
         assert "k1".equals( key );
 
         // similarly, column delete w/ older timestamp should do nothing
@@ -376,7 +376,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         rm.apply();
         rows = cfs.search(range, clause, filter, 100);
         assert rows.size() == 1 : StringUtils.join(rows, ",");
-        key = ByteBufferUtil.string(rows.get(0).key.key);
+        key = ByteBufferUtil.string(rows.get(0).key.getKey());
         assert "k1".equals( key );
 
         // delete the entire row (w/ newer timestamp this time)
@@ -408,7 +408,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         rm.apply();
         rows = cfs.search(range, clause, filter, 100);
         assert rows.size() == 1 : StringUtils.join(rows, ",");
-        key = ByteBufferUtil.string(rows.get(0).key.key);
+        key = ByteBufferUtil.string(rows.get(0).key.getKey());
         assert "k1".equals( key );
     }
 
@@ -438,7 +438,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         expr = new IndexExpression(ByteBufferUtil.bytes("birthdate"), IndexExpression.Operator.EQ, ByteBufferUtil.bytes(2L));
         clause = Arrays.asList(expr);
         rows = keyspace.getColumnFamilyStore("Indexed1").search(range, clause, filter, 100);
-        String key = ByteBufferUtil.string(rows.get(0).key.key);
+        String key = ByteBufferUtil.string(rows.get(0).key.getKey());
         assert "k1".equals( key );
 
         // update the birthdate value with an OLDER timestamp, and test that the index ignores this
@@ -447,7 +447,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         rm.apply();
 
         rows = keyspace.getColumnFamilyStore("Indexed1").search(range, clause, filter, 100);
-        key = ByteBufferUtil.string(rows.get(0).key.key);
+        key = ByteBufferUtil.string(rows.get(0).key.getKey());
         assert "k1".equals( key );
 
     }
@@ -714,7 +714,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         IDiskAtomFilter filter = new IdentityQueryFilter();
         List<Row> rows = keyspace.getColumnFamilyStore("Indexed2").search(Util.range("", ""), clause, filter, 100);
         assert rows.size() == 1 : StringUtils.join(rows, ",");
-        assertEquals("k1", ByteBufferUtil.string(rows.get(0).key.key));
+        assertEquals("k1", ByteBufferUtil.string(rows.get(0).key.getKey()));
     }
 
     @Test
@@ -771,7 +771,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
                                              Util.namesFilter(cfs, "asdf"),
                                              10);
         assertEquals(2, result.size());
-        assert result.get(0).key.key.equals(ByteBufferUtil.bytes("key1"));
+        assert result.get(0).key.getKey().equals(ByteBufferUtil.bytes("key1"));
     }
 
     @Test
@@ -786,16 +786,16 @@ public class ColumnFamilyStoreTest extends SchemaLoader
 
         // create an isolated sstable.
         putColsSuper(cfs, key, scfName,
-                new Cell(cellname(1L), ByteBufferUtil.bytes("val1"), 1),
-                new Cell(cellname(2L), ByteBufferUtil.bytes("val2"), 1),
-                new Cell(cellname(3L), ByteBufferUtil.bytes("val3"), 1));
+                new BufferCell(cellname(1L), ByteBufferUtil.bytes("val1"), 1),
+                new BufferCell(cellname(2L), ByteBufferUtil.bytes("val2"), 1),
+                new BufferCell(cellname(3L), ByteBufferUtil.bytes("val3"), 1));
         cfs.forceBlockingFlush();
 
         // insert, don't flush.
         putColsSuper(cfs, key, scfName,
-                new Cell(cellname(4L), ByteBufferUtil.bytes("val4"), 1),
-                new Cell(cellname(5L), ByteBufferUtil.bytes("val5"), 1),
-                new Cell(cellname(6L), ByteBufferUtil.bytes("val6"), 1));
+                new BufferCell(cellname(4L), ByteBufferUtil.bytes("val4"), 1),
+                new BufferCell(cellname(5L), ByteBufferUtil.bytes("val5"), 1),
+                new BufferCell(cellname(6L), ByteBufferUtil.bytes("val6"), 1));
 
         // verify insert.
         final SlicePredicate sp = new SlicePredicate();
@@ -807,7 +807,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         assertRowAndColCount(1, 6, false, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, scfName), 100));
 
         // delete
-        Mutation rm = new Mutation(keyspace.getName(), key.key);
+        Mutation rm = new Mutation(keyspace.getName(), key.getKey());
         rm.deleteRange(cfName, SuperColumns.startOf(scfName), SuperColumns.endOf(scfName), 2);
         rm.apply();
 
@@ -822,17 +822,17 @@ public class ColumnFamilyStoreTest extends SchemaLoader
 
         // late insert.
         putColsSuper(cfs, key, scfName,
-                new Cell(cellname(4L), ByteBufferUtil.bytes("val4"), 1L),
-                new Cell(cellname(7L), ByteBufferUtil.bytes("val7"), 1L));
+                new BufferCell(cellname(4L), ByteBufferUtil.bytes("val4"), 1L),
+                new BufferCell(cellname(7L), ByteBufferUtil.bytes("val7"), 1L));
 
         // re-verify delete.
         assertRowAndColCount(1, 0, false, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, scfName), 100));
 
         // make sure new writes are recognized.
         putColsSuper(cfs, key, scfName,
-                new Cell(cellname(3L), ByteBufferUtil.bytes("val3"), 3),
-                new Cell(cellname(8L), ByteBufferUtil.bytes("val8"), 3),
-                new Cell(cellname(9L), ByteBufferUtil.bytes("val9"), 3));
+                new BufferCell(cellname(3L), ByteBufferUtil.bytes("val3"), 3),
+                new BufferCell(cellname(8L), ByteBufferUtil.bytes("val8"), 3),
+                new BufferCell(cellname(9L), ByteBufferUtil.bytes("val9"), 3));
         assertRowAndColCount(1, 3, false, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, scfName), 100));
     }
 
@@ -861,7 +861,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfs.keyspace.getName(), cfs.name);
         for (Cell col : cols)
             cf.addColumn(col.withUpdatedName(CellNames.compositeDense(scfName, col.name().toByteBuffer())));
-        Mutation rm = new Mutation(cfs.keyspace.getName(), key.key, cf);
+        Mutation rm = new Mutation(cfs.keyspace.getName(), key.getKey(), cf);
         rm.apply();
     }
 
@@ -870,7 +870,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfs.keyspace.getName(), cfs.name);
         for (Cell col : cols)
             cf.addColumn(col);
-        Mutation rm = new Mutation(cfs.keyspace.getName(), key.key, cf);
+        Mutation rm = new Mutation(cfs.keyspace.getName(), key.getKey(), cf);
         rm.apply();
     }
 
@@ -902,7 +902,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         assertRowAndColCount(1, 4, false, cfs.getRangeSlice(Util.range("f", "g"), null, ThriftValidation.asIFilter(sp, cfs.metadata, null), 100));
 
         // delete (from sstable and memtable)
-        Mutation rm = new Mutation(keyspace.getName(), key.key);
+        Mutation rm = new Mutation(keyspace.getName(), key.getKey());
         rm.delete(cfs.name, 2);
         rm.apply();
 
@@ -973,8 +973,8 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         DecoratedKey key = Util.dk("slice-get-uuid-type");
 
         // Insert a row with one supercolumn and multiple subcolumns
-        putColsSuper(cfs, key, superColName, new Cell(cellname("a"), ByteBufferUtil.bytes("A"), 1),
-                                             new Cell(cellname("b"), ByteBufferUtil.bytes("B"), 1));
+        putColsSuper(cfs, key, superColName, new BufferCell(cellname("a"), ByteBufferUtil.bytes("A"), 1),
+                                             new BufferCell(cellname("b"), ByteBufferUtil.bytes("B"), 1));
 
         // Get the entire supercolumn like normal
         ColumnFamily cfGet = cfs.getColumnFamily(QueryFilter.getIdentityFilter(key, cfName, System.currentTimeMillis()));
@@ -985,7 +985,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         SortedSet<CellName> sliceColNames = new TreeSet<CellName>(cfs.metadata.comparator);
         sliceColNames.add(CellNames.compositeDense(superColName, ByteBufferUtil.bytes("a")));
         sliceColNames.add(CellNames.compositeDense(superColName, ByteBufferUtil.bytes("b")));
-        SliceByNamesReadCommand cmd = new SliceByNamesReadCommand(keyspaceName, key.key, cfName, System.currentTimeMillis(), new NamesQueryFilter(sliceColNames));
+        SliceByNamesReadCommand cmd = new SliceByNamesReadCommand(keyspaceName, key.getKey(), cfName, System.currentTimeMillis(), new NamesQueryFilter(sliceColNames));
         ColumnFamily cfSliced = cmd.getRow(keyspace).cf;
 
         // Make sure the slice returns the same as the straight get
@@ -1005,7 +1005,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         cfs.clearUnsafe();
 
         // Create a cell a 'high timestamp'
-        putColsStandard(cfs, key, new Cell(cname, ByteBufferUtil.bytes("a"), 2));
+        putColsStandard(cfs, key, new BufferCell(cname, ByteBufferUtil.bytes("a"), 2));
         cfs.forceBlockingFlush();
 
         // Nuke the metadata and reload that sstable
@@ -1018,10 +1018,10 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         cfs.loadNewSSTables();
 
         // Add another cell with a lower timestamp
-        putColsStandard(cfs, key, new Cell(cname, ByteBufferUtil.bytes("b"), 1));
+        putColsStandard(cfs, key, new BufferCell(cname, ByteBufferUtil.bytes("b"), 1));
 
         // Test fetching the cell by name returns the first cell
-        SliceByNamesReadCommand cmd = new SliceByNamesReadCommand(keyspaceName, key.key, cfName, System.currentTimeMillis(), new NamesQueryFilter(FBUtilities.singleton(cname, cfs.getComparator())));
+        SliceByNamesReadCommand cmd = new SliceByNamesReadCommand(keyspaceName, key.getKey(), cfName, System.currentTimeMillis(), new NamesQueryFilter(FBUtilities.singleton(cname, cfs.getComparator())));
         ColumnFamily cf = cmd.getRow(keyspace).cf;
         Cell cell = cf.getColumn(cname);
         assert cell.value().equals(ByteBufferUtil.bytes("a")) : "expecting a, got " + ByteBufferUtil.string(cell.value());
@@ -1250,7 +1250,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
             for (Row row : rows)
             {
                 sb.append("{");
-                sb.append(ByteBufferUtil.string(row.key.key));
+                sb.append(ByteBufferUtil.string(row.key.getKey()));
                 sb.append(":");
                 if (row.cf != null && !row.cf.isEmpty())
                 {
@@ -1399,7 +1399,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         Cell[] cols = new Cell[letters.length];
         for (int i = 0; i < cols.length; i++)
         {
-            cols[i] = new Cell(cellname("col" + letters[i].toUpperCase()),
+            cols[i] = new BufferCell(cellname("col" + letters[i].toUpperCase()),
                     ByteBuffer.wrap(new byte[1]), 1);
         }
 
@@ -1448,7 +1448,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         Cell[] cols = new Cell[letters.length];
         for (int i = 0; i < cols.length; i++)
         {
-            cols[i] = new Cell(cellname("col" + letters[i].toUpperCase()),
+            cols[i] = new BufferCell(cellname("col" + letters[i].toUpperCase()),
                     ByteBuffer.wrap(new byte[1366]), 1);
         }
 
@@ -1497,7 +1497,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         Cell[] cols = new Cell[letters.length];
         for (int i = 0; i < cols.length; i++)
         {
-            cols[i] = new Cell(cellname("col" + letters[i].toUpperCase()),
+            cols[i] = new BufferCell(cellname("col" + letters[i].toUpperCase()),
                     ByteBuffer.wrap(new byte[1]), 1);
         }
 
@@ -1547,7 +1547,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         Cell[] cols = new Cell[letters.length];
         for (int i = 0; i < cols.length; i++)
         {
-            cols[i] = new Cell(cellname("col" + letters[i].toUpperCase()),
+            cols[i] = new BufferCell(cellname("col" + letters[i].toUpperCase()),
                     ByteBuffer.wrap(new byte[1366]), 1);
         }
 
@@ -1595,7 +1595,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         Cell[] cols = new Cell[letters.length];
         for (int i = 0; i < cols.length; i++)
         {
-            cols[i] = new Cell(cellname("col" + letters[i].toUpperCase()),
+            cols[i] = new BufferCell(cellname("col" + letters[i].toUpperCase()),
                     // use 1366 so that three cols make an index segment
                     ByteBuffer.wrap(new byte[1366]), 1);
         }
@@ -1856,7 +1856,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         Cell[] cols = new Cell[12];
         for (int i = 0; i < cols.length; i++)
         {
-            cols[i] = new Cell(cellname("col" + letters[i]), ByteBuffer.wrap(new byte[valueSize]), 1);
+            cols[i] = new BufferCell(cellname("col" + letters[i]), ByteBuffer.wrap(new byte[valueSize]), 1);
         }
 
         for (int i = 0; i < 12; i++)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/test/unit/org/apache/cassandra/db/CounterCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CounterCacheTest.java b/test/unit/org/apache/cassandra/db/CounterCacheTest.java
index a015a43..cb2d97a 100644
--- a/test/unit/org/apache/cassandra/db/CounterCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/CounterCacheTest.java
@@ -75,8 +75,8 @@ public class CounterCacheTest extends SchemaLoader
         CacheService.instance.invalidateCounterCache();
 
         ColumnFamily cells = ArrayBackedSortedColumns.factory.create(cfs.metadata);
-        cells.addColumn(new CounterUpdateCell(cellname(1), 1L, FBUtilities.timestampMicros()));
-        cells.addColumn(new CounterUpdateCell(cellname(2), 2L, FBUtilities.timestampMicros()));
+        cells.addColumn(new BufferCounterUpdateCell(cellname(1), 1L, FBUtilities.timestampMicros()));
+        cells.addColumn(new BufferCounterUpdateCell(cellname(2), 2L, FBUtilities.timestampMicros()));
         new CounterMutation(new Mutation(KS, bytes(1), cells), ConsistencyLevel.ONE).apply();
         new CounterMutation(new Mutation(KS, bytes(2), cells), ConsistencyLevel.ONE).apply();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/test/unit/org/apache/cassandra/db/CounterCellTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CounterCellTest.java b/test/unit/org/apache/cassandra/db/CounterCellTest.java
index 86b856c..efc365d 100644
--- a/test/unit/org/apache/cassandra/db/CounterCellTest.java
+++ b/test/unit/org/apache/cassandra/db/CounterCellTest.java
@@ -64,7 +64,7 @@ public class CounterCellTest extends SchemaLoader
     public void testCreate()
     {
         long delta = 3L;
-        CounterCell cell = new CounterCell(Util.cellname("x"),
+        CounterCell cell = new BufferCounterCell(Util.cellname("x"),
                                            CounterContext.instance().createLocal(delta),
                                            1L,
                                            Long.MIN_VALUE);
@@ -87,33 +87,33 @@ public class CounterCellTest extends SchemaLoader
         ByteBuffer context;
 
         // tombstone + tombstone
-        left  = new DeletedCell(cellname("x"), 1, 1L);
-        right = new DeletedCell(cellname("x"), 2, 2L);
+        left  = new BufferDeletedCell(cellname("x"), 1, 1L);
+        right = new BufferDeletedCell(cellname("x"), 2, 2L);
 
         assert left.reconcile(right).getMarkedForDeleteAt() == right.getMarkedForDeleteAt();
         assert right.reconcile(left).getMarkedForDeleteAt() == right.getMarkedForDeleteAt();
 
         // tombstone > live
-        left  = new DeletedCell(cellname("x"), 1, 2L);
-        right = CounterCell.createLocal(cellname("x"), 0L, 1L, Long.MIN_VALUE);
+        left  = new BufferDeletedCell(cellname("x"), 1, 2L);
+        right = BufferCounterCell.createLocal(cellname("x"), 0L, 1L, Long.MIN_VALUE);
 
         assert left.reconcile(right) == left;
 
         // tombstone < live last delete
-        left  = new DeletedCell(cellname("x"), 1, 1L);
-        right = CounterCell.createLocal(cellname("x"), 0L, 4L, 2L);
+        left  = new BufferDeletedCell(cellname("x"), 1, 1L);
+        right = BufferCounterCell.createLocal(cellname("x"), 0L, 4L, 2L);
 
         assert left.reconcile(right) == right;
 
         // tombstone == live last delete
-        left  = new DeletedCell(cellname("x"), 1, 2L);
-        right = CounterCell.createLocal(cellname("x"), 0L, 4L, 2L);
+        left  = new BufferDeletedCell(cellname("x"), 1, 2L);
+        right = BufferCounterCell.createLocal(cellname("x"), 0L, 4L, 2L);
 
         assert left.reconcile(right) == right;
 
         // tombstone > live last delete
-        left  = new DeletedCell(cellname("x"), 1, 4L);
-        right = CounterCell.createLocal(cellname("x"), 0L, 9L, 1L);
+        left  = new BufferDeletedCell(cellname("x"), 1, 4L);
+        right = BufferCounterCell.createLocal(cellname("x"), 0L, 9L, 1L);
 
         reconciled = left.reconcile(right);
         assert reconciled.name() == right.name();
@@ -122,26 +122,26 @@ public class CounterCellTest extends SchemaLoader
         assert ((CounterCell)reconciled).timestampOfLastDelete() == left.getMarkedForDeleteAt();
 
         // live < tombstone
-        left  = CounterCell.createLocal(cellname("x"), 0L, 1L, Long.MIN_VALUE);
-        right = new DeletedCell(cellname("x"), 1, 2L);
+        left  = BufferCounterCell.createLocal(cellname("x"), 0L, 1L, Long.MIN_VALUE);
+        right = new BufferDeletedCell(cellname("x"), 1, 2L);
 
         assert left.reconcile(right) == right;
 
         // live last delete > tombstone
-        left  = CounterCell.createLocal(cellname("x"), 0L, 4L, 2L);
-        right = new DeletedCell(cellname("x"), 1, 1L);
+        left  = BufferCounterCell.createLocal(cellname("x"), 0L, 4L, 2L);
+        right = new BufferDeletedCell(cellname("x"), 1, 1L);
 
         assert left.reconcile(right) == left;
 
         // live last delete == tombstone
-        left  = CounterCell.createLocal(cellname("x"), 0L, 4L, 2L);
-        right = new DeletedCell(cellname("x"), 1, 2L);
+        left  = BufferCounterCell.createLocal(cellname("x"), 0L, 4L, 2L);
+        right = new BufferDeletedCell(cellname("x"), 1, 2L);
 
         assert left.reconcile(right) == left;
 
         // live last delete < tombstone
-        left  = CounterCell.createLocal(cellname("x"), 0L, 9L, 1L);
-        right = new DeletedCell(cellname("x"), 1, 4L);
+        left  = BufferCounterCell.createLocal(cellname("x"), 0L, 9L, 1L);
+        right = new BufferDeletedCell(cellname("x"), 1, 4L);
 
         reconciled = left.reconcile(right);
         assert reconciled.name() == left.name();
@@ -150,20 +150,20 @@ public class CounterCellTest extends SchemaLoader
         assert ((CounterCell)reconciled).timestampOfLastDelete() == right.getMarkedForDeleteAt();
 
         // live < live last delete
-        left  = new CounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(1), 2L, 3L), 1L, Long.MIN_VALUE);
-        right = new CounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(1), 1L, 1L), 4L, 3L);
+        left  = new BufferCounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(1), 2L, 3L), 1L, Long.MIN_VALUE);
+        right = new BufferCounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(1), 1L, 1L), 4L, 3L);
 
         assert left.reconcile(right) == right;
 
         // live last delete > live
-        left  = new CounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(1), 2L, 3L), 6L, 5L);
-        right = new CounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(1), 1L, 1L), 4L, 3L);
+        left  = new BufferCounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(1), 2L, 3L), 6L, 5L);
+        right = new BufferCounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(1), 1L, 1L), 4L, 3L);
 
         assert left.reconcile(right) == left;
 
         // live + live
-        left = new CounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(1), 1L, 1L), 4L, Long.MIN_VALUE);
-        right = new CounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(1), 2L, 3L), 1L, Long.MIN_VALUE);
+        left = new BufferCounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(1), 1L, 1L), 4L, Long.MIN_VALUE);
+        right = new BufferCounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(1), 2L, 3L), 1L, Long.MIN_VALUE);
 
         reconciled = left.reconcile(right);
         assert reconciled.name().equals(left.name());
@@ -171,7 +171,7 @@ public class CounterCellTest extends SchemaLoader
         assert reconciled.timestamp() == 4L;
 
         left = reconciled;
-        right = new CounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(2), 1L, 5L), 2L, Long.MIN_VALUE);
+        right = new BufferCounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(2), 1L, 5L), 2L, Long.MIN_VALUE);
 
         reconciled = left.reconcile(right);
         assert reconciled.name().equals(left.name());
@@ -179,7 +179,7 @@ public class CounterCellTest extends SchemaLoader
         assert reconciled.timestamp() == 4L;
 
         left = reconciled;
-        right = new CounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(2), 2L, 2L), 6L, Long.MIN_VALUE);
+        right = new BufferCounterCell(cellname("x"), cc.createRemote(CounterId.fromInt(2), 2L, 2L), 6L, Long.MIN_VALUE);
 
         reconciled = left.reconcile(right);
         assert reconciled.name().equals(left.name());
@@ -211,15 +211,15 @@ public class CounterCellTest extends SchemaLoader
         CounterCell rightCell;
 
         // timestamp
-        leftCell = CounterCell.createLocal(cellname("x"), 0, 1L, Long.MIN_VALUE);
-        rightCell = CounterCell.createLocal(cellname("x"), 0, 2L, Long.MIN_VALUE);
+        leftCell = BufferCounterCell.createLocal(cellname("x"), 0, 1L, Long.MIN_VALUE);
+        rightCell = BufferCounterCell.createLocal(cellname("x"), 0, 2L, Long.MIN_VALUE);
 
         assert rightCell == leftCell.diff(rightCell);
         assert null      == rightCell.diff(leftCell);
 
         // timestampOfLastDelete
-        leftCell = CounterCell.createLocal(cellname("x"), 0, 1L, 1L);
-        rightCell = CounterCell.createLocal(cellname("x"), 0, 1L, 2L);
+        leftCell = BufferCounterCell.createLocal(cellname("x"), 0, 1L, 1L);
+        rightCell = BufferCounterCell.createLocal(cellname("x"), 0, 1L, 2L);
 
         assert rightCell == leftCell.diff(rightCell);
         assert null      == rightCell.diff(leftCell);
@@ -231,8 +231,8 @@ public class CounterCellTest extends SchemaLoader
         left.writeRemote(CounterId.fromInt(9), 1L, 0L);
         right = ContextState.wrap(ByteBufferUtil.clone(left.context));
 
-        leftCell  = new CounterCell(cellname("x"), left.context,  1L);
-        rightCell = new CounterCell(cellname("x"), right.context, 1L);
+        leftCell  = new BufferCounterCell(cellname("x"), left.context,  1L);
+        rightCell = new BufferCounterCell(cellname("x"), right.context, 1L);
         assert leftCell.diff(rightCell) == null;
 
         // greater than: left has superset of nodes (counts equal)
@@ -247,8 +247,8 @@ public class CounterCellTest extends SchemaLoader
         right.writeRemote(CounterId.fromInt(6), 2L, 0L);
         right.writeRemote(CounterId.fromInt(9), 1L, 0L);
 
-        leftCell  = new CounterCell(cellname("x"), left.context,  1L);
-        rightCell = new CounterCell(cellname("x"), right.context, 1L);
+        leftCell  = new BufferCounterCell(cellname("x"), left.context,  1L);
+        rightCell = new BufferCounterCell(cellname("x"), right.context, 1L);
         assert leftCell.diff(rightCell) == null;
 
         // less than: right has subset of nodes (counts equal)
@@ -265,8 +265,8 @@ public class CounterCellTest extends SchemaLoader
         right.writeRemote(CounterId.fromInt(6), 1L, 0L);
         right.writeRemote(CounterId.fromInt(9), 1L, 0L);
 
-        leftCell  = new CounterCell(cellname("x"), left.context,  1L);
-        rightCell = new CounterCell(cellname("x"), right.context, 1L);
+        leftCell  = new BufferCounterCell(cellname("x"), left.context,  1L);
+        rightCell = new BufferCounterCell(cellname("x"), right.context, 1L);
         assert rightCell == leftCell.diff(rightCell);
         assert leftCell  == rightCell.diff(leftCell);
     }
@@ -281,7 +281,7 @@ public class CounterCellTest extends SchemaLoader
         state.writeLocal(CounterId.fromInt(4), 4L, 4L);
 
         CellNameType type = new SimpleDenseCellNameType(UTF8Type.instance);
-        CounterCell original = new CounterCell(cellname("x"), state.context, 1L);
+        CounterCell original = new BufferCounterCell(cellname("x"), state.context, 1L);
         byte[] serialized;
         try (DataOutputBuffer bufOut = new DataOutputBuffer())
         {
@@ -315,8 +315,8 @@ public class CounterCellTest extends SchemaLoader
         state.writeRemote(CounterId.fromInt(3), 4L, 4L);
         state.writeLocal(CounterId.fromInt(4), 4L, 4L);
 
-        CounterCell original = new CounterCell(cellname("x"), state.context, 1L);
-        CounterCell cleared = new CounterCell(cellname("x"), cc.clearAllLocal(state.context), 1L);
+        CounterCell original = new BufferCounterCell(cellname("x"), state.context, 1L);
+        CounterCell cleared = new BufferCounterCell(cellname("x"), cc.clearAllLocal(state.context), 1L);
 
         original.updateDigest(digest1);
         cleared.updateDigest(digest2);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/test/unit/org/apache/cassandra/db/KeyCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
index ec99fd1..e6745a1 100644
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@ -120,10 +120,10 @@ public class KeyCacheTest extends SchemaLoader
         Mutation rm;
 
         // inserts
-        rm = new Mutation(KEYSPACE1, key1.key);
+        rm = new Mutation(KEYSPACE1, key1.getKey());
         rm.add(COLUMN_FAMILY1, Util.cellname("1"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
         rm.apply();
-        rm = new Mutation(KEYSPACE1, key2.key);
+        rm = new Mutation(KEYSPACE1, key2.getKey());
         rm.add(COLUMN_FAMILY1, Util.cellname("2"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 0);
         rm.apply();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8541cca7/test/unit/org/apache/cassandra/db/KeyCollisionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyCollisionTest.java b/test/unit/org/apache/cassandra/db/KeyCollisionTest.java
index 57f077f..1869872 100644
--- a/test/unit/org/apache/cassandra/db/KeyCollisionTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCollisionTest.java
@@ -73,10 +73,10 @@ public class KeyCollisionTest extends SchemaLoader
 
         List<Row> rows = cfs.getRangeSlice(new Bounds<RowPosition>(dk("k2"), dk("key2")), null, new IdentityQueryFilter(), 10000);
         assert rows.size() == 4 : "Expecting 4 keys, got " + rows.size();
-        assert rows.get(0).key.key.equals(ByteBufferUtil.bytes("k2"));
-        assert rows.get(1).key.key.equals(ByteBufferUtil.bytes("k3"));
-        assert rows.get(2).key.key.equals(ByteBufferUtil.bytes("key1"));
-        assert rows.get(3).key.key.equals(ByteBufferUtil.bytes("key2"));
+        assert rows.get(0).key.getKey().equals(ByteBufferUtil.bytes("k2"));
+        assert rows.get(1).key.getKey().equals(ByteBufferUtil.bytes("k3"));
+        assert rows.get(2).key.getKey().equals(ByteBufferUtil.bytes("key1"));
+        assert rows.get(3).key.getKey().equals(ByteBufferUtil.bytes("key2"));
     }
 
     private void insert(String... keys)
@@ -102,7 +102,7 @@ public class KeyCollisionTest extends SchemaLoader
 
         public DecoratedKey decorateKey(ByteBuffer key)
         {
-            return new DecoratedKey(getToken(key), key);
+            return new BufferDecoratedKey(getToken(key), key);
         }
 
         public Token midpoint(Token ltoken, Token rtoken)