You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2014/01/31 00:09:30 UTC

[5/6] remove Table.switchlock and introduce o.a.c.utils.memory package patch by Benedict Elliott Smith; reviewed by jbellis for CASSANDRA-5549

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 94af5c0..540f1ce 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.regex.Pattern;
 import javax.management.*;
@@ -32,6 +31,14 @@ import javax.management.*;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.collect.*;
+import com.google.common.util.concurrent.*;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
+import org.apache.cassandra.utils.memory.HeapAllocator;
+
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.Striped;
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -46,8 +53,10 @@ import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.compaction.*;
+import org.apache.cassandra.db.filter.ExtendedFilter;
+import org.apache.cassandra.db.filter.IDiskAtomFilter;
+import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.filter.*;
 import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.db.index.SecondaryIndex;
@@ -74,7 +83,19 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 {
     private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);
 
-    public static final ExecutorService postFlushExecutor = new JMXEnabledThreadPoolExecutor("MemtablePostFlusher");
+    private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
+                                                                                          StageManager.KEEPALIVE,
+                                                                                          TimeUnit.SECONDS,
+                                                                                          new LinkedBlockingQueue<Runnable>(),
+                                                                                          new NamedThreadFactory("MemtableFlushWriter"),
+                                                                                          "internal");
+    // post-flush executor is single threaded to provide guarantee that any flush Future on a CF will never return until prior flushes have completed
+    public static final ExecutorService postFlushExecutor = new JMXEnabledThreadPoolExecutor(1,
+                                                                                             StageManager.KEEPALIVE,
+                                                                                             TimeUnit.SECONDS,
+                                                                                             new LinkedBlockingQueue<Runnable>(),
+                                                                                             new NamedThreadFactory("MemtablePostFlush"),
+                                                                                             "internal");
 
     public final Keyspace keyspace;
     public final String name;
@@ -83,7 +104,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     private final String mbeanName;
     private volatile boolean valid = true;
 
-    /* Memtables and SSTables on disk for this column family */
+    /**
+     * Memtables and SSTables on disk for this column family.
+     *
+     * We synchronize on the DataTracker to ensure isolation when we want to make sure
+     * that the memtable we're acting on doesn't change out from under us.  I.e., flush
+     * syncronizes on it to make sure it can submit on both executors atomically,
+     * so anyone else who wants to make sure flush doesn't interfere should as well.
+     */
     private final DataTracker data;
 
     /* This is used to generate the next index for a SSTable */
@@ -98,11 +126,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public final Directories directories;
 
-    /** ratio of in-memory memtable size, to serialized size */
-    volatile double liveRatio = 10.0; // reasonable default until we compute what it is based on actual data
-    /** ops count last time we computed liveRatio */
-    private final AtomicLong liveRatioComputedAt = new AtomicLong(32);
-
     public final ColumnFamilyMetrics metric;
     public volatile long sampleLatencyNanos;
 
@@ -128,8 +151,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
         // If the CF comparator has changed, we need to change the memtable,
         // because the old one still aliases the previous comparator.
-        if (getMemtableThreadSafe().initialComparator != metadata.comparator)
-            switchMemtable(true, true);
+        if (data.getView().getCurrentMemtable().initialComparator != metadata.comparator)
+            switchMemtable();
     }
 
     private void maybeReloadCompactionStrategy()
@@ -158,14 +181,23 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             {
                 protected void runMayThrow() throws Exception
                 {
-                    if (getMemtableThreadSafe().isExpired())
+                    synchronized (data)
                     {
-                        // if memtable is already expired but didn't flush because it's empty,
-                        // then schedule another flush.
-                        if (isClean())
-                            scheduleFlush();
-                        else
-                            forceFlush(); // scheduleFlush() will be called by the constructor of the new memtable.
+                        Memtable current = data.getView().getCurrentMemtable();
+                        // if we're not expired, we've been hit by a scheduled flush for an already flushed memtable, so ignore
+                        if (current.isExpired())
+                        {
+                            if (current.isClean())
+                            {
+                                // if we're still clean, instead of swapping just reschedule a flush for later
+                                scheduleFlush();
+                            }
+                            else
+                            {
+                                // we'll be rescheduled by the constructor of the Memtable.
+                                forceFlush();
+                            }
+                        }
                     }
                 }
             };
@@ -737,133 +769,283 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     }
 
     /**
-     * Switch and flush the current memtable, if it was dirty. The forceSwitch
-     * flag allow to force switching the memtable even if it is clean (though
-     * in that case we don't flush, as there is no point).
+     * Switches the memtable iff the live memtable is the one provided
+     *
+     * @param memtable
      */
