You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2014/01/31 00:09:27 UTC
[2/6] remove Table.switchlock and introduce o.a.c.utils.memory
package patch by Benedict Elliott Smith;
reviewed by jbellis for CASSANDRA-5549
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/btree/ReplaceFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/ReplaceFunction.java b/src/java/org/apache/cassandra/utils/btree/ReplaceFunction.java
deleted file mode 100644
index a193c31..0000000
--- a/src/java/org/apache/cassandra/utils/btree/ReplaceFunction.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package org.apache.cassandra.utils.btree;
-
-import com.google.common.base.Function;
-
-/**
- * An interface defining a function to be applied to both the object we are replacing in a BTree and
- * the object that is intended to replace it, returning the object to actually replace it.
- *
- * If this is a new insertion, that is there is no object to replace, the one argument variant of
- * the function will be called.
- *
- * @param <V>
- */
-public interface ReplaceFunction<V> extends Function<V, V>
-{
- V apply(V replaced, V update);
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java b/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java
new file mode 100644
index 0000000..cd30492
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java
@@ -0,0 +1,30 @@
+package org.apache.cassandra.utils.btree;
+
+import com.google.common.base.Function;
+
+/**
+ * An interface defining a function to be applied to both the object we are replacing in a BTree and
+ * the object that is intended to replace it, returning the object to actually replace it.
+ *
+ * @param <V>
+ */
+public interface UpdateFunction<V> extends Function<V, V>
+{
+ /**
+ * @param replacing the value in the original tree we have matched
+ * @param update the value in the updating collection that matched
+ * @return the value to insert into the new tree
+ */
+ V apply(V replacing, V update);
+
+ /**
+ * @return true if we should fail the update
+ */
+ boolean abortEarly();
+
+ /**
+ * @param heapSize extra heap space allocated (over previous tree)
+ */
+ void allocated(long heapSize);
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java b/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java
new file mode 100644
index 0000000..44330a0
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java
@@ -0,0 +1,411 @@
+package org.apache.cassandra.utils.concurrent;
+
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
+/**
+ * <p>A class for providing synchronization between producers and consumers that do not
+ * communicate directly with each other, but where the consumers need to process their
+ * work in contiguous batches. In particular this is useful for both CommitLog and Memtable
+ * where the producers (writing threads) are modifying a structure that the consumer
+ * (flush executor) only batch syncs, but needs to know what 'position' the work is at
+ * for co-ordination with other processes,
+ *
+ * <p>The typical usage is something like:
+ * <pre>
+ public final class ExampleShared
+ {
+ final OpOrder order = new OpOrder();
+ volatile SharedState state;
+
+ static class SharedState
+ {
+ volatile Barrier barrier;
+
+ // ...
+ }
+
+ public void consume()
+ {
+ SharedState state = this.state;
+ state.setReplacement(new State())
+ state.doSomethingToPrepareForBarrier();
+
+ state.barrier = order.newBarrier();
+ // seal() MUST be called after newBarrier() else barrier.isAfter()
+ // will always return true, and barrier.await() will fail
+ state.barrier.issue();
+
+ // wait for all producer work started prior to the barrier to complete
+ state.barrier.await();
+
+ // change the shared state to its replacement, as the current state will no longer be used by producers
+ this.state = state.getReplacement();
+
+ state.doSomethingWithExclusiveAccess();
+ }
+
+ public void produce()
+ {
+ Group opGroup = order.start();
+ try
+ {
+ SharedState s = state;
+ while (s.barrier != null && !s.barrier.isAfter(opGroup))
+ s = s.getReplacement();
+ s.doProduceWork();
+ }
+ finally
+ {
+ opGroup.finishOne();
+ }
+ }
+ }
+ * </pre>
+ */
+public class OpOrder
+{
+ /**
+ * Constant that when an Ordered.running is equal to, indicates the Ordered is complete
+ */
+ private static final int FINISHED = -1;
+
+ /**
+ * A linked list starting with the most recent Ordered object, i.e. the one we should start new operations from,
+ * with (prev) links to any incomplete Ordered instances, and (next) links to any potential future Ordered instances.
+ * Once all operations started against an Ordered instance and its ancestors have been finished the next instance
+ * will unlink this one
+ */
+ private volatile Group current = new Group();
+
+ /**
+ * Start an operation against this OpOrder.
+ * Once the operation is completed Ordered.finishOne() MUST be called EXACTLY once for this operation.
+ *
+ * @return the Ordered instance that manages this OpOrder
+ */
+ public Group start()
+ {
+ while (true)
+ {
+ Group current = this.current;
+ if (current.register())
+ return current;
+ }
+ }
+
+ /**
+ * Creates a new barrier. The barrier is only a placeholder until barrier.issue() is called on it,
+ * after which all new operations will start against a new Group that will not be accepted
+ * by barrier.isAfter(), and barrier.await() will return only once all operations started prior to the issue
+ * have completed.
+ *
+ * @return
+ */
+ public Barrier newBarrier()
+ {
+ return new Barrier();
+ }
+
+ public Group getCurrent()
+ {
+ return current;
+ }
+
+ /**
+ * Represents a group of identically ordered operations, i.e. all operations started in the interval between
+ * two barrier issuances. For each register() call this is returned, finishOne() must be called exactly once.
+ * It should be treated like taking a lock().
+ */
+ public static final class Group implements Comparable<Group>
+ {
+ /**
+ * In general this class goes through the following stages:
+ * 1) LIVE: many calls to register() and finishOne()
+ * 2) FINISHING: a call to expire() (after a barrier issue), means calls to register() will now fail,
+ * and we are now 'in the past' (new operations will be started against a new Ordered)
+ * 3) FINISHED: once the last finishOne() is called, this Ordered is done. We call unlink().
+ * 4) ZOMBIE: all our operations are finished, but some operations against an earlier Ordered are still
+ * running, or tidying up, so unlink() fails to remove us
+ * 5) COMPLETE: all operations started on or before us are FINISHED (and COMPLETE), so we are unlinked
+ * <p/>
+ * Another parallel states is ISBLOCKING:
+ * <p/>
+ * isBlocking => a barrier that is waiting on us (either directly, or via a future Ordered) is blocking general
+ * progress. This state is entered by calling Barrier.markBlocking(). If the running operations are blocked
+ * on a Signal that is also registered with the isBlockingSignal (probably through isSafeBlockingSignal)
+ * then they will be notified that they are blocking forward progress, and may take action to avoid that.
+ */
+
+ private volatile Group prev, next;
+ private final long id; // monotonically increasing id for compareTo()
+ private volatile int running = 0; // number of operations currently running. < 0 means we're expired, and the count of tasks still running is -(running + 1)
+ private volatile boolean isBlocking; // indicates running operations are blocking future barriers
+ private final WaitQueue isBlockingSignal = new WaitQueue(); // signal to wait on to indicate isBlocking is true
+ private final WaitQueue waiting = new WaitQueue(); // signal to wait on for completion
+
+ static final AtomicIntegerFieldUpdater<Group> runningUpdater = AtomicIntegerFieldUpdater.newUpdater(Group.class, "running");
+
+ // constructs first instance only
+ private Group()
+ {
+ this.id = 0;
+ }
+
+ private Group(Group prev)
+ {
+ this.id = prev.id + 1;
+ this.prev = prev;
+ }
+
+ // prevents any further operations starting against this Ordered instance
+ // if there are no running operations, calls unlink; otherwise, we let the last op to finishOne call it.
+ // this means issue() won't have to block for ops to finish.
+ private void expire()
+ {
+ while (true)
+ {
+ int current = running;
+ if (current < 0)
+ throw new IllegalStateException();
+ if (runningUpdater.compareAndSet(this, current, -1 - current))
+ {
+ // if we're already finished (no running ops), unlink ourselves
+ if (current == 0)
+ unlink();
+ return;
+ }
+ }
+ }
+
+ // attempts to start an operation against this Ordered instance, and returns true if successful.
+ private boolean register()
+ {
+ while (true)
+ {
+ int current = running;
+ if (current < 0)
+ return false;
+ if (runningUpdater.compareAndSet(this, current, current + 1))
+ return true;
+ }
+ }
+
+ /**
+ * To be called exactly once for each register() call this object is returned for, indicating the operation
+ * is complete
+ */
+ public void finishOne()
+ {
+ while (true)
+ {
+ int current = running;
+ if (current < 0)
+ {
+ if (runningUpdater.compareAndSet(this, current, current + 1))
+ {
+ if (current + 1 == FINISHED)
+ {
+ // if we're now finished, unlink ourselves
+ unlink();
+ }
+ return;
+ }
+ }
+ else if (runningUpdater.compareAndSet(this, current, current - 1))
+ {
+ return;
+ }
+ }
+ }
+
+ /**
+ * called once we know all operations started against this Ordered have completed,
+ * however we do not know if operations against its ancestors have completed, or
+ * if its descendants have completed ahead of it, so we attempt to create the longest
+ * chain from the oldest still linked Ordered. If we can't reach the oldest through
+ * an unbroken chain of completed Ordered, we abort, and leave the still completing
+ * ancestor to tidy up.
+ */
+ private void unlink()
+ {
+ // walk back in time to find the start of the list
+ Group start = this;
+ while (true)
+ {
+ Group prev = start.prev;
+ if (prev == null)
+ break;
+ // if we haven't finished this Ordered yet abort and let it clean up when it's done
+ if (prev.running != FINISHED)
+ return;
+ start = prev;
+ }
+
+ // now walk forwards in time, in case we finished up late
+ Group end = this.next;
+ while (end.running == FINISHED)
+ end = end.next;
+
+ // now walk from first to last, unlinking the prev pointer and waking up any blocking threads
+ while (start != end)
+ {
+ Group next = start.next;
+ next.prev = null;
+ start.waiting.signalAll();
+ start = next;
+ }
+ }
+
+ /**
+ * @return true if a barrier we are behind is, or may be, blocking general progress,
+ * so we should try more aggressively to progress
+ */
+ public boolean isBlocking()
+ {
+ return isBlocking;
+ }
+
+ /**
+ * register to be signalled when a barrier waiting on us is, or maybe, blocking general progress,
+ * so we should try more aggressively to progress
+ */
+ public WaitQueue.Signal isBlockingSignal()
+ {
+ return isBlockingSignal.register();
+ }
+
+ /**
+ * wrap the provided signal to also be signalled if the operation gets marked blocking
+ */
+ public WaitQueue.Signal isBlockingSignal(WaitQueue.Signal signal)
+ {
+ return WaitQueue.any(signal, isBlockingSignal());
+ }
+
+ public int compareTo(Group that)
+ {
+ // we deliberately use subtraction, as opposed to Long.compareTo() as we care about ordering
+ // not which is the smaller value, so this permits wrapping in the unlikely event we exhaust the long space
+ long c = this.id - that.id;
+ if (c > 0)
+ return 1;
+ else if (c < 0)
+ return -1;
+ else
+ return 0;
+ }
+ }
+
+ /**
+ * This class represents a synchronisation point providing ordering guarantees on operations started
+ * against the enclosing OpOrder. When issue() is called upon it (may only happen once per Barrier), the
+ * Barrier atomically partitions new operations from those already running (by expiring the current Group),
+ * and activates its isAfter() method
+ * which indicates if an operation was started before or after this partition. It offers methods to
+ * determine, or block until, all prior operations have finished, and a means to indicate to those operations
+ * that they are blocking forward progress. See {@link OpOrder} for idiomatic usage.
+ */
+ public final class Barrier
+ {
+ // this Barrier was issued after all Group operations started against orderOnOrBefore
+ private volatile Group orderOnOrBefore;
+
+ /**
+ * @return true if @param group was started prior to the issuing of the barrier.
+ *
+ * (Until issue is called, always returns true, but if you rely on this behavior you are probably
+ * Doing It Wrong.)
+ */
+ public boolean isAfter(Group group)
+ {
+ if (orderOnOrBefore == null)
+ return true;
+ // we subtract to permit wrapping round the full range of Long - so we only need to ensure
+ // there are never Long.MAX_VALUE * 2 total Group objects in existence at any one timem which will
+ // take care of itself
+ return orderOnOrBefore.id - group.id >= 0;
+ }
+
+ /**
+ * Issues (seals) the barrier, meaning no new operations may be issued against it, and expires the current
+ * Group. Must be called before await() for isAfter() to be properly synchronised.
+ */
+ public void issue()
+ {
+ if (orderOnOrBefore != null)
+ throw new IllegalStateException("Can only call issue() once on each Barrier");
+
+ final Group current;
+ synchronized (OpOrder.this)
+ {
+ current = OpOrder.this.current;
+ orderOnOrBefore = current;
+ OpOrder.this.current = current.next = new Group(current);
+ }
+ current.expire();
+ }
+
+ /**
+ * Mark all prior operations as blocking, potentially signalling them to more aggressively make progress
+ */
+ public void markBlocking()
+ {
+ Group current = orderOnOrBefore;
+ while (current != null)
+ {
+ current.isBlocking = true;
+ current.isBlockingSignal.signalAll();
+ current = current.prev;
+ }
+ }
+
+ /**
+ * Register to be signalled once allPriorOpsAreFinished() or allPriorOpsAreFinishedOrSafe() may return true
+ */
+ public WaitQueue.Signal register()
+ {
+ return orderOnOrBefore.waiting.register();
+ }
+
+ /**
+ * @return true if all operations started prior to barrier.issue() have completed
+ */
+ public boolean allPriorOpsAreFinished()
+ {
+ Group current = orderOnOrBefore;
+ if (current == null)
+ throw new IllegalStateException("This barrier needs to have issue() called on it before prior operations can complete");
+ if (current.next.prev == null)
+ return true;
+ return false;
+ }
+
+ /**
+ * wait for all operations started prior to issuing the barrier to complete
+ */
+ public void await()
+ {
+ while (!allPriorOpsAreFinished())
+ {
+ WaitQueue.Signal signal = register();
+ if (allPriorOpsAreFinished())
+ {
+ signal.cancel();
+ return;
+ }
+ else
+ signal.awaitUninterruptibly();
+ }
+ assert orderOnOrBefore.running == FINISHED;
+ }
+
+ /**
+ * returns the Group we are waiting on - any Group with .compareTo(getSyncPoint()) <= 0
+ * must complete before await() returns
+ */
+ public Group getSyncPoint()
+ {
+ return orderOnOrBefore;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/concurrent/WaitQueue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/WaitQueue.java b/src/java/org/apache/cassandra/utils/concurrent/WaitQueue.java
new file mode 100644
index 0000000..9bef8c3
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/concurrent/WaitQueue.java
@@ -0,0 +1,508 @@
+package org.apache.cassandra.utils.concurrent;
+
+import com.yammer.metrics.core.TimerContext;
+import org.slf4j.*;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.locks.LockSupport;
+
+/**
+ * <p>A relatively easy to use utility for general purpose thread signalling.</p>
+ * <p>Usage on a thread awaiting a state change using a WaitQueue q is:</p>
+ * <pre>
+ * {@code
+ * while (!conditionMet())
+ * Signal s = q.register();
+ * if (!conditionMet()) // or, perhaps more correctly, !conditionChanged()
+ * s.await();
+ * else
+ * s.cancel();
+ * }
+ * </pre>
+ * A signalling thread, AFTER changing the state, then calls q.signal() to wake up one, or q.signalAll()
+ * to wake up all, waiting threads.
+ * <p>To understand intuitively how this class works, the idea is simply that a thread, once it considers itself
+ * incapable of making progress, registers to be awoken once that changes. Since this could have changed between
+ * checking and registering (in which case the thread that made this change would have been unable to signal it),
+ * it checks the condition again, sleeping only if it hasn't changed/still is not met.</p>
+ * <p>This thread synchronisation scheme has some advantages over Condition objects and Object.wait/notify in that no monitor
+ * acquisition is necessary and, in fact, besides the actual waiting on a signal, all operations are non-blocking.
+ * As a result consumers can never block producers, nor each other, or vice versa, from making progress.
+ * Threads that are signalled are also put into a RUNNABLE state almost simultaneously, so they can all immediately make
+ * progress without having to serially acquire the monitor/lock, reducing scheduler delay incurred.</p>
+ *
+ * <p>A few notes on utilisation:</p>
+ * <p>1. A thread will only exit await() when it has been signalled, but this does not guarantee the condition has not
+ * been altered since it was signalled, and depending on your design it is likely the outer condition will need to be
+ * checked in a loop, though this is not always the case.</p>
+ * <p>2. Each signal is single use, so must be re-registered after each await(). This is true even if it times out.</p>
+ * <p>3. If you choose not to wait on the signal (because the condition has been met before you waited on it)
+ * you must cancel() the signal if the signalling thread uses signal() to awake waiters; otherwise signals will be
+ * lost. If signalAll() is used but infrequent, and register() is frequent, cancel() should still be used to prevent the
+ * queue growing unboundedly. Similarly, if you provide a TimerContext, cancel should be used to ensure it is not erroneously
+ * counted towards wait time.</p>
+ * <p>4. Care must be taken when selecting conditionMet() to ensure we are waiting on the condition that actually
+ * indicates progress is possible. In some complex cases it may be tempting to wait on a condition that is only indicative
+ * of local progress, not progress on the task we are aiming to complete, and a race may leave us waiting for a condition
+ * to be met that we no longer need.
+ * <p>5. This scheme is not fair</p>
+ * <p>6. Only the thread that calls register() may call await()</p>
+ */
+public final class WaitQueue
+{
+
+ private static final Logger logger = LoggerFactory.getLogger(WaitQueue.class);
+
+ private static final int CANCELLED = -1;
+ private static final int SIGNALLED = 1;
+ private static final int NOT_SET = 0;
+
+ private static final AtomicIntegerFieldUpdater signalledUpdater = AtomicIntegerFieldUpdater.newUpdater(RegisteredSignal.class, "state");
+
+ // the waiting signals
+ private final ConcurrentLinkedDeque<RegisteredSignal> queue = new ConcurrentLinkedDeque<>();
+
+ /**
+ * The calling thread MUST be the thread that uses the signal
+ * @return
+ */
+ public Signal register()
+ {
+ RegisteredSignal signal = new RegisteredSignal();
+ queue.add(signal);
+ return signal;
+ }
+
+ /**
+ * The calling thread MUST be the thread that uses the signal.
+ * If the Signal is waited on, context.stop() will be called when the wait times out, the Signal is signalled,
+ * or the waiting thread is interrupted.
+ * @return
+ */
+ public Signal register(TimerContext context)
+ {
+ assert context != null;
+ RegisteredSignal signal = new TimedSignal(context);
+ queue.add(signal);
+ return signal;
+ }
+
+ /**
+ * Signal one waiting thread
+ */
+ public boolean signal()
+ {
+ if (!hasWaiters())
+ return false;
+ while (true)
+ {
+ RegisteredSignal s = queue.poll();
+ if (s == null || s.signal())
+ return s != null;
+ }
+ }
+
+ /**
+ * Signal all waiting threads
+ */
+ public void signalAll()
+ {
+ if (!hasWaiters())
+ return;
+ List<Thread> woke = null;
+ if (logger.isTraceEnabled())
+ woke = new ArrayList<>();
+ long start = System.nanoTime();
+ // we wake up only a snapshot of the queue, to avoid a race where the condition is not met and the woken thread
+ // immediately waits on the queue again
+ RegisteredSignal last = queue.getLast();
+ Iterator<RegisteredSignal> iter = queue.iterator();
+ while (iter.hasNext())
+ {
+ RegisteredSignal signal = iter.next();
+ if (logger.isTraceEnabled())
+ {
+ Thread thread = signal.thread;
+ if (signal.signal())
+ woke.add(thread);
+ }
+ else
+ signal.signal();
+
+ iter.remove();
+
+ if (signal == last)
+ break;
+ }
+ long end = System.nanoTime();
+ if (woke != null)
+ logger.trace("Woke up {} in {}ms from {}", woke, (end - start) * 0.000001d, Thread.currentThread().getStackTrace()[2]);
+ }
+
+ private void cleanUpCancelled()
+ {
+ // attempt to remove the cancelled from the beginning only, but if we fail to remove any proceed to cover
+ // the whole list
+ Iterator<RegisteredSignal> iter = queue.iterator();
+ while (iter.hasNext())
+ {
+ RegisteredSignal s = iter.next();
+ if (s.isCancelled())
+ iter.remove();
+ }
+ }
+
+ public boolean hasWaiters()
+ {
+ return !queue.isEmpty();
+ }
+
+ /**
+ * Return how many threads are waiting
+ * @return
+ */
+ public int getWaiting()
+ {
+ if (queue.isEmpty())
+ return 0;
+ Iterator<RegisteredSignal> iter = queue.iterator();
+ int count = 0;
+ while (iter.hasNext())
+ {
+ Signal next = iter.next();
+ if (!next.isCancelled())
+ count++;
+ }
+ return count;
+ }
+
+ /**
+ * A Signal is a one-time-use mechanism for a thread to wait for notification that some condition
+ * state has transitioned that it may be interested in (and hence should check if it is).
+ * It is potentially transient, i.e. the state can change in the meantime, it only indicates
+ * that it should be checked, not necessarily anything about what the expected state should be.
+ *
+ * Signal implementations should never wake up spuriously, they are always woken up by a
+ * signal() or signalAll().
+ *
+ * This abstract definition of Signal does not need to be tied to a WaitQueue.
+ * Whilst RegisteredSignal is the main building block of Signals, this abstract
+ * definition allows us to compose Signals in useful ways. The Signal is 'owned' by the
+ * thread that registered itself with WaitQueue(s) to obtain the underlying RegisteredSignal(s);
+ * only the owning thread should use a Signal.
+ */
+ public static interface Signal
+ {
+
+ /**
+ * @return true if signalled; once true, must be discarded by the owning thread.
+ */
+ public boolean isSignalled();
+
+ /**
+ * @return true if cancelled; once cancelled, must be discarded by the owning thread.
+ */
+ public boolean isCancelled();
+
+ /**
+ * @return isSignalled() || isCancelled(). Once true, the state is fixed and the Signal should be discarded
+ * by the owning thread.
+ */
+ public boolean isSet();
+
+ /**
+ * atomically: cancels the Signal if !isSet(), or returns true if isSignalled()
+ *
+ * @return true if isSignalled()
+ */
+ public boolean checkAndClear();
+
+ /**
+ * Should only be called by the owning thread. Indicates the signal can be retired,
+ * and if signalled propagates the signal to another waiting thread
+ */
+ public abstract void cancel();
+
+ /**
+ * Wait, without throwing InterruptedException, until signalled. On exit isSignalled() must be true.
+ * If the thread is interrupted in the meantime, the interrupted flag will be set.
+ */
+ public void awaitUninterruptibly();
+
+ /**
+ * Wait until signalled, or throw an InterruptedException if interrupted before this happens.
+ * On normal exit isSignalled() must be true; however if InterruptedException is thrown isCancelled()
+ * will be true.
+ * @throws InterruptedException
+ */
+ public void await() throws InterruptedException;
+
+ /**
+ * Wait until signalled, or the provided time is reached, or the thread is interrupted. If signalled,
+ * isSignalled() will be true on exit, and the method will return true; if timedout, the method will return
+ * false and isCancelled() will be true; if interrupted an InterruptedException will be thrown and isCancelled()
+ * will be true.
+ * @param until System.currentTimeMillis() to wait until
+ * @return true if signalled, false if timed out
+ * @throws InterruptedException
+ */
+ public boolean awaitUntil(long until) throws InterruptedException;
+ }
+
+ /**
+ * An abstract signal implementation
+ */
+ public static abstract class AbstractSignal implements Signal
+ {
+ public void awaitUninterruptibly()
+ {
+ boolean interrupted = false;
+ while (!isSignalled())
+ {
+ if (Thread.currentThread().interrupted())
+ interrupted = true;
+ LockSupport.park();
+ }
+ if (interrupted)
+ Thread.currentThread().interrupt();
+ checkAndClear();
+ }
+
+ public void await() throws InterruptedException
+ {
+ while (!isSignalled())
+ {
+ checkInterrupted();
+ LockSupport.park();
+ }
+ checkAndClear();
+ }
+
+ public boolean awaitUntil(long until) throws InterruptedException
+ {
+ while (until < System.currentTimeMillis() && !isSignalled())
+ {
+ checkInterrupted();
+ LockSupport.parkUntil(until);
+ }
+ return checkAndClear();
+ }
+
+ private void checkInterrupted() throws InterruptedException
+ {
+ if (Thread.interrupted())
+ {
+ cancel();
+ throw new InterruptedException();
+ }
+ }
+ }
+
+ /**
+ * A signal registered with this WaitQueue
+ */
+ private class RegisteredSignal extends AbstractSignal
+ {
+ private volatile Thread thread = Thread.currentThread();
+ volatile int state;
+
+ public boolean isSignalled()
+ {
+ return state == SIGNALLED;
+ }
+
+ public boolean isCancelled()
+ {
+ return state == CANCELLED;
+ }
+
+ public boolean isSet()
+ {
+ return state != NOT_SET;
+ }
+
+ private boolean signal()
+ {
+ if (!isSet() && signalledUpdater.compareAndSet(this, NOT_SET, SIGNALLED))
+ {
+ LockSupport.unpark(thread);
+ thread = null;
+ return true;
+ }
+ return false;
+ }
+
+ public boolean checkAndClear()
+ {
+ if (!isSet() && signalledUpdater.compareAndSet(this, NOT_SET, CANCELLED))
+ {
+ thread = null;
+ cleanUpCancelled();
+ return false;
+ }
+ // must now be signalled assuming correct API usage
+ return true;
+ }
+
+ /**
+ * Should only be called by the registered thread. Indicates the signal can be retired,
+ * and if signalled propagates the signal to another waiting thread
+ */
+ public void cancel()
+ {
+ if (isCancelled())
+ return;
+ if (!signalledUpdater.compareAndSet(this, NOT_SET, CANCELLED))
+ {
+ // must already be signalled - switch to cancelled and
+ state = CANCELLED;
+ // propagate the signal
+ WaitQueue.this.signal();
+ }
+ thread = null;
+ cleanUpCancelled();
+ }
+ }
+
+ /**
+ * A RegisteredSignal that stores a TimerContext, and stops the timer when either cancelled or
+ * finished waiting. i.e. if the timer is started when the signal is registered it tracks the
+ * time in between registering and invalidating the signal.
+ */
+ private final class TimedSignal extends RegisteredSignal
+ {
+ private final TimerContext context;
+
+ private TimedSignal(TimerContext context)
+ {
+ this.context = context;
+ }
+
+ @Override
+ public boolean checkAndClear()
+ {
+ context.stop();
+ return super.checkAndClear();
+ }
+
+ @Override
+ public void cancel()
+ {
+ if (!isCancelled())
+ {
+ context.stop();
+ super.cancel();
+ }
+ }
+ }
+
+ /**
+ * An abstract signal wrapping multiple delegate signals
+ */
+ private abstract static class MultiSignal extends AbstractSignal
+ {
+ final Signal[] signals;
+ protected MultiSignal(Signal[] signals)
+ {
+ this.signals = signals;
+ }
+
+ public boolean isCancelled()
+ {
+ for (Signal signal : signals)
+ if (!signal.isCancelled())
+ return false;
+ return true;
+ }
+
+ public boolean checkAndClear()
+ {
+ for (Signal signal : signals)
+ signal.checkAndClear();
+ return isSignalled();
+ }
+
+ public void cancel()
+ {
+ for (Signal signal : signals)
+ signal.cancel();
+ }
+ }
+
+ /**
+ * A Signal that wraps multiple Signals and returns when any single one of them would have returned
+ */
+ private static class AnySignal extends MultiSignal
+ {
+ protected AnySignal(Signal ... signals)
+ {
+ super(signals);
+ }
+
+ public boolean isSignalled()
+ {
+ for (Signal signal : signals)
+ if (signal.isSignalled())
+ return true;
+ return false;
+ }
+
+ public boolean isSet()
+ {
+ for (Signal signal : signals)
+ if (signal.isSet())
+ return true;
+ return false;
+ }
+ }
+
+ /**
+ * A Signal that wraps multiple Signals and returns when all of them would have finished returning
+ */
+ private static class AllSignal extends MultiSignal
+ {
+ protected AllSignal(Signal ... signals)
+ {
+ super(signals);
+ }
+
+ public boolean isSignalled()
+ {
+ for (Signal signal : signals)
+ if (!signal.isSignalled())
+ return false;
+ return true;
+ }
+
+ public boolean isSet()
+ {
+ for (Signal signal : signals)
+ if (!signal.isSet())
+ return false;
+ return true;
+ }
+ }
+
+ /**
+ * @param signals
+ * @return a signal that returns only when any of the provided signals would have returned
+ */
+ public static Signal any(Signal ... signals)
+ {
+ return new AnySignal(signals);
+ }
+
+ /**
+ * @param signals
+ * @return a signal that returns only when all provided signals would have returned
+ */
+ public static Signal all(Signal ... signals)
+ {
+ return new AllSignal(signals);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java b/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java
new file mode 100644
index 0000000..8ebdf30
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.memory;
+
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import java.nio.ByteBuffer;
+
+public abstract class AbstractAllocator
+{
+ /**
+ * Allocate a slice of the given length.
+ */
+ public ByteBuffer clone(ByteBuffer buffer)
+ {
+ assert buffer != null;
+ if (buffer.remaining() == 0)
+ return ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ ByteBuffer cloned = allocate(buffer.remaining());
+
+ cloned.mark();
+ cloned.put(buffer.duplicate());
+ cloned.reset();
+ return cloned;
+ }
+
+ public abstract ByteBuffer allocate(int size);
+
+ //
+ // only really applicable to Pooled subclasses, but we provide default implementations here
+ //
+
+ public long owns()
+ {
+ return 0;
+ }
+
+ public float ownershipRatio()
+ {
+ return 0;
+ }
+
+ public long reclaiming()
+ {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/memory/ContextAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/ContextAllocator.java b/src/java/org/apache/cassandra/utils/memory/ContextAllocator.java
new file mode 100644
index 0000000..c58340e
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/ContextAllocator.java
@@ -0,0 +1,59 @@
+package org.apache.cassandra.utils.memory;
+
+import com.google.common.base.*;
+import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.db.ColumnFamilyStore;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Wraps calls to a PoolAllocator with the provided writeOp. Also doubles as a Function that clones Cells
+ * using itself
+ */
+public final class ContextAllocator extends AbstractAllocator implements Function<Cell, Cell>
+{
+ private final OpOrder.Group opGroup;
+ private final PoolAllocator allocator;
+ private final ColumnFamilyStore cfs;
+
+ public ContextAllocator(OpOrder.Group opGroup, PoolAllocator allocator, ColumnFamilyStore cfs)
+ {
+ this.opGroup = opGroup;
+ this.allocator = allocator;
+ this.cfs = cfs;
+ }
+
+ @Override
+ public ByteBuffer clone(ByteBuffer buffer)
+ {
+ return allocator.clone(buffer, opGroup);
+ }
+
+ public ByteBuffer allocate(int size)
+ {
+ return allocator.allocate(size, opGroup);
+ }
+
+ public Cell apply(Cell column)
+ {
+ return column.localCopy(cfs, this);
+ }
+
+ public long owns()
+ {
+ return allocator.owns();
+ }
+
+ @Override
+ public float ownershipRatio()
+ {
+ return allocator.ownershipRatio();
+ }
+
+ @Override
+ public long reclaiming()
+ {
+ return allocator.reclaiming();
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/memory/HeapAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/HeapAllocator.java b/src/java/org/apache/cassandra/utils/memory/HeapAllocator.java
new file mode 100644
index 0000000..86cea64
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/HeapAllocator.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.memory;
+
+import java.nio.ByteBuffer;
+
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+public final class HeapAllocator extends AbstractAllocator
+{
+ public static final HeapAllocator instance = new HeapAllocator();
+
+ /**
+ * Normally you should use HeapAllocator.instance, since there is no per-Allocator state.
+ * This is exposed so that the reflection done by Memtable works when SlabAllocator is disabled.
+ */
+ private HeapAllocator() {}
+
+ public ByteBuffer allocate(int size)
+ {
+ return ByteBuffer.allocate(size);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/memory/HeapPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/HeapPool.java b/src/java/org/apache/cassandra/utils/memory/HeapPool.java
new file mode 100644
index 0000000..bc293cb
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/HeapPool.java
@@ -0,0 +1,16 @@
+package org.apache.cassandra.utils.memory;
+
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+public class HeapPool extends Pool
+{
+ public HeapPool(long maxOnHeapMemory, float cleanupThreshold, Runnable cleaner)
+ {
+ super(maxOnHeapMemory, cleanupThreshold, cleaner);
+ }
+
+ public HeapPoolAllocator newAllocator(OpOrder writes)
+ {
+ return new HeapPoolAllocator(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/memory/HeapPoolAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/HeapPoolAllocator.java b/src/java/org/apache/cassandra/utils/memory/HeapPoolAllocator.java
new file mode 100644
index 0000000..cf798d0
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/HeapPoolAllocator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.memory;
+
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+import java.nio.ByteBuffer;
+
+public final class HeapPoolAllocator extends PoolAllocator
+{
+ HeapPoolAllocator(HeapPool pool)
+ {
+ super(pool);
+ }
+
+ public ByteBuffer allocate(int size)
+ {
+ return allocate(size, null);
+ }
+
+ public ByteBuffer allocate(int size, OpOrder.Group opGroup)
+ {
+ markAllocated(size, opGroup);
+ // must loop trying to acquire
+ return ByteBuffer.allocate(size);
+ }
+
+ public void free(ByteBuffer name)
+ {
+ release(name.remaining());
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/memory/HeapSlabAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/HeapSlabAllocator.java b/src/java/org/apache/cassandra/utils/memory/HeapSlabAllocator.java
new file mode 100644
index 0000000..4396caf
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/HeapSlabAllocator.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.memory;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The SlabAllocator is a bump-the-pointer allocator that allocates
+ * large (2MB by default) regions and then doles them out to threads that request
+ * slices into the array.
+ * <p/>
+ * The purpose of this class is to combat heap fragmentation in long lived
+ * objects: by ensuring that all allocations with similar lifetimes
+ * only to large regions of contiguous memory, we ensure that large blocks
+ * get freed up at the same time.
+ * <p/>
+ * Otherwise, variable length byte arrays allocated end up
+ * interleaved throughout the heap, and the old generation gets progressively
+ * more fragmented until a stop-the-world compacting collection occurs.
+ */
+public class HeapSlabAllocator extends PoolAllocator
+{
+ private static final Logger logger = LoggerFactory.getLogger(HeapSlabAllocator.class);
+
+ private final static int REGION_SIZE = 1024 * 1024;
+ private final static int MAX_CLONED_SIZE = 128 * 1024; // bigger than this don't go in the region
+
+ // globally stash any Regions we allocate but are beaten to using, and use these up before allocating any more
+ private static final ConcurrentLinkedQueue<Region> RACE_ALLOCATED = new ConcurrentLinkedQueue<>();
+
+ private final AtomicReference<Region> currentRegion = new AtomicReference<Region>();
+ private final AtomicInteger regionCount = new AtomicInteger(0);
+ private AtomicLong unslabbed = new AtomicLong(0);
+
+ HeapSlabAllocator(Pool pool)
+ {
+ super(pool);
+ }
+
+ public ByteBuffer allocate(int size)
+ {
+ return allocate(size, null);
+ }
+
+ public ByteBuffer allocate(int size, OpOrder.Group opGroup)
+ {
+ assert size >= 0;
+ if (size == 0)
+ return ByteBufferUtil.EMPTY_BYTE_BUFFER;
+
+ markAllocated(size, opGroup);
+ // satisfy large allocations directly from JVM since they don't cause fragmentation
+ // as badly, and fill up our regions quickly
+ if (size > MAX_CLONED_SIZE)
+ {
+ unslabbed.addAndGet(size);
+ return ByteBuffer.allocate(size);
+ }
+
+ while (true)
+ {
+ Region region = getRegion();
+
+ // Try to allocate from this region
+ ByteBuffer cloned = region.allocate(size);
+ if (cloned != null)
+ return cloned;
+
+ // not enough space!
+ currentRegion.compareAndSet(region, null);
+ }
+ }
+
+ public void free(ByteBuffer name)
+ {
+ // have to assume we cannot free the memory here, and just reclaim it all when we flush
+ }
+
+ /**
+ * Get the current region, or, if there is no current region, allocate a new one
+ */
+ private Region getRegion()
+ {
+ while (true)
+ {
+ // Try to get the region
+ Region region = currentRegion.get();
+ if (region != null)
+ return region;
+
+ // No current region, so we want to allocate one. We race
+ // against other allocators to CAS in a Region, and if we fail we stash the region for re-use
+ region = RACE_ALLOCATED.poll();
+ if (region == null)
+ region = new Region(REGION_SIZE);
+ if (currentRegion.compareAndSet(null, region))
+ {
+ regionCount.incrementAndGet();
+ logger.trace("{} regions now allocated in {}", regionCount, this);
+ return region;
+ }
+
+ // someone else won race - that's fine, we'll try to grab theirs
+ // in the next iteration of the loop.
+ RACE_ALLOCATED.add(region);
+ }
+ }
+
+ /**
+ * A region of memory out of which allocations are sliced.
+ *
+ * This serves two purposes:
+ * - to provide a step between initialization and allocation, so that racing to CAS a
+ * new region in is harmless
+ * - encapsulates the allocation offset
+ */
+ private static class Region
+ {
+ /**
+ * Actual underlying data
+ */
+ private ByteBuffer data;
+
+ /**
+ * Offset for the next allocation, or the sentinel value -1
+ * which implies that the region is still uninitialized.
+ */
+ private AtomicInteger nextFreeOffset = new AtomicInteger(0);
+
+ /**
+ * Total number of allocations satisfied from this buffer
+ */
+ private AtomicInteger allocCount = new AtomicInteger();
+
+ /**
+ * Create an uninitialized region. Note that memory is not allocated yet, so
+ * this is cheap.
+ *
+ * @param size in bytes
+ */
+ private Region(int size)
+ {
+ data = ByteBuffer.allocate(size);
+ }
+
+ /**
+ * Try to allocate <code>size</code> bytes from the region.
+ *
+ * @return the successful allocation, or null to indicate not-enough-space
+ */
+ public ByteBuffer allocate(int size)
+ {
+ while (true)
+ {
+ int oldOffset = nextFreeOffset.get();
+
+ if (oldOffset + size > data.capacity()) // capacity == remaining
+ return null;
+
+ // Try to atomically claim this region
+ if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size))
+ {
+ // we got the alloc
+ allocCount.incrementAndGet();
+ return (ByteBuffer) data.duplicate().position(oldOffset).limit(oldOffset + size);
+ }
+ // we raced and lost alloc, try again
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Region@" + System.identityHashCode(this) +
+ " allocs=" + allocCount.get() + "waste=" +
+ (data.capacity() - nextFreeOffset.get());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/memory/HeapSlabPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/HeapSlabPool.java b/src/java/org/apache/cassandra/utils/memory/HeapSlabPool.java
new file mode 100644
index 0000000..bd1ab98
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/HeapSlabPool.java
@@ -0,0 +1,16 @@
+package org.apache.cassandra.utils.memory;
+
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+public class HeapSlabPool extends Pool
+{
+ public HeapSlabPool(long maxOnHeapMemory, float cleanupThreshold, Runnable cleaner)
+ {
+ super(maxOnHeapMemory, cleanupThreshold, cleaner);
+ }
+
+ public HeapSlabAllocator newAllocator(OpOrder writes)
+ {
+ return new HeapSlabAllocator(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/memory/Pool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/Pool.java b/src/java/org/apache/cassandra/utils/memory/Pool.java
new file mode 100644
index 0000000..4e59de8
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/Pool.java
@@ -0,0 +1,140 @@
+package org.apache.cassandra.utils.memory;
+
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
+
+/**
+ * Represents an amount of memory used for a given purpose, that can be allocated to specific tasks through
+ * child AbstractAllocator objects. AbstractAllocator and MemoryTracker correspond approximately to PoolAllocator and Pool,
+ * respectively, with the MemoryTracker bookkeeping the total shared use of resources, and the AbstractAllocator the amount
+ * checked out and in use by a specific PoolAllocator.
+ *
+ * Note the difference between acquire() and allocate(); allocate() makes more resources available to all owners,
+ * and acquire() makes shared resources unavailable but still recorded. An Owner must always acquire resources,
+ * but only needs to allocate if there are none already available. This distinction is not always meaningful.
+ */
+public abstract class Pool
+{
+ // total memory/resource permitted to allocate
+ public final long limit;
+
+ // ratio of used to spare (both excluding 'reclaiming') at which to trigger a clean
+ public final float cleanThreshold;
+
+ // total bytes allocated and reclaiming
+ private AtomicLong allocated = new AtomicLong();
+ private AtomicLong reclaiming = new AtomicLong();
+
+ final WaitQueue hasRoom = new WaitQueue();
+
+ // a cache of the calculation determining at what allocation threshold we should next clean, and the cleaner we trigger
+ private volatile long nextClean;
+ private final PoolCleanerThread<?> cleanerThread;
+
+ public Pool(long limit, float cleanThreshold, Runnable cleaner)
+ {
+ this.limit = limit;
+ this.cleanThreshold = cleanThreshold;
+ updateNextClean();
+ cleanerThread = cleaner == null ? null : new PoolCleanerThread<>(this, cleaner);
+ if (cleanerThread != null)
+ cleanerThread.start();
+ }
+
+ /** Methods for tracking and triggering a clean **/
+
+ boolean needsCleaning()
+ {
+ return used() >= nextClean && updateNextClean() && cleanerThread != null;
+ }
+
+ void maybeClean()
+ {
+ if (needsCleaning())
+ cleanerThread.trigger();
+ }
+
+ private boolean updateNextClean()
+ {
+ long reclaiming = this.reclaiming.get();
+ return used() >= (nextClean = reclaiming
+ + (long) (this.limit * cleanThreshold));
+ }
+
+ /** Methods to allocate space **/
+
+ boolean tryAllocate(int size)
+ {
+ while (true)
+ {
+ long cur;
+ if ((cur = allocated.get()) + size > limit)
+ return false;
+ if (allocated.compareAndSet(cur, cur + size))
+ {
+ maybeClean();
+ return true;
+ }
+ }
+ }
+
+ /**
+ * apply the size adjustment to allocated, bypassing any limits or constraints. If this reduces the
+ * allocated total, we will signal waiters
+ */
+ void adjustAllocated(long size)
+ {
+ if (size == 0)
+ return;
+ while (true)
+ {
+ long cur = allocated.get();
+ if (allocated.compareAndSet(cur, cur + size))
+ {
+ if (size > 0)
+ {
+ maybeClean();
+ }
+ return;
+ }
+ }
+ }
+
+ void release(long size)
+ {
+ adjustAllocated(-size);
+ hasRoom.signalAll();
+ }
+
+ // space reclaimed should be released prior to calling this, to avoid triggering unnecessary cleans
+ void adjustReclaiming(long reclaiming)
+ {
+ if (reclaiming == 0)
+ return;
+ this.reclaiming.addAndGet(reclaiming);
+ if (reclaiming < 0 && updateNextClean() && cleanerThread != null)
+ cleanerThread.trigger();
+ }
+
+ public long allocated()
+ {
+ return allocated.get();
+ }
+
+ public long used()
+ {
+ return allocated.get();
+ }
+
+ public long reclaiming()
+ {
+ return reclaiming.get();
+ }
+
+ public abstract PoolAllocator newAllocator(OpOrder writes);
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/memory/PoolAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/PoolAllocator.java b/src/java/org/apache/cassandra/utils/memory/PoolAllocator.java
new file mode 100644
index 0000000..b30c484
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/PoolAllocator.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils.memory;
+
+import org.apache.cassandra.utils.concurrent.OpOrder;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+
+public abstract class PoolAllocator<P extends Pool> extends AbstractAllocator
+{
+ public final P pool;
+ volatile LifeCycle state = LifeCycle.LIVE;
+
+ static enum LifeCycle
+ {
+ LIVE, DISCARDING, DISCARDED;
+ LifeCycle transition(LifeCycle target)
+ {
+ assert target.ordinal() == ordinal() + 1;
+ return target;
+ }
+ }
+
+ // the amount of memory/resource owned by this object
+ private AtomicLong owns = new AtomicLong();
+ // the amount of memory we are reporting to collect; this may be inaccurate, but is close
+ // and is used only to ensure that once we have reclaimed we mark the tracker with the same amount
+ private AtomicLong reclaiming = new AtomicLong();
+
+ PoolAllocator(P pool)
+ {
+ this.pool = pool;
+ }
+
+ /**
+ * Mark this allocator as reclaiming; this will mark the memory it owns as reclaiming, so remove it from
+ * any calculation deciding if further cleaning/reclamation is necessary.
+ */
+ public void setDiscarding()
+ {
+ state = state.transition(LifeCycle.DISCARDING);
+ // mark the memory owned by this allocator as reclaiming
+ long prev = reclaiming.get();
+ long cur = owns.get();
+ reclaiming.set(cur);
+ pool.adjustReclaiming(cur - prev);
+ }
+
+ /**
+ * Indicate the memory and resources owned by this allocator are no longer referenced,
+ * and can be reclaimed/reused.
+ */
+ public void setDiscarded()
+ {
+ state = state.transition(LifeCycle.DISCARDED);
+ // release any memory owned by this allocator; automatically signals waiters
+ pool.release(owns.getAndSet(0));
+ pool.adjustReclaiming(-reclaiming.get());
+ }
+
+ public abstract ByteBuffer allocate(int size, OpOrder.Group opGroup);
+
+ /** Mark the BB as unused, permitting it to be reclaimed */
+ public abstract void free(ByteBuffer name);
+
+ // mark ourselves as owning memory from the tracker. meant to be called by subclass
+ // allocate method that actually allocates and returns a ByteBuffer
+ protected void markAllocated(int size, OpOrder.Group opGroup)
+ {
+ while (true)
+ {
+ if (pool.tryAllocate(size))
+ {
+ acquired(size);
+ return;
+ }
+ WaitQueue.Signal signal = opGroup.isBlockingSignal(pool.hasRoom.register());
+ boolean allocated = pool.tryAllocate(size);
+ if (allocated || opGroup.isBlocking())
+ {
+ signal.cancel();
+ if (allocated) // if we allocated, take ownership
+ acquired(size);
+ else // otherwise we're blocking so we're permitted to overshoot our constraints, to just allocate without blocking
+ allocated(size);
+ return;
+ }
+ else
+ signal.awaitUninterruptibly();
+ }
+ }
+
+ // retroactively mark (by-passes any constraints) an amount allocated in the tracker, and owned by us.
+ private void allocated(int size)
+ {
+ pool.adjustAllocated(size);
+ owns.addAndGet(size);
+ }
+
+ // retroactively mark (by-passes any constraints) an amount owned by us
+ private void acquired(int size)
+ {
+ owns.addAndGet(size);
+ }
+
+ // release an amount of memory from our ownership, and deallocate it in the tracker
+ void release(int size)
+ {
+ pool.release(size);
+ owns.addAndGet(-size);
+ }
+
+ public boolean isLive()
+ {
+ return state == LifeCycle.LIVE;
+ }
+
+ /**
+ * Allocate a slice of the given length.
+ */
+ public ByteBuffer clone(ByteBuffer buffer, OpOrder.Group opGroup)
+ {
+ assert buffer != null;
+ if (buffer.remaining() == 0)
+ return ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ ByteBuffer cloned = allocate(buffer.remaining(), opGroup);
+
+ cloned.mark();
+ cloned.put(buffer.duplicate());
+ cloned.reset();
+ return cloned;
+ }
+
+ public ContextAllocator wrap(OpOrder.Group opGroup, ColumnFamilyStore cfs)
+ {
+ return new ContextAllocator(opGroup, this, cfs);
+ }
+
+ @Override
+ public long owns()
+ {
+ return owns.get();
+ }
+
+ @Override
+ public float ownershipRatio()
+ {
+ return owns.get() / (float) pool.limit;
+ }
+
+ @Override
+ public long reclaiming()
+ {
+ return reclaiming.get();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/src/java/org/apache/cassandra/utils/memory/PoolCleanerThread.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/memory/PoolCleanerThread.java b/src/java/org/apache/cassandra/utils/memory/PoolCleanerThread.java
new file mode 100644
index 0000000..bfae475
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/memory/PoolCleanerThread.java
@@ -0,0 +1,55 @@
+package org.apache.cassandra.utils.memory;
+
+import org.apache.cassandra.utils.concurrent.WaitQueue;
+
+/**
+ * A thread that reclaims memor from a Pool on demand. The actual reclaiming work is delegated to the
+ * cleaner Runnable, e.g., FlushLargestColumnFamily
+ */
+class PoolCleanerThread<P extends Pool> extends Thread
+{
+ /** The pool we're cleaning */
+ final P pool;
+
+ /** should ensure that at least some memory has been marked reclaiming after completion */
+ final Runnable cleaner;
+
+ /** signalled whenever needsCleaning() may return true */
+ final WaitQueue wait = new WaitQueue();
+
+ PoolCleanerThread(P pool, Runnable cleaner)
+ {
+ super(pool.getClass().getSimpleName() + "Cleaner");
+ this.pool = pool;
+ this.cleaner = cleaner;
+ }
+
+ boolean needsCleaning()
+ {
+ return pool.needsCleaning();
+ }
+
+ // should ONLY be called when we really think it already needs cleaning
+ void trigger()
+ {
+ wait.signal();
+ }
+
+ @Override
+ public void run()
+ {
+ while (true)
+ {
+ while (!needsCleaning())
+ {
+ final WaitQueue.Signal signal = wait.register();
+ if (!needsCleaning())
+ signal.awaitUninterruptibly();
+ else
+ signal.cancel();
+ }
+
+ cleaner.run();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java b/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java
new file mode 100644
index 0000000..f13c1b2
--- /dev/null
+++ b/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java
@@ -0,0 +1,223 @@
+package org.apache.cassandra.concurrent;
+
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.junit.*;
+import org.slf4j.*;
+
+import static org.junit.Assert.*;
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+// TODO: we don't currently test SAFE functionality at all!
+// TODO: should also test markBlocking and SyncOrdered
+public class LongOpOrderTest
+{
+
+ private static final Logger logger = LoggerFactory.getLogger(LongOpOrderTest.class);
+
+ static final int CONSUMERS = 4;
+ static final int PRODUCERS = 32;
+
+ static final long RUNTIME = TimeUnit.MINUTES.toMillis(5);
+ static final long REPORT_INTERVAL = TimeUnit.MINUTES.toMillis(1);
+
+ static final Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler()
+ {
+ @Override
+ public void uncaughtException(Thread t, Throwable e)
+ {
+ System.err.println(t.getName() + ": " + e.getMessage());
+ e.printStackTrace();
+ }
+ };
+
+ final OpOrder order = new OpOrder();
+ final AtomicInteger errors = new AtomicInteger();
+
+ class TestOrdering implements Runnable
+ {
+
+ final int[] waitNanos = new int[1 << 16];
+ volatile State state = new State();
+ final ScheduledExecutorService sched;
+
+ TestOrdering(ExecutorService exec, ScheduledExecutorService sched)
+ {
+ this.sched = sched;
+ final ThreadLocalRandom rnd = ThreadLocalRandom.current();
+ for (int i = 0 ; i < waitNanos.length ; i++)
+ waitNanos[i] = rnd.nextInt(5000);
+ for (int i = 0 ; i < PRODUCERS / CONSUMERS ; i++)
+ exec.execute(new Producer());
+ exec.execute(this);
+ }
+
+ @Override
+ public void run()
+ {
+ final long until = System.currentTimeMillis() + RUNTIME;
+ long lastReport = System.currentTimeMillis();
+ long count = 0;
+ long opCount = 0;
+ while (true)
+ {
+ long now = System.currentTimeMillis();
+ if (now > until)
+ break;
+ if (now > lastReport + REPORT_INTERVAL)
+ {
+ lastReport = now;
+ logger.info(String.format("%s: Executed %d barriers with %d operations. %.0f%% complete.",
+ Thread.currentThread().getName(), count, opCount, 100 * (1 - ((until - now) / (double) RUNTIME))));
+ }
+ try
+ {
+ Thread.sleep(0, waitNanos[((int) (count & (waitNanos.length - 1)))]);
+ } catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+
+ final State s = state;
+ s.barrier = order.newBarrier();
+ s.replacement = new State();
+ s.barrier.issue();
+ s.barrier.await();
+ s.check();
+ opCount += s.totalCount();
+ state = s.replacement;
+ sched.schedule(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ s.check();
+ }
+ }, 1, TimeUnit.SECONDS);
+ count++;
+ }
+ }
+
+ class State
+ {
+
+ volatile OpOrder.Barrier barrier;
+ volatile State replacement;
+ final NonBlockingHashMap<OpOrder.Group, AtomicInteger> count = new NonBlockingHashMap<>();
+ int checkCount = -1;
+
+ boolean accept(OpOrder.Group opGroup)
+ {
+ if (barrier != null && !barrier.isAfter(opGroup))
+ return false;
+ AtomicInteger c;
+ if (null == (c = count.get(opGroup)))
+ {
+ count.putIfAbsent(opGroup, new AtomicInteger());
+ c = count.get(opGroup);
+ }
+ c.incrementAndGet();
+ return true;
+ }
+
+ int totalCount()
+ {
+ int c = 0;
+ for (AtomicInteger v : count.values())
+ c += v.intValue();
+ return c;
+ }
+
+ void check()
+ {
+ boolean delete;
+ if (checkCount >= 0)
+ {
+ if (checkCount != totalCount())
+ {
+ errors.incrementAndGet();
+ logger.error("Received size changed after barrier finished: {} vs {}", checkCount, totalCount());
+ }
+ delete = true;
+ }
+ else
+ {
+ checkCount = totalCount();
+ delete = false;
+ }
+ for (Map.Entry<OpOrder.Group, AtomicInteger> e : count.entrySet())
+ {
+ if (e.getKey().compareTo(barrier.getSyncPoint()) > 0)
+ {
+ errors.incrementAndGet();
+ logger.error("Received an operation that was created after the barrier was issued.");
+ }
+ if (TestOrdering.this.count.get(e.getKey()).intValue() != e.getValue().intValue())
+ {
+ errors.incrementAndGet();
+ logger.error("Missing registered operations. {} vs {}", TestOrdering.this.count.get(e.getKey()).intValue(), e.getValue().intValue());
+ }
+ if (delete)
+ TestOrdering.this.count.remove(e.getKey());
+ }
+ }
+
+ }
+
+ final NonBlockingHashMap<OpOrder.Group, AtomicInteger> count = new NonBlockingHashMap<>();
+
+ class Producer implements Runnable
+ {
+ public void run()
+ {
+ while (true)
+ {
+ AtomicInteger c;
+ OpOrder.Group opGroup = order.start();
+ try
+ {
+ if (null == (c = count.get(opGroup)))
+ {
+ count.putIfAbsent(opGroup, new AtomicInteger());
+ c = count.get(opGroup);
+ }
+ c.incrementAndGet();
+ State s = state;
+ while (!s.accept(opGroup))
+ s = s.replacement;
+ }
+ finally
+ {
+ opGroup.finishOne();
+ }
+ }
+ }
+ }
+
+ }
+
+ @Test
+ public void testOrdering() throws InterruptedException
+ {
+ errors.set(0);
+ Thread.setDefaultUncaughtExceptionHandler(handler);
+ final ExecutorService exec = Executors.newCachedThreadPool(new NamedThreadFactory("checker"));
+ final ScheduledExecutorService checker = Executors.newScheduledThreadPool(1, new NamedThreadFactory("checker"));
+ for (int i = 0 ; i < CONSUMERS ; i++)
+ new TestOrdering(exec, checker);
+ exec.shutdown();
+ exec.awaitTermination((long) (RUNTIME * 1.1), TimeUnit.MILLISECONDS);
+ assertTrue(exec.isShutdown());
+ assertTrue(errors.get() == 0);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java b/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java
new file mode 100644
index 0000000..e7bfe30
--- /dev/null
+++ b/test/long/org/apache/cassandra/db/LongFlushMemtableTest.java
@@ -0,0 +1,72 @@
+package org.apache.cassandra.db;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class LongFlushMemtableTest extends SchemaLoader
+{
+ @Test
+ public void testFlushMemtables() throws IOException, ConfigurationException
+ {
+ Keyspace keyspace = Keyspace.open("Keyspace1");
+ for (int i = 0; i < 100; i++)
+ {
+ CFMetaData metadata = CFMetaData.denseCFMetaData(keyspace.getName(), "_CF" + i, UTF8Type.instance);
+ MigrationManager.announceNewColumnFamily(metadata);
+ }
+
+ for (int j = 0; j < 200; j++)
+ {
+ for (int i = 0; i < 100; i++)
+ {
+ Mutation rm = new Mutation("Keyspace1", ByteBufferUtil.bytes("key" + j));
+ ColumnFamily cf = TreeMapBackedSortedColumns.factory.create("Keyspace1", "_CF" + i);
+ // don't cheat by allocating this outside of the loop; that defeats the purpose of deliberately using lots of memory
+ ByteBuffer value = ByteBuffer.allocate(100000);
+ cf.addColumn(new Cell(Util.cellname("c"), value));
+ rm.add(cf);
+ rm.applyUnsafe();
+ }
+ }
+
+ int flushes = 0;
+ for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
+ {
+ if (cfs.name.startsWith("_CF"))
+ flushes += cfs.getMemtableSwitchCount();
+ }
+ assert flushes > 0;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/test/long/org/apache/cassandra/db/MeteredFlusherTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/MeteredFlusherTest.java b/test/long/org/apache/cassandra/db/MeteredFlusherTest.java
deleted file mode 100644
index 4bab277..0000000
--- a/test/long/org/apache/cassandra/db/MeteredFlusherTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-package org.apache.cassandra.db;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.junit.Test;
-
-import org.apache.cassandra.SchemaLoader;
-import org.apache.cassandra.Util;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.service.MigrationManager;
-import org.apache.cassandra.utils.ByteBufferUtil;
-
-public class MeteredFlusherTest extends SchemaLoader
-{
- @Test
- public void testManyMemtables() throws IOException, ConfigurationException
- {
- Keyspace keyspace = Keyspace.open("Keyspace1");
- for (int i = 0; i < 100; i++)
- {
- CFMetaData metadata = CFMetaData.denseCFMetaData(keyspace.getName(), "_CF" + i, UTF8Type.instance);
- MigrationManager.announceNewColumnFamily(metadata);
- }
-
- for (int j = 0; j < 200; j++)
- {
- for (int i = 0; i < 100; i++)
- {
- Mutation rm = new Mutation("Keyspace1", ByteBufferUtil.bytes("key" + j));
- ColumnFamily cf = TreeMapBackedSortedColumns.factory.create("Keyspace1", "_CF" + i);
- // don't cheat by allocating this outside of the loop; that defeats the purpose of deliberately using lots of memory
- ByteBuffer value = ByteBuffer.allocate(100000);
- cf.addColumn(new Cell(Util.cellname("c"), value));
- rm.add(cf);
- rm.applyUnsafe();
- }
- }
-
- int flushes = 0;
- for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
- {
- if (cfs.name.startsWith("_CF"))
- flushes += cfs.getMemtableSwitchCount();
- }
- assert flushes > 0;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/test/long/org/apache/cassandra/utils/LongBTreeTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/utils/LongBTreeTest.java b/test/long/org/apache/cassandra/utils/LongBTreeTest.java
index 9f31743..b8e60de 100644
--- a/test/long/org/apache/cassandra/utils/LongBTreeTest.java
+++ b/test/long/org/apache/cassandra/utils/LongBTreeTest.java
@@ -50,7 +50,7 @@ public class LongBTreeTest
TreeSet<Integer> canon = new TreeSet<>();
for (int i = 0 ; i < 10000000 ; i++)
canon.add(i);
- Object[] btree = BTree.build(Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE), ICMP, true);
+ Object[] btree = BTree.build(Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE), ICMP, true, null);
btree = BTree.update(btree, ICMP, canon, true);
canon.add(Integer.MIN_VALUE);
canon.add(Integer.MAX_VALUE);
@@ -171,7 +171,7 @@ public class LongBTreeTest
canon.putAll(buffer);
ctxt.stop();
ctxt = BTREE_TIMER.time();
- btree = BTree.update(btree, ICMP, buffer.keySet(), true, null, null);
+ btree = BTree.update(btree, ICMP, buffer.keySet(), true, null);
ctxt.stop();
if (quickEquality)
@@ -200,7 +200,7 @@ public class LongBTreeTest
String id = String.format("[0..%d)", canon.size());
System.out.println("Testing " + id);
Futures.allAsList(testAllSlices(id, cur, canon)).get();
- cur = BTree.update(cur, ICMP, Arrays.asList(i), true, null, null);
+ cur = BTree.update(cur, ICMP, Arrays.asList(i), true, null);
canon.add(i);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
index aa6d3dd..849c30c 100644
--- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
+++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
@@ -142,7 +142,7 @@ public class CacheProviderTest extends SchemaLoader
this.string = input;
}
- public long memorySize()
+ public long unsharedHeapSize()
{
return string.length();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/test/unit/org/apache/cassandra/cache/ObjectSizeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cache/ObjectSizeTest.java b/test/unit/org/apache/cassandra/cache/ObjectSizeTest.java
deleted file mode 100644
index da34711..0000000
--- a/test/unit/org/apache/cassandra/cache/ObjectSizeTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-package org.apache.cassandra.cache;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-
-import java.nio.ByteBuffer;
-import java.util.UUID;
-
-import org.junit.Assert;
-
-import org.apache.cassandra.db.ColumnIndex;
-import org.apache.cassandra.db.DeletionTime;
-import org.apache.cassandra.db.RowIndexEntry;
-import org.apache.cassandra.utils.ObjectSizes;
-import org.github.jamm.MemoryMeter;
-import org.junit.Test;
-
-public class ObjectSizeTest
-{
- public static final MemoryMeter meter = new MemoryMeter().omitSharedBufferOverhead();
-
- @Test
- public void testArraySizes()
- {
- long size = ObjectSizes.getArraySize(0, 1);
- long size2 = meter.measureDeep(new byte[0]);
- Assert.assertEquals(size, size2);
- }
-
- @Test
- public void testBiggerArraySizes()
- {
- long size = ObjectSizes.getArraySize(0, 1);
- long size2 = meter.measureDeep(new byte[0]);
- Assert.assertEquals(size, size2);
-
- size = ObjectSizes.getArraySize(8, 1);
- size2 = meter.measureDeep(new byte[8]);
- Assert.assertEquals(size, size2);
- }
-
- @Test
- public void testKeyCacheKey()
- {
- KeyCacheKey key = new KeyCacheKey(null, null, ByteBuffer.wrap(new byte[0]));
- long size = key.memorySize();
- long size2 = meter.measureDeep(key);
- Assert.assertEquals(size, size2);
- }
-
- @Test
- public void testKeyCacheValue()
- {
- RowIndexEntry entry = new RowIndexEntry(123);
- long size = entry.memorySize();
- long size2 = meter.measureDeep(entry);
- Assert.assertEquals(size, size2);
- }
-
- @Test
- public void testKeyCacheValueWithDelInfo()
- {
- RowIndexEntry entry = RowIndexEntry.create(123, new DeletionTime(123, 123), ColumnIndex.nothing());
- long size = entry.memorySize();
- long size2 = meter.measureDeep(entry);
- Assert.assertEquals(size, size2);
- }
-
- @Test
- public void testRowCacheKey()
- {
- UUID id = UUID.randomUUID();
- RowCacheKey key = new RowCacheKey(id, ByteBuffer.wrap(new byte[11]));
- long size = key.memorySize();
- long size2 = meter.measureDeep(key) - meter.measureDeep(id);
- Assert.assertEquals(size, size2);
- }
-
- @Test
- public void testRowCacheSentinel()
- {
- RowCacheSentinel sentinel = new RowCacheSentinel(123);
- long size = sentinel.memorySize();
- long size2 = meter.measureDeep(sentinel);
- Assert.assertEquals(size, size2);
- }
-}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b54b8ac/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java b/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
index c2888bc..15d40dc 100644
--- a/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
+++ b/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
@@ -1,6 +1,6 @@
package org.apache.cassandra.concurrent;
-import org.apache.cassandra.utils.WaitQueue;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
import org.junit.*;
import java.util.concurrent.atomic.AtomicBoolean;