You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/08/21 23:23:02 UTC
[38/51] [partial] incubator-geode git commit: GEODE-77 removing the
old jgroups subproject
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FJTaskRunner.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FJTaskRunner.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FJTaskRunner.java
deleted file mode 100644
index 6d6555a..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FJTaskRunner.java
+++ /dev/null
@@ -1,979 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-/*
- File: FJTaskRunner.java
-
- Originally written by Doug Lea and released into the public domain.
- This may be used for any purposes whatsoever without acknowledgment.
- Thanks for the assistance and support of Sun Microsystems Labs,
- and everyone contributing, testing, and using this code.
-
- History:
- Date Who What
- 7Jan1999 dl First public release
- 13Jan1999 dl correct a stat counter update;
- ensure inactive status on run termination;
- misc minor cleaup
- 14Jan1999 dl Use random starting point in scan;
- variable renamings.
- 18Jan1999 dl Runloop allowed to die on task exception;
- remove useless timed join
- 22Jan1999 dl Rework scan to allow use of priorities.
- 6Feb1999 dl Documentation updates.
- 7Mar1999 dl Add array-based coInvoke
- 31Mar1999 dl Revise scan to remove need for NullTasks
- 27Apr1999 dl Renamed
- 23oct1999 dl Earlier detect of interrupt in scanWhileIdling
- 24nov1999 dl Now works on JVMs that do not properly
- implement read-after-write of 2 volatiles.
-*/
-
-package com.gemstone.org.jgroups.oswego.concurrent;
-
-
-import java.util.Random;
-
-/**
- * Specialized Thread subclass for running FJTasks.
- * <p>
- * Each FJTaskRunner keeps FJTasks in a double-ended queue (DEQ).
- * Double-ended queues support stack-based operations
- * push and pop, as well as queue-based operations put and take.
- * Normally, threads run their own tasks. But they
- * may also steal tasks from each others DEQs.
- * <p>
- * The algorithms are minor variants of those used
- * in <A href="http://supertech.lcs.mit.edu/cilk/"> Cilk</A> and
- * <A href="http://www.cs.utexas.edu/users/hood/"> Hood</A>, and
- * to a lesser extent
- * <A href="http://www.cs.uga.edu/~dkl/filaments/dist.html"> Filaments</A>,
- * but are adapted to work in Java.
- * <p>
- * The two most important capabilities are:
- * <ul>
- * <li> Fork a FJTask:
- * <pre>
- * Push task onto DEQ
- * </pre>
- * <li> Get a task to run (for example within taskYield)
- * <pre>
- * If DEQ is not empty,
- * Pop a task and run it.
- * Else if any other DEQ is not empty,
- * Take ("steal") a task from it and run it.
- * Else if the entry queue for our group is not empty,
- * Take a task from it and run it.
- * Else if current thread is otherwise idling
- * If all threads are idling
- * Wait for a task to be put on group entry queue
- * Else
- * Yield or Sleep for a while, and then retry
- * </pre>
- * </ul>
- * The push, pop, and put are designed to only ever called by the
- * current thread, and take (steal) is only ever called by
- * other threads.
- * All other operations are composites and variants of these,
- * plus a few miscellaneous bookkeeping methods.
- * <p>
- * Implementations of the underlying representations and operations
- * are geared for use on JVMs operating on multiple CPUs (although
- * they should of course work fine on single CPUs as well).
- * <p>
- * A possible snapshot of a FJTaskRunner's DEQ is:
- * <pre>
- * 0 1 2 3 4 5 6 ...
- * +-----+-----+-----+-----+-----+-----+-----+--
- * | | t | t | t | t | | | ... deq array
- * +-----+-----+-----+-----+-----+-----+-----+--
- * ^ ^
- * base top
- * (incremented (incremented
- * on take, on push
- * decremented decremented
- * on put) on pop)
- * </pre>
- * <p>
- * FJTasks are held in elements of the DEQ.
- * They are maintained in a bounded array that
- * works similarly to a circular bounded buffer. To ensure
- * visibility of stolen FJTasks across threads, the array elements
- * must be <code>volatile</code>.
- * Using volatile rather than synchronizing suffices here since
- * each task accessed by a thread is either one that it
- * created or one that has never seen before. Thus we cannot
- * encounter any staleness problems executing run methods,
- * although FJTask programmers must be still sure to either synch or use
- * volatile for shared data within their run methods.
- * <p>
- * However, since there is no way
- * to declare an array of volatiles in Java, the DEQ elements actually
- * hold VolatileTaskRef objects, each of which in turn holds a
- * volatile reference to a FJTask.
- * Even with the double-indirection overhead of
- * volatile refs, using an array for the DEQ works out
- * better than linking them since fewer shared
- * memory locations need to be
- * touched or modified by the threads while using the DEQ.
- * Further, the double indirection may alleviate cache-line
- * sharing effects (which cannot otherwise be directly dealt with in Java).
- * <p>
- * The indices for the <code>base</code> and <code>top</code> of the DEQ
- * are declared as volatile. The main contention point with
- * multiple FJTaskRunner threads occurs when one thread is trying
- * to pop its own stack while another is trying to steal from it.
- * This is handled via a specialization of Dekker's algorithm,
- * in which the popping thread pre-decrements <code>top</code>,
- * and then checks it against <code>base</code>.
- * To be conservative in the face of JVMs that only partially
- * honor the specification for volatile, the pop proceeds
- * without synchronization only if there are apparently enough
- * items for both a simultaneous pop and take to succeed.
- * It otherwise enters a
- * synchronized lock to check if the DEQ is actually empty,
- * if so failing. The stealing thread
- * does almost the opposite, but is set up to be less likely
- * to win in cases of contention: Steals always run under synchronized
- * locks in order to avoid conflicts with other ongoing steals.
- * They pre-increment <code>base</code>, and then check against
- * <code>top</code>. They back out (resetting the base index
- * and failing to steal) if the
- * DEQ is empty or is about to become empty by an ongoing pop.
- * <p>
- * A push operation can normally run concurrently with a steal.
- * A push enters a synch lock only if the DEQ appears full so must
- * either be resized or have indices adjusted due to wrap-around
- * of the bounded DEQ. The put operation always requires synchronization.
- * <p>
- * When a FJTaskRunner thread has no tasks of its own to run,
- * it tries to be a good citizen.
- * Threads run at lower priority while scanning for work.
- * <p>
- * If the task is currently waiting
- * via yield, the thread alternates scans (starting at a randomly
- * chosen victim) with Thread.yields. This is
- * well-behaved so long as the JVM handles Thread.yield in a
- * sensible fashion. (It need not. Thread.yield is so underspecified
- * that it is legal for a JVM to treat it as a no-op.) This also
- * keeps things well-behaved even if we are running on a uniprocessor
- * JVM using a simple cooperative threading model.
- * <p>
- * If a thread needing work is
- * is otherwise idle (which occurs only in the main runloop), and
- * there are no available tasks to steal or poll, it
- * instead enters into a sleep-based (actually timed wait(msec))
- * phase in which it progressively sleeps for longer durations
- * (up to a maximum of FJTaskRunnerGroup.MAX_SLEEP_TIME,
- * currently 100ms) between scans.
- * If all threads in the group
- * are idling, they further progress to a hard wait phase, suspending
- * until a new task is entered into the FJTaskRunnerGroup entry queue.
- * A sleeping FJTaskRunner thread may be awakened by a new
- * task being put into the group entry queue or by another FJTaskRunner
- * becoming active, but not merely by some DEQ becoming non-empty.
- * Thus the MAX_SLEEP_TIME provides a bound for sleep durations
- * in cases where all but one worker thread start sleeping
- * even though there will eventually be work produced
- * by a thread that is taking a long time to place tasks in DEQ.
- * These sleep mechanics are handled in the FJTaskRunnerGroup class.
- * <p>
- * Composite operations such as taskJoin include heavy
- * manual inlining of the most time-critical operations
- * (mainly FJTask.invoke).
- * This opens up a few opportunities for further hand-optimizations.
- * Until Java compilers get a lot smarter, these tweaks
- * improve performance significantly enough for task-intensive
- * programs to be worth the poorer maintainability and code duplication.
- * <p>
- * Because they are so fragile and performance-sensitive, nearly
- * all methods are declared as final. However, nearly all fields
- * and methods are also declared as protected, so it is possible,
- * with much care, to extend functionality in subclasses. (Normally
- * you would also need to subclass FJTaskRunnerGroup.)
- * <p>
- * None of the normal java.lang.Thread class methods should ever be called
- * on FJTaskRunners. For this reason, it might have been nicer to
- * declare FJTaskRunner as a Runnable to run within a Thread. However,
- * this would have complicated many minor logistics. And since
- * no FJTaskRunner methods should normally be called from outside the
- * FJTask and FJTaskRunnerGroup classes either, this decision doesn't impact
- * usage.
- * <p>
- * You might think that layering this kind of framework on top of
- * Java threads, which are already several levels removed from raw CPU
- * scheduling on most systems, would lead to very poor performance.
- * But on the platforms
- * tested, the performance is quite good.
- * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
- * @see FJTask
- * @see FJTaskRunnerGroup
- **/
-
-public class FJTaskRunner extends Thread {
-
- /** The group of which this FJTaskRunner is a member **/
- protected final FJTaskRunnerGroup group;
-
- /**
- * Constructor called only during FJTaskRunnerGroup initialization
- **/
-
- protected FJTaskRunner(FJTaskRunnerGroup g) {
- group = g;
- victimRNG = new Random(System.identityHashCode(this));
- runPriority = getPriority();
- setDaemon(true);
- }
-
- /**
- * Return the FJTaskRunnerGroup of which this thread is a member
- **/
-
- protected final FJTaskRunnerGroup getGroup() { return group; }
-
-
- /* ------------ DEQ Representation ------------------- */
-
-
- /**
- * FJTasks are held in an array-based DEQ with INITIAL_CAPACITY
- * elements. The DEQ is grown if necessary, but default value is
- * normally much more than sufficient unless there are
- * user programming errors or questionable operations generating
- * large numbers of Tasks without running them.
- * Capacities must be a power of two.
- **/
-
- protected static final int INITIAL_CAPACITY = 4096;
-
- /**
- * The maximum supported DEQ capacity.
- * When exceeded, FJTaskRunner operations throw Errors
- **/
-
- protected static final int MAX_CAPACITY = 1 << 30;
-
- /**
- * An object holding a single volatile reference to a FJTask.
- **/
-
- protected final static class VolatileTaskRef {
- /** The reference **/
- protected volatile FJTask ref;
-
- /** Set the reference **/
- protected final void put(FJTask r) { ref = r; }
- /** Return the reference **/
- protected final FJTask get() { return ref; }
- /** Return the reference and clear it **/
- protected final FJTask take() { FJTask r = ref; ref = null; return r; }
-
- /**
- * Initialization utility for constructing arrays.
- * Make an array of given capacity and fill it with
- * VolatileTaskRefs.
- **/
- protected static VolatileTaskRef[] newArray(int cap) {
- VolatileTaskRef[] a = new VolatileTaskRef[cap];
- for (int k = 0; k < cap; k++) a[k] = new VolatileTaskRef();
- return a;
- }
-
- }
-
- /**
- * The DEQ array.
- **/
-
- protected VolatileTaskRef[] deq = VolatileTaskRef.newArray(INITIAL_CAPACITY);
-
- /** Current size of the task DEQ **/
- protected int deqSize() { return deq.length; }
-
- /**
- * Current top of DEQ. Generally acts just like a stack pointer in an
- * array-based stack, except that it circularly wraps around the
- * array, as in an array-based queue. The value is NOT
- * always kept within <code>0 ... deq.length</code> though.
- * The current top element is always at <code>top & (deq.length-1)</code>.
- * To avoid integer overflow, top is reset down
- * within bounds whenever it is noticed to be out out bounds;
- * at worst when it is at <code>2 * deq.length</code>.
- **/
- protected volatile int top = 0;
-
-
- /**
- * Current base of DEQ. Acts like a take-pointer in an
- * array-based bounded queue. Same bounds and usage as top.
- **/
-
- protected volatile int base = 0;
-
-
- /**
- * An extra object to synchronize on in order to
- * achieve a memory barrier.
- **/
-
- protected final Object barrier = new Object();
-
- /* ------------ Other BookKeeping ------------------- */
-
- /**
- * Record whether current thread may be processing a task
- * (i.e., has been started and is not in an idle wait).
- * Accessed, under synch, ONLY by FJTaskRunnerGroup, but the field is
- * stored here for simplicity.
- **/
-
- protected boolean active = false;
-
- /** Random starting point generator for scan() **/
- protected final Random victimRNG;
-
-
- /** Priority to use while scanning for work **/
- protected int scanPriority = FJTaskRunnerGroup.DEFAULT_SCAN_PRIORITY;
-
- /** Priority to use while running tasks **/
- protected int runPriority;
-
- /**
- * Set the priority to use while scanning.
- * We do not bother synchronizing access, since
- * by the time the value is needed, both this FJTaskRunner
- * and its FJTaskRunnerGroup will
- * necessarily have performed enough synchronization
- * to avoid staleness problems of any consequence.
- **/
- protected void setScanPriority(int pri) { scanPriority = pri; }
-
-
- /**
- * Set the priority to use while running tasks.
- * Same usage and rationale as setScanPriority.
- **/
- protected void setRunPriority(int pri) { runPriority = pri; }
-
- /**
- * Compile-time constant for statistics gathering.
- * Even when set, reported values may not be accurate
- * since all are read and written without synchronization.
- **/
-
-
-
- static final boolean COLLECT_STATS = true;
- // static final boolean COLLECT_STATS = false;
-
-
- // for stat collection
-
- /** Total number of tasks run **/
- protected int runs = 0;
-
- /** Total number of queues scanned for work **/
- protected int scans = 0;
-
- /** Total number of tasks obtained via scan **/
- protected int steals = 0;
-
-
-
-
- /* ------------ DEQ operations ------------------- */
-
-
- /**
- * Push a task onto DEQ.
- * Called ONLY by current thread.
- **/
-
- protected final void push(final FJTask r) {
- int t = top;
-
- /*
- This test catches both overflows and index wraps. It doesn't
- really matter if base value is in the midst of changing in take.
- As long as deq length is < 2^30, we are guaranteed to catch wrap in
- time since base can only be incremented at most length times
- between pushes (or puts).
- */
-
- if (t < (base & (deq.length-1)) + deq.length) {
-
- deq[t & (deq.length-1)].put(r);
- top = t + 1;
- }
-
- else // isolate slow case to increase chances push is inlined
- slowPush(r); // check overflow and retry
- }
-
-
- /**
- * Handle slow case for push
- **/
-
- protected synchronized void slowPush(final FJTask r) {
- checkOverflow();
- push(r); // just recurse -- this one is sure to succeed.
- }
-
-
- /**
- * Enqueue task at base of DEQ.
- * Called ONLY by current thread.
- * This method is currently not called from class FJTask. It could be used
- * as a faster way to do FJTask.start, but most users would
- * find the semantics too confusing and unpredictable.
- **/
-
- protected final synchronized void put(final FJTask r) {
- for (;;) {
- int b = base - 1;
- if (top < b + deq.length) {
-
- int newBase = b & (deq.length-1);
- deq[newBase].put(r);
- base = newBase;
-
- if (b != newBase) { // Adjust for index underflow
- int newTop = top & (deq.length-1);
- if (newTop < newBase) newTop += deq.length;
- top = newTop;
- }
- return;
- }
- else {
- checkOverflow();
- // ... and retry
- }
- }
- }
-
- /**
- * Return a popped task, or null if DEQ is empty.
- * Called ONLY by current thread.
- * <p>
- * This is not usually called directly but is
- * instead inlined in callers. This version differs from the
- * cilk algorithm in that pop does not fully back down and
- * retry in the case of potential conflict with take. It simply
- * rechecks under synch lock. This gives a preference
- * for threads to run their own tasks, which seems to
- * reduce flailing a bit when there are few tasks to run.
- **/
-
- protected final FJTask pop() {
- /*
- Decrement top, to force a contending take to back down.
- */
-
- int t = --top;
-
- /*
- To avoid problems with JVMs that do not properly implement
- read-after-write of a pair of volatiles, we conservatively
- grab without lock only if the DEQ appears to have at least two
- elements, thus guaranteeing that both a pop and take will succeed,
- even if the pre-increment in take is not seen by current thread.
- Otherwise we recheck under synch.
- */
-
- if (base + 1 < t)
- return deq[t & (deq.length-1)].take();
- else
- return confirmPop(t);
-
- }
-
-
- /**
- * Check under synch lock if DEQ is really empty when doing pop.
- * Return task if not empty, else null.
- **/
-
- protected final synchronized FJTask confirmPop(int provisionalTop) {
- if (base <= provisionalTop)
- return deq[provisionalTop & (deq.length-1)].take();
- else { // was empty
- /*
- Reset DEQ indices to zero whenever it is empty.
- This both avoids unnecessary calls to checkOverflow
- in push, and helps keep the DEQ from accumulating garbage
- */
-
- top = base = 0;
- return null;
- }
- }
-
-
- /**
- * Take a task from the base of the DEQ.
- * Always called by other threads via scan()
- **/
-
-
- protected final synchronized FJTask take() {
-
- /*
- Increment base in order to suppress a contending pop
- */
-
- int b = base++;
-
- if (b < top)
- return confirmTake(b);
- else {
- // back out
- base = b;
- return null;
- }
- }
-
-
- /**
- * double-check a potential take
- **/
-
- protected FJTask confirmTake(int oldBase) {
-
- /*
- Use a second (guaranteed uncontended) synch
- to serve as a barrier in case JVM does not
- properly process read-after-write of 2 volatiles
- */
-
- synchronized(barrier) {
- if (oldBase < top) {
- /*
- We cannot call deq[oldBase].take here because of possible races when
- nulling out versus concurrent push operations. Resulting
- accumulated garbage is swept out periodically in
- checkOverflow, or more typically, just by keeping indices
- zero-based when found to be empty in pop, which keeps active
- region small and constantly overwritten.
- */
-
- return deq[oldBase & (deq.length-1)].get();
- }
- else {
- base = oldBase;
- return null;
- }
- }
- }
-
-
- /**
- * Adjust top and base, and grow DEQ if necessary.
- * Called only while DEQ synch lock being held.
- * We don't expect this to be called very often. In most
- * programs using FJTasks, it is never called.
- **/
-
- protected void checkOverflow() {
- int t = top;
- int b = base;
-
- if (t - b < deq.length-1) { // check if just need an index reset
-
- int newBase = b & (deq.length-1);
- int newTop = top & (deq.length-1);
- if (newTop < newBase) newTop += deq.length;
- top = newTop;
- base = newBase;
-
- /*
- Null out refs to stolen tasks.
- This is the only time we can safely do it.
- */
-
- int i = newBase;
- while (i != newTop && deq[i].ref != null) {
- deq[i].ref = null;
- i = (i - 1) & (deq.length-1);
- }
-
- }
- else { // grow by doubling array
-
- int newTop = t - b;
- int oldcap = deq.length;
- int newcap = oldcap * 2;
-
- if (newcap >= MAX_CAPACITY)
- throw new Error("FJTask queue maximum capacity exceeded");
-
- VolatileTaskRef[] newdeq = new VolatileTaskRef[newcap];
-
- // copy in bottom half of new deq with refs from old deq
- for (int j = 0; j < oldcap; ++j) newdeq[j] = deq[b++ & (oldcap-1)];
-
- // fill top half of new deq with new refs
- for (int j = oldcap; j < newcap; ++j) newdeq[j] = new VolatileTaskRef();
-
- deq = newdeq;
- base = 0;
- top = newTop;
- }
- }
-
-
- /* ------------ Scheduling ------------------- */
-
-
- /**
- * Do all but the pop() part of yield or join, by
- * traversing all DEQs in our group looking for a task to
- * steal. If none, it checks the entry queue.
- * <p>
- * Since there are no good, portable alternatives,
- * we rely here on a mixture of Thread.yield and priorities
- * to reduce wasted spinning, even though these are
- * not well defined. We are hoping here that the JVM
- * does something sensible.
- * @param waitingFor if non-null, the current task being joined
- **/
-
- protected void scan(final FJTask waitingFor) {
-
- FJTask task = null;
-
- // to delay lowering priority until first failure to steal
- boolean lowered = false;
-
- /*
- Circularly traverse from a random start index.
-
- This differs slightly from cilk version that uses a random index
- for each attempted steal.
- Exhaustive scanning might impede analytic tractablity of
- the scheduling policy, but makes it much easier to deal with
- startup and shutdown.
- */
-
- FJTaskRunner[] ts = group.getArray();
- int idx = victimRNG.nextInt(ts.length);
-
- for (int i = 0; i < ts.length; ++i) {
-
- FJTaskRunner t = ts[idx];
- if (++idx >= ts.length) idx = 0; // circularly traverse
-
- if (t != null && t != this) {
-
- if (waitingFor != null && waitingFor.isDone()) {
- break;
- }
- else {
- if (COLLECT_STATS) ++scans;
- task = t.take();
- if (task != null) {
- if (COLLECT_STATS) ++steals;
- break;
- }
- else if (isInterrupted()) {
- break;
- }
- else if (!lowered) { // if this is first fail, lower priority
- lowered = true;
- setPriority(scanPriority);
- }
- else { // otherwise we are at low priority; just yield
- yield();
- }
- }
- }
-
- }
-
- if (task == null) {
- if (COLLECT_STATS) ++scans;
- task = group.pollEntryQueue();
- if (COLLECT_STATS) if (task != null) ++steals;
- }
-
- if (lowered) setPriority(runPriority);
-
- if (task != null && !task.isDone()) {
- if (COLLECT_STATS) ++runs;
- task.run();
- task.setDone();
- }
-
- }
-
- /**
- * Same as scan, but called when current thread is idling.
- * It repeatedly scans other threads for tasks,
- * sleeping while none are available.
- * <p>
- * This differs from scan mainly in that
- * since there is no reason to return to recheck any
- * condition, we iterate until a task is found, backing
- * off via sleeps if necessary.
- **/
-
- protected void scanWhileIdling() {
- FJTask task = null;
-
- boolean lowered = false;
- long iters = 0;
-
- FJTaskRunner[] ts = group.getArray();
- int idx = victimRNG.nextInt(ts.length);
-
- do {
- for (int i = 0; i < ts.length; ++i) {
-
- FJTaskRunner t = ts[idx];
- if (++idx >= ts.length) idx = 0; // circularly traverse
-
- if (t != null && t != this) {
- if (COLLECT_STATS) ++scans;
-
- task = t.take();
- if (task != null) {
- if (COLLECT_STATS) ++steals;
- if (lowered) setPriority(runPriority);
- group.setActive(this);
- break;
- }
- }
- }
-
- if (task == null) {
- if (isInterrupted())
- return;
-
- if (COLLECT_STATS) ++scans;
- task = group.pollEntryQueue();
-
- if (task != null) {
- if (COLLECT_STATS) ++steals;
- if (lowered) setPriority(runPriority);
- group.setActive(this);
- }
- else {
- ++iters;
- // Check here for yield vs sleep to avoid entering group synch lock
- if (iters >= FJTaskRunnerGroup/*group GemStoneModification*/.SCANS_PER_SLEEP) {
- group.checkActive(this, iters);
- if (isInterrupted())
- return;
- }
- else if (!lowered) {
- lowered = true;
- setPriority(scanPriority);
- }
- else {
- yield();
- }
- }
- }
- } while (task == null);
-
-
- if (!task.isDone()) {
- if (COLLECT_STATS) ++runs;
- task.run();
- task.setDone();
- }
-
- }
-
- /* ------------ composite operations ------------------- */
-
-
- /**
- * Main runloop
- **/
-
- @Override // GemStoneAddition
- public void run() {
- try{
- while (!interrupted()) {
- FJTask task = pop();
- if (task != null) {
- if (!task.isDone()) {
- // inline FJTask.invoke
- if (COLLECT_STATS) ++runs;
- task.run();
- task.setDone();
- }
- }
- else
- scanWhileIdling();
- }
- }
- finally {
- group.setInactive(this);
- }
- }
-
- /**
- * Execute a task in this thread. Generally called when current task
- * cannot otherwise continue.
- **/
-
-
- protected final void taskYield() {
- FJTask task = pop();
- if (task != null) {
- if (!task.isDone()) {
- if (COLLECT_STATS) ++runs;
- task.run();
- task.setDone();
- }
- }
- else
- scan(null);
- }
-
-
- /**
- * Process tasks until w is done.
- * Equivalent to <code>while(!w.isDone()) taskYield(); </code>
- **/
-
- protected final void taskJoin(final FJTask w) {
-
- while (!w.isDone()) {
-
- FJTask task = pop();
- if (task != null) {
- if (!task.isDone()) {
- if (COLLECT_STATS) ++runs;
- task.run();
- task.setDone();
- if (task == w) return; // fast exit if we just ran w
- }
- }
- else
- scan(w);
- }
- }
-
- /**
- * A specialized expansion of
- * <code> w.fork(); invoke(v); w.join(); </code>
- **/
-
-
- protected final void coInvoke(final FJTask w, final FJTask v) {
-
- // inline push
-
- int t = top;
- if (t < (base & (deq.length-1)) + deq.length) {
-
- deq[t & (deq.length-1)].put(w);
- top = t + 1;
-
- // inline invoke
-
- if (!v.isDone()) {
- if (COLLECT_STATS) ++runs;
- v.run();
- v.setDone();
- }
-
- // inline taskJoin
-
- while (!w.isDone()) {
- FJTask task = pop();
- if (task != null) {
- if (!task.isDone()) {
- if (COLLECT_STATS) ++runs;
- task.run();
- task.setDone();
- if (task == w) return; // fast exit if we just ran w
- }
- }
- else
- scan(w);
- }
- }
-
- else // handle non-inlinable cases
- slowCoInvoke(w, v);
- }
-
-
- /**
- * Backup to handle noninlinable cases of coInvoke
- **/
-
- protected void slowCoInvoke(final FJTask w, final FJTask v) {
- push(w); // let push deal with overflow
- FJTask.invoke(v);
- taskJoin(w);
- }
-
-
- /**
- * Array-based version of coInvoke
- **/
-
- protected final void coInvoke(FJTask[] tasks) {
- int nforks = tasks.length - 1;
-
- // inline bulk push of all but one task
-
- int t = top;
-
- if (nforks >= 0 && t + nforks < (base & (deq.length-1)) + deq.length) {
- for (int i = 0; i < nforks; ++i) {
- deq[t++ & (deq.length-1)].put(tasks[i]);
- top = t;
- }
-
- // inline invoke of one task
- FJTask v = tasks[nforks];
- if (!v.isDone()) {
- if (COLLECT_STATS) ++runs;
- v.run();
- v.setDone();
- }
-
- // inline taskJoins
-
- for (int i = 0; i < nforks; ++i) {
- FJTask w = tasks[i];
- while (!w.isDone()) {
-
- FJTask task = pop();
- if (task != null) {
- if (!task.isDone()) {
- if (COLLECT_STATS) ++runs;
- task.run();
- task.setDone();
- }
- }
- else
- scan(w);
- }
- }
- }
-
- else // handle non-inlinable cases
- slowCoInvoke(tasks);
- }
-
- /**
- * Backup to handle atypical or noninlinable cases of coInvoke
- **/
-
- protected void slowCoInvoke(FJTask[] tasks) {
- for (int i = 0; i < tasks.length; ++i) push(tasks[i]);
- for (int i = 0; i < tasks.length; ++i) taskJoin(tasks[i]);
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FJTaskRunnerGroup.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FJTaskRunnerGroup.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FJTaskRunnerGroup.java
deleted file mode 100644
index 3ac2872..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FJTaskRunnerGroup.java
+++ /dev/null
@@ -1,625 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-/*
- File: FJTaskRunnerGroup.java
-
- Originally written by Doug Lea and released into the public domain.
- This may be used for any purposes whatsoever without acknowledgment.
- Thanks for the assistance and support of Sun Microsystems Labs,
- and everyone contributing, testing, and using this code.
-
- History:
- Date Who What
- 7Jan1999 dl First public release
- 12Jan1999 dl made getActiveCount public; misc minor cleanup.
- 14Jan1999 dl Added executeTask
- 20Jan1999 dl Allow use of priorities; reformat stats
- 6Feb1999 dl Lazy thread starts
- 27Apr1999 dl Renamed
-*/
-
-package com.gemstone.org.jgroups.oswego.concurrent;
-
-/**
- * A stripped down analog of a ThreadGroup used for
- * establishing and managing FJTaskRunner threads.
- * ThreadRunnerGroups serve as the control boundary separating
- * the general world of normal threads from the specialized world
- * of FJTasks.
- * <p>
- * By intent, this class does not subclass java.lang.ThreadGroup, and
- * does not support most methods found in ThreadGroups, since they
- * would make no sense for FJTaskRunner threads. In fact, the class
- * does not deal with ThreadGroups at all. If you want to restrict
- * a FJTaskRunnerGroup to a particular ThreadGroup, you can create
- * it from within that ThreadGroup.
- * <p>
- * The main contextual parameter for a FJTaskRunnerGroup is
- * the group size, established in the constructor.
- * Groups must be of a fixed size.
- * There is no way to dynamically increase or decrease the number
- * of threads in an existing group.
- * <p>
- * In general, the group size should be equal to the number
- * of CPUs on the system. (Unfortunately, there is no portable
- * means of automatically detecting the number of CPUs on a JVM, so there is
- * no good way to automate defaults.) In principle, when
- * FJTasks are used for computation-intensive tasks, having only
- * as many threads as CPUs should minimize bookkeeping overhead
- * and contention, and so maximize throughput. However, because
- * FJTaskRunners lie atop Java threads, and in turn operating system
- * thread support and scheduling policies,
- * it is very possible that using more threads
- * than CPUs will improve overall throughput even though it adds
- * to overhead. This will always be so if FJTasks are I/O bound.
- * So it may pay to experiment a bit when tuning on particular platforms.
- * You can also use <code>setRunPriorities</code> to either
- * increase or decrease the priorities of active threads, which
- * may interact with group size choice.
- * <p>
- * In any case, overestimating group sizes never
- * seriously degrades performance (at least within reasonable bounds).
- * You can also use a value
- * less than the number of CPUs in order to reserve processing
- * for unrelated threads.
- * <p>
- * There are two general styles for using a FJTaskRunnerGroup.
- * You can create one group per entire program execution, for example
- * as a static singleton, and use it for all parallel tasks:
- * <pre>
- * class Tasks {
- * static FJTaskRunnerGroup group;
- * public void initialize(int groupsize) {
- * group = new FJTaskRunnerGroup(groupSize);
- * }
- * // ...
- * }
- * </pre>
- * Alternatively, you can make new groups on the fly and use them only for
- * particular task sets. This is more flexible,,
- * and leads to more controllable and deterministic execution patterns,
- * but it encounters greater overhead on startup. Also, to reclaim
- * system resources, you should
- * call <code>FJTaskRunnerGroup.interruptAll</code> when you are done
- * using one-shot groups. Otherwise, because FJTaskRunners set
- * <code>Thread.isDaemon</code>
- * status, they will not normally be reclaimed until program termination.
- * <p>
- * The main supported methods are <code>execute</code>,
- * which starts a task processed by FJTaskRunner threads,
- * and <code>invoke</code>, which starts one and waits for completion.
- * For example, you might extend the above <code>FJTasks</code>
- * class to support a task-based computation, say, the
- * <code>Fib</code> class from the <code>FJTask</code> documentation:
- * <pre>
- * class Tasks { // continued
- * // ...
- * static int fib(int n) {
- * try {
- * Fib f = new Fib(n);
- * group.invoke(f);
- * return f.getAnswer();
- * }
- * catch (InterruptedException ex) {
- * throw new Error("Interrupted during computation");
- * }
- * }
- * }
- * </pre>
- * <p>
- * Method <code>stats()</code> can be used to monitor performance.
- * Both FJTaskRunnerGroup and FJTaskRunner may be compiled with
- * the compile-time constant COLLECT_STATS set to false. In this
- * case, various simple counts reported in stats() are not collected.
- * On platforms tested,
- * this leads to such a tiny performance improvement that there is
- * very little motivation to bother.
- *
- * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
- * <p>
- * @see FJTask
- * @see FJTaskRunner
- **/
-
-public class FJTaskRunnerGroup implements Executor {
-
- /** The threads in this group **/
- protected final FJTaskRunner[] threads;
-
- /** Group-wide queue for tasks entered via execute() **/
- protected final LinkedQueue entryQueue = new LinkedQueue();
-
- /** Number of threads that are not waiting for work **/
- protected int activeCount = 0;
-
- /** Number of threads that have been started. Used to avoid
- unecessary contention during startup of task sets.
- **/
- protected int nstarted = 0;
-
- /**
- * Compile-time constant. If true, various counts of
- * runs, waits, etc., are maintained. These are NOT
- * updated with synchronization, so statistics reports
- * might not be accurate.
- **/
-
- static final boolean COLLECT_STATS = true;
- // static final boolean COLLECT_STATS = false;
-
- // for stats
-
- /** The time at which this ThreadRunnerGroup was constructed **/
- long initTime = 0;
-
- /** Total number of executes or invokes **/
- int entries = 0;
-
- static final int DEFAULT_SCAN_PRIORITY = Thread.MIN_PRIORITY+1;
-
- /**
- * Create a FJTaskRunnerGroup with the indicated number
- * of FJTaskRunner threads. Normally, the best size to use is
- * the number of CPUs on the system.
- * <p>
- * The threads in a FJTaskRunnerGroup are created with their
- * isDaemon status set, so do not normally need to be
- * shut down manually upon program termination.
- **/
-
- public FJTaskRunnerGroup(int groupSize) {
- threads = new FJTaskRunner[groupSize];
- initializeThreads();
- initTime = System.currentTimeMillis();
- }
-
- /**
- * Arrange for execution of the given task
- * by placing it in a work queue. If the argument
- * is not of type FJTask, it is embedded in a FJTask via
- * <code>FJTask.Wrap</code>.
- * @exception InterruptedException if current Thread is
- * currently interrupted
- **/
-
- public void execute(Runnable r) throws InterruptedException {
-// if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition not necessary checked in put
- if (r instanceof FJTask) {
- entryQueue.put(/*(FJTask) GemStoneAddition*/r);
- }
- else {
- entryQueue.put(new FJTask.Wrap(r));
- }
- signalNewTask();
- }
-
-
- /**
- * Specialized form of execute called only from within FJTasks
- **/
- public void executeTask(FJTask t) {
- try {
- entryQueue.put(t);
- signalNewTask();
- }
- catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- }
-
-
- /**
- * Start a task and wait it out. Returns when the task completes.
- * @exception InterruptedException if current Thread is
- * interrupted before completion of the task.
- **/
-
- public void invoke(Runnable r) throws InterruptedException {
-// if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition not necessary checked in put
- InvokableFJTask w = new InvokableFJTask(r);
- entryQueue.put(w);
- signalNewTask();
- w.awaitTermination();
- }
-
-
- /**
- * Try to shut down all FJTaskRunner threads in this group
- * by interrupting them all. This method is designed
- * to be used during cleanup when it is somehow known
- * that all threads are idle.
- * FJTaskRunners only
- * check for interruption when they are not otherwise
- * processing a task (and its generated subtasks,
- * if any), so if any threads are active, shutdown may
- * take a while, and may lead to unpredictable
- * task processing.
- **/
-
- public void interruptAll() {
- // paranoically interrupt current thread last if in group.
- Thread current = Thread.currentThread();
- boolean stopCurrent = false;
-
- for (int i = 0; i < threads.length; ++i) {
- Thread t = threads[i];
- if (t == current)
- stopCurrent = true;
- else
- t.interrupt();
- }
- if (stopCurrent)
- current.interrupt();
- }
-
-
- /**
- * Set the priority to use while a FJTaskRunner is
- * polling for new tasks to perform. Default
- * is currently Thread.MIN_PRIORITY+1. The value
- * set may not go into effect immediately, but
- * will be used at least the next time a thread scans for work.
- **/
- public synchronized void setScanPriorities(int pri) {
- for (int i = 0; i < threads.length; ++i) {
- FJTaskRunner t = threads[i];
- t.setScanPriority(pri);
- if (!t.active) t.setPriority(pri);
- }
- }
-
-
- /**
- * Set the priority to use while a FJTaskRunner is
- * actively running tasks. Default
- * is the priority that was in effect by the thread that
- * constructed this FJTaskRunnerGroup. Setting this value
- * while threads are running may momentarily result in
- * them running at this priority even when idly waiting for work.
- **/
- public synchronized void setRunPriorities(int pri) {
- for (int i = 0; i < threads.length; ++i) {
- FJTaskRunner t = threads[i];
- t.setRunPriority(pri);
- if (t.active) t.setPriority(pri);
- }
- }
-
-
-
- /** Return the number of FJTaskRunner threads in this group **/
-
- public int size() { return threads.length; }
-
-
- /**
- * Return the number of threads that are not idly waiting for work.
- * Beware that even active threads might not be doing any useful
- * work, but just spinning waiting for other dependent tasks.
- * Also, since this is just a snapshot value, some tasks
- * may be in the process of becoming idle.
- **/
- public synchronized int getActiveCount() { return activeCount; }
-
- /**
- * Prints various snapshot statistics to System.out.
- * <ul>
- * <li> For each FJTaskRunner thread (labeled as T<em>n</em>, for
- * <em>n</em> from zero to group size - 1):
- * <ul>
- * <li> A star "*" is printed if the thread is currently active;
- * that is, not sleeping while waiting for work. Because
- * threads gradually enter sleep modes, an active thread
- * may in fact be about to sleep (or wake up).
- * <li> <em>Q Cap</em> The current capacity of its task queue.
- * <li> <em>Run</em> The total number of tasks that have been run.
- * <li> <em>New</em> The number of these tasks that were
- * taken from either the entry queue or from other
- * thread queues; that is, the number of tasks run
- * that were <em>not</em> forked by the thread itself.
- * <li> <em>Scan</em> The number of times other task
- * queues or the entry queue were polled for tasks.
- * </ul>
- * <li> <em>Execute</em> The total number of tasks entered
- * (but not necessarily yet run) via execute or invoke.
- * <li> <em>Time</em> Time in seconds since construction of this
- * FJTaskRunnerGroup.
- * <li> <em>Rate</em> The total number of tasks processed
- * per second across all threads. This
- * may be useful as a simple throughput indicator
- * if all processed tasks take approximately the
- * same time to run.
- * </ul>
- * <p>
- * Cautions: Some statistics are updated and gathered
- * without synchronization,
- * so may not be accurate. However, reported counts may be considered
- * as lower bounds of actual values.
- * Some values may be zero if classes are compiled
- * with COLLECT_STATS set to false. (FJTaskRunner and FJTaskRunnerGroup
- * classes can be independently compiled with different values of
- * COLLECT_STATS.) Also, the counts are maintained as ints so could
- * overflow in exceptionally long-lived applications.
- * <p>
- * These statistics can be useful when tuning algorithms or diagnosing
- * problems. For example:
- * <ul>
- * <li> High numbers of scans may mean that there is insufficient
- * parallelism to keep threads busy. However, high scan rates
- * are expected if the number
- * of Executes is also high or there is a lot of global
- * synchronization in the application, and the system is not otherwise
- * busy. Threads may scan
- * for work hundreds of times upon startup, shutdown, and
- * global synch points of task sets.
- * <li> Large imbalances in tasks run across different threads might
- * just reflect contention with unrelated threads on a system
- * (possibly including JVM threads such as GC), but may also
- * indicate some systematic bias in how you generate tasks.
- * <li> Large task queue capacities may mean that too many tasks are being
- * generated before they can be run.
- * Capacities are reported rather than current numbers of tasks
- * in queues because they are better indicators of the existence
- * of these kinds of possibly-transient problems.
- * Queue capacities are
- * resized on demand from their initial value of 4096 elements,
- * which is much more than sufficient for the kinds of
- * applications that this framework is intended to best support.
- * </ul>
- **/
-
- public void stats() {
- long time = System.currentTimeMillis() - initTime;
- double secs = (/*(double) GemStoneAddition */time) / 1000.0;
- long totalRuns = 0;
- long totalScans = 0;
- long totalSteals = 0;
-
- System.out.print("Thread" +
- "\tQ Cap" +
- "\tScans" +
- "\tNew" +
- "\tRuns" +
- "\n");
-
- for (int i = 0; i < threads.length; ++i) {
- FJTaskRunner t = threads[i];
- int truns = t.runs;
- totalRuns += truns;
-
- int tscans = t.scans;
- totalScans += tscans;
-
- int tsteals = t.steals;
- totalSteals += tsteals;
-
- String star = (getActive(t))? "*" : " ";
-
-
- System.out.print("T" + i + star +
- "\t" + t.deqSize() +
- "\t" + tscans +
- "\t" + tsteals +
- "\t" + truns +
- "\n");
- }
-
- System.out.print("Total" +
- "\t " +
- "\t" + totalScans +
- "\t" + totalSteals +
- "\t" + totalRuns +
- "\n");
-
- System.out.print("Execute: " + entries);
-
- System.out.print("\tTime: " + secs);
-
- long rps = 0;
- if (secs != 0) rps = Math.round(/*(double) GemStoneAddition */(totalRuns) / secs);
-
- System.out.println("\tRate: " + rps);
- }
-
-
- /* ------------ Methods called only by FJTaskRunners ------------- */
-
-
- /**
- * Return the array of threads in this group.
- * Called only by FJTaskRunner.scan().
- **/
-
- protected FJTaskRunner[] getArray() { return threads; }
-
-
- /**
- * Return a task from entry queue, or null if empty.
- * Called only by FJTaskRunner.scan().
- **/
-
- protected FJTask pollEntryQueue() {
- try {
- FJTask t = (FJTask)(entryQueue.poll(0));
- return t;
- }
- catch(InterruptedException ex) { // ignore interrupts
- Thread.currentThread().interrupt();
- return null;
- }
- }
-
-
- /**
- * Return active status of t.
- * Per-thread active status can only be accessed and
- * modified via synchronized method here in the group class.
- **/
-
- protected synchronized boolean getActive(FJTaskRunner t) {
- return t.active;
- }
-
-
- /**
- * Set active status of thread t to true, and notify others
- * that might be waiting for work.
- **/
-
- protected synchronized void setActive(FJTaskRunner t) {
- if (!t.active) {
- t.active = true;
- ++activeCount;
- if (nstarted < threads.length)
- threads[nstarted++].start();
- else
- notifyAll();
- }
- }
-
- /**
- * Set active status of thread t to false.
- **/
-
- protected synchronized void setInactive(FJTaskRunner t) {
- if (t.active) {
- t.active = false;
- --activeCount;
- }
- }
-
- /**
- * The number of times to scan other threads for tasks
- * before transitioning to a mode where scans are
- * interleaved with sleeps (actually timed waits).
- * Upon transition, sleeps are for duration of
- * scans / SCANS_PER_SLEEP milliseconds.
- * <p>
- * This is not treated as a user-tunable parameter because
- * good values do not appear to vary much across JVMs or
- * applications. Its main role is to help avoid some
- * useless spinning and contention during task startup.
- **/
- static final long SCANS_PER_SLEEP = 15;
-
- /**
- * The maximum time (in msecs) to sleep when a thread is idle,
- * yet others are not, so may eventually generate work that
- * the current thread can steal. This value reflects the maximum time
- * that a thread may sleep when it possibly should not, because there
- * are other active threads that might generate work. In practice,
- * designs in which some threads become stalled because others
- * are running yet not generating tasks are not likely to work
- * well in this framework anyway, so the exact value does not matter
- * too much. However, keeping it in the sub-second range does
- * help smooth out startup and shutdown effects.
- **/
-
- static final long MAX_SLEEP_TIME = 100;
-
- /**
- * Set active status of thread t to false, and
- * then wait until: (a) there is a task in the entry
- * queue, or (b) other threads are active, or (c) the current
- * thread is interrupted. Upon return, it
- * is not certain that there will be work available.
- * The thread must itself check.
- * <p>
- * The main underlying reason
- * for these mechanics is that threads do not
- * signal each other when they add elements to their queues.
- * (This would add to task overhead, reduce locality.
- * and increase contention.)
- * So we must rely on a tamed form of polling. However, tasks
- * inserted into the entry queue do result in signals, so
- * tasks can wait on these if all of them are otherwise idle.
- **/
-
- protected synchronized void checkActive(FJTaskRunner t, long scans) {
-
- setInactive(t);
-
- try {
- // if nothing available, do a hard wait
- if (activeCount == 0 && entryQueue.peek() == null) {
- wait();
- }
- else {
- // If there is possibly some work,
- // sleep for a while before rechecking
-
- long msecs = scans / SCANS_PER_SLEEP;
- if (msecs > MAX_SLEEP_TIME) msecs = MAX_SLEEP_TIME;
- int nsecs = (msecs == 0) ? 1 : 0; // forces shortest possible sleep
- wait(msecs, nsecs);
- }
- }
- catch (InterruptedException ex) {
- notify(); // avoid lost notifies on interrupts
- Thread.currentThread().interrupt();
- }
- }
-
- /* ------------ Utility methods ------------- */
-
- /**
- * Start or wake up any threads waiting for work
- **/
-
- protected synchronized void signalNewTask() {
- if (COLLECT_STATS) ++entries;
- if (nstarted < threads.length)
- threads[nstarted++].start();
- else
- notify();
- }
-
- /**
- * Create all FJTaskRunner threads in this group.
- **/
-
- protected void initializeThreads() {
- for (int i = 0; i < threads.length; ++i) threads[i] = new FJTaskRunner(this);
- }
-
-
-
-
- /**
- * Wrap wait/notify mechanics around a task so that
- * invoke() can wait it out
- **/
- protected static final class InvokableFJTask extends FJTask {
- protected final Runnable wrapped;
- protected boolean terminated = false;
-
- protected InvokableFJTask(Runnable r) { wrapped = r; }
-
- public void run() {
- try {
- if (wrapped instanceof FJTask)
- FJTask.invoke((FJTask)(wrapped));
- else
- wrapped.run();
- }
- finally {
- setTerminated();
- }
- }
-
- protected synchronized void setTerminated() {
- terminated = true;
- notifyAll();
- }
-
- protected synchronized void awaitTermination() throws InterruptedException {
- if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition in case terminated is true
- while (!terminated) wait();
- }
- }
-
-
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FutureResult.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FutureResult.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FutureResult.java
deleted file mode 100644
index 040fc91..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/FutureResult.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-/*
- File: FutureResult.java
-
- Originally written by Doug Lea and released into the public domain.
- This may be used for any purposes whatsoever without acknowledgment.
- Thanks for the assistance and support of Sun Microsystems Labs,
- and everyone contributing, testing, and using this code.
-
- History:
- Date Who What
- 30Jun1998 dl Create public version
-*/
-
-package com.gemstone.org.jgroups.oswego.concurrent;
-
-import java.lang.reflect.*;
-
-/**
- * A class maintaining a single reference variable serving as the result
- * of an operation. The result cannot be accessed until it has been set.
- * <p>
- * <b>Sample Usage</b> <p>
- * <pre>
- * class ImageRenderer { Image render(byte[] raw); }
- * class App {
- * Executor executor = ...
- * ImageRenderer renderer = ...
- * void display(byte[] rawimage) {
- * try {
- * FutureResult futureImage = new FutureResult();
- * Runnable command = futureImage.setter(new Callable() {
- * public Object call() { return renderer.render(rawImage); }
- * });
- * executor.execute(command);
- * drawBorders(); // do other things while executing
- * drawCaption();
- * drawImage((Image)(futureImage.get())); // use future
- * }
- * catch (InterruptedException ex) { return; }
- * catch (InvocationTargetException ex) { cleanup(); return; }
- * }
- * }
- * </pre>
- * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
- * @see Executor
- **/
-
-public class FutureResult {
- /** The result of the operation **/
- protected Object value_ = null;
-
- /** Status -- true after first set **/
- protected boolean ready_ = false;
-
- /** the exception encountered by operation producing result **/
- protected InvocationTargetException exception_ = null;
-
- /**
- * Create an initially unset FutureResult
- **/
- public FutureResult() { }
-
-
- /**
- * Return a Runnable object that, when run, will set the result value.
- * @param function - a Callable object whose result will be
- * held by this FutureResult.
- * @return A Runnable object that, when run, will call the
- * function and (eventually) set the result.
- **/
-
- public Runnable setter(final Callable function) {
- return new Runnable() {
- public void run() {
- try {
- set(function.call());
- }
- catch(Exception ex) {
- setException(ex);
- }
- }
- };
- }
-
- /** internal utility: either get the value or throw the exception **/
- protected Object doGet() throws InvocationTargetException {
- if (exception_ != null)
- throw exception_;
- else
- return value_;
- }
-
- /**
- * Access the reference, waiting if necessary until it is ready.
- * @return current value
- * @exception InterruptedException if current thread has been interrupted
- * @exception InvocationTargetException if the operation
- * producing the value encountered an exception.
- **/
- public synchronized Object get()
- throws InterruptedException, InvocationTargetException {
- if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition in case ready is true
- while (!ready_) wait();
- return doGet();
- }
-
-
-
- /**
- * Wait at most msecs to access the reference.
- * @return current value
- * @exception TimeoutException if not ready after msecs
- * @exception InterruptedException if current thread has been interrupted
- * @exception InvocationTargetException if the operation
- * producing the value encountered an exception.
- **/
- public synchronized Object timedGet(long msecs)
- throws TimeoutException, InterruptedException, InvocationTargetException {
- if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition
- long startTime = (msecs <= 0)? 0 : System.currentTimeMillis();
- long waitTime = msecs;
- if (ready_) return doGet();
- else if (waitTime <= 0) throw new TimeoutException(msecs);
- else {
- for (;;) {
- wait(waitTime);
- if (ready_) return doGet();
- else {
- waitTime = msecs - (System.currentTimeMillis() - startTime);
- if (waitTime <= 0)
- throw new TimeoutException(msecs);
- }
- }
- }
- }
-
- /**
- * Set the reference, and signal that it is ready. It is not
- * considered an error to set the value more than once,
- * but it is not something you would normally want to do.
- * @param newValue The value that will be returned by a subsequent get();
- **/
- public synchronized void set(Object newValue) {
- value_ = newValue;
- ready_ = true;
- notifyAll();
- }
-
- /**
- * Set the exception field, also setting ready status.
- * @param ex The exception. It will be reported out wrapped
- * within an InvocationTargetException
- **/
- public synchronized void setException(Throwable ex) {
- exception_ = new InvocationTargetException(ex);
- ready_ = true;
- notifyAll();
- }
-
-
- /**
- * Get the exception, or null if there isn't one (yet).
- * This does not wait until the future is ready, so should
- * ordinarily only be called if you know it is.
- * @return the exception encountered by the operation
- * setting the future, wrapped in an InvocationTargetException
- **/
- public synchronized InvocationTargetException getException() {
- return exception_;
- }
-
- /**
- * Return whether the reference or exception have been set.
- * @return true if has been set. else false
- **/
- public synchronized boolean isReady() {
- return ready_;
- }
-
- /**
- * Access the reference, even if not ready
- * @return current value
- **/
- public synchronized Object peek() {
- return value_;
- }
-
-
- /**
- * Clear the value and exception and set to not-ready,
- * allowing this FutureResult to be reused. This is not
- * particularly recommended and must be done only
- * when you know that no other object is depending on the
- * properties of this FutureResult.
- **/
- public synchronized void clear() {
- value_ = null;
- exception_ = null;
- ready_ = false;
- }
-
-}
-
-
-
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Heap.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Heap.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Heap.java
deleted file mode 100644
index bf8afac..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Heap.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-/*
- File: Heap.java
-
- Originally written by Doug Lea and released into the public domain.
- This may be used for any purposes whatsoever without acknowledgment.
- Thanks for the assistance and support of Sun Microsystems Labs,
- and everyone contributing, testing, and using this code.
-
- History:
- Date Who What
- 29Aug1998 dl Refactored from BoundedPriorityQueue
- 08dec2001 dl Null out slots of removed items
- 03feb2002 dl Also null out in clear
-*/
-
-package com.gemstone.org.jgroups.oswego.concurrent;
-import java.util.Comparator;
-
-/**
- * A heap-based priority queue, without any concurrency control
- * (i.e., no blocking on empty/full states).
- * This class provides the data structure mechanics for BoundedPriorityQueue.
- * <p>
- * The class currently uses a standard array-based heap, as described
- * in, for example, Sedgewick's Algorithms text. All methods
- * are fully synchronized. In the future,
- * it may instead use structures permitting finer-grained locking.
- * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
- **/
-
-public class Heap {
- protected Object[] nodes_; // the tree nodes, packed into an array
- protected int count_ = 0; // number of used slots
- protected final Comparator cmp_; // for ordering
-
- /**
- * Create a Heap with the given initial capacity and comparator
- * @exception IllegalArgumentException if capacity less or equal to zero
- **/
-
- public Heap(int capacity, Comparator cmp)
- throws IllegalArgumentException {
- if (capacity <= 0) throw new IllegalArgumentException();
- nodes_ = new Object[capacity];
- cmp_ = cmp;
- }
-
- /**
- * Create a Heap with the given capacity,
- * and relying on natural ordering.
- **/
-
- public Heap(int capacity) {
- this(capacity, null);
- }
-
-
- /** perform element comaprisons using comparator or natural ordering **/
- protected int compare(Object a, Object b) {
- if (cmp_ == null)
- return ((Comparable)a).compareTo(b);
- else
- return cmp_.compare(a, b);
- }
-
-
- // indexes of heap parents and children
- protected final int parent(int k) { return (k - 1) / 2; }
- protected final int left(int k) { return 2 * k + 1; }
- protected final int right(int k) { return 2 * (k + 1); }
-
- /**
- * insert an element, resize if necessary
- **/
- public synchronized void insert(Object x) {
- if (count_ >= nodes_.length) {
- int newcap = 3 * nodes_.length / 2 + 1;
- Object[] newnodes = new Object[newcap];
- System.arraycopy(nodes_, 0, newnodes, 0, nodes_.length);
- nodes_ = newnodes;
- }
-
- int k = count_;
- ++count_;
- while (k > 0) {
- int par = parent(k);
- if (compare(x, nodes_[par]) < 0) {
- nodes_[k] = nodes_[par];
- k = par;
- }
- else break;
- }
- nodes_[k] = x;
- }
-
-
- /**
- * Return and remove least element, or null if empty
- **/
-
- public synchronized Object extract() {
- if (count_ < 1) return null;
-
- int k = 0; // take element at root;
- Object least = nodes_[k];
- --count_;
- Object x = nodes_[count_];
- nodes_[count_] = null;
- for (;;) {
- int l = left(k);
- if (l >= count_)
- break;
- else {
- int r = right(k);
- int child = (r >= count_ || compare(nodes_[l], nodes_[r]) < 0)? l : r;
- if (compare(x, nodes_[child]) > 0) {
- nodes_[k] = nodes_[child];
- k = child;
- }
- else break;
- }
- }
- nodes_[k] = x;
- return least;
- }
-
- /** Return least element without removing it, or null if empty **/
- public synchronized Object peek() {
- if (count_ > 0)
- return nodes_[0];
- else
- return null;
- }
-
- /** Return number of elements **/
- public synchronized int size() {
- return count_;
- }
-
- /** remove all elements **/
- public synchronized void clear() {
- for (int i = 0; i < count_; ++i)
- nodes_[i] = null;
- count_ = 0;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Latch.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Latch.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Latch.java
deleted file mode 100644
index 6e4b624..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Latch.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-/*
- File: Latch.java
-
- Originally written by Doug Lea and released into the public domain.
- This may be used for any purposes whatsoever without acknowledgment.
- Thanks for the assistance and support of Sun Microsystems Labs,
- and everyone contributing, testing, and using this code.
-
- History:
- Date Who What
- 11Jun1998 dl Create public version
-*/
-
-package com.gemstone.org.jgroups.oswego.concurrent;
-
-/**
- * A latch is a boolean condition that is set at most once, ever.
- * Once a single release is issued, all acquires will pass.
- * <p>
- * <b>Sample usage.</b> Here are a set of classes that use
- * a latch as a start signal for a group of worker threads that
- * are created and started beforehand, and then later enabled.
- * <pre>
- * class Worker implements Runnable {
- * private final Latch startSignal;
- * Worker(Latch l) { startSignal = l; }
- * public void run() {
- * startSignal.acquire();
- * doWork();
- * }
- * void doWork() { ... }
- * }
- *
- * class Driver { // ...
- * void main() {
- * Latch go = new Latch();
- * for (int i = 0; i < N; ++i) // make threads
- * new Thread(new Worker(go)).start();
- * doSomethingElse(); // don't let run yet
- * go.release(); // let all threads proceed
- * }
- * }
- *</pre>
- * [<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] <p>
-**/
-
-public class Latch implements Sync {
- protected boolean latched_ = false;
-
- /*
- This could use double-check, but doesn't.
- If the latch is being used as an indicator of
- the presence or state of an object, the user would
- not necessarily get the memory barrier that comes with synch
- that would be needed to correctly use that object. This
- would lead to errors that users would be very hard to track down. So, to
- be conservative, we always use synch.
- */
-
- public void acquire() throws InterruptedException {
- if (Thread.interrupted()) throw new InterruptedException();
- synchronized(this) {
- while (!latched_)
- wait();
- }
- }
-
- public boolean attempt(long msecs) throws InterruptedException {
- if (Thread.interrupted()) throw new InterruptedException();
- synchronized(this) {
- if (latched_)
- return true;
- else if (msecs <= 0)
- return false;
- else {
- long waitTime = msecs;
- long start = System.currentTimeMillis();
- for (;;) {
- wait(waitTime);
- if (latched_)
- return true;
- else {
- waitTime = msecs - (System.currentTimeMillis() - start);
- if (waitTime <= 0)
- return false;
- }
- }
- }
- }
- }
-
- /** Enable all current and future acquires to pass **/
- public synchronized void release() {
- latched_ = true;
- notifyAll();
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LayeredSync.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LayeredSync.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LayeredSync.java
deleted file mode 100644
index 057e322..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LayeredSync.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-/*
- File: LayeredSync.java
-
- Originally written by Doug Lea and released into the public domain.
- This may be used for any purposes whatsoever without acknowledgment.
- Thanks for the assistance and support of Sun Microsystems Labs,
- and everyone contributing, testing, and using this code.
-
- History:
- Date Who What
- 1Aug1998 dl Create public version
-*/
-
-package com.gemstone.org.jgroups.oswego.concurrent;
-
-/**
- * A class that can be used to compose Syncs.
- * A LayeredSync object manages two other Sync objects,
- * <em>outer</em> and <em>inner</em>. The acquire operation
- * invokes <em>outer</em>.acquire() followed by <em>inner</em>.acquire(),
- * but backing out of outer (via release) upon an exception in inner.
- * The other methods work similarly.
- * <p>
- * LayeredSyncs can be used to compose arbitrary chains
- * by arranging that either of the managed Syncs be another
- * LayeredSync.
- *
- * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
-**/
-
-
-public class LayeredSync implements Sync {
-
- protected final Sync outer_;
- protected final Sync inner_;
-
- /**
- * Create a LayeredSync managing the given outer and inner Sync
- * objects
- **/
-
- public LayeredSync(Sync outer, Sync inner) {
- outer_ = outer;
- inner_ = inner;
- }
-
- public void acquire() throws InterruptedException {
- if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition
- outer_.acquire();
- try {
- inner_.acquire();
- }
- catch (InterruptedException ex) {
- outer_.release();
- throw ex;
- }
- }
-
- public boolean attempt(long msecs) throws InterruptedException {
-
- if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition - for safety
- long start = (msecs <= 0)? 0 : System.currentTimeMillis();
- long waitTime = msecs;
-
- if (outer_.attempt(waitTime)) {
- try {
- if (msecs > 0)
- waitTime = msecs - (System.currentTimeMillis() - start);
- if (inner_.attempt(waitTime))
- return true;
- else {
- outer_.release();
- return false;
- }
- }
- catch (InterruptedException ex) {
- outer_.release();
- throw ex;
- }
- }
- else
- return false;
- }
-
- public void release() {
- inner_.release();
- outer_.release();
- }
-
-}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LinkedNode.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LinkedNode.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LinkedNode.java
deleted file mode 100644
index 76cdf0d..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LinkedNode.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-/*
- File: LinkedNode.java
-
- Originally written by Doug Lea and released into the public domain.
- This may be used for any purposes whatsoever without acknowledgment.
- Thanks for the assistance and support of Sun Microsystems Labs,
- and everyone contributing, testing, and using this code.
-
- History:
- Date Who What
- 11Jun1998 dl Create public version
- 25may2000 dl Change class access to public
- 26nov2001 dl Added no-arg constructor, all public access.
-*/
-
-package com.gemstone.org.jgroups.oswego.concurrent;
-
-/** A standard linked list node used in various queue classes **/
-public class LinkedNode {
- public Object value;
- public LinkedNode next;
- public LinkedNode() {}
- public LinkedNode(Object x) { value = x; }
- public LinkedNode(Object x, LinkedNode n) { value = x; next = n; }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LinkedQueue.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LinkedQueue.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LinkedQueue.java
deleted file mode 100644
index 17bec28..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LinkedQueue.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-/*
- File: LinkedQueue.java
-
- Originally written by Doug Lea and released into the public domain.
- This may be used for any purposes whatsoever without acknowledgment.
- Thanks for the assistance and support of Sun Microsystems Labs,
- and everyone contributing, testing, and using this code.
-
- History:
- Date Who What
- 11Jun1998 dl Create public version
- 25aug1998 dl added peek
- 10dec1998 dl added isEmpty
- 10oct1999 dl lock on node object to ensure visibility
-*/
-
-package com.gemstone.org.jgroups.oswego.concurrent;
-
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-
-/**
- * A linked list based channel implementation.
- * The algorithm avoids contention between puts
- * and takes when the queue is not empty.
- * Normally a put and a take can proceed simultaneously.
- * (Although it does not allow multiple concurrent puts or takes.)
- * This class tends to perform more efficently than
- * other Channel implementations in producer/consumer
- * applications.
- * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
- **/
-@SuppressFBWarnings(value="ML_SYNC_ON_FIELD_TO_GUARD_CHANGING_THAT_FIELD",justification="GemFire doesn't use this class")
-public class LinkedQueue implements Channel {
-
-
- /**
- * Dummy header node of list. The first actual node, if it exists, is always
- * at head_.next. After each take, the old first node becomes the head.
- **/
- protected LinkedNode head_;
-
- /**
- * Helper monitor for managing access to last node.
- **/
- protected final Object putLock_ = new Object();
-
- /**
- * The last node of list. Put() appends to list, so modifies last_
- **/
- protected LinkedNode last_;
-
- /**
- * The number of threads waiting for a take.
- * Notifications are provided in put only if greater than zero.
- * The bookkeeping is worth it here since in reasonably balanced
- * usages, the notifications will hardly ever be necessary, so
- * the call overhead to notify can be eliminated.
- **/
- protected int waitingForTake_ = 0;
-
- public LinkedQueue() {
- head_ = new LinkedNode(null);
- last_ = head_;
- }
-
- /** Main mechanics for put/offer **/
- protected void insert(Object x) {
- synchronized(putLock_) {
- LinkedNode p = new LinkedNode(x);
- synchronized(last_) {
- last_.next = p;
-// last_ = p; GemStoneAddition synchronize on last_ in futile attempt to guard it
- }
- last_ = p; // GemStoneAddition
- if (waitingForTake_ > 0)
- putLock_.notify();
- }
- }
-
- /** Main mechanics for take/poll **/
- protected synchronized Object extract() {
- synchronized(head_) {
- Object x = null;
- LinkedNode first = head_.next;
- if (first != null) {
- x = first.value;
- first.value = null;
- head_ = first;
- }
- return x;
- }
- }
-
-
- public void put(Object x) throws InterruptedException {
- if (x == null) throw new IllegalArgumentException();
- if (Thread.interrupted()) throw new InterruptedException();
- insert(x);
- }
-
- public boolean offer(Object x, long msecs) throws InterruptedException {
- if (x == null) throw new IllegalArgumentException();
- if (Thread.interrupted()) throw new InterruptedException();
- insert(x);
- return true;
- }
-
- public Object take() throws InterruptedException {
- if (Thread.interrupted()) throw new InterruptedException();
- // try to extract. If fail, then enter wait-based retry loop
- Object x = extract();
- if (x != null)
- return x;
- else {
- synchronized(putLock_) {
- try {
- ++waitingForTake_;
- for (;;) {
- x = extract();
- if (x != null) {
- --waitingForTake_;
- return x;
- }
- else {
- putLock_.wait();
- }
- }
- }
- catch(InterruptedException ex) {
- --waitingForTake_;
- putLock_.notify();
- throw ex;
- }
- }
- }
- }
-
- public Object peek() {
- synchronized(head_) {
- LinkedNode first = head_.next;
- if (first != null)
- return first.value;
- else
- return null;
- }
- }
-
-
- public boolean isEmpty() {
- synchronized(head_) {
- return head_.next == null;
- }
- }
-
- public Object poll(long msecs) throws InterruptedException {
- if (Thread.interrupted()) throw new InterruptedException();
- Object x = extract();
- if (x != null)
- return x;
- else {
- synchronized(putLock_) {
- try {
- long waitTime = msecs;
- long start = (msecs <= 0)? 0 : System.currentTimeMillis();
- ++waitingForTake_;
- for (;;) {
- x = extract();
- if (x != null || waitTime <= 0) {
- --waitingForTake_;
- return x;
- }
- else {
- putLock_.wait(waitTime);
- waitTime = msecs - (System.currentTimeMillis() - start);
- }
- }
- }
- catch(InterruptedException ex) {
- --waitingForTake_;
- putLock_.notify();
- throw ex;
- }
- }
- }
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LockedExecutor.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LockedExecutor.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LockedExecutor.java
deleted file mode 100644
index fd54006..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/LockedExecutor.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-/*
- File: LockedExecutor.java
-
- Originally written by Doug Lea and released into the public domain.
- This may be used for any purposes whatsoever without acknowledgment.
- Thanks for the assistance and support of Sun Microsystems Labs,
- and everyone contributing, testing, and using this code.
-
- History:
- Date Who What
- 21Jun1998 dl Create public version
-*/
-
-package com.gemstone.org.jgroups.oswego.concurrent;
-
-/**
- * An implementation of Executor that
- * invokes the run method of the supplied command within
- * a synchronization lock and then returns.
- *
- * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
- **/
-public class LockedExecutor implements Executor {
-
- /** The mutex **/
- protected final Sync mutex_;
-
- /**
- * Create a new LockedExecutor that relies on the given mutual
- * exclusion lock.
- * @param mutex Any mutual exclusion lock.
- * Standard usage is to supply an instance of <code>Mutex</code>,
- * but, for example, a Semaphore initialized to 1 also works.
- * On the other hand, many other Sync implementations would not
- * work here, so some care is required to supply a sensible
- * synchronization object.
- **/
-
- public LockedExecutor(Sync mutex) {
- mutex_ = mutex;
- }
-
- /**
- * Execute the given command directly in the current thread,
- * within the supplied lock.
- **/
- public void execute(Runnable command) throws InterruptedException {
- if (Thread.interrupted()) throw new InterruptedException(); // GemStoneAddition for safety
- mutex_.acquire();
- try {
- command.run();
- }
- finally {
- mutex_.release();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Mutex.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Mutex.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Mutex.java
deleted file mode 100644
index 7ff7208..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/oswego/concurrent/Mutex.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-/*
- File: Mutex.java
-
- Originally written by Doug Lea and released into the public domain.
- This may be used for any purposes whatsoever without acknowledgment.
- Thanks for the assistance and support of Sun Microsystems Labs,
- and everyone contributing, testing, and using this code.
-
- History:
- Date Who What
- 11Jun1998 dl Create public version
-*/
-
-package com.gemstone.org.jgroups.oswego.concurrent;
-
-/**
- * A simple non-reentrant mutual exclusion lock.
- * The lock is free upon construction. Each acquire gets the
- * lock, and each release frees it. Releasing a lock that
- * is already free has no effect.
- * <p>
- * This implementation makes no attempt to provide any fairness
- * or ordering guarantees. If you need them, consider using one of
- * the Semaphore implementations as a locking mechanism.
- * <p>
- * <b>Sample usage</b><br>
- * <p>
- * Mutex can be useful in constructions that cannot be
- * expressed using java synchronized blocks because the
- * acquire/release pairs do not occur in the same method or
- * code block. For example, you can use them for hand-over-hand
- * locking across the nodes of a linked list. This allows
- * extremely fine-grained locking, and so increases
- * potential concurrency, at the cost of additional complexity and
- * overhead that would normally make this worthwhile only in cases of
- * extreme contention.
- * <pre>
- * class Node {
- * Object item;
- * Node next;
- * Mutex lock = new Mutex(); // each node keeps its own lock
- *
- * Node(Object x, Node n) { item = x; next = n; }
- * }
- *
- * class List {
- * protected Node head; // pointer to first node of list
- *
- * // Use plain java synchronization to protect head field.
- * // (We could instead use a Mutex here too but there is no
- * // reason to do so.)
- * protected synchronized Node getHead() { return head; }
- *
- * boolean search(Object x) throws InterruptedException {
- * Node p = getHead();
- * if (p == null) return false;
- *
- * // (This could be made more compact, but for clarity of illustration,
- * // all of the cases that can arise are handled separately.)
- *
- * p.lock.acquire(); // Prime loop by acquiring first lock.
- * // (If the acquire fails due to
- * // interrupt, the method will throw
- * // InterruptedException now,
- * // so there is no need for any
- * // further cleanup.)
- * for (;;) {
- * if (x.equals(p.item)) {
- * p.lock.release(); // release current before return
- * return true;
- * }
- * else {
- * Node nextp = p.next;
- * if (nextp == null) {
- * p.lock.release(); // release final lock that was held
- * return false;
- * }
- * else {
- * try {
- * nextp.lock.acquire(); // get next lock before releasing current
- * }
- * catch (InterruptedException ex) {
- * p.lock.release(); // also release current if acquire fails
- * throw ex;
- * }
- * p.lock.release(); // release old lock now that new one held
- * p = nextp;
- * }
- * }
- * }
- * }
- *
- * synchronized void add(Object x) { // simple prepend
- * // The use of `synchronized' here protects only head field.
- * // The method does not need to wait out other traversers
- * // who have already made it past head.
- *
- * head = new Node(x, head);
- * }
- *
- * // ... other similar traversal and update methods ...
- * }
- * </pre>
- * <p>
- * @see Semaphore
- * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
-**/
-
-public class Mutex implements Sync {
-
- /** The lock status **/
- protected boolean inuse_ = false;
-
- public void acquire() throws InterruptedException {
- if (Thread.interrupted()) throw new InterruptedException();
- synchronized(this) {
- try {
- while (inuse_) wait();
- inuse_ = true;
- }
- catch (InterruptedException ex) {
- notify();
- throw ex;
- }
- }
- }
-
- public synchronized void release() {
- inuse_ = false;
- notify();
- }
-
-
- public boolean attempt(long msecs) throws InterruptedException {
- if (Thread.interrupted()) throw new InterruptedException();
- synchronized(this) {
- if (!inuse_) {
- inuse_ = true;
- return true;
- }
- else if (msecs <= 0)
- return false;
- else {
- long waitTime = msecs;
- long start = System.currentTimeMillis();
- try {
- for (;;) {
- wait(waitTime);
- if (!inuse_) {
- inuse_ = true;
- return true;
- }
- else {
- waitTime = msecs - (System.currentTimeMillis() - start);
- if (waitTime <= 0)
- return false;
- }
- }
- }
- catch (InterruptedException ex) {
- notify();
- throw ex;
- }
- }
- }
- }
-
-}
-