-    public Future<?> switchMemtable(final boolean writeCommitLog, boolean forceSwitch)
-    {
-        /*
-         * If we can get the writelock, that means no new updates can come in and
-         * all ongoing updates to memtables have completed. We can get the tail
-         * of the log and use it as the starting position for log replay on recovery.
-         *
-         * This is why we Keyspace.switchLock needs to be global instead of per-Keyspace:
-         * we need to schedule discardCompletedSegments calls in the same order as their
-         * contexts (commitlog position) were read, even though the flush executor
-         * is multithreaded.
-         */
-        Keyspace.switchLock.writeLock().lock();
-        try
+    public Future<?> switchMemtableIfCurrent(Memtable memtable)
+    {
+        synchronized (data)
         {
-            final Future<ReplayPosition> ctx = writeCommitLog ? CommitLog.instance.getContext() : Futures.immediateFuture(ReplayPosition.NONE);
+            if (data.getView().getCurrentMemtable() == memtable)
+                return switchMemtable();
+        }
+        return Futures.immediateFuture(null);
+    }
 
-            // submit the memtable for any indexed sub-cfses, and our own.
-            final List<ColumnFamilyStore> icc = new ArrayList<ColumnFamilyStore>();
-            // don't assume that this.memtable is dirty; forceFlush can bring us here during index build even if it is not
+    /*
+     * switchMemtable puts Memtable.getSortedContents on the writer executor.  When the write is complete,
+     * we turn the writer into an SSTableReader and add it to ssTables where it is available for reads.
+     * This method does not block except for synchronizing on DataTracker, but the Future it returns will
+     * not complete until the Memtable (and all prior Memtables) have been successfully flushed, and the CL
+     * marked clean up to the position owned by the Memtable.
+     */
+    public ListenableFuture<?> switchMemtable()
+    {
+        logger.info("Enqueuing flush of {}", name);
+        synchronized (data)
+        {
+            Flush flush = new Flush(false);
+            flushExecutor.execute(flush);
+            ListenableFutureTask<?> task = ListenableFutureTask.create(flush.postFlush, null);
+            postFlushExecutor.submit(task);
+            return task;
+        }
+    }
+
+    public ListenableFuture<?> forceFlush()
+    {
+        return forceFlush(null);
+    }
+
+    /**
+     * Flush if there is unflushed data that was written to the CommitLog before @param flushIfDirtyBefore
+     * (inclusive).  If @param flushIfDirtyBefore is null, flush if there is any unflushed data.
+     *
+     * @return a Future such that when the future completes, all data inserted before forceFlush was called,
+     * will be flushed.
+     */
+    public ListenableFuture<?> forceFlush(ReplayPosition flushIfDirtyBefore)
+    {
+        // we synchronize on the data tracker to ensure we don't race against other calls to switchMemtable(),
+        // unnecessarily queueing memtables that are about to be made clean
+        synchronized (data)
+        {
+            // during index build, 2ary index memtables can be dirty even if parent is not.  if so,
+            // we want to flush the 2ary index ones too.
+            boolean clean = true;
             for (ColumnFamilyStore cfs : concatWithIndexes())
+                clean &= cfs.data.getView().getCurrentMemtable().isCleanAfter(flushIfDirtyBefore);
+
+            if (clean)
             {
-                if (forceSwitch || !cfs.getMemtableThreadSafe().isClean())
-                    icc.add(cfs);
+                // We could have a memtable for this column family that is being
+                // flushed. Make sure the future returned wait for that so callers can
+                // assume that any data inserted prior to the call are fully flushed
+                // when the future returns (see #5241).
+                ListenableFutureTask<?> task = ListenableFutureTask.create(new Runnable()
+                {
+                    public void run()
+                    {
+                        logger.debug("forceFlush requested but everything is clean in {}", name);
+                    }
+                }, null);
+                postFlushExecutor.execute(task);
+                return task;
             }
 
-            final CountDownLatch latch = new CountDownLatch(icc.size());
-            for (ColumnFamilyStore cfs : icc)
+            return switchMemtable();
+        }
+    }
+
+    public void forceBlockingFlush()
+    {
+        FBUtilities.waitOnFuture(forceFlush());
+    }
+
+    /**
+     * Both synchronises custom secondary indexes and provides ordering guarantees for futures on switchMemtable/flush
+     * etc, which expect to be able to wait until the flush (and all prior flushes) requested have completed.
+     */
+    private final class PostFlush implements Runnable
+    {
+        final boolean flushSecondaryIndexes;
+        final OpOrder.Barrier writeBarrier;
+        final CountDownLatch latch = new CountDownLatch(1);
+        volatile ReplayPosition lastReplayPosition;
+
+        private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier)
+        {
+            this.writeBarrier = writeBarrier;
+            this.flushSecondaryIndexes = flushSecondaryIndexes;
+        }
+
+        public void run()
+        {
+            writeBarrier.await();
+
+            /**
+             * we can flush 2is as soon as the barrier completes, as they will be consistent with (or ahead of) the
+             * flushed memtables and CL position, which is as good as we can guarantee.
+             * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly
+             * with CL as we do with memtables/CFS-backed SecondaryIndexes.
+             */
+
+            if (flushSecondaryIndexes)
             {
-                Memtable memtable = cfs.data.switchMemtable();
-                // With forceSwitch it's possible to get a clean memtable here.
-                // In that case, since we've switched it already, just remove
-                // it from the memtable pending flush right away.
-                if (memtable.isClean())
+                for (SecondaryIndex index : indexManager.getIndexesNotBackedByCfs())
                 {
-                    cfs.replaceFlushed(memtable, null);
-                    latch.countDown();
-                }
-                else
-                {
-                    logger.info("Enqueuing flush of {}", memtable);
-                    memtable.flushAndSignal(latch, ctx);
+                    // flush any non-cfs backed indexes
+                    logger.info("Flushing SecondaryIndex {}", index);
+                    index.forceBlockingFlush();
                 }
             }
 
-            if (metric.memtableSwitchCount.count() == Long.MAX_VALUE)
-                metric.memtableSwitchCount.clear();
-            metric.memtableSwitchCount.inc();
+            try
+            {
+                // we wait on the latch for the lastReplayPosition to be set, and so that waiters
+                // on this task can rely on all prior flushes being complete
+                latch.await();
+            }
+            catch (InterruptedException e)
+            {
+                throw new IllegalStateException();
+            }
 
-            // when all the memtables have been written, including for indexes, mark the flush in the commitlog header.
-            // a second executor makes sure the onMemtableFlushes get called in the right order,
-            // while keeping the wait-for-flush (future.get) out of anything latency-sensitive.
-            return postFlushExecutor.submit(new WrappedRunnable()
+            // must check lastReplayPosition != null because Flush may find that all memtables are clean
+            // and so not set a lastReplayPosition
+            if (lastReplayPosition != null)
             {
-                public void runMayThrow() throws InterruptedException, ExecutionException
-                {
-                    latch.await();
+                CommitLog.instance.discardCompletedSegments(metadata.cfId, lastReplayPosition);
+            }
 
-                    if (!icc.isEmpty())
-                    {
-                        //only valid when memtables exist
+            metric.pendingFlushes.dec();
+        }
+    }
 
-                        for (SecondaryIndex index : indexManager.getIndexesNotBackedByCfs())
-                        {
-                            // flush any non-cfs backed indexes
-                            logger.info("Flushing SecondaryIndex {}", index);
-                            index.forceBlockingFlush();
-                        }
-                    }
+    /**
+     * Should only be constructed/used from switchMemtable() or truncate(), with ownership of the DataTracker monitor.
+     * In the constructor the current memtable(s) are swapped, and a barrer on outstanding writes is issued;
+     * when run by the flushWriter the barrier is waited on to ensure all outstanding writes have completed
+     * before all memtables are immediately written, and the CL is either immediately marked clean or, if
+     * there are custom secondary indexes, the post flush clean up is left to update those indexes and mark
+     * the CL clean
+     */
+    private final class Flush implements Runnable
+    {
+        final OpOrder.Barrier writeBarrier;
+        final List<Memtable> memtables;
+        final PostFlush postFlush;
+        final boolean truncate;
+
+        private Flush(boolean truncate)
+        {
+            // if true, we won't flush, we'll just wait for any outstanding writes, switch the memtable, and discard
+            this.truncate = truncate;
+
+            metric.pendingFlushes.inc();
+            /**
+             * To ensure correctness of switch without blocking writes, run() needs to wait for all write operations
+             * started prior to the switch to complete. We do this by creating a Barrier on the writeOrdering
+             * that all write operations register themselves with, and assigning this barrier to the memtables,
+             * after which we *.issue()* the barrier. This barrier is used to direct write operations started prior
+             * to the barrier.issue() into the memtable we have switched out, and any started after to its replacement.
+             * In doing so it also tells the write operations to update the lastReplayPosition of the memtable, so
+             * that we know the CL position we are dirty to, which can be marked clean when we complete.
+             */
+            writeBarrier = keyspace.writeOrder.newBarrier();
+            memtables = new ArrayList<>();
+
+            // submit flushes for the memtable for any indexed sub-cfses, and our own
+            final ReplayPosition minReplayPosition = CommitLog.instance.getContext();
+            for (ColumnFamilyStore cfs : concatWithIndexes())
+            {
+                // switch all memtables, regardless of their dirty status, setting the barrier
+                // so that we can reach a coordinated decision about cleanliness once they
+                // are no longer possible to be modified
+                Memtable mt = cfs.data.switchMemtable(truncate);
+                mt.setDiscarding(writeBarrier, minReplayPosition);
+                memtables.add(mt);
+            }
 
-                    if (writeCommitLog)
-                    {
-                        // if we're not writing to the commit log, we are replaying the log, so marking
-                        // the log header with "you can discard anything written before the context" is not valid
-                        CommitLog.instance.discardCompletedSegments(metadata.cfId, ctx.get());
-                    }
-                }
-            });
+            writeBarrier.issue();
+            postFlush = new PostFlush(!truncate, writeBarrier);
         }
-        finally
+
+        public void run()
         {
-            Keyspace.switchLock.writeLock().unlock();
-        }
-    }
+            // mark writes older than the barrier as blocking progress, permitting them to exceed our memory limit
+            // if they are stuck waiting on it, then wait for them all to complete
+            writeBarrier.markBlocking();
+            writeBarrier.await();
 
-    private boolean isClean()
-    {
-        // during index build, 2ary index memtables can be dirty even if parent is not.  if so,
-        // we want flushLargestMemtables to flush the 2ary index ones too.
-        for (ColumnFamilyStore cfs : concatWithIndexes())
-            if (!cfs.getMemtableThreadSafe().isClean())
-                return false;
+            // mark all memtables as flushing, removing them from the live memtable list, and
+            // remove any memtables that are already clean from the set we need to flush
+            Iterator<Memtable> iter = memtables.iterator();
+            while (iter.hasNext())
+            {
+                Memtable memtable = iter.next();
+                memtable.cfs.data.markFlushing(memtable);
+                if (memtable.isClean() || truncate)
+                {
+                    memtable.cfs.replaceFlushed(memtable, null);
+                    memtable.setDiscarded();
+                    iter.remove();
+                }
+            }
 
-        return true;
+            if (memtables.isEmpty())
+            {
+                postFlush.latch.countDown();
+                return;
+            }
+
+            metric.memtableSwitchCount.inc();
+
+            for (final Memtable memtable : memtables)
+            {
+                // flush the memtable
+                MoreExecutors.sameThreadExecutor().execute(memtable.flushRunnable());
+                memtable.setDiscarded();
+            }
+
+            // signal the post-flush we've done our work
+            postFlush.lastReplayPosition = memtables.get(0).getLastReplayPosition();
+            postFlush.latch.countDown();
+        }
     }
 
     /**
-     * @return a future, with a guarantee that any data inserted prior to the forceFlush() call is fully flushed
-     *         by the time future.get() returns. Never returns null.
+     * Finds the largest memtable, as a percentage of *either* on- or off-heap memory limits, and immediately
+     * queues it for flushing. If the memtable selected is flushed before this completes, no work is done.
      */
