You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@harmony.apache.org by hi...@apache.org on 2009/07/28 11:30:48 UTC
svn commit: r798469 [17/28] - in /harmony/enhanced/classlib/branches/java6:
./ depends/build/platform/ depends/files/ depends/jars/
depends/manifests/icu4j_4.0/ depends/manifests/icu4j_4.2.1/
depends/manifests/icu4j_4.2.1/META-INF/ make/ modules/access...
Modified: harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/locks/AbstractQueuedSynchronizer.java
URL: http://svn.apache.org/viewvc/harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/locks/AbstractQueuedSynchronizer.java?rev=798469&r1=798468&r2=798469&view=diff
==============================================================================
--- harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/locks/AbstractQueuedSynchronizer.java (original)
+++ harmony/enhanced/classlib/branches/java6/modules/concurrent/src/main/java/java/util/concurrent/locks/AbstractQueuedSynchronizer.java Tue Jul 28 09:30:33 2009
@@ -32,7 +32,7 @@
* synchronization interface. Instead it defines methods such as
* {@link #acquireInterruptibly} that can be invoked as
* appropriate by concrete locks and related synchronizers to
- * implement their public methods.
+ * implement their public methods.
*
* <p>This class supports either or both a default <em>exclusive</em>
* mode and a <em>shared</em> mode. When acquired in exclusive mode,
@@ -59,14 +59,14 @@
* condition, so if this constraint cannot be met, do not use it. The
* behavior of {@link ConditionObject} depends of course on the
* semantics of its synchronizer implementation.
- *
- * <p> This class provides inspection, instrumentation, and monitoring
+ *
+ * <p>This class provides inspection, instrumentation, and monitoring
* methods for the internal queue, as well as similar methods for
* condition objects. These can be exported as desired into classes
* using an <tt>AbstractQueuedSynchronizer</tt> for their
* synchronization mechanics.
*
- * <p> Serialization of this class stores only the underlying atomic
+ * <p>Serialization of this class stores only the underlying atomic
* integer maintaining state, so deserialized objects have empty
* thread queues. Typical subclasses requiring serializability will
* define a <tt>readObject</tt> method that restores this to a known
@@ -74,10 +74,10 @@
*
* <h3>Usage</h3>
*
- * <p> To use this class as the basis of a synchronizer, redefine the
+ * <p>To use this class as the basis of a synchronizer, redefine the
* following methods, as applicable, by inspecting and/or modifying
* the synchronization state using {@link #getState}, {@link
- * #setState} and/or {@link #compareAndSetState}:
+ * #setState} and/or {@link #compareAndSetState}:
*
* <ul>
* <li> {@link #tryAcquire}
@@ -94,7 +94,7 @@
* means of using this class. All other methods are declared
* <tt>final</tt> because they cannot be independently varied.
*
- * <p> Even though this class is based on an internal FIFO queue, it
+ * <p>Even though this class is based on an internal FIFO queue, it
* does not automatically enforce FIFO acquisition policies. The core
* of exclusive synchronization takes the form:
*
@@ -112,22 +112,17 @@
*
* (Shared mode is similar but may involve cascading signals.)
*
- * <p> Because checks in acquire are invoked before enqueuing, a newly
- * acquiring thread may <em>barge</em> ahead of others that are
- * blocked and queued. However, you can, if desired, define
- * <tt>tryAcquire</tt> and/or <tt>tryAcquireShared</tt> to disable
- * barging by internally invoking one or more of the inspection
- * methods. In particular, a strict FIFO lock can define
- * <tt>tryAcquire</tt> to immediately return <tt>false</tt> if {@link
- * #getFirstQueuedThread} does not return the current thread. A
- * normally preferable non-strict fair version can immediately return
- * <tt>false</tt> only if {@link #hasQueuedThreads} returns
- * <tt>true</tt> and <tt>getFirstQueuedThread</tt> is not the current
- * thread; or equivalently, that <tt>getFirstQueuedThread</tt> is both
- * non-null and not the current thread. Further variations are
- * possible.
+ * <p><a name="barging">Because checks in acquire are invoked before
+ * enqueuing, a newly acquiring thread may <em>barge</em> ahead of
+ * others that are blocked and queued. However, you can, if desired,
+ * define <tt>tryAcquire</tt> and/or <tt>tryAcquireShared</tt> to
+ * disable barging by internally invoking one or more of the inspection
+ * methods, thereby providing a <em>fair</em> FIFO acquisition order.
+ * In particular, most fair synchronizers can define <tt>tryAcquire</tt>
+ * to return <tt>false</tt> if predecessors are queued. Other variations
+ * are possible.
*
- * <p> Throughput and scalability are generally highest for the
+ * <p>Throughput and scalability are generally highest for the
* default barging (also known as <em>greedy</em>,
* <em>renouncement</em>, and <em>convoy-avoidance</em>) strategy.
* While this is not guaranteed to be fair or starvation-free, earlier
@@ -144,7 +139,7 @@
* and/or {@link #hasQueuedThreads} to only do so if the synchronizer
* is likely not to be contended.
*
- * <p> This class provides an efficient and scalable basis for
+ * <p>This class provides an efficient and scalable basis for
* synchronization in part by specializing its range of use to
* synchronizers that can rely on <tt>int</tt> state, acquire, and
* release parameters, and an internal FIFO wait queue. When this does
@@ -152,7 +147,7 @@
* {@link java.util.concurrent.atomic atomic} classes, your own custom
* {@link java.util.Queue} classes, and {@link LockSupport} blocking
* support.
- *
+ *
* <h3>Usage Examples</h3>
*
* <p>Here is a non-reentrant mutual exclusion lock class that uses
@@ -163,56 +158,58 @@
* <pre>
* class Mutex implements Lock, java.io.Serializable {
*
- * // Our internal helper class
- * private static class Sync extends AbstractQueuedSynchronizer {
- * // Report whether in locked state
- * protected boolean isHeldExclusively() {
- * return getState() == 1;
- * }
- *
- * // Acquire the lock if state is zero
- * public boolean tryAcquire(int acquires) {
- * assert acquires == 1; // Otherwise unused
- * return compareAndSetState(0, 1);
- * }
- *
- * // Release the lock by setting state to zero
- * protected boolean tryRelease(int releases) {
- * assert releases == 1; // Otherwise unused
- * if (getState() == 0) throw new IllegalMonitorStateException();
- * setState(0);
- * return true;
- * }
- *
- * // Provide a Condition
- * Condition newCondition() { return new ConditionObject(); }
- *
- * // Deserialize properly
- * private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
- * s.defaultReadObject();
- * setState(0); // reset to unlocked state
- * }
- * }
- *
- * // The sync object does all the hard work. We just forward to it.
- * private final Sync sync = new Sync();
- *
- * public void lock() { sync.acquire(1); }
- * public boolean tryLock() { return sync.tryAcquire(1); }
- * public void unlock() { sync.release(1); }
- * public Condition newCondition() { return sync.newCondition(); }
- * public boolean isLocked() { return sync.isHeldExclusively(); }
- * public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
- * public void lockInterruptibly() throws InterruptedException {
- * sync.acquireInterruptibly(1);
- * }
- * public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
- * return sync.tryAcquireNanos(1, unit.toNanos(timeout));
- * }
+ * // Our internal helper class
+ * private static class Sync extends AbstractQueuedSynchronizer {
+ * // Report whether in locked state
+ * protected boolean isHeldExclusively() {
+ * return getState() == 1;
+ * }
+ *
+ * // Acquire the lock if state is zero
+ * public boolean tryAcquire(int acquires) {
+ * assert acquires == 1; // Otherwise unused
+ * return compareAndSetState(0, 1);
+ * }
+ *
+ * // Release the lock by setting state to zero
+ * protected boolean tryRelease(int releases) {
+ * assert releases == 1; // Otherwise unused
+ * if (getState() == 0) throw new IllegalMonitorStateException();
+ * setState(0);
+ * return true;
+ * }
+ *
+ * // Provide a Condition
+ * Condition newCondition() { return new ConditionObject(); }
+ *
+ * // Deserialize properly
+ * private void readObject(ObjectInputStream s)
+ * throws IOException, ClassNotFoundException {
+ * s.defaultReadObject();
+ * setState(0); // reset to unlocked state
+ * }
+ * }
+ *
+ * // The sync object does all the hard work. We just forward to it.
+ * private final Sync sync = new Sync();
+ *
+ * public void lock() { sync.acquire(1); }
+ * public boolean tryLock() { return sync.tryAcquire(1); }
+ * public void unlock() { sync.release(1); }
+ * public Condition newCondition() { return sync.newCondition(); }
+ * public boolean isLocked() { return sync.isHeldExclusively(); }
+ * public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
+ * public void lockInterruptibly() throws InterruptedException {
+ * sync.acquireInterruptibly(1);
+ * }
+ * public boolean tryLock(long timeout, TimeUnit unit)
+ * throws InterruptedException {
+ * return sync.tryAcquireNanos(1, unit.toNanos(timeout));
+ * }
* }
* </pre>
*
- * <p> Here is a latch class that is like a {@link CountDownLatch}
+ * <p>Here is a latch class that is like a {@link CountDownLatch}
* except that it only requires a single <tt>signal</tt> to
* fire. Because a latch is non-exclusive, it uses the <tt>shared</tt>
* acquire and release methods.
@@ -220,33 +217,35 @@
* <pre>
* class BooleanLatch {
*
- * private static class Sync extends AbstractQueuedSynchronizer {
- * boolean isSignalled() { return getState() != 0; }
+ * private static class Sync extends AbstractQueuedSynchronizer {
+ * boolean isSignalled() { return getState() != 0; }
*
- * protected int tryAcquireShared(int ignore) {
- * return isSignalled()? 1 : -1;
- * }
- *
- * protected boolean tryReleaseShared(int ignore) {
- * setState(1);
- * return true;
- * }
- * }
- *
- * private final Sync sync = new Sync();
- * public boolean isSignalled() { return sync.isSignalled(); }
- * public void signal() { sync.releaseShared(1); }
- * public void await() throws InterruptedException {
- * sync.acquireSharedInterruptibly(1);
- * }
- * }
+ * protected int tryAcquireShared(int ignore) {
+ * return isSignalled()? 1 : -1;
+ * }
*
+ * protected boolean tryReleaseShared(int ignore) {
+ * setState(1);
+ * return true;
+ * }
+ * }
+ *
+ * private final Sync sync = new Sync();
+ * public boolean isSignalled() { return sync.isSignalled(); }
+ * public void signal() { sync.releaseShared(1); }
+ * public void await() throws InterruptedException {
+ * sync.acquireSharedInterruptibly(1);
+ * }
+ * }
* </pre>
*
* @since 1.5
* @author Doug Lea
*/
-public abstract class AbstractQueuedSynchronizer implements java.io.Serializable {
+public abstract class AbstractQueuedSynchronizer
+ extends AbstractOwnableSynchronizer
+ implements java.io.Serializable {
+
private static final long serialVersionUID = 7373984972572414691L;
/**
@@ -258,7 +257,7 @@
/**
* Wait queue node class.
*
- * <p> The wait queue is a variant of a "CLH" (Craig, Landin, and
+ * <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
* Hagersten) lock queue. CLH locks are normally used for
* spinlocks. We instead use them for blocking synchronizers, but
* use the same basic tactic of holding some of the control
@@ -274,7 +273,7 @@
* contender thread may need to rewait.
*
* <p>To enqueue into a CLH lock, you atomically splice it in as new
- * tail. To dequeue, you just set the head field.
+ * tail. To dequeue, you just set the head field.
* <pre>
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
@@ -295,7 +294,7 @@
* predecessor. For explanation of similar mechanics in the case
* of spin locks, see the papers by Scott and Scherer at
* http://www.cs.rochester.edu/u/scott/synchronization/
- *
+ *
* <p>We also use "next" links to implement blocking mechanics.
* The thread id for each node is kept in its own node, so a
* predecessor signals the next node to wake up by traversing
@@ -312,7 +311,8 @@
* nodes, we can miss noticing whether a cancelled node is
* ahead or behind us. This is dealt with by always unparking
* successors upon cancellation, allowing them to stabilize on
- * a new predecessor.
+ * a new predecessor, unless we can identify an uncancelled
+ * predecessor who will carry this responsibility.
*
* <p>CLH queues need a dummy header node to get started. But
* we don't create them on construction, because it would be wasted
@@ -334,34 +334,46 @@
* on the design of this class.
*/
static final class Node {
+ /** Marker to indicate a node is waiting in shared mode */
+ static final Node SHARED = new Node();
+ /** Marker to indicate a node is waiting in exclusive mode */
+ static final Node EXCLUSIVE = null;
+
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
- /** waitStatus value to indicate thread needs unparking */
+ /** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
- /** Marker to indicate a node is waiting in shared mode */
- static final Node SHARED = new Node();
- /** Marker to indicate a node is waiting in exclusive mode */
- static final Node EXCLUSIVE = null;
+ /**
+ * waitStatus value to indicate the next acquireShared should
+ * unconditionally propagate
+ */
+ static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
- * SIGNAL: The successor of this node is (or will soon be)
- * blocked (via park), so the current node must
- * unpark its successor when it releases or
+ * SIGNAL: The successor of this node is (or will soon be)
+ * blocked (via park), so the current node must
+ * unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
- * first indicate they need a signal,
- * then retry the atomic acquire, and then,
+ * first indicate they need a signal,
+ * then retry the atomic acquire, and then,
* on failure, block.
- * CANCELLED: Node is cancelled due to timeout or interrupt
+ * CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
- * CONDITION: Node is currently on a condition queue
- * It will not be used as a sync queue node until
- * transferred. (Use of this value here
- * has nothing to do with the other uses
- * of the field, but simplifies mechanics.)
+ * CONDITION: This node is currently on a condition queue.
+ * It will not be used as a sync queue node
+ * until transferred, at which time the status
+ * will be set to 0. (Use of this value here has
+ * nothing to do with the other uses of the
+ * field, but simplifies mechanics.)
+ * PROPAGATE: A releaseShared should be propagated to other
+ * nodes. This is set (for head node only) in
+ * doReleaseShared to ensure propagation
+ * continues, even if other operations have
+ * since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
@@ -370,8 +382,8 @@
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
- * CONDITION for condition nodes. It is modified only using
- * CAS.
+ * CONDITION for condition nodes. It is modified using CAS
+ * (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
@@ -390,25 +402,26 @@
/**
* Link to the successor node that the current node/thread
- * unparks upon release. Assigned once during enqueuing, and
- * nulled out (for sake of GC) when no longer needed. Upon
- * cancellation, we cannot adjust this field, but can notice
- * status and bypass the node if cancelled. The enq operation
- * does not assign next field of a predecessor until after
- * attachment, so seeing a null next field does not
- * necessarily mean that node is at end of queue. However, if
- * a next field appears to be null, we can scan prev's from
- * the tail to double-check.
+ * unparks upon release. Assigned during enqueuing, adjusted
+ * when bypassing cancelled predecessors, and nulled out (for
+ * sake of GC) when dequeued. The enq operation does not
+ * assign next field of a predecessor until after attachment,
+ * so seeing a null next field does not necessarily mean that
+ * node is at end of queue. However, if a next field appears
+ * to be null, we can scan prev's from the tail to
+ * double-check. The next field of cancelled nodes is set to
+ * point to the node itself instead of null, to make life
+ * easier for isOnSyncQueue.
*/
volatile Node next;
/**
* The thread that enqueued this node. Initialized on
- * construction and nulled out after use.
+ * construction and nulled out after use.
*/
volatile Thread thread;
- /**
+ /**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
@@ -428,14 +441,16 @@
}
/**
- * Returns previous node, or throws NullPointerException if
- * null. Use when predecessor cannot be null.
+ * Returns previous node, or throws NullPointerException if null.
+ * Use when predecessor cannot be null. The null check could
+ * be elided, but is present to help the VM.
+ *
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
- throw new NullPointerException();
+ throw new NullPointerException();
else
return p;
}
@@ -450,11 +465,11 @@
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
- this.thread = thread;
+ this.thread = thread;
}
}
- /**
+ /**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
@@ -462,11 +477,11 @@
*/
private transient volatile Node head;
- /**
+ /**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
- private transient volatile Node tail;
+ private transient volatile Node tail;
/**
* The synchronization state.
@@ -496,10 +511,11 @@
* value if the current state value equals the expected value.
* This operation has memory semantics of a <tt>volatile</tt> read
* and write.
+ *
* @param expect the expected value
* @param update the new value
- * @return true if successful. False return indicates that
- * the actual value was not equal to the expected value.
+ * @return true if successful. False return indicates that the actual
+ * value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
@@ -509,7 +525,14 @@
// Queuing utilities
/**
- * Insert node into queue, initializing if necessary. See picture above.
+ * The number of nanoseconds for which it is faster to spin
+ * rather than to use timed park. A rough estimate suffices
+ * to improve responsiveness with very short timeouts.
+ */
+ static final long spinForTimeoutThreshold = 1000L;
+
+ /**
+ * Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
@@ -517,27 +540,21 @@
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
- Node h = new Node(); // Dummy header
- h.next = node;
- node.prev = h;
- if (compareAndSetHead(h)) {
- tail = node;
- return h;
- }
- }
- else {
- node.prev = t;
+ if (compareAndSetHead(new Node()))
+ tail = head;
+ } else {
+ node.prev = t;
if (compareAndSetTail(t, node)) {
- t.next = node;
- return t;
+ t.next = node;
+ return t;
}
}
}
}
/**
- * Create and enq node for given thread and mode
- * @param current the thread
+ * Creates and enqueues node for current thread and given mode.
+ *
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
@@ -546,9 +563,9 @@
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
- node.prev = pred;
+ node.prev = pred;
if (compareAndSetTail(pred, node)) {
- pred.next = node;
+ pred.next = node;
return node;
}
}
@@ -557,79 +574,166 @@
}
/**
- * Set head of queue to be node, thus dequeuing. Called only by
+ * Sets head of queue to be node, thus dequeuing. Called only by
* acquire methods. Also nulls out unused fields for sake of GC
* and to suppress unnecessary signals and traversals.
- * @param node the node
+ *
+ * @param node the node
*/
private void setHead(Node node) {
head = node;
node.thread = null;
- node.prev = null;
+ node.prev = null;
}
/**
- * Wake up node's successor, if one exists.
+ * Wakes up node's successor, if one exists.
+ *
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
- * Try to clear status in anticipation of signalling. It is
- * OK if this fails or if status is changed by waiting thread.
- */
- compareAndSetWaitStatus(node, Node.SIGNAL, 0);
-
+ * If status is negative (i.e., possibly needing signal) try
+ * to clear in anticipation of signalling. It is OK if this
+ * fails or if status is changed by waiting thread.
+ */
+ int ws = node.waitStatus;
+ if (ws < 0)
+ compareAndSetWaitStatus(node, ws, 0);
+
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
- Thread thread;
Node s = node.next;
- if (s != null && s.waitStatus <= 0)
- thread = s.thread;
- else {
- thread = null;
- for (s = tail; s != null && s != node; s = s.prev)
- if (s.waitStatus <= 0)
- thread = s.thread;
- }
- LockSupport.unpark(thread);
+ if (s == null || s.waitStatus > 0) {
+ s = null;
+ for (Node t = tail; t != null && t != node; t = t.prev)
+ if (t.waitStatus <= 0)
+ s = t;
+ }
+ if (s != null)
+ LockSupport.unpark(s.thread);
}
/**
- * Set head of queue, and check if successor may be waiting
- * in shared mode, if so propagating if propagate > 0.
- * @param pred the node holding waitStatus for node
- * @param node the node
+ * Release action for shared mode -- signal successor and ensure
+ * propagation. (Note: For exclusive mode, release just amounts
+ * to calling unparkSuccessor of head if it needs signal.)
+ */
+ private void doReleaseShared() {
+ /*
+ * Ensure that a release propagates, even if there are other
+ * in-progress acquires/releases. This proceeds in the usual
+ * way of trying to unparkSuccessor of head if it needs
+ * signal. But if it does not, status is set to PROPAGATE to
+ * ensure that upon release, propagation continues.
+ * Additionally, we must loop in case a new node is added
+ * while we are doing this. Also, unlike other uses of
+ * unparkSuccessor, we need to know if CAS to reset status
+ * fails, if so rechecking.
+ */
+ for (;;) {
+ Node h = head;
+ if (h != null && h != tail) {
+ int ws = h.waitStatus;
+ if (ws == Node.SIGNAL) {
+ if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
+ continue; // loop to recheck cases
+ unparkSuccessor(h);
+ }
+ else if (ws == 0 &&
+ !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
+ continue; // loop on failed CAS
+ }
+ if (h == head) // loop if head changed
+ break;
+ }
+ }
+
+ /**
+ * Sets head of queue, and checks if successor may be waiting
+ * in shared mode, if so propagating if either propagate > 0 or
+ * PROPAGATE status was set.
+ *
+ * @param node the node
* @param propagate the return value from a tryAcquireShared
*/
private void setHeadAndPropagate(Node node, int propagate) {
+ Node h = head; // Record old head for check below
setHead(node);
- if (propagate > 0 && node.waitStatus != 0) {
- /*
- * Don't bother fully figuring out successor. If it
- * looks null, call unparkSuccessor anyway to be safe.
- */
- Node s = node.next;
+ /*
+ * Try to signal next queued node if:
+ * Propagation was indicated by caller,
+ * or was recorded (as h.waitStatus) by a previous operation
+ * (note: this uses sign-check of waitStatus because
+ * PROPAGATE status may transition to SIGNAL.)
+ * and
+ * The next node is waiting in shared mode,
+ * or we don't know, because it appears null
+ *
+ * The conservatism in both of these checks may cause
+ * unnecessary wake-ups, but only when there are multiple
+ * racing acquires/releases, so most need signals now or soon
+ * anyway.
+ */
+ if (propagate > 0 || h == null || h.waitStatus < 0) {
+ Node s = node.next;
if (s == null || s.isShared())
- unparkSuccessor(node);
+ doReleaseShared();
}
}
// Utilities for various versions of acquire
/**
- * Cancel an ongoing attempt to acquire.
+ * Cancels an ongoing attempt to acquire.
+ *
* @param node the node
*/
private void cancelAcquire(Node node) {
- if (node != null) { // Ignore if node doesn't exist
- node.thread = null;
- // Can use unconditional write instead of CAS here
- node.waitStatus = Node.CANCELLED;
- unparkSuccessor(node);
+ // Ignore if node doesn't exist
+ if (node == null)
+ return;
+
+ node.thread = null;
+
+ // Skip cancelled predecessors
+ Node pred = node.prev;
+ while (pred.waitStatus > 0)
+ node.prev = pred = pred.prev;
+
+ // predNext is the apparent node to unsplice. CASes below will
+ // fail if not, in which case, we lost race vs another cancel
+ // or signal, so no further action is necessary.
+ Node predNext = pred.next;
+
+ // Can use unconditional write instead of CAS here.
+ // After this atomic step, other Nodes can skip past us.
+ // Before, we are free of interference from other threads.
+ node.waitStatus = Node.CANCELLED;
+
+ // If we are the tail, remove ourselves.
+ if (node == tail && compareAndSetTail(node, pred)) {
+ compareAndSetNext(pred, predNext, null);
+ } else {
+ // If successor needs signal, try to set pred's next-link
+ // so it will get one. Otherwise wake it up to propagate.
+ int ws;
+ if (pred != head &&
+ ((ws = pred.waitStatus) == Node.SIGNAL ||
+ (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
+ pred.thread != null) {
+ Node next = node.next;
+ if (next != null && next.waitStatus <= 0)
+ compareAndSetNext(pred, predNext, next);
+ } else {
+ unparkSuccessor(node);
+ }
+
+ node.next = node; // help GC
}
}
@@ -637,31 +741,36 @@
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev
+ *
* @param pred node's predecessor holding status
- * @param node the node
- * @return true if thread should block
+ * @param node the node
+ * @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
- int s = pred.waitStatus;
- if (s < 0)
+ int ws = pred.waitStatus;
+ if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
- * to signal it, so it can safely park
+ * to signal it, so it can safely park.
*/
return true;
- if (s > 0)
+ if (ws > 0) {
/*
- * Predecessor was cancelled. Move up to its predecessor
- * and indicate retry.
+ * Predecessor was cancelled. Skip over predecessors and
+ * indicate retry.
*/
- node.prev = pred.prev;
- else
+ do {
+ node.prev = pred = pred.prev;
+ } while (pred.waitStatus > 0);
+ pred.next = node;
+ } else {
/*
- * Indicate that we need a signal, but don't park yet. Caller
- * will need to retry to make sure it cannot acquire before
- * parking.
+ * waitStatus must be 0 or PROPAGATE. Indicate that we
+ * need a signal, but don't park yet. Caller will need to
+ * retry to make sure it cannot acquire before parking.
*/
- compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
+ compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
+ }
return false;
}
@@ -674,9 +783,10 @@
/**
* Convenience method to park and then check if interrupted
- * @return true if interrupted
+ *
+ * @return {@code true} if interrupted
*/
- private static boolean parkAndCheckInterrupt() {
+ private final boolean parkAndCheckInterrupt() {
LockSupport.park();
return Thread.interrupted();
}
@@ -687,17 +797,19 @@
* different. Only a little bit of factoring is possible due to
* interactions of exception mechanics (including ensuring that we
* cancel if tryAcquire throws exception) and other control, at
- * least not without hurting performance too much.
+ * least not without hurting performance too much.
*/
/**
- * Acquire in exclusive uninterruptible mode for thread already in
+ * Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
+ *
* @param node the node
* @param arg the acquire argument
- * @return true if interrupted while waiting
+ * @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
+ boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
@@ -705,92 +817,91 @@
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
+ failed = false;
return interrupted;
}
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
+ if (shouldParkAfterFailedAcquire(p, node) &&
+ parkAndCheckInterrupt())
interrupted = true;
}
- } catch (RuntimeException ex) {
- cancelAcquire(node);
- throw ex;
+ } finally {
+ if (failed)
+ cancelAcquire(node);
}
}
- /**
- * Acquire in exclusive interruptible mode
+ /**
+ * Acquires in exclusive interruptible mode.
* @param arg the acquire argument
*/
- private void doAcquireInterruptibly(int arg)
+ private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
+ boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
+ failed = false;
return;
}
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- break;
+ if (shouldParkAfterFailedAcquire(p, node) &&
+ parkAndCheckInterrupt())
+ throw new InterruptedException();
}
- } catch (RuntimeException ex) {
- cancelAcquire(node);
- throw ex;
+ } finally {
+ if (failed)
+ cancelAcquire(node);
}
- // Arrive here only if interrupted
- cancelAcquire(node);
- throw new InterruptedException();
}
- /**
- * Acquire in exclusive timed mode
+ /**
+ * Acquires in exclusive timed mode.
+ *
* @param arg the acquire argument
* @param nanosTimeout max wait time
- * @return true if acquired
+ * @return {@code true} if acquired
*/
- private boolean doAcquireNanos(int arg, long nanosTimeout)
+ private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
long lastTime = System.nanoTime();
final Node node = addWaiter(Node.EXCLUSIVE);
+ boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
+ failed = false;
return true;
}
- if (nanosTimeout <= 0) {
- cancelAcquire(node);
+ if (nanosTimeout <= 0)
return false;
- }
- if (shouldParkAfterFailedAcquire(p, node)) {
+ if (shouldParkAfterFailedAcquire(p, node) &&
+ nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(nanosTimeout);
- if (Thread.interrupted())
- break;
- long now = System.nanoTime();
- nanosTimeout -= now - lastTime;
- lastTime = now;
- }
+ long now = System.nanoTime();
+ nanosTimeout -= now - lastTime;
+ lastTime = now;
+ if (Thread.interrupted())
+ throw new InterruptedException();
}
- } catch (RuntimeException ex) {
- cancelAcquire(node);
- throw ex;
+ } finally {
+ if (failed)
+ cancelAcquire(node);
}
- // Arrive here only if interrupted
- cancelAcquire(node);
- throw new InterruptedException();
}
- /**
- * Acquire in shared uninterruptible mode
+ /**
+ * Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
+ boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
@@ -802,26 +913,28 @@
p.next = null; // help GC
if (interrupted)
selfInterrupt();
+ failed = false;
return;
}
}
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
+ if (shouldParkAfterFailedAcquire(p, node) &&
+ parkAndCheckInterrupt())
interrupted = true;
}
- } catch (RuntimeException ex) {
- cancelAcquire(node);
- throw ex;
+ } finally {
+ if (failed)
+ cancelAcquire(node);
}
}
- /**
- * Acquire in shared interruptible mode
+ /**
+ * Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
- private void doAcquireSharedInterruptibly(int arg)
+ private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
+ boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
@@ -830,33 +943,33 @@
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
+ failed = false;
return;
}
}
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- break;
+ if (shouldParkAfterFailedAcquire(p, node) &&
+ parkAndCheckInterrupt())
+ throw new InterruptedException();
}
- } catch (RuntimeException ex) {
- cancelAcquire(node);
- throw ex;
+ } finally {
+ if (failed)
+ cancelAcquire(node);
}
- // Arrive here only if interrupted
- cancelAcquire(node);
- throw new InterruptedException();
}
- /**
- * Acquire in shared timed mode
+ /**
+ * Acquires in shared timed mode.
+ *
* @param arg the acquire argument
* @param nanosTimeout max wait time
- * @return true if acquired
+ * @return {@code true} if acquired
*/
- private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
+ private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
long lastTime = System.nanoTime();
final Node node = addWaiter(Node.SHARED);
+ boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
@@ -865,32 +978,28 @@
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
+ failed = false;
return true;
}
}
- if (nanosTimeout <= 0) {
- cancelAcquire(node);
+ if (nanosTimeout <= 0)
return false;
- }
- if (shouldParkAfterFailedAcquire(p, node)) {
+ if (shouldParkAfterFailedAcquire(p, node) &&
+ nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(nanosTimeout);
- if (Thread.interrupted())
- break;
- long now = System.nanoTime();
- nanosTimeout -= now - lastTime;
- lastTime = now;
- }
+ long now = System.nanoTime();
+ nanosTimeout -= now - lastTime;
+ lastTime = now;
+ if (Thread.interrupted())
+ throw new InterruptedException();
}
- } catch (RuntimeException ex) {
- cancelAcquire(node);
- throw ex;
+ } finally {
+ if (failed)
+ cancelAcquire(node);
}
- // Arrive here only if interrupted
- cancelAcquire(node);
- throw new InterruptedException();
}
- // Main exported methods
+ // Main exported methods
/**
* Attempts to acquire in exclusive mode. This method should query
@@ -901,22 +1010,21 @@
* acquire. If this method reports failure, the acquire method
* may queue the thread, if it is not already queued, until it is
* signalled by a release from some other thread. This can be used
- * to implement method {@link Lock#tryLock()}.
+ * to implement method {@link Lock#tryLock()}.
*
* <p>The default
- * implementation throws {@link UnsupportedOperationException}
+ * implementation throws {@link UnsupportedOperationException}.
*
- * @param arg the acquire argument. This value
- * is always the one passed to an acquire method,
- * or is the value saved on entry to a condition wait.
- * The value is otherwise uninterpreted and can represent anything
- * you like.
- * @return true if successful. Upon success, this object has been
- * acquired.
- * @throws IllegalMonitorStateException if acquiring would place
- * this synchronizer in an illegal state. This exception must be
- * thrown in a consistent fashion for synchronization to work
- * correctly.
+ * @param arg the acquire argument. This value is always the one
+ * passed to an acquire method, or is the value saved on entry
+ * to a condition wait. The value is otherwise uninterpreted
+ * and can represent anything you like.
+ * @return {@code true} if successful. Upon success, this object has
+ * been acquired.
+ * @throws IllegalMonitorStateException if acquiring would place this
+ * synchronizer in an illegal state. This exception must be
+ * thrown in a consistent fashion for synchronization to work
+ * correctly.
* @throws UnsupportedOperationException if exclusive mode is not supported
*/
protected boolean tryAcquire(int arg) {
@@ -925,23 +1033,24 @@
/**
* Attempts to set the state to reflect a release in exclusive
- * mode. <p>This method is always invoked by the thread
- * performing release.
+ * mode.
+ *
+ * <p>This method is always invoked by the thread performing release.
*
* <p>The default implementation throws
- * {@link UnsupportedOperationException}
- * @param arg the release argument. This value
- * is always the one passed to a release method,
- * or the current state value upon entry to a condition wait.
- * The value is otherwise uninterpreted and can represent anything
- * you like.
- * @return <tt>true</tt> if this object is now in a fully released state,
- * so that any waiting threads may attempt to acquire; and <tt>false</tt>
- * otherwise.
- * @throws IllegalMonitorStateException if releasing would place
- * this synchronizer in an illegal state. This exception must be
- * thrown in a consistent fashion for synchronization to work
- * correctly.
+ * {@link UnsupportedOperationException}.
+ *
+ * @param arg the release argument. This value is always the one
+ * passed to a release method, or the current state value upon
+ * entry to a condition wait. The value is otherwise
+ * uninterpreted and can represent anything you like.
+ * @return {@code true} if this object is now in a fully released
+ * state, so that any waiting threads may attempt to acquire;
+ * and {@code false} otherwise.
+ * @throws IllegalMonitorStateException if releasing would place this
+ * synchronizer in an illegal state. This exception must be
+ * thrown in a consistent fashion for synchronization to work
+ * correctly.
* @throws UnsupportedOperationException if exclusive mode is not supported
*/
protected boolean tryRelease(int arg) {
@@ -951,7 +1060,7 @@
/**
* Attempts to acquire in shared mode. This method should query if
* the state of the object permits it to be acquired in the shared
- * mode, and if so to acquire it.
+ * mode, and if so to acquire it.
*
* <p>This method is always invoked by the thread performing
* acquire. If this method reports failure, the acquire method
@@ -959,24 +1068,25 @@
* signalled by a release from some other thread.
*
* <p>The default implementation throws {@link
- * UnsupportedOperationException}
+ * UnsupportedOperationException}.
*
- * @param arg the acquire argument. This value
- * is always the one passed to an acquire method,
- * or is the value saved on entry to a condition wait.
- * The value is otherwise uninterpreted and can represent anything
- * you like.
- * @return a negative value on failure, zero on exclusive success,
- * and a positive value if non-exclusively successful, in which
- * case a subsequent waiting thread must check
- * availability. (Support for three different return values
- * enables this method to be used in contexts where acquires only
- * sometimes act exclusively.) Upon success, this object has been
- * acquired.
- * @throws IllegalMonitorStateException if acquiring would place
- * this synchronizer in an illegal state. This exception must be
- * thrown in a consistent fashion for synchronization to work
- * correctly.
+ * @param arg the acquire argument. This value is always the one
+ * passed to an acquire method, or is the value saved on entry
+ * to a condition wait. The value is otherwise uninterpreted
+ * and can represent anything you like.
+ * @return a negative value on failure; zero if acquisition in shared
+ * mode succeeded but no subsequent shared-mode acquire can
+ * succeed; and a positive value if acquisition in shared
+ * mode succeeded and subsequent shared-mode acquires might
+ * also succeed, in which case a subsequent waiting thread
+ * must check availability. (Support for three different
+ * return values enables this method to be used in contexts
+ * where acquires only sometimes act exclusively.) Upon
+ * success, this object has been acquired.
+ * @throws IllegalMonitorStateException if acquiring would place this
+ * synchronizer in an illegal state. This exception must be
+ * thrown in a consistent fashion for synchronization to work
+ * correctly.
* @throws UnsupportedOperationException if shared mode is not supported
*/
protected int tryAcquireShared(int arg) {
@@ -985,21 +1095,23 @@
/**
* Attempts to set the state to reflect a release in shared mode.
+ *
* <p>This method is always invoked by the thread performing release.
- * <p> The default implementation throws
- * {@link UnsupportedOperationException}
- * @param arg the release argument. This value
- * is always the one passed to a release method,
- * or the current state value upon entry to a condition wait.
- * The value is otherwise uninterpreted and can represent anything
- * you like.
- * @return <tt>true</tt> if this object is now in a fully released state,
- * so that any waiting threads may attempt to acquire; and <tt>false</tt>
- * otherwise.
- * @throws IllegalMonitorStateException if releasing would place
- * this synchronizer in an illegal state. This exception must be
- * thrown in a consistent fashion for synchronization to work
- * correctly.
+ *
+ * <p>The default implementation throws
+ * {@link UnsupportedOperationException}.
+ *
+ * @param arg the release argument. This value is always the one
+ * passed to a release method, or the current state value upon
+ * entry to a condition wait. The value is otherwise
+ * uninterpreted and can represent anything you like.
+ * @return {@code true} if this release of shared mode may permit a
+ * waiting acquire (shared or exclusive) to succeed; and
+ * {@code false} otherwise
+ * @throws IllegalMonitorStateException if releasing would place this
+ * synchronizer in an illegal state. This exception must be
+ * thrown in a consistent fashion for synchronization to work
+ * correctly.
* @throws UnsupportedOperationException if shared mode is not supported
*/
protected boolean tryReleaseShared(int arg) {
@@ -1007,17 +1119,18 @@
}
/**
- * Returns true if synchronization is held exclusively with respect
- * to the current (calling) thread. This method is invoked
+ * Returns {@code true} if synchronization is held exclusively with
+ * respect to the current (calling) thread. This method is invoked
* upon each call to a non-waiting {@link ConditionObject} method.
* (Waiting methods instead invoke {@link #release}.)
+ *
* <p>The default implementation throws {@link
* UnsupportedOperationException}. This method is invoked
* internally only within {@link ConditionObject} methods, so need
* not be defined if conditions are not used.
*
- * @return true if synchronization is held exclusively;
- * else false
+ * @return {@code true} if synchronization is held exclusively;
+ * {@code false} otherwise
* @throws UnsupportedOperationException if conditions are not supported
*/
protected boolean isHeldExclusively() {
@@ -1030,12 +1143,12 @@
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
- * to implement method {@link Lock#lock}
- * @param arg the acquire argument.
- * This value is conveyed to {@link #tryAcquire} but is
- * otherwise uninterpreted and can represent anything
- * you like.
- */
+ * to implement method {@link Lock#lock}.
+ *
+ * @param arg the acquire argument. This value is conveyed to
+ * {@link #tryAcquire} but is otherwise uninterpreted and
+ * can represent anything you like.
+ */
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
@@ -1049,11 +1162,11 @@
* success. Otherwise the thread is queued, possibly repeatedly
* blocking and unblocking, invoking {@link #tryAcquire}
* until success or the thread is interrupted. This method can be
- * used to implement method {@link Lock#lockInterruptibly}
- * @param arg the acquire argument.
- * This value is conveyed to {@link #tryAcquire} but is
- * otherwise uninterpreted and can represent anything
- * you like.
+ * used to implement method {@link Lock#lockInterruptibly}.
+ *
+ * @param arg the acquire argument. This value is conveyed to
+ * {@link #tryAcquire} but is otherwise uninterpreted and
+ * can represent anything you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireInterruptibly(int arg) throws InterruptedException {
@@ -1072,35 +1185,35 @@
* {@link #tryAcquire} until success or the thread is interrupted
* or the timeout elapses. This method can be used to implement
* method {@link Lock#tryLock(long, TimeUnit)}.
- * @param arg the acquire argument.
- * This value is conveyed to {@link #tryAcquire} but is
- * otherwise uninterpreted and can represent anything
- * you like.
+ *
+ * @param arg the acquire argument. This value is conveyed to
+ * {@link #tryAcquire} but is otherwise uninterpreted and
+ * can represent anything you like.
* @param nanosTimeout the maximum number of nanoseconds to wait
- * @return true if acquired; false if timed out
+ * @return {@code true} if acquired; {@code false} if timed out
* @throws InterruptedException if the current thread is interrupted
*/
- public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- return tryAcquire(arg) ||
- doAcquireNanos(arg, nanosTimeout);
- }
+ public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
+ if (Thread.interrupted())
+ throw new InterruptedException();
+ return tryAcquire(arg) ||
+ doAcquireNanos(arg, nanosTimeout);
+ }
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
- * This method can be used to implement method {@link Lock#unlock}
- * @param arg the release argument.
- * This value is conveyed to {@link #tryRelease} but is
- * otherwise uninterpreted and can represent anything
- * you like.
- * @return the value returned from {@link #tryRelease}
+ * This method can be used to implement method {@link Lock#unlock}.
+ *
+ * @param arg the release argument. This value is conveyed to
+ * {@link #tryRelease} but is otherwise uninterpreted and
+ * can represent anything you like.
+ * @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
- if (h != null && h.waitStatus != 0)
+ if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
@@ -1112,11 +1225,11 @@
* first invoking at least once {@link #tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
- * #tryAcquireShared} until success.
- * @param arg the acquire argument.
- * This value is conveyed to {@link #tryAcquireShared} but is
- * otherwise uninterpreted and can represent anything
- * you like.
+ * #tryAcquireShared} until success.
+ *
+ * @param arg the acquire argument. This value is conveyed to
+ * {@link #tryAcquireShared} but is otherwise uninterpreted
+ * and can represent anything you like.
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
@@ -1129,8 +1242,8 @@
* {@link #tryAcquireShared}, returning on success. Otherwise the
* thread is queued, possibly repeatedly blocking and unblocking,
* invoking {@link #tryAcquireShared} until success or the thread
- * is interrupted.
- * @param arg the acquire argument.
+ * is interrupted.
+ * @param arg the acquire argument
* This value is conveyed to {@link #tryAcquireShared} but is
* otherwise uninterpreted and can represent anything
* you like.
@@ -1141,7 +1254,7 @@
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
- }
+ }
/**
* Attempts to acquire in shared mode, aborting if interrupted, and
@@ -1151,35 +1264,33 @@
* thread is queued, possibly repeatedly blocking and unblocking,
* invoking {@link #tryAcquireShared} until success or the thread
* is interrupted or the timeout elapses.
- * @param arg the acquire argument.
- * This value is conveyed to {@link #tryAcquireShared} but is
- * otherwise uninterpreted and can represent anything
- * you like.
+ *
+ * @param arg the acquire argument. This value is conveyed to
+ * {@link #tryAcquireShared} but is otherwise uninterpreted
+ * and can represent anything you like.
* @param nanosTimeout the maximum number of nanoseconds to wait
- * @return true if acquired; false if timed out
+ * @return {@code true} if acquired; {@code false} if timed out
* @throws InterruptedException if the current thread is interrupted
*/
- public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- return tryAcquireShared(arg) >= 0 ||
- doAcquireSharedNanos(arg, nanosTimeout);
- }
+ public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
+ if (Thread.interrupted())
+ throw new InterruptedException();
+ return tryAcquireShared(arg) >= 0 ||
+ doAcquireSharedNanos(arg, nanosTimeout);
+ }
/**
* Releases in shared mode. Implemented by unblocking one or more
- * threads if {@link #tryReleaseShared} returns true.
- * @param arg the release argument.
- * This value is conveyed to {@link #tryReleaseShared} but is
- * otherwise uninterpreted and can represent anything
- * you like.
- * @return the value returned from {@link #tryReleaseShared}
+ * threads if {@link #tryReleaseShared} returns true.
+ *
+ * @param arg the release argument. This value is conveyed to
+ * {@link #tryReleaseShared} but is otherwise uninterpreted
+ * and can represent anything you like.
+ * @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
+ doReleaseShared();
return true;
}
return false;
@@ -1190,16 +1301,15 @@
/**
* Queries whether any threads are waiting to acquire. Note that
* because cancellations due to interrupts and timeouts may occur
- * at any time, a <tt>true</tt> return does not guarantee that any
+ * at any time, a {@code true} return does not guarantee that any
* other thread will ever acquire.
*
- * <p> In this implementation, this operation returns in
+ * <p>In this implementation, this operation returns in
* constant time.
*
- * @return true if there may be other threads waiting to acquire
- * the lock.
+ * @return {@code true} if there may be other threads waiting to acquire
*/
- public final boolean hasQueuedThreads() {
+ public final boolean hasQueuedThreads() {
return head != tail;
}
@@ -1207,10 +1317,10 @@
* Queries whether any threads have ever contended to acquire this
* synchronizer; that is if an acquire method has ever blocked.
*
- * <p> In this implementation, this operation returns in
+ * <p>In this implementation, this operation returns in
* constant time.
*
- * @return true if there has ever been contention
+ * @return {@code true} if there has ever been contention
*/
public final boolean hasContended() {
return head != null;
@@ -1218,18 +1328,18 @@
/**
* Returns the first (longest-waiting) thread in the queue, or
- * <tt>null</tt> if no threads are currently queued.
+ * {@code null} if no threads are currently queued.
*
- * <p> In this implementation, this operation normally returns in
+ * <p>In this implementation, this operation normally returns in
* constant time, but may iterate upon contention if other threads are
* concurrently modifying the queue.
*
* @return the first (longest-waiting) thread in the queue, or
- * <tt>null</tt> if no threads are currently queued.
+ * {@code null} if no threads are currently queued
*/
public final Thread getFirstQueuedThread() {
// handle only fast path, else relay
- return (head == tail)? null : fullGetFirstQueuedThread();
+ return (head == tail) ? null : fullGetFirstQueuedThread();
}
/**
@@ -1237,57 +1347,49 @@
*/
private Thread fullGetFirstQueuedThread() {
/*
- * This loops only if the queue changes while we read sets of
- * fields.
- */
- for (;;) {
- Node h = head;
- if (h == null) // No queue
- return null;
+ * The first node is normally head.next. Try to get its
+ * thread field, ensuring consistent reads: If thread
+ * field is nulled out or s.prev is no longer head, then
+ * some other thread(s) concurrently performed setHead in
+ * between some of our reads. We try this twice before
+ * resorting to traversal.
+ */
+ Node h, s;
+ Thread st;
+ if (((h = head) != null && (s = h.next) != null &&
+ s.prev == head && (st = s.thread) != null) ||
+ ((h = head) != null && (s = h.next) != null &&
+ s.prev == head && (st = s.thread) != null))
+ return st;
- /*
- * The first node is normally h.next. Try to get its
- * thread field, ensuring consistent reads: If thread
- * field is nulled out or s.prev is no longer head, then
- * some other thread(s) concurrently performed setHead in
- * between some of our reads, so we must reread.
- */
- Node s = h.next;
- if (s != null) {
- Thread st = s.thread;
- Node sp = s.prev;
- if (st != null && sp == head)
- return st;
- }
-
- /*
- * Head's next field might not have been set yet, or may
- * have been unset after setHead. So we must check to see
- * if tail is actually first node, in almost the same way
- * as above.
- */
- Node t = tail;
- if (t == h) // Empty queue
- return null;
+ /*
+ * Head's next field might not have been set yet, or may have
+ * been unset after setHead. So we must check to see if tail
+ * is actually first node. If not, we continue on, safely
+ * traversing from tail back to head to find first,
+ * guaranteeing termination.
+ */
- if (t != null) {
- Thread tt = t.thread;
- Node tp = t.prev;
- if (tt != null && tp == head)
- return tt;
- }
+ Node t = tail;
+ Thread firstThread = null;
+ while (t != null && t != head) {
+ Thread tt = t.thread;
+ if (tt != null)
+ firstThread = tt;
+ t = t.prev;
}
+ return firstThread;
}
/**
- * Returns true if the given thread is currently queued.
+ * Returns true if the given thread is currently queued.
*
- * <p> This implementation traverses the queue to determine
+ * <p>This implementation traverses the queue to determine
* presence of the given thread.
*
* @param thread the thread
- * @return true if the given thread in on the queue
- * @throws NullPointerException if thread null
+ * @return {@code true} if the given thread is on the queue
+ * @throws NullPointerException if the thread is null
*/
public final boolean isQueued(Thread thread) {
if (thread == null)
@@ -1298,6 +1400,77 @@
return false;
}
+ /**
+ * Returns {@code true} if the apparent first queued thread, if one
+ * exists, is waiting in exclusive mode. If this method returns
+ * {@code true}, and the current thread is attempting to acquire in
+ * shared mode (that is, this method is invoked from {@link
+ * #tryAcquireShared}) then it is guaranteed that the current thread
+ * is not the first queued thread. Used only as a heuristic in
+ * ReentrantReadWriteLock.
+ */
+ final boolean apparentlyFirstQueuedIsExclusive() {
+ Node h, s;
+ return (h = head) != null &&
+ (s = h.next) != null &&
+ !s.isShared() &&
+ s.thread != null;
+ }
+
+ /**
+ * Queries whether any threads have been waiting to acquire longer
+ * than the current thread.
+ *
+ * <p>An invocation of this method is equivalent to (but may be
+ * more efficient than):
+ * <pre> {@code
+ * getFirstQueuedThread() != Thread.currentThread() &&
+ * hasQueuedThreads()}</pre>
+ *
+ * <p>Note that because cancellations due to interrupts and
+ * timeouts may occur at any time, a {@code true} return does not
+ * guarantee that some other thread will acquire before the current
+ * thread. Likewise, it is possible for another thread to win a
+ * race to enqueue after this method has returned {@code false},
+ * due to the queue being empty.
+ *
+ * <p>This method is designed to be used by a fair synchronizer to
+ * avoid <a href="AbstractQueuedSynchronizer#barging">barging</a>.
+ * Such a synchronizer's {@link #tryAcquire} method should return
+ * {@code false}, and its {@link #tryAcquireShared} method should
+ * return a negative value, if this method returns {@code true}
+ * (unless this is a reentrant acquire). For example, the {@code
+ * tryAcquire} method for a fair, reentrant, exclusive mode
+ * synchronizer might look like this:
+ *
+ * <pre> {@code
+ * protected boolean tryAcquire(int arg) {
+ * if (isHeldExclusively()) {
+ * // A reentrant acquire; increment hold count
+ * return true;
+ * } else if (hasQueuedPredecessors()) {
+ * return false;
+ * } else {
+ * // try to acquire normally
+ * }
+ * }}</pre>
+ *
+ * @return {@code true} if there is a queued thread preceding the
+ * current thread, and {@code false} if the current thread
+ * is at the head of the queue or the queue is empty
+ */
+ final boolean hasQueuedPredecessors() {
+ // The correctness of this depends on head being initialized
+ // before tail and on head.next being accurate if the current
+ // thread is first in queue.
+ Node t = tail; // Read fields in reverse initialization order
+ Node h = head;
+ Node s;
+ return h != t &&
+ ((s = h.next) == null || s.thread != Thread.currentThread());
+ }
+
+
// Instrumentation and monitoring methods
/**
@@ -1308,7 +1481,7 @@
* monitoring system state, not for synchronization
* control.
*
- * @return the estimated number of threads waiting for this lock
+ * @return the estimated number of threads waiting to acquire
*/
public final int getQueueLength() {
int n = 0;
@@ -1327,6 +1500,7 @@
* returned collection are in no particular order. This method is
* designed to facilitate construction of subclasses that provide
* more extensive monitoring facilities.
+ *
* @return the collection of threads
*/
public final Collection<Thread> getQueuedThreads() {
@@ -1344,6 +1518,7 @@
* acquire in exclusive mode. This has the same properties
* as {@link #getQueuedThreads} except that it only returns
* those threads waiting due to an exclusive acquire.
+ *
* @return the collection of threads
*/
public final Collection<Thread> getExclusiveQueuedThreads() {
@@ -1363,6 +1538,7 @@
* acquire in shared mode. This has the same properties
* as {@link #getQueuedThreads} except that it only returns
* those threads waiting due to a shared acquire.
+ *
* @return the collection of threads
*/
public final Collection<Thread> getSharedQueuedThreads() {
@@ -1378,18 +1554,18 @@
}
/**
- * Returns a string identifying this synchronizer, as well as its
- * state. The state, in brackets, includes the String "State
- * =" followed by the current value of {@link #getState}, and
- * either "nonempty" or "empty" depending on
- * whether the queue is empty.
+ * Returns a string identifying this synchronizer, as well as its state.
+ * The state, in brackets, includes the String {@code "State ="}
+ * followed by the current value of {@link #getState}, and either
+ * {@code "nonempty"} or {@code "empty"} depending on whether the
+ * queue is empty.
*
- * @return a string identifying this synchronizer, as well as its state.
+ * @return a string identifying this synchronizer, as well as its state
*/
public String toString() {
int s = getState();
- String q = hasQueuedThreads()? "non" : "";
- return super.toString() +
+ String q = hasQueuedThreads() ? "non" : "";
+ return super.toString() +
"[State = " + s + ", " + q + "empty queue]";
}
@@ -1416,7 +1592,7 @@
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
- }
+ }
/**
* Returns true if node is on sync queue by searching backwards from tail.
@@ -1424,7 +1600,7 @@
* @return true if present
*/
private boolean findNodeFromTail(Node node) {
- Node t = tail;
+ Node t = tail;
for (;;) {
if (t == node)
return true;
@@ -1435,7 +1611,7 @@
}
/**
- * Transfers a node from a condition queue onto sync queue.
+ * Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
@@ -1455,8 +1631,8 @@
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
- int c = p.waitStatus;
- if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))
+ int ws = p.waitStatus;
+ if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
@@ -1465,9 +1641,8 @@
* Transfers node, if necessary, to sync queue after a cancelled
* wait. Returns true if thread was cancelled before being
* signalled.
- * @param current the waiting thread
* @param node its node
- * @return true if cancelled before the node was signalled.
+ * @return true if cancelled before the node was signalled
*/
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
@@ -1480,39 +1655,42 @@
* incomplete transfer is both rare and transient, so just
* spin.
*/
- while (!isOnSyncQueue(node))
+ while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
/**
- * Invoke release with current state value; return saved state.
- * Cancel node and throw exception on failure.
+ * Invokes release with current state value; returns saved state.
+ * Cancels node and throws exception on failure.
* @param node the condition node for this wait
* @return previous sync state
*/
final int fullyRelease(Node node) {
+ boolean failed = true;
try {
int savedState = getState();
- if (release(savedState))
+ if (release(savedState)) {
+ failed = false;
return savedState;
- } catch(RuntimeException ex) {
- node.waitStatus = Node.CANCELLED;
- throw ex;
+ } else {
+ throw new IllegalMonitorStateException();
+ }
+ } finally {
+ if (failed)
+ node.waitStatus = Node.CANCELLED;
}
- // reach here if release fails
- node.waitStatus = Node.CANCELLED;
- throw new IllegalMonitorStateException();
}
// Instrumentation methods for conditions
/**
- * Queries whether the given ConditionObject
+ * Queries whether the given ConditionObject
* uses this synchronizer as its lock.
+ *
* @param condition the condition
* @return <tt>true</tt> if owned
- * @throws NullPointerException if condition null
+ * @throws NullPointerException if the condition is null
*/
public final boolean owns(ConditionObject condition) {
if (condition == null)
@@ -1527,14 +1705,15 @@
* does not guarantee that a future <tt>signal</tt> will awaken
* any threads. This method is designed primarily for use in
* monitoring of the system state.
+ *
* @param condition the condition
- * @return <tt>true</tt> if there are any waiting threads.
- * @throws IllegalMonitorStateException if exclusive synchronization
- * is not held
+ * @return <tt>true</tt> if there are any waiting threads
+ * @throws IllegalMonitorStateException if exclusive synchronization
+ * is not held
* @throws IllegalArgumentException if the given condition is
- * not associated with this synchronizer
- * @throws NullPointerException if condition null
- */
+ * not associated with this synchronizer
+ * @throws NullPointerException if the condition is null
+ */
public final boolean hasWaiters(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
@@ -1548,14 +1727,15 @@
* estimate serves only as an upper bound on the actual number of
* waiters. This method is designed for use in monitoring of the
* system state, not for synchronization control.
+ *
* @param condition the condition
- * @return the estimated number of waiting threads.
- * @throws IllegalMonitorStateException if exclusive synchronization
- * is not held
+ * @return the estimated number of waiting threads
+ * @throws IllegalMonitorStateException if exclusive synchronization
+ * is not held
* @throws IllegalArgumentException if the given condition is
- * not associated with this synchronizer
- * @throws NullPointerException if condition null
- */
+ * not associated with this synchronizer
+ * @throws NullPointerException if the condition is null
+ */
public final int getWaitQueueLength(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
@@ -1568,14 +1748,15 @@
* synchronizer. Because the actual set of threads may change
* dynamically while constructing this result, the returned
* collection is only a best-effort estimate. The elements of the
- * returned collection are in no particular order.
+ * returned collection are in no particular order.
+ *
* @param condition the condition
* @return the collection of threads
- * @throws IllegalMonitorStateException if exclusive synchronization
- * is not held
+ * @throws IllegalMonitorStateException if exclusive synchronization
+ * is not held
* @throws IllegalArgumentException if the given condition is
- * not associated with this synchronizer
- * @throws NullPointerException if condition null
+ * not associated with this synchronizer
+ * @throws NullPointerException if the condition is null
*/
public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
if (!owns(condition))
@@ -1588,14 +1769,14 @@
* AbstractQueuedSynchronizer} serving as the basis of a {@link
* Lock} implementation.
*
- * <p> Method documentation for this class describes mechanics,
+ * <p>Method documentation for this class describes mechanics,
* not behavioral specifications from the point of view of Lock
* and Condition users. Exported versions of this class will in
* general need to be accompanied by documentation describing
* condition semantics that rely on those of the associated
* <tt>AbstractQueuedSynchronizer</tt>.
- *
- * <p> This class is Serializable, but all fields are transient,
+ *
+ * <p>This class is Serializable, but all fields are transient,
* so deserialized conditions have no waiters.
*/
public class ConditionObject implements Condition, java.io.Serializable {
@@ -1613,29 +1794,34 @@
// Internal methods
/**
- * Add a new waiter to wait queue
+ * Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
- Node node = new Node(Thread.currentThread(), Node.CONDITION);
Node t = lastWaiter;
- if (t == null)
+ // If lastWaiter is cancelled, clean out.
+ if (t != null && t.waitStatus != Node.CONDITION) {
+ unlinkCancelledWaiters();
+ t = lastWaiter;
+ }
+ Node node = new Node(Thread.currentThread(), Node.CONDITION);
+ if (t == null)
firstWaiter = node;
- else
+ else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
/**
- * Remove and transfer nodes until hit non-cancelled one or
+ * Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
- if ( (firstWaiter = first.nextWaiter) == null)
+ if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
@@ -1643,11 +1829,11 @@
}
/**
- * Remove and transfer all nodes.
+ * Removes and transfers all nodes.
* @param first (non-null) the first node on condition queue
*/
private void doSignalAll(Node first) {
- lastWaiter = firstWaiter = null;
+ lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
@@ -1656,45 +1842,81 @@
} while (first != null);
}
+ /**
+ * Unlinks cancelled waiter nodes from condition queue.
+ * Called only while holding lock. This is called when
+ * cancellation occurred during condition wait, and upon
+ * insertion of a new waiter when lastWaiter is seen to have
+ * been cancelled. This method is needed to avoid garbage
+ * retention in the absence of signals. So even though it may
+ * require a full traversal, it comes into play only when
+ * timeouts or cancellations occur in the absence of
+ * signals. It traverses all nodes rather than stopping at a
+ * particular target to unlink all pointers to garbage nodes
+ * without requiring many re-traversals during cancellation
+ * storms.
+ */
+ private void unlinkCancelledWaiters() {
+ Node t = firstWaiter;
+ Node trail = null;
+ while (t != null) {
+ Node next = t.nextWaiter;
+ if (t.waitStatus != Node.CONDITION) {
+ t.nextWaiter = null;
+ if (trail == null)
+ firstWaiter = next;
+ else
+ trail.nextWaiter = next;
+ if (next == null)
+ lastWaiter = trail;
+ }
+ else
+ trail = t;
+ t = next;
+ }
+ }
+
// public methods
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
+ *
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
- * returns false
+ * returns {@code false}
*/
public final void signal() {
- if (!isHeldExclusively())
+ if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
-
+
/**
* Moves all threads from the wait queue for this condition to
* the wait queue for the owning lock.
+ *
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
- * returns false
+ * returns {@code false}
*/
public final void signalAll() {
- if (!isHeldExclusively())
+ if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
- if (first != null)
+ if (first != null)
doSignalAll(first);
}
/**
* Implements uninterruptible condition wait.
* <ol>
- * <li> Save lock state returned by {@link #getState}
- * <li> Invoke {@link #release} with
- * saved state as argument, throwing
- * IllegalMonitorStateException if it fails.
- * <li> Block until signalled
+ * <li> Save lock state returned by {@link #getState}.
+ * <li> Invoke {@link #release} with
+ * saved state as argument, throwing
+ * IllegalMonitorStateException if it fails.
+ * <li> Block until signalled.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* </ol>
@@ -1705,7 +1927,7 @@
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park();
- if (Thread.interrupted())
+ if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
@@ -1725,47 +1947,47 @@
private static final int THROW_IE = -1;
/**
- * Check for interrupt, returning THROW_IE if interrupted
+ * Checks for interrupt, returning THROW_IE if interrupted
* before signalled, REINTERRUPT if after signalled, or
* 0 if not interrupted.
*/
private int checkInterruptWhileWaiting(Node node) {
- return (Thread.interrupted()) ?
- ((transferAfterCancelledWait(node))? THROW_IE : REINTERRUPT) :
+ return Thread.interrupted() ?
+ (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
/**
- * Throw InterruptedException, reinterrupt current thread, or
- * do nothing, depending on mode.
+ * Throws InterruptedException, reinterrupts current thread, or
+ * does nothing, depending on mode.
*/
- private void reportInterruptAfterWait(int interruptMode)
+ private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
- Thread.currentThread().interrupt();
+ selfInterrupt();
}
/**
* Implements interruptible condition wait.
* <ol>
- * <li> If current thread is interrupted, throw InterruptedException
- * <li> Save lock state returned by {@link #getState}
- * <li> Invoke {@link #release} with
- * saved state as argument, throwing
- * IllegalMonitorStateException if it fails.
- * <li> Block until signalled or interrupted
+ * <li> If current thread is interrupted, throw InterruptedException.
+ * <li> Save lock state returned by {@link #getState}.
+ * <li> Invoke {@link #release} with
+ * saved state as argument, throwing
+ * IllegalMonitorStateException if it fails.
+ * <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
- * <li> If interrupted while blocked in step 4, throw exception
+ * <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*
* @throws InterruptedException if the current thread is interrupted (and
* interruption of thread suspension is supported).
*/
public final void await() throws InterruptedException {
- if (Thread.interrupted())
+ if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
@@ -1777,6 +1999,8 @@
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
+ if (node.nextWaiter != null) // clean up if cancelled
+ unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
@@ -1784,15 +2008,15 @@
/**
* Implements timed condition wait.
* <ol>
- * <li> If current thread is interrupted, throw InterruptedException
- * <li> Save lock state returned by {@link #getState}
- * <li> Invoke {@link #release} with
- * saved state as argument, throwing
- * IllegalMonitorStateException if it fails.
- * <li> Block until signalled, interrupted, or timed out
+ * <li> If current thread is interrupted, throw InterruptedException.
+ * <li> Save lock state returned by {@link #getState}.
+ * <li> Invoke {@link #release} with
[... 293 lines stripped ...]