You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2014/01/31 00:09:27 UTC

[2/6] remove Table.switchlock and introduce o.a.c.utils.memory package patch by Benedict Elliott Smith; reviewed by jbellis for CASSANDRA-5549

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/btree/ReplaceFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/ReplaceFunction.java b/src/java/org/apache/cassandra/utils/btree/ReplaceFunction.java
deleted file mode 100644
index a193c31..0000000
--- a/src/java/org/apache/cassandra/utils/btree/ReplaceFunction.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package org.apache.cassandra.utils.btree;
-
-import com.google.common.base.Function;
-
-/**
- * An interface defining a function to be applied to both the object we are replacing in a BTree and
- * the object that is intended to replace it, returning the object to actually replace it.
- *
- * If this is a new insertion, that is there is no object to replace, the one argument variant of
- * the function will be called.
- *
- * @param <V>
- */
-public interface ReplaceFunction<V> extends Function<V, V>
-{
-    V apply(V replaced, V update);
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java b/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java
new file mode 100644
index 0000000..cd30492
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java
@@ -0,0 +1,30 @@
+package org.apache.cassandra.utils.btree;
+
+import com.google.common.base.Function;
+
+/**
+ * An interface defining a function to be applied to both the object we are replacing in a BTree and
+ * the object that is intended to replace it, returning the object to actually replace it.
+ *
+ * @param <V>
+ */
+public interface UpdateFunction<V> extends Function<V, V>
+{
+    /**
+     * @param replacing the value in the original tree we have matched
+     * @param update the value in the updating collection that matched
+     * @return the value to insert into the new tree
+     */
+    V apply(V replacing, V update);
+
+    /**
+     * @return true if we should fail the update
+     */
+    boolean abortEarly();
+
+    /**
+     * @param heapSize extra heap space allocated (over previous tree)
+     */
+    void allocated(long heapSize);
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java b/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java
new file mode 100644
index 0000000..44330a0
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java
@@ -0,0 +1,411 @@
+package org.apache.cassandra.utils.concurrent;
+
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+/**
+ * <p>A class for providing synchronization between producers and consumers that do not
+ * communicate directly with each other, but where the consumers need to process their
+ * work in contiguous batches. In particular this is useful for both CommitLog and Memtable
+ * where the producers (writing threads) are modifying a structure that the consumer
+ * (flush executor) only batch syncs, but needs to know what 'position' the work is at
+ * for co-ordination with other processes,
+ *
+ * <p>The typical usage is something like:
+ * <pre>
+     public final class ExampleShared
+     {
+        final OpOrder order = new OpOrder();
+        volatile SharedState state;
+
+        static class SharedState
+        {
+            volatile Barrier barrier;
+
+            // ...
+        }
+
+        public void consume()
+        {
+            SharedState state = this.state;
+            state.setReplacement(new State())
+            state.doSomethingToPrepareForBarrier();
+
+            state.barrier = order.newBarrier();
+            // seal() MUST be called after newBarrier() else barrier.isAfter()
+            // will always return true, and barrier.await() will fail
+            state.barrier.issue();
+
+            // wait for all producer work started prior to the barrier to complete
+            state.barrier.await();
+
+            // change the shared state to its replacement, as the current state will no longer be used by producers
+            this.state = state.getReplacement();
+
+            state.doSomethingWithExclusiveAccess();
+        }
+
+        public void produce()
+        {
+            Group opGroup = order.start();
+            try
+            {
+                SharedState s = state;
+                while (s.barrier != null && !s.barrier.isAfter(opGroup))
+                    s = s.getReplacement();
+                s.doProduceWork();
+            }
+            finally
+            {
+                opGroup.finishOne();
+            }
+        }
+    }
+ * </pre>
+ */
+public class OpOrder
+{
+    /**
+     * Constant that when an Ordered.running is equal to, indicates the Ordered is complete
+     */
+    private static final int FINISHED = -1;
+
+    /**
+     * A linked list starting with the most recent Ordered object, i.e. the one we should start new operations from,
+     * with (prev) links to any incomplete Ordered instances, and (next) links to any potential future Ordered instances.
+     * Once all operations started against an Ordered instance and its ancestors have been finished the next instance
+     * will unlink this one
+     */
+    private volatile Group current = new Group();
+
+    /**
+     * Start an operation against this OpOrder.
+     * Once the operation is completed Ordered.finishOne() MUST be called EXACTLY once for this operation.
+     *
+     * @return the Ordered instance that manages this OpOrder
+     */
+    public Group start()
+    {
+        while (true)
+        {
+            Group current = this.current;
+            if (current.register())
+                return current;
+        }
+    }
+
+    /**
+     * Creates a new barrier. The barrier is only a placeholder until barrier.issue() is called on it,
+     * after which all new operations will start against a new Group that will not be accepted
+     * by barrier.isAfter(), and barrier.await() will return only once all operations started prior to the issue
+     * have completed.
+     *
+     * @return
+     */
+    public Barrier newBarrier()
+    {
+        return new Barrier();
+    }
+
+    public Group getCurrent()
+    {
+        return current;
+    }
+
+    /**
+     * Represents a group of identically ordered operations, i.e. all operations started in the interval between
+     * two barrier issuances. For each register() call this is returned, finishOne() must be called exactly once.
+     * It should be treated like taking a lock().
+     */
+    public static final class Group implements Comparable<Group>
+    {
+        /**
+         * In general this class goes through the following stages:
+         * 1) LIVE:      many calls to register() and finishOne()
+         * 2) FINISHING: a call to expire() (after a barrier issue), means calls to register() will now fail,
+         *               and we are now 'in the past' (new operations will be started against a new Ordered)
+         * 3) FINISHED:  once the last finishOne() is called, this Ordered is done. We call unlink().
+         * 4) ZOMBIE:    all our operations are finished, but some operations against an earlier Ordered are still
+         *               running, or tidying up, so unlink() fails to remove us
+         * 5) COMPLETE:  all operations started on or before us are FINISHED (and COMPLETE), so we are unlinked
+         * <p/>
+         * Another parallel states is ISBLOCKING:
+         * <p/>
+         * isBlocking => a barrier that is waiting on us (either directly, or via a future Ordered) is blocking general
+         * progress. This state is entered by calling Barrier.markBlocking(). If the running operations are blocked
+         * on a Signal that is also registered with the isBlockingSignal (probably through isSafeBlockingSignal)
+         * then they will be notified that they are blocking forward progress, and may take action to avoid that.
+         */
+
+        private volatile Group prev, next;
+        private final long id; // monotonically increasing id for compareTo()
+        private volatile int running = 0; // number of operations currently running.  < 0 means we're expired, and the count of tasks still running is -(running + 1)
+        private volatile boolean isBlocking; // indicates running operations are blocking future barriers
+        private final WaitQueue isBlockingSignal = new WaitQueue(); // signal to wait on to indicate isBlocking is true
+        private final WaitQueue waiting = new WaitQueue(); // signal to wait on for completion
+
+        static final AtomicIntegerFieldUpdater<Group> runningUpdater = AtomicIntegerFieldUpdater.newUpdater(Group.class, "running");
+
+        // constructs first instance only
+        private Group()
+        {
+            this.id = 0;
+        }
+
+        private Group(Group prev)
+        {
+            this.id = prev.id + 1;
+            this.prev = prev;
+        }
+
+        // prevents any further operations starting against this Ordered instance
+        // if there are no running operations, calls unlink; otherwise, we let the last op to finishOne call it.
+        // this means issue() won't have to block for ops to finish.
+        private void expire()
+        {
+            while (true)
+            {
+                int current = running;
+                if (current < 0)
+                    throw new IllegalStateException();
+                if (runningUpdater.compareAndSet(this, current, -1 - current))
+                {
+                    // if we're already finished (no running ops), unlink ourselves
+                    if (current == 0)
+                        unlink();
+                    return;
+                }
+            }
+        }
+
+        // attempts to start an operation against this Ordered instance, and returns true if successful.
+        private boolean register()
+        {
+            while (true)
+            {
+                int current = running;
+                if (current < 0)
+                    return false;
+                if (runningUpdater.compareAndSet(this, current, current + 1))
+                    return true;
+            }
+        }
+
+        /**
+         * To be called exactly once for each register() call this object is returned for, indicating the operation
+         * is complete
+         */
+        public void finishOne()
+        {
+            while (true)
+            {
+                int current = running;
+                if (current < 0)
+                {
+                    if (runningUpdater.compareAndSet(this, current, current + 1))
+                    {
+                        if (current + 1 == FINISHED)
+                        {
+                            // if we're now finished, unlink ourselves
+                            unlink();
+                        }
+                        return;
+                    }
+                }
+                else if (runningUpdater.compareAndSet(this, current, current - 1))
+                {
+                    return;
+                }
+            }
+        }
+
+        /**
+         * called once we know all operations started against this Ordered have completed,
+         * however we do not know if operations against its ancestors have completed, or
+         * if its descendants have completed ahead of it, so we attempt to create the longest
+         * chain from the oldest still linked Ordered. If we can't reach the oldest through
+         * an unbroken chain of completed Ordered, we abort, and leave the still completing
+         * ancestor to tidy up.
+         */
+        private void unlink()
+        {
+            // walk back in time to find the start of the list
+            Group start = this;
+            while (true)
+            {
+                Group prev = start.prev;
+                if (prev == null)
+                    break;
+                // if we haven't finished this Ordered yet abort and let it clean up when it's done
+                if (prev.running != FINISHED)
+                    return;
+                start = prev;
+            }
+
+            // now walk forwards in time, in case we finished up late
+            Group end = this.next;
+            while (end.running == FINISHED)
+                end = end.next;
+
+            // now walk from first to last, unlinking the prev pointer and waking up any blocking threads
+            while (start != end)
+            {
+                Group next = start.next;
+                next.prev = null;
+                start.waiting.signalAll();
+                start = next;
+            }
+        }
+
+        /**
+         * @return true if a barrier we are behind is, or may be, blocking general progress,
+         * so we should try more aggressively to progress
+         */
+        public boolean isBlocking()
+        {
+            return isBlocking;
+        }
+
+        /**
+         * register to be signalled when a barrier waiting on us is, or maybe, blocking general progress,
+         * so we should try more aggressively to progress
+         */
+        public WaitQueue.Signal isBlockingSignal()
+        {
+            return isBlockingSignal.register();
+        }
+
+        /**
+         * wrap the provided signal to also be signalled if the operation gets marked blocking
+         */
+        public WaitQueue.Signal isBlockingSignal(WaitQueue.Signal signal)
+        {
+            return WaitQueue.any(signal, isBlockingSignal());
+        }
+
+        public int compareTo(Group that)
+        {
+            // we deliberately use subtraction, as opposed to Long.compareTo() as we care about ordering
+            // not which is the smaller value, so this permits wrapping in the unlikely event we exhaust the long space
+            long c = this.id - that.id;
+            if (c > 0)
+                return 1;
+            else if (c < 0)
+                return -1;
+            else
+                return 0;
+        }
+    }
+
+    /**
+     * This class represents a synchronisation point providing ordering guarantees on operations started
+     * against the enclosing OpOrder.  When issue() is called upon it (may only happen once per Barrier), the
+     * Barrier atomically partitions new operations from those already running (by expiring the current Group),
+     * and activates its isAfter() method
+     * which indicates if an operation was started before or after this partition. It offers methods to
+     * determine, or block until, all prior operations have finished, and a means to indicate to those operations
+     * that they are blocking forward progress. See {@link OpOrder} for idiomatic usage.
+     */
+    public final class Barrier
+    {
+        // this Barrier was issued after all Group operations started against orderOnOrBefore
+        private volatile Group orderOnOrBefore;
+
+        /**
+         * @return true if @param group was started prior to the issuing of the barrier.
+         *
+         * (Until issue is called, always returns true, but if you rely on this behavior you are probably
+         * Doing It Wrong.)
+         */
+        public boolean isAfter(Group group)
+        {
+            if (orderOnOrBefore == null)
+                return true;
+            // we subtract to permit wrapping round the full range of Long - so we only need to ensure
+            // there are never Long.MAX_VALUE * 2 total Group objects in existence at any one timem which will
+            // take care of itself
+            return orderOnOrBefore.id - group.id >= 0;
+        }
+
+        /**
+         * Issues (seals) the barrier, meaning no new operations may be issued against it, and expires the current
+         * Group.  Must be called before await() for isAfter() to be properly synchronised.
+         */
+        public void issue()
+        {
+            if (orderOnOrBefore != null)
+                throw new IllegalStateException("Can only call issue() once on each Barrier");
+
+            final Group current;
+            synchronized (OpOrder.this)
+            {
+                current = OpOrder.this.current;
+                orderOnOrBefore = current;
+                OpOrder.this.current = current.next = new Group(current);
+            }
+            current.expire();
+        }
+
+        /**
+         * Mark all prior operations as blocking, potentially signalling them to more aggressively make progress
+         */
+        public void markBlocking()
+        {
+            Group current = orderOnOrBefore;
+            while (current != null)
+            {
+                current.isBlocking = true;
+                current.isBlockingSignal.signalAll();
+                current = current.prev;
+            }
+        }
+
+        /**
+         * Register to be signalled once allPriorOpsAreFinished() or allPriorOpsAreFinishedOrSafe() may return true
+         */
+        public WaitQueue.Signal register()
+        {
+            return orderOnOrBefore.waiting.register();
+        }
+
+        /**
+         * @return true if all operations started prior to barrier.issue() have completed
+         */
+        public boolean allPriorOpsAreFinished()
+        {
+            Group current = orderOnOrBefore;
+            if (current == null)
+                throw new IllegalStateException("This barrier needs to have issue() called on it before prior operations can complete");
+            if (current.next.prev == null)
+                return true;
+            return false;
+        }
+
+        /**
+         * wait for all operations started prior to issuing the barrier to complete
+         */
+        public void await()
+        {
+            while (!allPriorOpsAreFinished())
+            {
+                WaitQueue.Signal signal = register();
+                if (allPriorOpsAreFinished())
+                {
+                    signal.cancel();
+                    return;
+                }
+                else
+                    signal.awaitUninterruptibly();
+            }
+            assert orderOnOrBefore.running == FINISHED;
+        }
+
+        /**
+         * returns the Group we are waiting on - any Group with .compareTo(getSyncPoint()) <= 0
+         * must complete before await() returns
+         */
+        public Group getSyncPoint()
+        {
+            return orderOnOrBefore;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/concurrent/WaitQueue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/WaitQueue.java b/src/java/org/apache/cassandra/utils/concurrent/WaitQueue.java
new file mode 100644
index 0000000..9bef8c3
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/concurrent/WaitQueue.java
@@ -0,0 +1,508 @@
+package org.apache.cassandra.utils.concurrent;
+
+import com.yammer.metrics.core.TimerContext;
+import org.slf4j.*;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.locks.LockSupport;
+
+/**
+ * <p>A relatively easy to use utility for general purpose thread signalling.</p>
+ * <p>Usage on a thread awaiting a state change using a WaitQueue q is:</p>
+ * <pre>
+ * {@code
+ *      while (!conditionMet())
+ *          Signal s = q.register();
+ *              if (!conditionMet())    // or, perhaps more correctly, !conditionChanged()
+ *                  s.await();
+ *              else
+ *                  s.cancel();
+ * }
+ * </pre>
+ * A signalling thread, AFTER changing the state, then calls q.signal() to wake up one, or q.signalAll()
+ * to wake up all, waiting threads.
+ * <p>To understand intuitively how this class works, the idea is simply that a thread, once it considers itself
+ * incapable of making progress, registers to be awoken once that changes. Since this could have changed between
+ * checking and registering (in which case the thread that made this change would have been unable to signal it),
+ * it checks the condition again, sleeping only if it hasn't changed/still is not met.</p>
+ * <p>This thread synchronisation scheme has some advantages over Condition objects and Object.wait/notify in that no monitor
+ * acquisition is necessary and, in fact, besides the actual waiting on a signal, all operations are non-blocking.
+ * As a result consumers can never block producers, nor each other, or vice versa, from making progress.
+ * Threads that are signalled are also put into a RUNNABLE state almost simultaneously, so they can all immediately make
+ * progress without having to serially acquire the monitor/lock, reducing scheduler delay incurred.</p>
+ *
+ * <p>A few notes on utilisation:</p>
+ * <p>1. A thread will only exit await() when it has been signalled, but this does not guarantee the condition has not
+ * been altered since it was signalled, and depending on your design it is likely the outer condition will need to be
+ * checked in a loop, though this is not always the case.</p>
+ * <p>2. Each signal is single use, so must be re-registered after each await(). This is true even if it times out.</p>
+ * <p>3. If you choose not to wait on the signal (because the condition has been met before you waited on it)
+ * you must cancel() the signal if the signalling thread uses signal() to awake waiters; otherwise signals will be
+ * lost. If signalAll() is used but infrequent, and register() is frequent, cancel() should still be used to prevent the
+ * queue growing unboundedly. Similarly, if you provide a TimerContext, cancel should be used to ensure it is not erroneously
+ * counted towards wait time.</p>
+ * <p>4. Care must be taken when selecting conditionMet() to ensure we are waiting on the condition that actually
+ * indicates progress is possible. In some complex cases it may be tempting to wait on a condition that is only indicative
+ * of local progress, not progress on the task we are aiming to complete, and a race may leave us waiting for a condition
+ * to be met that we no longer need.
+ * <p>5. This scheme is not fair</p>
+ * <p>6. Only the thread that calls register() may call await()</p>
+ */
+public final class WaitQueue
+{
+
+    private static final Logger logger = LoggerFactory.getLogger(WaitQueue.class);
+
+    private static final int CANCELLED = -1;
+    private static final int SIGNALLED = 1;
+    private static final int NOT_SET = 0;
+
+    private static final AtomicIntegerFieldUpdater signalledUpdater = AtomicIntegerFieldUpdater.newUpdater(RegisteredSignal.class, "state");
+
+    // the waiting signals
+    private final ConcurrentLinkedDeque<RegisteredSignal> queue = new ConcurrentLinkedDeque<>();
+
+    /**
+     * The calling thread MUST be the thread that uses the signal
+     * @return
+     */
+    public Signal register()
+    {
+        RegisteredSignal signal = new RegisteredSignal();
+        queue.add(signal);
+        return signal;
+    }
+
+    /**
+     * The calling thread MUST be the thread that uses the signal.
+     * If the Signal is waited on, context.stop() will be called when the wait times out, the Signal is signalled,
+     * or the waiting thread is interrupted.
+     * @return
+     */
+    public Signal register(TimerContext context)
+    {
+        assert context != null;
+        RegisteredSignal signal = new TimedSignal(context);
+        queue.add(signal);
+        return signal;
+    }
+
+    /**
+     * Signal one waiting thread
+     */
+    public boolean signal()
+    {
+        if (!hasWaiters())
+            return false;
+        while (true)
+        {
+            RegisteredSignal s = queue.poll();
+            if (s == null || s.signal())
+                return s != null;
+        }
+    }
+
+    /**
+     * Signal all waiting threads
+     */
+    public void signalAll()
+    {
+        if (!hasWaiters())
+            return;
+        List<Thread> woke = null;
+        if (logger.isTraceEnabled())
+            woke = new ArrayList<>();
+        long start = System.nanoTime();
+        // we wake up only a snapshot of the queue, to avoid a race where the condition is not met and the woken thread
+        // immediately waits on the queue again
+        RegisteredSignal last = queue.getLast();
+        Iterator<RegisteredSignal> iter = queue.iterator();
+        while (iter.hasNext())
+        {
+            RegisteredSignal signal = iter.next();
+            if (logger.isTraceEnabled())
+            {
+                Thread thread = signal.thread;
+                if (signal.signal())
+                    woke.add(thread);
+            }
+            else
+                signal.signal();
+
+            iter.remove();
+
+            if (signal == last)
+                break;
+        }
+        long end = System.nanoTime();
+        if (woke != null)
+            logger.trace("Woke up {} in {}ms from {}", woke, (end - start) * 0.000001d, Thread.currentThread().getStackTrace()[2]);
+    }
+
+    private void cleanUpCancelled()
+    {
+        // attempt to remove the cancelled from the beginning only, but if we fail to remove any proceed to cover
+        // the whole list
+        Iterator<RegisteredSignal> iter = queue.iterator();
+        while (iter.hasNext())
+        {
+            RegisteredSignal s = iter.next();
+            if (s.isCancelled())
+                iter.remove();
+        }
+    }
+
+    public boolean hasWaiters()
+    {
+        return !queue.isEmpty();
+    }
+
+    /**
+     * Return how many threads are waiting
+     * @return
+     */
+    public int getWaiting()
+    {
+        if (queue.isEmpty())
+            return 0;
+        Iterator<RegisteredSignal> iter = queue.iterator();
+        int count = 0;
+        while (iter.hasNext())
+        {
+            Signal next = iter.next();
+            if (!next.isCancelled())
+                count++;
+        }
+        return count;
+    }
+
+    /**
+     * A Signal is a one-time-use mechanism for a thread to wait for notification that some condition
+     * state has transitioned that it may be interested in (and hence should check if it is).
+     * It is potentially transient, i.e. the state can change in the meantime, it only indicates
+     * that it should be checked, not necessarily anything about what the expected state should be.
+     *
+     * Signal implementations should never wake up spuriously, they are always woken up by a
+     * signal() or signalAll().
+     *
+     * This abstract definition of Signal does not need to be tied to a WaitQueue.
+     * Whilst RegisteredSignal is the main building block of Signals, this abstract
+     * definition allows us to compose Signals in useful ways. The Signal is 'owned' by the
+     * thread that registered itself with WaitQueue(s) to obtain the underlying RegisteredSignal(s);
+     * only the owning thread should use a Signal.
+     */
+    public static interface Signal
+    {
+
+        /**
+         * @return true if signalled; once true, must be discarded by the owning thread.
+         */
+        public boolean isSignalled();
+
+        /**
+         * @return true if cancelled; once cancelled, must be discarded by the owning thread.
+         */
+        public boolean isCancelled();
+
+        /**
+         * @return isSignalled() || isCancelled(). Once true, the state is fixed and the Signal should be discarded
+         * by the owning thread.
+         */
+        public boolean isSet();
+
+        /**
+         * atomically: cancels the Signal if !isSet(), or returns true if isSignalled()
+         *
+         * @return true if isSignalled()
+         */
+        public boolean checkAndClear();
+
+        /**
+         * Should only be called by the owning thread. Indicates the signal can be retired,
+         * and if signalled propagates the signal to another waiting thread
+         */
+        public abstract void cancel();
+
+        /**
+         * Wait, without throwing InterruptedException, until signalled. On exit isSignalled() must be true.
+         * If the thread is interrupted in the meantime, the interrupted flag will be set.
+         */
+        public void awaitUninterruptibly();
+
+        /**
+         * Wait until signalled, or throw an InterruptedException if interrupted before this happens.
+         * On normal exit isSignalled() must be true; however if InterruptedException is thrown isCancelled()
+         * will be true.
+         * @throws InterruptedException
+         */
+        public void await() throws InterruptedException;
+
+        /**
+         * Wait until signalled, or the provided time is reached, or the thread is interrupted. If signalled,
+         * isSignalled() will be true on exit, and the method will return true; if timedout, the method will return
+         * false and isCancelled() will be true; if interrupted an InterruptedException will be thrown and isCancelled()
+         * will be true.
+         * @param until System.currentTimeMillis() to wait until
+         * @return true if signalled, false if timed out
+         * @throws InterruptedException
+         */
+        public boolean awaitUntil(long until) throws InterruptedException;
+    }
+
+    /**
+     * An abstract signal implementation
+     */
+    public static abstract class AbstractSignal implements Signal
+    {
+        public void awaitUninterruptibly()
+        {
+            boolean interrupted = false;
+            while (!isSignalled())
+            {
+                if (Thread.currentThread().interrupted())
+                    interrupted = true;
+                LockSupport.park();
+            }
+            if (interrupted)
+                Thread.currentThread().interrupt();
+            checkAndClear();
+        }
+
+        public void await() throws InterruptedException
+        {
+            while (!isSignalled())
+            {
+                checkInterrupted();
+                LockSupport.park();
+            }
+            checkAndClear();
+        }
+
+        public boolean awaitUntil(long until) throws InterruptedException
+        {
+            while (until < System.currentTimeMillis() && !isSignalled())
+            {
+                checkInterrupted();
+                LockSupport.parkUntil(until);
+            }
+            return checkAndClear();
+        }
+
+        private void checkInterrupted() throws InterruptedException
+        {
+            if (Thread.interrupted())
+            {
+                cancel();
+                throw new InterruptedException();
+            }
+        }
+    }
+
+    /**
+     * A signal registered with this WaitQueue
+     */
+    private class RegisteredSignal extends AbstractSignal
+    {
+        private volatile Thread thread = Thread.currentThread();
+        volatile int state;
+
+        public boolean isSignalled()
+        {
+            return state == SIGNALLED;
+        }
+
+        public boolean isCancelled()
+        {
+            return state == CANCELLED;
+        }
+
+        public boolean isSet()
+        {
+            return state != NOT_SET;
+        }
+
+        private boolean signal()
+        {
+            if (!isSet() && signalledUpdater.compareAndSet(this, NOT_SET, SIGNALLED))
+            {
+                LockSupport.unpark(thread);
+                thread = null;
+                return true;
+            }
+            return false;
+        }
+
+        public boolean checkAndClear()
+        {
+            if (!isSet() && signalledUpdater.compareAndSet(this, NOT_SET, CANCELLED))
+            {
+                thread = null;
+                cleanUpCancelled();
+                return false;
+            }
+            // must now be signalled assuming correct API usage
+            return true;
+        }
+
+        /**
+         * Should only be called by the registered thread. Indicates the signal can be retired,
+         * and if signalled propagates the signal to another waiting thread
+         */
+        public void cancel()
+        {
+            if (isCancelled())
+                return;
+            if (!signalledUpdater.compareAndSet(this, NOT_SET, CANCELLED))
+            {
+                // must already be signalled - switch to cancelled and
+                state = CANCELLED;
+                // propagate the signal
+                WaitQueue.this.signal();
+            }
+            thread = null;
+            cleanUpCancelled();
+        }
+    }
+
+    /**
+     * A RegisteredSignal that stores a TimerContext, and stops the timer when either cancelled or
+     * finished waiting. i.e. if the timer is started when the signal is registered it tracks the
+     * time in between registering and invalidating the signal.
+     */
+    private final class TimedSignal extends RegisteredSignal
+    {
+        private final TimerContext context;
+
+        private TimedSignal(TimerContext context)
+        {
+            this.context = context;
+        }
+
+        @Override
+        public boolean checkAndClear()
+        {
+            context.stop();
+            return super.checkAndClear();
+        }
+
+        @Override
+        public void cancel()
+        {
+            if (!isCancelled())
+            {
+                context.stop();
+                super.cancel();
+            }
+        }
+    }
+
+    /**
+     * An abstract signal wrapping multiple delegate signals
+     */
+    private abstract static class MultiSignal extends AbstractSignal
+    {
+        final Signal[] signals;
+        protected MultiSignal(Signal[] signals)
+        {
+            this.signals = signals;
+        }
+
+        public boolean isCancelled()
+        {
+            for (Signal signal : signals)
+                if (!signal.isCancelled())
+                    return false;
+            return true;
+        }
+
+        public boolean checkAndClear()
+        {
+            for (Signal signal : signals)
+                signal.checkAndClear();
+            return isSignalled();
+        }
+
+        public void cancel()
+        {
+            for (Signal signal : signals)
+                signal.cancel();
+        }
+    }
+
+    /**
+     * A Signal that wraps multiple Signals and returns when any single one of them would have returned
+     */
+    private static class AnySignal extends MultiSignal
+    {
+        protected AnySignal(Signal ... signals)
+        {
+            super(signals);
+        }
+
+        public boolean isSignalled()
+        {
+            for (Signal signal : signals)
+                if (signal.isSignalled())
+                    return true;
+            return false;
+        }
+
+        public boolean isSet()
+        {
+            for (Signal signal : signals)
+                if (signal.isSet())
+                    return true;
+            return false;
+        }
+    }
+
+    /**
+     * A Signal that wraps multiple Signals and returns when all of them would have finished returning
+     */
+    private static class AllSignal extends MultiSignal
+    {
+        protected AllSignal(Signal ... signals)
+        {
+            super(signals);
+        }
+
+        public boolean isSignalled()
+        {
+            for (Signal signal : signals)
+                if (!signal.isSignalled())
+                    return false;
+            return true;
+        }
+
+        public boolean isSet()
+        {
+            for (Signal signal : signals)
+                if (!signal.isSet())
+                    return false;
+            return true;
+        }
+    }
+
+    /**
+     * @param signals
+     * @return a signal that returns only when any of the provided signals would have returned
+     */
+    public static Signal any(Signal ... signals)
+    {
+        return new AnySignal(signals);
+    }
+
+    /**
+     * @param signals
+     * @return a signal that returns only when all provided signals would have returned
+     */
+    public static Signal all(Signal ... signals)
+    {
+        return new AllSignal(signals);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java b/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java
new file mode 100644
index 0000000..8ebdf30
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.memory;
+
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import java.nio.ByteBuffer;
+
+public abstract class AbstractAllocator
+{
+    /**
+     * Allocate a slice of the given length.
+     */
+    public ByteBuffer clone(ByteBuffer buffer)
+    {
+        assert buffer != null;
+        if (buffer.remaining() == 0)
+            return ByteBufferUtil.EMPTY_BYTE_BUFFER;
+        ByteBuffer cloned = allocate(buffer.remaining());
+
+        cloned.mark();
+        cloned.put(buffer.duplicate());
+        cloned.reset();
+        return cloned;
+    }
+
+    public abstract ByteBuffer allocate(int size);
+
+    //
+    // only really applicable to Pooled subclasses, but we provide default implementations here
+    //
+
+    public long owns()
+    {
+        return 0;
+    }
+
+    public float ownershipRatio()
+    {
+        return 0;
+    }
+
+    public long reclaiming()
+    {
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/memory/ContextAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/ContextAllocator.java b/src/java/org/apache/cassandra/utils/memory/ContextAllocator.java
new file mode 100644
index 0000000..c58340e
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/ContextAllocator.java
@@ -0,0 +1,59 @@
+package org.apache.cassandra.utils.memory;
+
+import com.google.common.base.*;
+import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.db.ColumnFamilyStore;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Wraps calls to a PoolAllocator with the provided writeOp. Also doubles as a Function that clones Cells
+ * using itself
+ */
+public final class ContextAllocator extends AbstractAllocator implements Function<Cell, Cell>
+{
+    private final OpOrder.Group opGroup;
+    private final PoolAllocator allocator;
+    private final ColumnFamilyStore cfs;
+
+    public ContextAllocator(OpOrder.Group opGroup, PoolAllocator allocator, ColumnFamilyStore cfs)
+    {
+        this.opGroup = opGroup;
+        this.allocator = allocator;
+        this.cfs = cfs;
+    }
+
+    @Override
+    public ByteBuffer clone(ByteBuffer buffer)
+    {
+        return allocator.clone(buffer, opGroup);
+    }
+
+    public ByteBuffer allocate(int size)
+    {
+        return allocator.allocate(size, opGroup);
+    }
+
+    public Cell apply(Cell column)
+    {
+        return column.localCopy(cfs, this);
+    }
+
+    public long owns()
+    {
+        return allocator.owns();
+    }
+
+    @Override
+    public float ownershipRatio()
+    {
+        return allocator.ownershipRatio();
+    }
+
+    @Override
+    public long reclaiming()
+    {
+        return allocator.reclaiming();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/memory/HeapAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/HeapAllocator.java b/src/java/org/apache/cassandra/utils/memory/HeapAllocator.java
new file mode 100644
index 0000000..86cea64
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/HeapAllocator.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.memory;
+
+import java.nio.ByteBuffer;
+
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+public final class HeapAllocator extends AbstractAllocator
+{
+    public static final HeapAllocator instance = new HeapAllocator();
+
+    /**
+     * Normally you should use HeapAllocator.instance, since there is no per-Allocator state.
+     * This is exposed so that the reflection done by Memtable works when SlabAllocator is disabled.
+     */
+    private HeapAllocator() {}
+
+    public ByteBuffer allocate(int size)
+    {
+        return ByteBuffer.allocate(size);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/memory/HeapPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/HeapPool.java b/src/java/org/apache/cassandra/utils/memory/HeapPool.java
new file mode 100644
index 0000000..bc293cb
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/HeapPool.java
@@ -0,0 +1,16 @@
+package org.apache.cassandra.utils.memory;
+
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+public class HeapPool extends Pool
+{
+    public HeapPool(long maxOnHeapMemory, float cleanupThreshold, Runnable cleaner)
+    {
+        super(maxOnHeapMemory, cleanupThreshold, cleaner);
+    }
+
+    public HeapPoolAllocator newAllocator(OpOrder writes)
+    {
+        return new HeapPoolAllocator(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/memory/HeapPoolAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/HeapPoolAllocator.java b/src/java/org/apache/cassandra/utils/memory/HeapPoolAllocator.java
new file mode 100644
index 0000000..cf798d0
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/HeapPoolAllocator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.memory;
+
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+import java.nio.ByteBuffer;
+
+public final class HeapPoolAllocator extends PoolAllocator
+{
+    HeapPoolAllocator(HeapPool pool)
+    {
+        super(pool);
+    }
+
+    public ByteBuffer allocate(int size)
+    {
+        return allocate(size, null);
+    }
+
+    public ByteBuffer allocate(int size, OpOrder.Group opGroup)
+    {
+        markAllocated(size, opGroup);
+        // must loop trying to acquire
+        return ByteBuffer.allocate(size);
+    }
+
+    public void free(ByteBuffer name)
+    {
+        release(name.remaining());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/memory/HeapSlabAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/HeapSlabAllocator.java b/src/java/org/apache/cassandra/utils/memory/HeapSlabAllocator.java
new file mode 100644
index 0000000..4396caf
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/HeapSlabAllocator.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.memory;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The SlabAllocator is a bump-the-pointer allocator that allocates
+ * large (2MB by default) regions and then doles them out to threads that request
+ * slices into the array.
+ * <p/>
+ * The purpose of this class is to combat heap fragmentation in long lived
+ * objects: by ensuring that all allocations with similar lifetimes
+ * only to large regions of contiguous memory, we ensure that large blocks
+ * get freed up at the same time.
+ * <p/>
+ * Otherwise, variable length byte arrays allocated end up
+ * interleaved throughout the heap, and the old generation gets progressively
+ * more fragmented until a stop-the-world compacting collection occurs.
+ */
+public class HeapSlabAllocator extends PoolAllocator
+{
+    private static final Logger logger = LoggerFactory.getLogger(HeapSlabAllocator.class);
+
+    private final static int REGION_SIZE = 1024 * 1024;
+    private final static int MAX_CLONED_SIZE = 128 * 1024; // bigger than this don't go in the region
+
+    // globally stash any Regions we allocate but are beaten to using, and use these up before allocating any more
+    private static final ConcurrentLinkedQueue<Region> RACE_ALLOCATED = new ConcurrentLinkedQueue<>();
+
+    private final AtomicReference<Region> currentRegion = new AtomicReference<Region>();
+    private final AtomicInteger regionCount = new AtomicInteger(0);
+    private AtomicLong unslabbed = new AtomicLong(0);
+
+    HeapSlabAllocator(Pool pool)
+    {
+        super(pool);
+    }
+
+    public ByteBuffer allocate(int size)
+    {
+        return allocate(size, null);
+    }
+
+    public ByteBuffer allocate(int size, OpOrder.Group opGroup)
+    {
+        assert size >= 0;
+        if (size == 0)
+            return ByteBufferUtil.EMPTY_BYTE_BUFFER;
+
+        markAllocated(size, opGroup);
+        // satisfy large allocations directly from JVM since they don't cause fragmentation
+        // as badly, and fill up our regions quickly
+        if (size > MAX_CLONED_SIZE)
+        {
+            unslabbed.addAndGet(size);
+            return ByteBuffer.allocate(size);
+        }
+
+        while (true)
+        {
+            Region region = getRegion();
+
+            // Try to allocate from this region
+            ByteBuffer cloned = region.allocate(size);
+            if (cloned != null)
+                return cloned;
+
+            // not enough space!
+            currentRegion.compareAndSet(region, null);
+        }
+    }
+
+    public void free(ByteBuffer name)
+    {
+        // have to assume we cannot free the memory here, and just reclaim it all when we flush
+    }
+
+    /**
+     * Get the current region, or, if there is no current region, allocate a new one
+     */
+    private Region getRegion()
+    {
+        while (true)
+        {
+            // Try to get the region
+            Region region = currentRegion.get();
+            if (region != null)
+                return region;
+
+            // No current region, so we want to allocate one. We race
+            // against other allocators to CAS in a Region, and if we fail we stash the region for re-use
+            region = RACE_ALLOCATED.poll();
+            if (region == null)
+                region = new Region(REGION_SIZE);
+            if (currentRegion.compareAndSet(null, region))
+            {
+                regionCount.incrementAndGet();
+                logger.trace("{} regions now allocated in {}", regionCount, this);
+                return region;
+            }
+
+            // someone else won race - that's fine, we'll try to grab theirs
+            // in the next iteration of the loop.
+            RACE_ALLOCATED.add(region);
+        }
+    }
+
+    /**
+     * A region of memory out of which allocations are sliced.
+     *
+     * This serves two purposes:
+     *  - to provide a step between initialization and allocation, so that racing to CAS a
+     *    new region in is harmless
+     *  - encapsulates the allocation offset
+     */
+    private static class Region
+    {
+        /**
+         * Actual underlying data
+         */
+        private ByteBuffer data;
+
+        /**
+         * Offset for the next allocation, or the sentinel value -1
+         * which implies that the region is still uninitialized.
+         */
+        private AtomicInteger nextFreeOffset = new AtomicInteger(0);
+
+        /**
+         * Total number of allocations satisfied from this buffer
+         */
+        private AtomicInteger allocCount = new AtomicInteger();
+
+        /**
+         * Create an uninitialized region. Note that memory is not allocated yet, so
+         * this is cheap.
+         *
+         * @param size in bytes
+         */
+        private Region(int size)
+        {
+            data = ByteBuffer.allocate(size);
+        }
+
+        /**
+         * Try to allocate <code>size</code> bytes from the region.
+         *
+         * @return the successful allocation, or null to indicate not-enough-space
+         */
+        public ByteBuffer allocate(int size)
+        {
+            while (true)
+            {
+                int oldOffset = nextFreeOffset.get();
+
+                if (oldOffset + size > data.capacity()) // capacity == remaining
+                    return null;
+
+                // Try to atomically claim this region
+                if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size))
+                {
+                    // we got the alloc
+                    allocCount.incrementAndGet();
+                    return (ByteBuffer) data.duplicate().position(oldOffset).limit(oldOffset + size);
+                }
+                // we raced and lost alloc, try again
+            }
+        }
+
+        @Override
+        public String toString()
+        {
+            return "Region@" + System.identityHashCode(this) +
+                   " allocs=" + allocCount.get() + "waste=" +
+                   (data.capacity() - nextFreeOffset.get());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/memory/HeapSlabPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/HeapSlabPool.java b/src/java/org/apache/cassandra/utils/memory/HeapSlabPool.java
new file mode 100644
index 0000000..bd1ab98
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/HeapSlabPool.java
@@ -0,0 +1,16 @@
+package org.apache.cassandra.utils.memory;
+
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+public class HeapSlabPool extends Pool
+{
+    public HeapSlabPool(long maxOnHeapMemory, float cleanupThreshold, Runnable cleaner)
+    {
+        super(maxOnHeapMemory, cleanupThreshold, cleaner);
+    }
+
+    public HeapSlabAllocator newAllocator(OpOrder writes)
+    {
+        return new HeapSlabAllocator(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/memory/Pool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/Pool.java b/src/java/org/apache/cassandra/utils/memory/Pool.java
new file mode 100644
index 0000000..4e59de8
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/Pool.java
@@ -0,0 +1,140 @@
+package org.apache.cassandra.utils.memory;
+
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
+
+/**
+ * Represents an amount of memory used for a given purpose, that can be allocated to specific tasks through
+ * child AbstractAllocator objects. AbstractAllocator and MemoryTracker correspond approximately to PoolAllocator and Pool,
+ * respectively, with the MemoryTracker bookkeeping the total shared use of resources, and the AbstractAllocator the amount
+ * checked out and in use by a specific PoolAllocator.
+ *
+ * Note the difference between acquire() and allocate(); allocate() makes more resources available to all owners,
+ * and acquire() makes shared resources unavailable but still recorded. An Owner must always acquire resources,
+ * but only needs to allocate if there are none already available. This distinction is not always meaningful.
+ */
+public abstract class Pool
+{
+    // total memory/resource permitted to allocate
+    public final long limit;
+
+    // ratio of used to spare (both excluding 'reclaiming') at which to trigger a clean
+    public final float cleanThreshold;
+
+    // total bytes allocated and reclaiming
+    private AtomicLong allocated = new AtomicLong();
+    private AtomicLong reclaiming = new AtomicLong();
+
+    final WaitQueue hasRoom = new WaitQueue();
+
+    // a cache of the calculation determining at what allocation threshold we should next clean, and the cleaner we trigger
+    private volatile long nextClean;
+    private final PoolCleanerThread<?> cleanerThread;
+
+    public Pool(long limit, float cleanThreshold, Runnable cleaner)
+    {
+        this.limit = limit;
+        this.cleanThreshold = cleanThreshold;
+        updateNextClean();
+        cleanerThread = cleaner == null ? null : new PoolCleanerThread<>(this, cleaner);
+        if (cleanerThread != null)
+            cleanerThread.start();
+    }
+
+    /** Methods for tracking and triggering a clean **/
+
+    boolean needsCleaning()
+    {
+        return used() >= nextClean && updateNextClean() && cleanerThread != null;
+    }
+
+    void maybeClean()
+    {
+        if (needsCleaning())
+            cleanerThread.trigger();
+    }
+
+    private boolean updateNextClean()
+    {
+        long reclaiming = this.reclaiming.get();
+        return used() >= (nextClean = reclaiming
+                + (long) (this.limit * cleanThreshold));
+    }
+
+    /** Methods to allocate space **/
+
+    boolean tryAllocate(int size)
+    {
+        while (true)
+        {
+            long cur;
+            if ((cur = allocated.get()) + size > limit)
+                return false;
+            if (allocated.compareAndSet(cur, cur + size))
+            {
+                maybeClean();
+                return true;
+            }
+        }
+    }
+
+    /**
+     * apply the size adjustment to allocated, bypassing any limits or constraints. If this reduces the
+     * allocated total, we will signal waiters
+     */
+    void adjustAllocated(long size)
+    {
+        if (size == 0)
+            return;
+        while (true)
+        {
+            long cur = allocated.get();
+            if (allocated.compareAndSet(cur, cur + size))
+            {
+                if (size > 0)
+                {
+                    maybeClean();
+                }
+                return;
+            }
+        }
+    }
+
+    void release(long size)
+    {
+        adjustAllocated(-size);
+        hasRoom.signalAll();
+    }
+
+    // space reclaimed should be released prior to calling this, to avoid triggering unnecessary cleans
+    void adjustReclaiming(long reclaiming)
+    {
+        if (reclaiming == 0)
+            return;
+        this.reclaiming.addAndGet(reclaiming);
+        if (reclaiming < 0 && updateNextClean() && cleanerThread != null)
+            cleanerThread.trigger();
+    }
+
+    public long allocated()
+    {
+        return allocated.get();
+    }
+
+    public long used()
+    {
+        return allocated.get();
+    }
+
+    public long reclaiming()
+    {
+        return reclaiming.get();
+    }
+
+    public abstract PoolAllocator newAllocator(OpOrder writes);
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/memory/PoolAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/PoolAllocator.java b/src/java/org/apache/cassandra/utils/memory/PoolAllocator.java
new file mode 100644
index 0000000..b30c484
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/PoolAllocator.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.memory;
+
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+
+public abstract class PoolAllocator<P extends Pool> extends AbstractAllocator
+{
+    public final P pool;
+    volatile LifeCycle state = LifeCycle.LIVE;
+
+    static enum LifeCycle
+    {
+        LIVE, DISCARDING, DISCARDED;
+        LifeCycle transition(LifeCycle target)
+        {
+            assert target.ordinal() == ordinal() + 1;
+            return target;
+        }
+    }
+
+    // the amount of memory/resource owned by this object
+    private AtomicLong owns = new AtomicLong();
+    // the amount of memory we are reporting to collect; this may be inaccurate, but is close
+    // and is used only to ensure that once we have reclaimed we mark the tracker with the same amount
+    private AtomicLong reclaiming = new AtomicLong();
+
+    PoolAllocator(P pool)
+    {
+        this.pool = pool;
+    }
+
+    /**
+     * Mark this allocator as reclaiming; this will mark the memory it owns as reclaiming, so remove it from
+     * any calculation deciding if further cleaning/reclamation is necessary.
+     */
+    public void setDiscarding()
+    {
+        state = state.transition(LifeCycle.DISCARDING);
+        // mark the memory owned by this allocator as reclaiming
+        long prev = reclaiming.get();
+        long cur = owns.get();
+        reclaiming.set(cur);
+        pool.adjustReclaiming(cur - prev);
+    }
+
+    /**
+     * Indicate the memory and resources owned by this allocator are no longer referenced,
+     * and can be reclaimed/reused.
+     */
+    public void setDiscarded()
+    {
+        state = state.transition(LifeCycle.DISCARDED);
+        // release any memory owned by this allocator; automatically signals waiters
+        pool.release(owns.getAndSet(0));
+        pool.adjustReclaiming(-reclaiming.get());
+    }
+
+    public abstract ByteBuffer allocate(int size, OpOrder.Group opGroup);
+
+    /** Mark the BB as unused, permitting it to be reclaimed */
+    public abstract void free(ByteBuffer name);
+
+    // mark ourselves as owning memory from the tracker.  meant to be called by subclass
+    // allocate method that actually allocates and returns a ByteBuffer
+    protected void markAllocated(int size, OpOrder.Group opGroup)
+    {
+        while (true)
+        {
+            if (pool.tryAllocate(size))
+            {
+                acquired(size);
+                return;
+            }
+            WaitQueue.Signal signal = opGroup.isBlockingSignal(pool.hasRoom.register());
+            boolean allocated = pool.tryAllocate(size);
+            if (allocated || opGroup.isBlocking())
+            {
+                signal.cancel();
+                if (allocated) // if we allocated, take ownership
+                    acquired(size);
+                else // otherwise we're blocking so we're permitted to overshoot our constraints, to just allocate without blocking
+                    allocated(size);
+                return;
+            }
+            else
+                signal.awaitUninterruptibly();
+        }
+    }
+
+    // retroactively mark (by-passes any constraints) an amount allocated in the tracker, and owned by us.
+    private void allocated(int size)
+    {
+        pool.adjustAllocated(size);
+        owns.addAndGet(size);
+    }
+
+    // retroactively mark (by-passes any constraints) an amount owned by us
+    private void acquired(int size)
+    {
+        owns.addAndGet(size);
+    }
+
+    // release an amount of memory from our ownership, and deallocate it in the tracker
+    void release(int size)
+    {
+        pool.release(size);
+        owns.addAndGet(-size);
+    }
+
+    public boolean isLive()
+    {
+        return state == LifeCycle.LIVE;
+    }
+
+    /**
+     * Allocate a slice of the given length.
+     */
+    public ByteBuffer clone(ByteBuffer buffer, OpOrder.Group opGroup)
+    {
+        assert buffer != null;
+        if (buffer.remaining() == 0)
+            return ByteBufferUtil.EMPTY_BYTE_BUFFER;
+        ByteBuffer cloned = allocate(buffer.remaining(), opGroup);
+
+        cloned.mark();
+        cloned.put(buffer.duplicate());
+        cloned.reset();
+        return cloned;
+    }
+
+    public ContextAllocator wrap(OpOrder.Group opGroup, ColumnFamilyStore cfs)
+    {
+        return new ContextAllocator(opGroup, this, cfs);
+    }
+
+    @Override
+    public long owns()
+    {
+        return owns.get();
+    }
+
+    @Override
+    public float ownershipRatio()
+    {
+        return owns.get() / (float) pool.limit;
+    }
+
+    @Override
+    public long reclaiming()
+    {
+        return reclaiming.get();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/memory/PoolCleanerThread.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/PoolCleanerThread.java b/src/java/org/apache/cassandra/utils/memory/PoolCleanerThread.java
new file mode 100644
index 0000000..bfae475
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/PoolCleanerThread.java
@@ -0,0 +1,55 @@
+package org.apache.cassandra.utils.memory;
+
+import org.apache.cassandra.utils.concurrent.WaitQueue;
+
+/**
+ * A thread that reclaims memor from a Pool on demand.  The actual reclaiming work is delegated to the
+ * cleaner Runnable, e.g., FlushLargestColumnFamily
+ */
+class PoolCleanerThread<P extends Pool> extends Thread
+{
+    /** The pool we're cleaning */
+    final P pool;
+
+    /** should ensure that at least some memory has been marked reclaiming after completion */
+    final Runnable cleaner;
+
+    /** signalled whenever needsCleaning() may return true */
+    final WaitQueue wait = new WaitQueue();
+
+    PoolCleanerThread(P pool, Runnable cleaner)
+    {
+        super(pool.getClass().getSimpleName() + "Cleaner");
+        this.pool = pool;
+        this.cleaner = cleaner;
+    }
+
+    boolean needsCleaning()
+    {
+        return pool.needsCleaning();
+    }
+
+    // should ONLY be called when we really think it already needs cleaning
+    void trigger()
+    {
+        wait.signal();
+    }
+
+    @Override
+    public void run()
+    {
+        while (true)
+        {
+            while (!needsCleaning())
+            {
+                final WaitQueue.Signal signal = wait.register();
+                if (!needsCleaning())
+                    signal.awaitUninterruptibly();
+                else
+                    signal.cancel();
+            }
+
+            cleaner.run();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java b/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java
new file mode 100644
index 0000000..f13c1b2
--- /dev/null
+++ b/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java
@@ -0,0 +1,223 @@
+package org.apache.cassandra.concurrent;
+
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.junit.*;
+import org.slf4j.*;
+
+import static org.junit.Assert.*;
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+// TODO: we don't currently test SAFE functionality at all!
+// TODO: should also test markBlocking and SyncOrdered
+public class LongOpOrderTest
+{
+
+    private static final Logger logger = LoggerFactory.getLogger(LongOpOrderTest.class);
+
+    static final int CONSUMERS = 4;
+    static final int PRODUCERS = 32;
+
+    static final long RUNTIME = TimeUnit.MINUTES.toMillis(5);
+    static final long REPORT_INTERVAL = TimeUnit.MINUTES.toMillis(1);
+
+    static final Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler()
+    {
+        @Override
+        public void uncaughtException(Thread t, Throwable e)
+        {
+            System.err.println(t.getName() + ": " + e.getMessage());
+            e.printStackTrace();
+        }
+    };
+
+    final OpOrder order = new OpOrder();
+    final AtomicInteger errors = new AtomicInteger();
+
+    class TestOrdering implements Runnable
+    {
+
+        final int[] waitNanos = new int[1 << 16];
+        volatile State state = new State();
+        final ScheduledExecutorService sched;
+
+        TestOrdering(ExecutorService exec, ScheduledExecutorService sched)
+        {
+            this.sched = sched;
+            final ThreadLocalRandom rnd = ThreadLocalRandom.current();
+            for (int i = 0 ; i < waitNanos.length ; i++)
+                waitNanos[i] = rnd.nextInt(5000);
+            for (int i = 0 ; i < PRODUCERS / CONSUMERS ; i++)
+                exec.execute(new Producer());
+            exec.execute(this);
+        }
+
+        @Override
+        public void run()
+        {
+            final long until = System.currentTimeMillis() + RUNTIME;
+            long lastReport = System.currentTimeMillis();
+            long count = 0;
+            long opCount = 0;
+            while (true)
+            {
+                long now = System.currentTimeMillis();
+                if (now > until)
+                    break;
+                if (now > lastReport + REPORT_INTERVAL)
+                {
+                    lastReport = now;
+                    logger.info(String.format("%s: Executed %d barriers with %d operations. %.0f%% complete.",
+                            Thread.currentThread().getName(), count, opCount, 100 * (1 - ((until - now) / (double) RUNTIME))));
+                }
+                try
+                {
+                    Thread.sleep(0, waitNanos[((int) (count & (waitNanos.length - 1)))]);
+                } catch (InterruptedException e)
+                {
+                    e.printStackTrace();
+                }
+
+                final State s = state;
+                s.barrier = order.newBarrier();
+                s.replacement = new State();
+                s.barrier.issue();
+                s.barrier.await();
+                s.check();
+                opCount += s.totalCount();
+                state = s.replacement;
+                sched.schedule(new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        s.check();
+                    }
+                }, 1, TimeUnit.SECONDS);
+                count++;
+            }
+        }
+
+        class State
+        {
+
+            volatile OpOrder.Barrier barrier;
+            volatile State replacement;
+            final NonBlockingHashMap<OpOrder.Group, AtomicInteger> count = new NonBlockingHashMap<>();
+            int checkCount = -1;
+
+            boolean accept(OpOrder.Group opGroup)
+            {
+                if (barrier != null && !barrier.isAfter(opGroup))
+                    return false;
+                AtomicInteger c;
+                if (null == (c = count.get(opGroup)))
+                {
+                    count.putIfAbsent(opGroup, new AtomicInteger());
+                    c = count.get(opGroup);
+                }
+                c.incrementAndGet();
+                return true;
+            }
+
+            int totalCount()
+            {
+                int c = 0;
+                for (AtomicInteger v : count.values())
+                    c += v.intValue();
+                return c;
+            }
+
+            void check()
+            {
+                boolean delete;
+                if (checkCount >= 0)
+                {
+                    if (checkCount != totalCount())
+                    {
+                        errors.incrementAndGet();
+                        logger.error("Received size changed after barrier finished: {} vs {}", checkCount, totalCount());
+                    }
+                    delete = true;
+                }
+                else
+                {
+                    checkCount = totalCount();
+                    delete = false;
+                }
+                for (Map.Entry<OpOrder.Group, AtomicInteger> e : count.entrySet())
+                {
+                    if (e.getKey().compareTo(barrier.getSyncPoint()) > 0)
+                    {
+                        errors.incrementAndGet();
+                        logger.error("Received an operation that was created after the barrier was issued.");
+                    }
+                    if (TestOrdering.this.count.get(e.getKey()).intValue() != e.getValue().intValue())
+                    {
+                        errors.incrementAndGet();
+                        logger.error("Missing registered operations. {} vs {}", TestOrdering.this.count.get(e.getKey()).intValue(), e.getValue().intValue());
+                    }
+                    if (delete)
+                        TestOrdering.this.count.remove(e.getKey());
+                }
+            }
+
+        }
+
+        final NonBlockingHashMap<OpOrder.Group, AtomicInteger> count = new NonBlockingHashMap<>();
+
+        class Producer implements Runnable
+        {
+            public void run()
+            {
+                while (true)
+                {
+                    AtomicInteger c;
+                    OpOrder.Group opGroup = order.start();
+                    try
+                    {
+                        if (null == (c = count.get(opGroup)))
+                        {
+                            count.putIfAbsent(opGroup, new AtomicInteger());
+                            c = count.get(opGroup);
+                        }
+                        c.incrementAndGet();
+                        State s = state;
+                        while (!s.accept(opGroup))
+                            s = s.replacement;
+                    }
+                    finally
+                    {
+                        opGroup.finishOne();
+                    }
+                }
+            }
+        }
+
+    }
+
+    @Test
+    public void testOrdering() throws InterruptedException
+    {
+        errors.set(0);
+        Thread.setDefaultUncaughtExceptionHandler(handler);
+        final ExecutorService exec = Executors.newCachedThreadPool(new NamedThreadFactory("checker"));
+        final ScheduledExecutorService checker = Executors.newScheduledThreadPool(1, new NamedThreadFactory("checker"));
+        for (int i = 0 ; i < CONSUMERS ; i++)
+            new TestOrdering(exec, checker);
+        exec.shutdown();
+        exec.awaitTermination((long) (RUNTIME * 1.1), TimeUnit.MILLISECONDS);
+        assertTrue(exec.isShutdown());
+        assertTrue(errors.get() == 0);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java b/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java
new file mode 100644
index 0000000..e7bfe30
--- /dev/null
+++ b/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java
@@ -0,0 +1,72 @@
+package org.apache.cassandra.db;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class LongFlushMemtableTest extends SchemaLoader
+{
+    @Test
+    public void testFlushMemtables() throws IOException, ConfigurationException
+    {
+        Keyspace keyspace = Keyspace.open("Keyspace1");
+        for (int i = 0; i < 100; i++)
+        {
+            CFMetaData metadata = CFMetaData.denseCFMetaData(keyspace.getName(), "_CF" + i, UTF8Type.instance);
+            MigrationManager.announceNewColumnFamily(metadata);
+        }
+
+        for (int j = 0; j < 200; j++)
+        {
+            for (int i = 0; i < 100; i++)
+            {
+                Mutation rm = new Mutation("Keyspace1", ByteBufferUtil.bytes("key" + j));
+                ColumnFamily cf = TreeMapBackedSortedColumns.factory.create("Keyspace1", "_CF" + i);
+                // don't cheat by allocating this outside of the loop; that defeats the purpose of deliberately using lots of memory
+                ByteBuffer value = ByteBuffer.allocate(100000);
+                cf.addColumn(new Cell(Util.cellname("c"), value));
+                rm.add(cf);
+                rm.applyUnsafe();
+            }
+        }
+
+        int flushes = 0;
+        for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
+        {
+            if (cfs.name.startsWith("_CF"))
+                flushes += cfs.getMemtableSwitchCount();
+        }
+        assert flushes > 0;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/test/long/org/apache/cassandra/db/MeteredFlusherTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/MeteredFlusherTest.java b/test/long/org/apache/cassandra/db/MeteredFlusherTest.java
deleted file mode 100644
index 4bab277..0000000
--- a/test/long/org/apache/cassandra/db/MeteredFlusherTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-package org.apache.cassandra.db;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.junit.Test;
-
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.Util;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.service.MigrationManager;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public class MeteredFlusherTest extends SchemaLoader
-{
-    @Test
-    public void testManyMemtables() throws IOException, ConfigurationException
-    {
-        Keyspace keyspace = Keyspace.open("Keyspace1");
-        for (int i = 0; i < 100; i++)
-        {
-            CFMetaData metadata = CFMetaData.denseCFMetaData(keyspace.getName(), "_CF" + i, UTF8Type.instance);
-            MigrationManager.announceNewColumnFamily(metadata);
-        }
-
-        for (int j = 0; j < 200; j++)
-        {
-            for (int i = 0; i < 100; i++)
-            {
-                Mutation rm = new Mutation("Keyspace1", ByteBufferUtil.bytes("key" + j));
-                ColumnFamily cf = TreeMapBackedSortedColumns.factory.create("Keyspace1", "_CF" + i);
-                // don't cheat by allocating this outside of the loop; that defeats the purpose of deliberately using lots of memory
-                ByteBuffer value = ByteBuffer.allocate(100000);
-                cf.addColumn(new Cell(Util.cellname("c"), value));
-                rm.add(cf);
-                rm.applyUnsafe();
-            }
-        }
-
-        int flushes = 0;
-        for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
-        {
-            if (cfs.name.startsWith("_CF"))
-                flushes += cfs.getMemtableSwitchCount();
-        }
-        assert flushes > 0;
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/test/long/org/apache/cassandra/utils/LongBTreeTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/utils/LongBTreeTest.java b/test/long/org/apache/cassandra/utils/LongBTreeTest.java
index 9f31743..b8e60de 100644
--- a/test/long/org/apache/cassandra/utils/LongBTreeTest.java
+++ b/test/long/org/apache/cassandra/utils/LongBTreeTest.java
@@ -50,7 +50,7 @@ public class LongBTreeTest
         TreeSet<Integer> canon = new TreeSet<>();
         for (int i = 0 ; i < 10000000 ; i++)
             canon.add(i);
-        Object[] btree = BTree.build(Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE), ICMP, true);
+        Object[] btree = BTree.build(Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE), ICMP, true, null);
         btree = BTree.update(btree, ICMP, canon, true);
         canon.add(Integer.MIN_VALUE);
         canon.add(Integer.MAX_VALUE);
@@ -171,7 +171,7 @@ public class LongBTreeTest
                     canon.putAll(buffer);
                     ctxt.stop();
                     ctxt = BTREE_TIMER.time();
-                    btree = BTree.update(btree, ICMP, buffer.keySet(), true, null, null);
+                    btree = BTree.update(btree, ICMP, buffer.keySet(), true, null);
                     ctxt.stop();
 
                     if (quickEquality)
@@ -200,7 +200,7 @@ public class LongBTreeTest
             String id = String.format("[0..%d)", canon.size());
             System.out.println("Testing " + id);
             Futures.allAsList(testAllSlices(id, cur, canon)).get();
-            cur = BTree.update(cur, ICMP, Arrays.asList(i), true, null, null);
+            cur = BTree.update(cur, ICMP, Arrays.asList(i), true, null);
             canon.add(i);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
index aa6d3dd..849c30c 100644
--- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
+++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
@@ -142,7 +142,7 @@ public class CacheProviderTest extends SchemaLoader
             this.string = input;
         }
 
-        public long memorySize()
+        public long unsharedHeapSize()
         {
             return string.length();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/test/unit/org/apache/cassandra/cache/ObjectSizeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cache/ObjectSizeTest.java b/test/unit/org/apache/cassandra/cache/ObjectSizeTest.java
deleted file mode 100644
index da34711..0000000
--- a/test/unit/org/apache/cassandra/cache/ObjectSizeTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-package org.apache.cassandra.cache;
-/*
- * 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * 
- */
-
-
-import java.nio.ByteBuffer;
-import java.util.UUID;
-
-import org.junit.Assert;
-
-import org.apache.cassandra.db.ColumnIndex;
-import org.apache.cassandra.db.DeletionTime;
-import org.apache.cassandra.db.RowIndexEntry;
-import org.apache.cassandra.utils.ObjectSizes;
-import org.github.jamm.MemoryMeter;
-import org.junit.Test;
-
-public class ObjectSizeTest
-{
-    public static final MemoryMeter meter = new MemoryMeter().omitSharedBufferOverhead();
-
-    @Test
-    public void testArraySizes()
-    {
-        long size = ObjectSizes.getArraySize(0, 1);
-        long size2 = meter.measureDeep(new byte[0]);
-        Assert.assertEquals(size, size2);
-    }
-
-    @Test
-    public void testBiggerArraySizes()
-    {
-        long size = ObjectSizes.getArraySize(0, 1);
-        long size2 = meter.measureDeep(new byte[0]);
-        Assert.assertEquals(size, size2);
-
-        size = ObjectSizes.getArraySize(8, 1);
-        size2 = meter.measureDeep(new byte[8]);
-        Assert.assertEquals(size, size2);
-    }
-
-    @Test
-    public void testKeyCacheKey()
-    {
-        KeyCacheKey key = new KeyCacheKey(null, null, ByteBuffer.wrap(new byte[0]));
-        long size = key.memorySize();
-        long size2 = meter.measureDeep(key);
-        Assert.assertEquals(size, size2);
-    }
-
-    @Test
-    public void testKeyCacheValue()
-    {
-        RowIndexEntry entry = new RowIndexEntry(123);
-        long size = entry.memorySize();
-        long size2 = meter.measureDeep(entry);
-        Assert.assertEquals(size, size2);
-    }
-
-    @Test
-    public void testKeyCacheValueWithDelInfo()
-    {
-        RowIndexEntry entry = RowIndexEntry.create(123, new DeletionTime(123, 123), ColumnIndex.nothing());
-        long size = entry.memorySize();
-        long size2 = meter.measureDeep(entry);
-        Assert.assertEquals(size, size2);
-    }
-
-    @Test
-    public void testRowCacheKey()
-    {
-        UUID id = UUID.randomUUID();
-        RowCacheKey key = new RowCacheKey(id, ByteBuffer.wrap(new byte[11]));
-        long size = key.memorySize();
-        long size2 = meter.measureDeep(key) - meter.measureDeep(id);
-        Assert.assertEquals(size, size2);
-    }
-
-    @Test
-    public void testRowCacheSentinel()
-    {
-        RowCacheSentinel sentinel = new RowCacheSentinel(123);
-        long size = sentinel.memorySize();
-        long size2 = meter.measureDeep(sentinel);
-        Assert.assertEquals(size, size2);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java b/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
index c2888bc..15d40dc 100644
--- a/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
+++ b/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
@@ -1,6 +1,6 @@
 package org.apache.cassandra.concurrent;
 
-import org.apache.cassandra.utils.WaitQueue;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
 import org.junit.*;
 
 import java.util.concurrent.atomic.AtomicBoolean;