-    public Future<?> forceFlush()
+    public static class FlushLargestColumnFamily implements Runnable
     {
-        if (isClean())
+        public void run()
         {
-            // We could have a memtable for this column family that is being
-            // flushed. Make sure the future returned wait for that so callers can
-            // assume that any data inserted prior to the call are fully flushed
-            // when the future returns (see #5241).
-            return postFlushExecutor.submit(new Runnable()
+            float largestRatio = 0f;
+            Memtable largest = null;
+            for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
             {
-                public void run()
+                // we take a reference to the current main memtable for the CF prior to snapping its ownership ratios
+                // to ensure we have some ordering guarantee for performing the switchMemtableIf(), i.e. we will only
+                // swap if the memtables we are measuring here haven't already been swapped by the time we try to swap them
+                Memtable current = cfs.getDataTracker().getView().getCurrentMemtable();
+
+                // find the total ownership ratio for the memtable and all SecondaryIndexes owned by this CF,
+                // both on- and off-heap, and select the largest of the two ratios to weight this CF
+                float onHeap = 0f;
+                onHeap += current.getAllocator().ownershipRatio();
+
+                for (SecondaryIndex index : cfs.indexManager.getIndexes())
                 {
-                    logger.debug("forceFlush requested but everything is clean in {}", name);
+                    if (index.getOnHeapAllocator() != null)
+                        onHeap += index.getOnHeapAllocator().ownershipRatio();
                 }
-            });
-        }
 
-        return switchMemtable(true, false);
-    }
+                if (onHeap > largestRatio)
+                {
+                    largest = current;
+                    largestRatio = onHeap;
+                }
+            }
 
-    public void forceBlockingFlush()
-    {
-        FBUtilities.waitOnFuture(forceFlush());
+            if (largest != null)
+            {
+                largest.cfs.switchMemtableIfCurrent(largest);
+                logger.info("Reclaiming {} of {} retained memtable bytes", largest.getAllocator().reclaiming(), Memtable.memoryPool.used());
+            }
+        }
     }
 
     public void maybeUpdateRowCache(DecoratedKey key)
@@ -882,28 +1064,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      * param @ key - key for update/insert
      * param @ columnFamily - columnFamily changes
      */
-    public void apply(DecoratedKey key, ColumnFamily columnFamily, SecondaryIndexManager.Updater indexer)
+    public void apply(DecoratedKey key, ColumnFamily columnFamily, SecondaryIndexManager.Updater indexer, OpOrder.Group opGroup, ReplayPosition replayPosition)
     {
         long start = System.nanoTime();
 
-        Memtable mt = getMemtableThreadSafe();
-        mt.put(key, columnFamily, indexer);
+        Memtable mt = data.getMemtableFor(opGroup);
+        mt.put(key, columnFamily, indexer, opGroup, replayPosition);
         maybeUpdateRowCache(key);
         metric.writeLatency.addNano(System.nanoTime() - start);
-
-        // recompute liveRatio, if we have doubled the number of ops since last calculated
-        while (true)
-        {
-            long last = liveRatioComputedAt.get();
-            long operations = metric.writeLatency.latency.count();
-            if (operations < 2 * last)
-                break;
-            if (liveRatioComputedAt.compareAndSet(last, operations))
-            {
-                logger.debug("computing liveRatio of {} at {} ops", this, operations);
-                mt.updateLiveRatio();
-            }
-        }
     }
 
     /**
@@ -1149,35 +1317,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public long getMemtableDataSize()
     {
-        return metric.memtableDataSize.value();
-    }
-
-    public long getTotalMemtableLiveSize()
-    {
-        return getMemtableDataSize() + indexManager.getTotalLiveSize();
-    }
-
-    /**
-     * @return the live size of all the memtables (the current active one and pending flush).
-     */
-    public long getAllMemtablesLiveSize()
-    {
-        long size = 0;
-        for (Memtable mt : getDataTracker().getAllMemtables())
-            size += mt.getLiveSize();
-        return size;
-    }
-
-    /**
-     * @return the size of all the memtables, including the pending flush ones and 2i memtables, if any.
-     */
-    public long getTotalAllMemtablesLiveSize()
-    {
-        long size = getAllMemtablesLiveSize();
-        if (indexManager.hasIndexes())
-            for (ColumnFamilyStore index : indexManager.getIndexesBackedByCfs())
-                size += index.getAllMemtablesLiveSize();
-        return size;
+        return metric.memtableHeapSize.value();
     }
 
     public int getMemtableSwitchCount()
@@ -1185,11 +1325,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return (int) metric.memtableSwitchCount.count();
     }
 
-    Memtable getMemtableThreadSafe()
-    {
-        return data.getMemtable();
-    }
-
     /**
      * Package protected for access from the CompactionManager.
      */
@@ -1245,7 +1380,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public int getPendingTasks()
     {
-        return metric.pendingTasks.value();
+        return (int) metric.pendingFlushes.count();
     }
 
     public long getWriteCount()
@@ -1472,7 +1607,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             // retry w/ new view
         }
 
-        return new ViewFragment(sstables, Iterables.concat(Collections.singleton(view.memtable), view.memtablesPendingFlush));
+        return new ViewFragment(sstables, view.getAllMemtables());
     }
 
     /**
@@ -2049,22 +2184,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             // that was part of the flushed we forced; otherwise on a tie, it won't get deleted.
             Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS);
         }
-
-        // nuke the memtable data w/o writing to disk first
-        Keyspace.switchLock.writeLock().lock();
-        try
+        else
         {
-            for (ColumnFamilyStore cfs : concatWithIndexes())
+            // just nuke the memtable data w/o writing to disk first
+            synchronized (data)
             {
-                Memtable mt = cfs.getMemtableThreadSafe();
-                if (!mt.isClean())
-                    mt.cfs.data.renewMemtable();
+                final Flush flush = new Flush(true);
+                flushExecutor.execute(flush);
+                postFlushExecutor.submit(flush.postFlush);
             }
         }
-        finally
-        {
-            Keyspace.switchLock.writeLock().unlock();
-        }
 
         Runnable truncateRunnable = new Runnable()
         {
@@ -2337,12 +2466,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public Iterable<ColumnFamilyStore> concatWithIndexes()
     {
-        return Iterables.concat(indexManager.getIndexesBackedByCfs(), Collections.singleton(this));
-    }
-
-    public Set<Memtable> getMemtablesPendingFlush()
-    {
-        return data.getMemtablesPendingFlush();
+        // we return the main CFS first, which we rely on for simplicity in switchMemtable(), for getting the
+        // latest replay position
+        return Iterables.concat(Collections.singleton(this), indexManager.getIndexesBackedByCfs());
     }
 
     public List<String> getBuiltIndexes()
@@ -2381,17 +2507,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      */
     public long oldestUnflushedMemtable()
     {
-        DataTracker.View view = data.getView();
-        long oldest = view.memtable.creationTime();
-        for (Memtable memtable : view.memtablesPendingFlush)
-            oldest = Math.min(oldest, memtable.creationTime());
-        return oldest;
+        return data.getView().getOldestMemtable().creationTime();
     }
 
     public boolean isEmpty()
     {
         DataTracker.View view = data.getView();
-        return view.sstables.isEmpty() && view.memtable.getOperations() == 0 && view.memtablesPendingFlush.isEmpty();
+        return view.sstables.isEmpty() && view.getCurrentMemtable().getOperations() == 0 && view.getCurrentMemtable() == view.getOldestMemtable();
     }
 
     private boolean isRowCacheEnabled()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
index fc1a7b1..f248ccf 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java
@@ -35,7 +35,7 @@ public interface ColumnFamilyStoreMBean
      * Returns the total amount of data stored in the memtable, including
      * column related overhead.
      *
-     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#memtableDataSize
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#memtableHeapSize
      * @return The size in bytes.
      * @deprecated
      */
@@ -146,7 +146,7 @@ public interface ColumnFamilyStoreMBean
     public double getRecentWriteLatencyMicros();
 
     /**
-     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#pendingTasks
+     * @see org.apache.cassandra.metrics.ColumnFamilyMetrics#pendingFlushes
      * @return the estimated number of tasks pending for this column family
      */
     @Deprecated

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/db/CounterCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterCell.java b/src/java/org/apache/cassandra/db/CounterCell.java
index 426d876..e097914 100644
--- a/src/java/org/apache/cassandra/db/CounterCell.java
+++ b/src/java/org/apache/cassandra/db/CounterCell.java
@@ -20,15 +20,15 @@ package org.apache.cassandra.db;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+import org.apache.cassandra.utils.memory.HeapAllocator;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.Allocator;
-import org.apache.cassandra.utils.*;
 
 /**
  * A column that represents a partitioned counter.
@@ -142,7 +142,7 @@ public class CounterCell extends Cell
     }
 
     @Override
-    public Cell reconcile(Cell cell, Allocator allocator)
+    public Cell reconcile(Cell cell, AbstractAllocator allocator)
     {
         // live + tombstone: track last tombstone
         if (cell.isMarkedForDelete(Long.MIN_VALUE)) // cannot be an expired cell, so the current time is irrelevant
@@ -190,7 +190,7 @@ public class CounterCell extends Cell
     }
 
     @Override
-    public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator)
+    public Cell localCopy(ColumnFamilyStore cfs, AbstractAllocator allocator)
     {
         return new CounterCell(name.copy(allocator), allocator.clone(value), timestamp, timestampOfLastDelete);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/db/CounterMutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutation.java b/src/java/org/apache/cassandra/db/CounterMutation.java
index 6884c80..5d96c70 100644
--- a/src/java/org/apache/cassandra/db/CounterMutation.java
+++ b/src/java/org/apache/cassandra/db/CounterMutation.java
@@ -33,6 +33,9 @@ import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+import org.apache.cassandra.utils.memory.HeapAllocator;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.*;
@@ -134,7 +137,7 @@ public class CounterMutation implements IMutation
     // Replaces all the CounterUpdateCell-s with updated regular CounterCell-s
     private ColumnFamily processModifications(ColumnFamily changesCF)
     {
-        Allocator allocator = HeapAllocator.instance;
+        AbstractAllocator allocator = HeapAllocator.instance;
         ColumnFamilyStore cfs = Keyspace.open(getKeyspaceName()).getColumnFamilyStore(changesCF.id());
 
         ColumnFamily resultCF = changesCF.cloneMeShallow();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/db/CounterUpdateCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterUpdateCell.java b/src/java/org/apache/cassandra/db/CounterUpdateCell.java
index f7a0ef1..b7ca2cb 100644
--- a/src/java/org/apache/cassandra/db/CounterUpdateCell.java
+++ b/src/java/org/apache/cassandra/db/CounterUpdateCell.java
@@ -20,9 +20,9 @@ package org.apache.cassandra.db;
 import java.nio.ByteBuffer;
 
 import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.utils.Allocator;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.db.composites.CellNameType;
 
 /**
  * A counter update while it hasn't been applied yet by the leader replica.
@@ -56,7 +56,7 @@ public class CounterUpdateCell extends Cell
     }
 
     @Override
-    public Cell reconcile(Cell cell, Allocator allocator)
+    public Cell reconcile(Cell cell, AbstractAllocator allocator)
     {
         // The only time this could happen is if a batchAdd ships two
         // increment for the same cell. Hence we simply sums the delta.
@@ -78,7 +78,7 @@ public class CounterUpdateCell extends Cell
     }
 
     @Override
-    public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator)
+    public Cell localCopy(ColumnFamilyStore cfs, AbstractAllocator allocator)
     {
         throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 8fad5dd..f2aae50 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -24,6 +24,9 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.*;
+
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,23 +54,22 @@ public class DataTracker
         this.init();
     }
 
-    public Memtable getMemtable()
-    {
-        return view.get().memtable;
-    }
-
-    public Set<Memtable> getMemtablesPendingFlush()
+    // get the Memtable that the ordered writeOp should be directed to
+    public Memtable getMemtableFor(OpOrder.Group opGroup)
     {
-        return view.get().memtablesPendingFlush;
-    }
+        // since any new memtables appended to the list after we fetch it will be for operations started
+        // after us, we can safely assume that we will always find the memtable that 'accepts' us;
+        // if the barrier for any memtable is set whilst we are reading the list, it must accept us.
 
-    /**
-     * @return the active memtable and all the memtables that are pending flush.
-     */
-    public Iterable<Memtable> getAllMemtables()
-    {
-        View snapshot = view.get();
-        return Iterables.concat(snapshot.memtablesPendingFlush, Collections.singleton(snapshot.memtable));
+        // there may be multiple memtables in the list that would 'accept' us, however we only ever choose
+        // the oldest such memtable, as accepts() only prevents us falling behind (i.e. ensures we don't
+        // assign operations to a memtable that was retired/queued before we started)
+        for (Memtable memtable : view.get().liveMemtables)
+        {
+            if (memtable.accepts(opGroup))
+                return memtable;
+        }
+        throw new AssertionError(view.get().liveMemtables.toString());
     }
 
     public Set<SSTableReader> getSSTables()
@@ -98,46 +100,41 @@ public class DataTracker
     }
 
     /**
-     * Switch the current memtable.
-     * This atomically adds the current memtable to the memtables pending
-     * flush and replace it with a fresh memtable.
+     * Switch the current memtable. This atomically appends a new memtable to the end of the list of active memtables,
+     * returning the previously last memtable. It leaves the previous Memtable in the list of live memtables until
+     * discarding(memtable) is called. These two methods must be synchronized/paired, i.e. m = switchMemtable
+     * must be followed by discarding(m), they cannot be interleaved.
      *
-     * @return the previous current memtable (the one added to the pending
-     * flush)
+     * @return the previously active memtable
      */
-    public Memtable switchMemtable()
+    public Memtable switchMemtable(boolean truncating)
     {
-        // atomically change the current memtable
         Memtable newMemtable = new Memtable(cfstore);
         Memtable toFlushMemtable;
         View currentView, newView;
         do
         {
             currentView = view.get();
-            toFlushMemtable = currentView.memtable;
+            toFlushMemtable = currentView.getCurrentMemtable();
             newView = currentView.switchMemtable(newMemtable);
         }
         while (!view.compareAndSet(currentView, newView));
 
+        if (truncating)
+            notifyRenewed(newMemtable);
+
         return toFlushMemtable;
     }
 
-    /**
-     * Renew the current memtable without putting the old one for a flush.
-     * Used when we flush but a memtable is clean (in which case we must
-     * change it because it was frozen).
-     */
-    public void renewMemtable()
+    public void markFlushing(Memtable memtable)
     {
-        Memtable newMemtable = new Memtable(cfstore);
         View currentView, newView;
         do
         {
             currentView = view.get();
-            newView = currentView.renewMemtable(newMemtable);
+            newView = currentView.markFlushing(memtable);
         }
         while (!view.compareAndSet(currentView, newView));
-        notifyRenewed(currentView.memtable);
     }
 
     public void replaceFlushed(Memtable memtable, SSTableReader sstable)
@@ -320,11 +317,12 @@ public class DataTracker
     /** (Re)initializes the tracker, purging all references. */
     void init()
     {
-        view.set(new View(new Memtable(cfstore),
-                          Collections.<Memtable>emptySet(),
-                          Collections.<SSTableReader>emptySet(),
-                          Collections.<SSTableReader>emptySet(),
-                          SSTableIntervalTree.empty()));
+        view.set(new View(
+                ImmutableList.of(new Memtable(cfstore)),
+                ImmutableList.<Memtable>of(),
+                Collections.<SSTableReader>emptySet(),
+                Collections.<SSTableReader>emptySet(),
+                SSTableIntervalTree.empty()));
     }
 
     /**
@@ -533,66 +531,128 @@ public class DataTracker
      * flush, the sstables for a column family, and the sstables that are active
      * in compaction (a subset of the sstables).
      */
-    static class View
-    {
-        public final Memtable memtable;
-        public final Set<Memtable> memtablesPendingFlush;
+    public static class View
+    {
+        /**
+         * ordinarily a list of size 1, but when preparing to flush will contain both the memtable we will flush
+         * and the new replacement memtable, until all outstanding write operations on the old table complete.
+         * The last item in the list is always the "current" memtable.
+         */
+        private final List<Memtable> liveMemtables;
+        /**
+         * contains all memtables that are no longer referenced for writing and are queued for / in the process of being
+         * flushed. In chronologically ascending order.
+         */
+        private final List<Memtable> flushingMemtables;
         public final Set<SSTableReader> compacting;
         public final Set<SSTableReader> sstables;
         public final SSTableIntervalTree intervalTree;
 
-        View(Memtable memtable, Set<Memtable> pendingFlush, Set<SSTableReader> sstables, Set<SSTableReader> compacting, SSTableIntervalTree intervalTree)
+        View(List<Memtable> liveMemtables, List<Memtable> flushingMemtables, Set<SSTableReader> sstables, Set<SSTableReader> compacting, SSTableIntervalTree intervalTree)
         {
-            this.memtable = memtable;
-            this.memtablesPendingFlush = pendingFlush;
+            this.liveMemtables = liveMemtables;
+            this.flushingMemtables = flushingMemtables;
             this.sstables = sstables;
             this.compacting = compacting;
             this.intervalTree = intervalTree;
         }
 
+        public Memtable getOldestMemtable()
+        {
+            if (!flushingMemtables.isEmpty())
+                return flushingMemtables.get(0);
+            return liveMemtables.get(0);
+        }
+
+        public Memtable getCurrentMemtable()
+        {
+            return liveMemtables.get(liveMemtables.size() - 1);
+        }
+
+        public Iterable<Memtable> getMemtablesPendingFlush()
+        {
+            if (liveMemtables.size() == 1)
+                return flushingMemtables;
+            return Iterables.concat(liveMemtables.subList(0, 1), flushingMemtables);
+        }
+
+        /**
+         * @return the active memtable and all the memtables that are pending flush.
+         */
+        public Iterable<Memtable> getAllMemtables()
+        {
+            return Iterables.concat(flushingMemtables, liveMemtables);
+        }
+
         public Sets.SetView<SSTableReader> nonCompactingSStables()
         {
             return Sets.difference(ImmutableSet.copyOf(sstables), compacting);
         }
 
-        public View switchMemtable(Memtable newMemtable)
+        View switchMemtable(Memtable newMemtable)
         {
-            Set<Memtable> newPending = ImmutableSet.<Memtable>builder().addAll(memtablesPendingFlush).add(memtable).build();
-            return new View(newMemtable, newPending, sstables, compacting, intervalTree);
+            List<Memtable> newLiveMemtables = ImmutableList.<Memtable>builder().addAll(liveMemtables).add(newMemtable).build();
+            return new View(newLiveMemtables, flushingMemtables, sstables, compacting, intervalTree);
         }
 
-        public View renewMemtable(Memtable newMemtable)
+        View markFlushing(Memtable toFlushMemtable)
         {
-            return new View(newMemtable, memtablesPendingFlush, sstables, compacting, intervalTree);
+            List<Memtable> live = liveMemtables, flushing = flushingMemtables;
+
+            // since we can have multiple flushes queued, we may occasionally race and start a flush out of order,
+            // so must locate it in the list to remove, rather than just removing from the beginning
+            int i = live.indexOf(toFlushMemtable);
+            assert i < live.size() - 1;
+            List<Memtable> newLive = ImmutableList.<Memtable>builder()
+                                                  .addAll(live.subList(0, i))
+                                                  .addAll(live.subList(i + 1, live.size()))
+                                                  .build();
+
+            // similarly, if we out-of-order markFlushing once, we may afterwards need to insert a memtable into the
+            // flushing list in a position other than the end, though this will be rare
+            i = flushing.size();
+            while (i > 0 && flushing.get(i - 1).creationTime() > toFlushMemtable.creationTime())
+                i--;
+            List<Memtable> newFlushing = ImmutableList.<Memtable>builder()
+                                                      .addAll(flushing.subList(0, i))
+                                                      .add(toFlushMemtable)
+                                                      .addAll(flushing.subList(i, flushing.size()))
+                                                      .build();
+
+            return new View(newLive, newFlushing, sstables, compacting, intervalTree);
         }
 
-        public View replaceFlushed(Memtable flushedMemtable, SSTableReader newSSTable)
+        View replaceFlushed(Memtable flushedMemtable, SSTableReader newSSTable)
         {
-            Set<Memtable> newPending = ImmutableSet.copyOf(Sets.difference(memtablesPendingFlush, Collections.singleton(flushedMemtable)));
+            int index = flushingMemtables.indexOf(flushedMemtable);
+            List<Memtable> newQueuedMemtables = ImmutableList.<Memtable>builder()
+                                                             .addAll(flushingMemtables.subList(0, index))
+                                                             .addAll(flushingMemtables.subList(index + 1, flushingMemtables.size()))
+                                                             .build();
             Set<SSTableReader> newSSTables = newSSTable == null
-                                           ? sstables
-                                           : newSSTables(newSSTable);
+                                             ? sstables
+                                             : newSSTables(newSSTable);
             SSTableIntervalTree intervalTree = buildIntervalTree(newSSTables);
-            return new View(memtable, newPending, newSSTables, compacting, intervalTree);
+            return new View(liveMemtables, newQueuedMemtables, newSSTables, compacting, intervalTree);
         }
 
-        public View replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
+        View replace(Collection<SSTableReader> oldSSTables, Iterable<SSTableReader> replacements)
         {
             Set<SSTableReader> newSSTables = newSSTables(oldSSTables, replacements);
             SSTableIntervalTree intervalTree = buildIntervalTree(newSSTables);
-            return new View(memtable, memtablesPendingFlush, newSSTables, compacting, intervalTree);
+            return new View(liveMemtables, flushingMemtables, newSSTables, compacting, intervalTree);
         }
 
-        public View markCompacting(Collection<SSTableReader> tomark)
+        View markCompacting(Collection<SSTableReader> tomark)
         {
             Set<SSTableReader> compactingNew = ImmutableSet.<SSTableReader>builder().addAll(compacting).addAll(tomark).build();
-            return new View(memtable, memtablesPendingFlush, sstables, compactingNew, intervalTree);
+            return new View(liveMemtables, flushingMemtables, sstables, compactingNew, intervalTree);
         }
 
-        public View unmarkCompacting(Iterable<SSTableReader> tounmark)
+        View unmarkCompacting(Iterable<SSTableReader> tounmark)
         {
             Set<SSTableReader> compactingNew = ImmutableSet.copyOf(Sets.difference(compacting, ImmutableSet.copyOf(tounmark)));
-            return new View(memtable, memtablesPendingFlush, sstables, compactingNew, intervalTree);
+            return new View(liveMemtables, flushingMemtables, sstables, compactingNew, intervalTree);
         }
 
         private Set<SSTableReader> newSSTables(SSTableReader newSSTable)
@@ -621,7 +681,7 @@ public class DataTracker
         @Override
         public String toString()
         {
-            return String.format("View(pending_count=%d, sstables=%s, compacting=%s)", memtablesPendingFlush.size(), sstables, compacting);
+            return String.format("View(pending_count=%d, sstables=%s, compacting=%s)", liveMemtables.size() + flushingMemtables.size() - 1, sstables, compacting);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/db/DeletedCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletedCell.java b/src/java/org/apache/cassandra/db/DeletedCell.java
index 13d1358..64a6df3 100644
--- a/src/java/org/apache/cassandra/db/DeletedCell.java
+++ b/src/java/org/apache/cassandra/db/DeletedCell.java
@@ -25,8 +25,9 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.Allocator;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.memory.HeapAllocator;
 
 public class DeletedCell extends Cell
 {
@@ -89,7 +90,7 @@ public class DeletedCell extends Cell
     }
 
     @Override
-    public Cell reconcile(Cell cell, Allocator allocator)
+    public Cell reconcile(Cell cell, AbstractAllocator allocator)
     {
         if (cell instanceof DeletedCell)
             return super.reconcile(cell, allocator);
@@ -97,7 +98,7 @@ public class DeletedCell extends Cell
     }
 
     @Override
-    public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator)
+    public Cell localCopy(ColumnFamilyStore cfs, AbstractAllocator allocator)
     {
         return new DeletedCell(name.copy(allocator), allocator.clone(value), timestamp);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/db/DeletionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java
index 5c62132..fc452a1 100644
--- a/src/java/org/apache/cassandra/db/DeletionInfo.java
+++ b/src/java/org/apache/cassandra/db/DeletionInfo.java
@@ -25,16 +25,20 @@ import java.util.*;
 import com.google.common.base.Objects;
 import com.google.common.collect.Iterators;
 
+import org.apache.cassandra.cache.IMeasurableMemory;
 import org.apache.cassandra.db.composites.CType;
 import org.apache.cassandra.db.composites.Composite;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.utils.ObjectSizes;
 
 /**
  * A combination of a top-level (or row) tombstone and range tombstones describing the deletions
  * within a {@link ColumnFamily} (or row).
  */
-public class DeletionInfo
+public class DeletionInfo implements IMeasurableMemory
 {
+    private static final long EMPTY_SIZE = ObjectSizes.measure(new DeletionInfo(0, 0));
+
     /**
      * This represents a deletion of the entire row.  We can't represent this within the RangeTombstoneList, so it's
      * kept separately.  This also slightly optimizes the common case of a full row deletion.
@@ -317,6 +321,12 @@ public class DeletionInfo
         return Objects.hashCode(topLevel, ranges);
     }
 
+    @Override
+    public long unsharedHeapSize()
+    {
+        return EMPTY_SIZE + topLevel.unsharedHeapSize() + (ranges == null ? 0 : ranges.unsharedHeapSize());
+    }
+
     public static class Serializer implements IVersionedSerializer<DeletionInfo>
     {
         private final RangeTombstoneList.Serializer rtlSerializer;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/db/DeletionTime.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java
index c60b423..39db398 100644
--- a/src/java/org/apache/cassandra/db/DeletionTime.java
+++ b/src/java/org/apache/cassandra/db/DeletionTime.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 
+import org.apache.cassandra.cache.IMeasurableMemory;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.ObjectSizes;
@@ -31,8 +32,10 @@ import org.apache.cassandra.utils.ObjectSizes;
 /**
  * A top-level (row) tombstone.
  */
-public class DeletionTime implements Comparable<DeletionTime>
+public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory
 {
+    private static final long EMPTY_SIZE = ObjectSizes.measure(new DeletionTime(0, 0));
+
     /**
      * A special DeletionTime that signifies that there is no top-level (row) tombstone.
      */
@@ -105,10 +108,9 @@ public class DeletionTime implements Comparable<DeletionTime>
         return atom.maxTimestamp() <= markedForDeleteAt;
     }
 
-    public long memorySize()
+    public long unsharedHeapSize()
     {
-        long fields = TypeSizes.NATIVE.sizeof(markedForDeleteAt) + TypeSizes.NATIVE.sizeof(localDeletionTime);
-        return ObjectSizes.getFieldSize(fields);
+        return EMPTY_SIZE;
     }
 
     public static class Serializer implements ISerializer<DeletionTime>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/db/EmptyColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/EmptyColumns.java b/src/java/org/apache/cassandra/db/EmptyColumns.java
index 5498353..5021f39 100644
--- a/src/java/org/apache/cassandra/db/EmptyColumns.java
+++ b/src/java/org/apache/cassandra/db/EmptyColumns.java
@@ -28,7 +28,7 @@ import java.util.Iterator;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.utils.Allocator;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterators;
@@ -63,12 +63,12 @@ public class EmptyColumns extends AbstractThreadUnsafeSortedColumns
         return factory;
     }
 
-    public void addColumn(Cell cell, Allocator allocator)
+    public void addColumn(Cell cell, AbstractAllocator allocator)
     {
         throw new UnsupportedOperationException();
     }
 
-    public void addAll(ColumnFamily cm, Allocator allocator, Function<Cell, Cell> transformation)
+    public void addAll(ColumnFamily cm, AbstractAllocator allocator, Function<Cell, Cell> transformation)
     {
         throw new UnsupportedOperationException();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/db/ExpiringCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ExpiringCell.java b/src/java/org/apache/cassandra/db/ExpiringCell.java
index a2f68da..6742630 100644
--- a/src/java/org/apache/cassandra/db/ExpiringCell.java
+++ b/src/java/org/apache/cassandra/db/ExpiringCell.java
@@ -26,7 +26,8 @@ import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.Allocator;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+import org.apache.cassandra.utils.memory.HeapAllocator;
 
 /**
  * Alternative to Cell that have an expiring time.
@@ -132,7 +133,7 @@ public class ExpiringCell extends Cell
     }
 
     @Override
-    public Cell localCopy(ColumnFamilyStore cfs, Allocator allocator)
+    public Cell localCopy(ColumnFamilyStore cfs, AbstractAllocator allocator)
     {
         return new ExpiringCell(name.copy(allocator), allocator.clone(value), timestamp, timeToLive, localExpirationTime);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index c380e45..fb0f273 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -23,10 +23,12 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Future;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,14 +56,6 @@ public class Keyspace
 
     private static final Logger logger = LoggerFactory.getLogger(Keyspace.class);
 
-    /**
-     * accesses to CFS.memtable should acquire this for thread safety.
-     * CFS.maybeSwitchMemtable should aquire the writeLock; see that method for the full explanation.
-     * <p/>
-     * (Enabling fairness in the RRWL is observed to decrease throughput, so we leave it off.)
-     */
-    public static final ReentrantReadWriteLock switchLock = new ReentrantReadWriteLock();
-
     // It is possible to call Keyspace.open without a running daemon, so it makes sense to ensure
     // proper directories here as well as in CassandraDaemon.
     static
@@ -71,6 +65,7 @@ public class Keyspace
     }
 
     public final KSMetaData metadata;
+    public final OpOrder writeOrder = new OpOrder();
 
     /* ColumnFamilyStore per column family */
     private final ConcurrentMap<UUID, ColumnFamilyStore> columnFamilyStores = new ConcurrentHashMap<UUID, ColumnFamilyStore>();
@@ -343,15 +338,21 @@ public class Keyspace
      */
     public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
     {
-        // write the mutation to the commitlog and memtables
-        Tracing.trace("Acquiring switchLock read lock");
-        switchLock.readLock().lock();
+        final OpOrder.Group opGroup = writeOrder.start();
         try
         {
+            // write the mutation to the commitlog and memtables
+            final ReplayPosition replayPosition;
             if (writeCommitLog)
             {
                 Tracing.trace("Appending to commitlog");
-                CommitLog.instance.add(mutation);
+                replayPosition = CommitLog.instance.add(mutation);
+            }
+            else
+            {
+                // we don't need the replayposition, but grab one anyway so that it stays stack allocated.
+                // (the JVM will not stack allocate if the object may be null.)
+                replayPosition = CommitLog.instance.getContext();
             }
 
             DecoratedKey key = StorageService.getPartitioner().decorateKey(mutation.key());
@@ -365,12 +366,13 @@ public class Keyspace
                 }
 
                 Tracing.trace("Adding to {} memtable", cf.metadata().cfName);
-                cfs.apply(key, cf, updateIndexes ? cfs.indexManager.updaterFor(key, cf) : SecondaryIndexManager.nullUpdater);
+                SecondaryIndexManager.Updater updater = updateIndexes ? cfs.indexManager.updaterFor(key, opGroup) : SecondaryIndexManager.nullUpdater;
+                cfs.apply(key, cf, updater, opGroup, replayPosition);
             }
         }
         finally
         {
-            switchLock.readLock().unlock();
+            opGroup.finishOne();
         }
     }
 
@@ -389,11 +391,11 @@ public class Keyspace
         if (logger.isDebugEnabled())
             logger.debug("Indexing row {} ", cfs.metadata.getKeyValidator().getString(key.key));
 
-        Collection<SecondaryIndex> indexes = cfs.indexManager.getIndexesByNames(idxNames);
-
-        switchLock.readLock().lock();
+        final OpOrder.Group opGroup = cfs.keyspace.writeOrder.start();
         try
         {
+            Collection<SecondaryIndex> indexes = cfs.indexManager.getIndexesByNames(idxNames);
+
             Iterator<ColumnFamily> pager = QueryPagers.pageRowLocally(cfs, key.key, DEFAULT_PAGE_SIZE);
             while (pager.hasNext())
             {
@@ -404,12 +406,12 @@ public class Keyspace
                     if (cfs.indexManager.indexes(cell.name(), indexes))
                         cf2.addColumn(cell);
                 }
-                cfs.indexManager.indexRow(key.key, cf2);
+                cfs.indexManager.indexRow(key.key, cf2, opGroup);
             }
         }
         finally
         {
-            switchLock.readLock().unlock();
+            opGroup.finishOne();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 3761826..49a3f92 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -21,75 +21,50 @@ import java.io.File;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
-import com.google.common.base.Function;
 import com.google.common.base.Throwables;
-import org.cliffc.high_scale_lib.NonBlockingHashSet;
+
+import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.dht.LongToken;
+import org.apache.cassandra.io.util.DiskAwareRunnable;
+
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+import org.apache.cassandra.utils.memory.ContextAllocator;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.memory.Pool;
+import org.apache.cassandra.utils.memory.PoolAllocator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
-import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.db.composites.CellNameType;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.io.util.DiskAwareRunnable;
-import org.apache.cassandra.utils.Allocator;
-import org.github.jamm.MemoryMeter;
 
 public class Memtable
 {
     private static final Logger logger = LoggerFactory.getLogger(Memtable.class);
 
-    /*
-     * switchMemtable puts Memtable.getSortedContents on the writer executor.  When the write is complete,
-     * we turn the writer into an SSTableReader and add it to ssTables where it is available for reads.
-     *
-     * There are two other things that switchMemtable does.
-     * First, it puts the Memtable into memtablesPendingFlush, where it stays until the flush is complete
-     * and it's been added as an SSTableReader to ssTables_.  Second, it adds an entry to commitLogUpdater
-     * that waits for the flush to complete, then calls onMemtableFlush.  This allows multiple flushes
-     * to happen simultaneously on multicore systems, while still calling onMF in the correct order,
-     * which is necessary for replay in case of a restart since CommitLog assumes that when onMF is
-     * called, all data up to the given context has been persisted to SSTables.
-     */
-    private static final ExecutorService flushWriter
-            = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
-                                               StageManager.KEEPALIVE,
-                                               TimeUnit.SECONDS,
-                                               new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getFlushQueueSize()),
-                                               new NamedThreadFactory("FlushWriter"),
-                                               "internal");
-
-    // size in memory can never be less than serialized size
-    private static final double MIN_SANE_LIVE_RATIO = 1.0;
-    // max liveratio seen w/ 1-byte columns on a 64-bit jvm was 19. If it gets higher than 64 something is probably broken.
-    private static final double MAX_SANE_LIVE_RATIO = 64.0;
-
-    // We need to take steps to avoid retaining inactive membtables in memory, because counting is slow (can be
-    // minutes, for a large memtable and a busy server).  A strictly FIFO Memtable queue could keep memtables
-    // alive waiting for metering after they're flushed and would otherwise be GC'd.  Instead, the approach we take
-    // is to enqueue the CFS instead of the memtable, and to meter whatever the active memtable is when the executor
-    // starts to work on it.  We use a Set to make sure we don't enqueue redundant tasks for the same CFS.
-    private static final Set<ColumnFamilyStore> meteringInProgress = new NonBlockingHashSet<ColumnFamilyStore>();
-    private static final ExecutorService meterExecutor = new JMXEnabledThreadPoolExecutor(1,
-                                                                                          Integer.MAX_VALUE,
-                                                                                          TimeUnit.MILLISECONDS,
-                                                                                          new LinkedBlockingQueue<Runnable>(),
-                                                                                          new NamedThreadFactory("MemoryMeter"),
-                                                                                          "internal");
-    private final MemoryMeter meter;
-
-    volatile static ColumnFamilyStore activelyMeasuring;
-
-    private final AtomicLong currentSize = new AtomicLong(0);
+    static final Pool memoryPool = DatabaseDescriptor.getMemtableAllocatorPool();
+    private static final int ROW_OVERHEAD_HEAP_SIZE;
+
+    private final PoolAllocator allocator;
+    private final AtomicLong liveDataSize = new AtomicLong(0);
     private final AtomicLong currentOperations = new AtomicLong(0);
 
+    // the write barrier for directing writes to this memtable during a switch
+    private volatile OpOrder.Barrier writeBarrier;
+    // the last ReplayPosition owned by this Memtable; all ReplayPositions lower are owned by this or an earlier Memtable
+    private final AtomicReference<ReplayPosition> lastReplayPosition = new AtomicReference<>();
+    // the "first" ReplayPosition owned by this Memtable; this is inaccurate, and only used as a convenience to prevent CLSM flushing wantonly
+    private final ReplayPosition minReplayPosition = CommitLog.instance.getContext();
+
     // We index the memtable by RowPosition only for the purpose of being able
     // to select key range using Token.KeyBound. However put() ensures that we
     // actually only store DecoratedKey.
@@ -98,16 +73,6 @@ public class Memtable
     private final long creationTime = System.currentTimeMillis();
     private final long creationNano = System.nanoTime();
 
-    private final Allocator allocator = DatabaseDescriptor.getMemtableAllocator();
-    // We really only need one column by allocator but one by memtable is not a big waste and avoids needing allocators to know about CFS
-    private final Function<Cell, Cell> localCopyFunction = new Function<Cell, Cell>()
-    {
-        public Cell apply(Cell c)
-        {
-            return c.localCopy(cfs, allocator);
-        }
-    };
-
     // Record the comparator of the CFS at the creation of the memtable. This
     // is only used when a user update the CF comparator, to know if the
     // memtable was created with the new or old comparator.
@@ -116,31 +81,19 @@ public class Memtable
     public Memtable(ColumnFamilyStore cfs)
     {
         this.cfs = cfs;
+        this.allocator = memoryPool.newAllocator(cfs.keyspace.writeOrder);
         this.initialComparator = cfs.metadata.comparator;
         this.cfs.scheduleFlush();
-
-        Callable<Set<Object>> provider = new Callable<Set<Object>>()
-        {
-            public Set<Object> call() throws Exception
-            {
-                // avoid counting this once for each row
-                Set<Object> set = Collections.newSetFromMap(new IdentityHashMap<Object, Boolean>());
-                set.add(Memtable.this.cfs.metadata);
-                return set;
-            }
-        };
-        meter = new MemoryMeter().omitSharedBufferOverhead().withTrackerProvider(provider);
     }
 
-    public long getLiveSize()
+    public AbstractAllocator getAllocator()
     {
-        long estimatedSize = (long) (currentSize.get() * cfs.liveRatio);
-
-        // liveRatio is just an estimate; we can get a lower bound directly from the allocator
-        if (estimatedSize < allocator.getMinimumSize())
-            return allocator.getMinimumSize();
+        return allocator;
+    }
 
-        return estimatedSize;
+    public long getLiveDataSize()
+    {
+        return liveDataSize.get();
     }
 
     public long getOperations()
@@ -148,54 +101,109 @@ public class Memtable
         return currentOperations.get();
     }
 
-    /**
-     * Should only be called by ColumnFamilyStore.apply.  NOT a public API.
-     * (CFS handles locking to avoid submitting an op
-     *  to a flushing memtable.  Any other way is unsafe.)
-    */
-    void put(DecoratedKey key, ColumnFamily columnFamily, SecondaryIndexManager.Updater indexer)
+    void setDiscarding(OpOrder.Barrier writeBarrier, ReplayPosition minLastReplayPosition)
     {
-        resolve(key, columnFamily, indexer);
+        assert this.writeBarrier == null;
+        this.lastReplayPosition.set(minLastReplayPosition);
+        this.writeBarrier = writeBarrier;
+        allocator.setDiscarding();
     }
 
-    public void updateLiveRatio() throws RuntimeException
+    void setDiscarded()
     {
-        if (!MemoryMeter.isInitialized())
-        {
-            // hack for openjdk.  we log a warning about this in the startup script too.
-            logger.error("MemoryMeter uninitialized (jamm not specified as java agent); assuming liveRatio of {}.  "
-                         + " Usually this means cassandra-env.sh disabled jamm because you are using a buggy JRE; "
-                         + " upgrade to the Sun JRE instead", cfs.liveRatio);
-            return;
-        }
+        allocator.setDiscarded();
+    }
 
-        if (!meteringInProgress.add(cfs))
-        {
-            logger.debug("Metering already pending or active for {}; skipping liveRatio update", cfs);
-            return;
-        }
+    public boolean accepts(OpOrder.Group opGroup)
+    {
+        OpOrder.Barrier barrier = this.writeBarrier;
+        return barrier == null || barrier.isAfter(opGroup);
+    }
+
+    public boolean isLive()
+    {
+        return allocator.isLive();
+    }
+
+    public boolean isClean()
+    {
+        return rows.isEmpty();
+    }
 
-        meterExecutor.submit(new MeteringRunnable(cfs));
+    public boolean isCleanAfter(ReplayPosition position)
+    {
+        return isClean() || (position != null && minReplayPosition.compareTo(position) >= 0);
     }
 
-    private void resolve(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater indexer)
+    /**
+     * @return true if this memtable is expired. Expiration time is determined by CF's memtable_flush_period_in_ms.
+     */
+    public boolean isExpired()
+    {
+        int period = cfs.metadata.getMemtableFlushPeriod();
+        return period > 0 && (System.nanoTime() - creationNano >= TimeUnit.MILLISECONDS.toNanos(period));
+    }
+
+    /**
+     * Should only be called by ColumnFamilyStore.apply via Keyspace.apply, which supplies the appropriate
+     * OpOrdering.
+     *
+     * replayPosition should only be null if this is a secondary index, in which case it is *expected* to be null
+     */
+    void put(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater indexer, OpOrder.Group opGroup, ReplayPosition replayPosition)
     {
+        if (replayPosition != null && writeBarrier != null)
+        {
+            // if the writeBarrier is set, we want to maintain lastReplayPosition; this is an optimisation to avoid
+            // casing it for every write, but still ensure it is correct when writeBarrier.await() completes.
+            // we clone the replay position so that the object passed in does not "escape", permitting stack allocation
+            replayPosition = replayPosition.clone();
+            while (true)
+            {
+                ReplayPosition last = lastReplayPosition.get();
+                if (last.compareTo(replayPosition) >= 0)
+                    break;
+                if (lastReplayPosition.compareAndSet(last, replayPosition))
+                    break;
+            }
+        }
+
         AtomicBTreeColumns previous = rows.get(key);
 
         if (previous == null)
         {
             AtomicBTreeColumns empty = cf.cloneMeShallow(AtomicBTreeColumns.factory, false);
+            final DecoratedKey cloneKey = new DecoratedKey(key.token, allocator.clone(key.key, opGroup));
             // We'll add the columns later. This avoids wasting works if we get beaten in the putIfAbsent
-            previous = rows.putIfAbsent(new DecoratedKey(key.token, allocator.clone(key.key)), empty);
+            previous = rows.putIfAbsent(cloneKey, empty);
             if (previous == null)
+            {
                 previous = empty;
+                // allocate the row overhead after the fact; this saves over allocating and having to free after, but
+                // means we can overshoot our declared limit.
+                int overhead = (int) (cfs.partitioner.getHeapSizeOf(key.token) + ROW_OVERHEAD_HEAP_SIZE);
+                allocator.allocate(overhead, opGroup);
+            }
+            else
+            {
+                allocator.free(cloneKey.key);
+            }
         }
 
-        long sizeDelta = previous.addAllWithSizeDelta(cf, allocator, localCopyFunction, indexer);
-        currentSize.addAndGet(sizeDelta);
+        ContextAllocator contextAllocator = allocator.wrap(opGroup, cfs);
+        AtomicBTreeColumns.Delta delta = previous.addAllWithSizeDelta(cf, contextAllocator, contextAllocator, indexer, new AtomicBTreeColumns.Delta());
+        liveDataSize.addAndGet(delta.dataSize());
         currentOperations.addAndGet((cf.getColumnCount() == 0)
                                     ? cf.isMarkedForDelete() ? 1 : 0
                                     : cf.getColumnCount());
+
+        // allocate or free the delta in column overhead after the fact
+        for (Cell cell : delta.reclaimed())
+        {
+            cell.name.free(allocator);
+            allocator.free(cell.value);
+        }
+        allocator.allocate((int) delta.excessHeapSize(), opGroup);
     }
 
     // for debugging
@@ -211,15 +219,15 @@ public class Memtable
         return builder.toString();
     }
 
-    public void flushAndSignal(final CountDownLatch latch, final Future<ReplayPosition> context)
+    public FlushRunnable flushRunnable()
     {
-        flushWriter.execute(new FlushRunnable(latch, context));
+        return new FlushRunnable(lastReplayPosition.get());
     }
 
     public String toString()
     {
-        return String.format("Memtable-%s@%s(%s/%s serialized/live bytes, %s ops)",
-                             cfs.name, hashCode(), currentSize, getLiveSize(), currentOperations);
+        return String.format("Memtable-%s@%s(%s serialized bytes, %s ops, %.0f%% of heap limit)",
+                             cfs.name, hashCode(), liveDataSize, currentOperations, 100 * allocator.ownershipRatio());
     }
 
     /**
@@ -254,26 +262,12 @@ public class Memtable
             public void remove()
             {
                 iter.remove();
-                currentSize.addAndGet(-currentEntry.getValue().dataSize());
+                liveDataSize.addAndGet(-currentEntry.getValue().dataSize());
                 currentEntry = null;
             }
         };
     }
 
-    public boolean isClean()
-    {
-        return rows.isEmpty();
-    }
-
-    /**
-     * @return true if this memtable is expired. Expiration time is determined by CF's memtable_flush_period_in_ms.
-     */
-    public boolean isExpired()
-    {
-        int period = cfs.metadata.getMemtableFlushPeriod();
-        return period > 0 && (System.nanoTime() - creationNano >= TimeUnit.MILLISECONDS.toNanos(period));
-    }
-
     public ColumnFamily getColumnFamily(DecoratedKey key)
     {
         return rows.get(key);
@@ -284,15 +278,18 @@ public class Memtable
         return creationTime;
     }
 
+    public ReplayPosition getLastReplayPosition()
+    {
+        return lastReplayPosition.get();
+    }
+
     class FlushRunnable extends DiskAwareRunnable
     {
-        private final CountDownLatch latch;
-        private final Future<ReplayPosition> context;
+        private final ReplayPosition context;
         private final long estimatedSize;
 
-        FlushRunnable(CountDownLatch latch, Future<ReplayPosition> context)
+        FlushRunnable(ReplayPosition context)
         {
-            this.latch = latch;
             this.context = context;
 
             long keySize = 0;
@@ -304,7 +301,7 @@ public class Memtable
             }
             estimatedSize = (long) ((keySize // index entries
                                     + keySize // keys in data file
-                                    + currentSize.get()) // data
+                                    + liveDataSize.get()) // data
                                     * 1.2); // bloom filter and row index overhead
         }
 
@@ -319,7 +316,6 @@ public class Memtable
 
             SSTableReader sstable = writeSortedContents(context, sstableDirectory);
             cfs.replaceFlushed(Memtable.this, sstable);
-            latch.countDown();
         }
 
         protected Directories getDirectories()
@@ -327,7 +323,7 @@ public class Memtable
             return cfs.directories;
         }
 
-        private SSTableReader writeSortedContents(Future<ReplayPosition> context, File sstableDirectory)
+        private SSTableReader writeSortedContents(ReplayPosition context, File sstableDirectory)
         throws ExecutionException, InterruptedException
         {
             logger.info("Writing {}", Memtable.this.toString());
@@ -361,15 +357,16 @@ public class Memtable
                 {
                     ssTable = writer.closeAndOpenReader();
                     logger.info(String.format("Completed flushing %s (%d bytes) for commitlog position %s",
-                                              ssTable.getFilename(), new File(ssTable.getFilename()).length(), context.get()));
+                                              ssTable.getFilename(), new File(ssTable.getFilename()).length(), context));
                 }
                 else
                 {
                     writer.abort();
                     ssTable = null;
                     logger.info("Completed flushing; nothing needed to be retained.  Commitlog position was {}",
-                                context.get());
+                                context);
                 }
+
                 return ssTable;
             }
             catch (Throwable e)
@@ -381,7 +378,7 @@ public class Memtable
 
         public SSTableWriter createFlushWriter(String filename) throws ExecutionException, InterruptedException
         {
-            MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context.get());
+            MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context);
             return new SSTableWriter(filename,
                                      rows.size(),
                                      cfs.metadata,
@@ -390,62 +387,19 @@ public class Memtable
         }
     }
 
-    private static class MeteringRunnable implements Runnable
+    static
     {
-        // we might need to wait in the meter queue for a while.  measure whichever memtable is active at that point,
-        // rather than keeping the original memtable referenced (and thus un-freeable) until this runs.
-        private final ColumnFamilyStore cfs;
-
-        public MeteringRunnable(ColumnFamilyStore cfs)
-        {
-            this.cfs = cfs;
-        }
-
-        public void run()
-        {
-            try
-            {
-                activelyMeasuring = cfs;
-                Memtable memtable = cfs.getMemtableThreadSafe();
-
-                long start = System.nanoTime();
-                // ConcurrentSkipListMap has cycles, so measureDeep will have to track a reference to EACH object it visits.
-                // So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time.
-                long deepSize = memtable.meter.measure(memtable.rows);
-                int objects = 0;
-                for (Map.Entry<RowPosition, AtomicBTreeColumns> entry : memtable.rows.entrySet())
-                {
-                    deepSize += memtable.meter.measureDeep(entry.getKey()) + memtable.meter.measureDeep(entry.getValue());
-                    objects += entry.getValue().getColumnCount();
-                }
-                double newRatio = (double) deepSize / memtable.currentSize.get();
-
-                if (newRatio < MIN_SANE_LIVE_RATIO)
-                {
-                    logger.debug("setting live ratio to minimum of {} instead of {}", MIN_SANE_LIVE_RATIO, newRatio);
-                    newRatio = MIN_SANE_LIVE_RATIO;
-                }
-                if (newRatio > MAX_SANE_LIVE_RATIO)
-                {
-                    logger.warn("setting live ratio to maximum of {} instead of {}", MAX_SANE_LIVE_RATIO, newRatio);
-                    newRatio = MAX_SANE_LIVE_RATIO;
-                }
-
-                // we want to be very conservative about our estimate, since the penalty for guessing low is OOM
-                // death.  thus, higher estimates are believed immediately; lower ones are averaged w/ the old
-                if (newRatio > cfs.liveRatio)
-                    cfs.liveRatio = newRatio;
-                else
-                    cfs.liveRatio = (cfs.liveRatio + newRatio) / 2.0;
-
-                logger.info("{} liveRatio is {} (just-counted was {}).  calculation took {}ms for {} cells",
-                            cfs, cfs.liveRatio, newRatio, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), objects);
-            }
-            finally
-            {
-                activelyMeasuring = null;
-                meteringInProgress.remove(cfs);
-            }
-        }
+        // calculate row overhead
+        int rowOverhead;
+        ConcurrentNavigableMap<RowPosition, Object> rows = new ConcurrentSkipListMap<>();
+        final int count = 100000;
+        final Object val = new Object();
+        for (int i = 0 ; i < count ; i++)
+            rows.put(new DecoratedKey(new LongToken((long) i), ByteBufferUtil.EMPTY_BYTE_BUFFER), val);
+        double avgSize = ObjectSizes.measureDeep(rows) / (double) count;
+        rowOverhead = (int) ((avgSize - Math.floor(avgSize)) < 0.05 ? Math.floor(avgSize) : Math.ceil(avgSize));
+        rowOverhead -= ObjectSizes.measureDeep(new LongToken((long) 0));
+        rowOverhead += AtomicBTreeColumns.HEAP_SIZE;
+        ROW_OVERHEAD_HEAP_SIZE = rowOverhead;
     }
 }