You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/05/21 22:46:50 UTC
svn commit: r1125811 - in /activemq/activemq-cpp/trunk/activemq-cpp/src:
main/decaf/util/concurrent/locks/ test/decaf/util/concurrent/locks/
Author: tabish
Date: Sat May 21 20:46:50 2011
New Revision: 1125811
URL: http://svn.apache.org/viewvc?rev=1125811&view=rev
Log:
Working AbstractQueuedSynchronizer implementation.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/locks/AbstractQueuedSynchronizerTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/locks/AbstractQueuedSynchronizerTest.h
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp?rev=1125811&r1=1125810&r2=1125811&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp Sat May 21 20:46:50 2011
@@ -25,8 +25,9 @@
#include <decaf/util/ArrayList.h>
#include <decaf/util/concurrent/locks/LockSupport.h>
-#include <decaf/util/concurrent/atomic/AtomicInteger.h>
#include <decaf/util/concurrent/atomic/AtomicReference.h>
+#include <decaf/internal/util/concurrent/Atomics.h>
+#include <decaf/internal/util/concurrent/PlatformThread.h>
using namespace decaf;
using namespace decaf::lang;
@@ -35,6 +36,7 @@ using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace decaf::util::concurrent::atomic;
using namespace decaf::util::concurrent::locks;
+using namespace decaf::internal::util::concurrent;
////////////////////////////////////////////////////////////////////////////////
namespace {
@@ -42,6 +44,32 @@ namespace {
class Node {
public:
+ /**
+ * Status field, taking on only the values:
+ * SIGNAL: The successor of this node is (or will soon be) blocked (via park),
+ * so the current node must unpark its successor when it releases or
+ * cancels. To avoid races, acquire methods must first indicate they
+ * need a signal, then retry the atomic acquire, and then, on failure,
+ * block.
+ * CANCELLED: This node is canceled due to timeout or interrupt. Nodes never leave
+ * this state. In particular, a thread with canceled node never again
+ * blocks.
+ * CONDITION: This node is currently on a condition queue. It will not be used as a
+ * sync queue node until transferred, at which time the status will be
+ * set to 0. (Use of this value here has nothing to do with the other
+ * uses of the field, but simplifies mechanics.)
+ * PROPAGATE: A releaseShared should be propagated to other nodes. This is set
+ * (for head node only) in doReleaseShared to ensure propagation
+ * continues, even if other operations have since intervened.
+ * 0: None of the above
+ *
+ * The values are arranged numerically to simplify use. Non-negative values mean that
+ * a node doesn't need to signal. So, most code doesn't need to check for particular
+ * values, just for sign.
+ *
+ * The field is initialized to 0 for normal sync nodes, and CONDITION for condition nodes.
+ * It is modified using CAS (or when possible, unconditional volatile writes).
+ */
enum WaitStatus {
CANCELLED = 1,
SIGNAL = -1,
@@ -54,11 +82,46 @@ namespace {
public:
- AtomicInteger waitStatus;
- AtomicReference<Node> prev;
- AtomicReference<Node> next;
+ int waitStatus;
+
+ /**
+ * Link to predecessor node that current node/thread relies on
+ * for checking waitStatus. Assigned during enqueing, and nulled
+ * out. Also, upon cancellation of a predecessor, we short-circuit
+ * while finding a non-canceled one, which will always exist because
+ * the head node is never canceled: A node becomes head only as a
+ * result of successful acquire. A canceled thread never succeeds
+ * in acquiring, and a thread only cancels itself, not any other node.
+ */
+ Node* prev;
+
+ /**
+ * Link to the successor node that the current node/thread
+ * unparks upon release. Assigned during enqueuing, adjusted
+ * when bypassing canceled predecessors, and nulled out when
+ * dequeued. The enq operation does not assign next field of
+ * a predecessor until after attachment, so seeing a NULL next
+ * field does not necessarily mean that node is at end of queue.
+ * However, if a next field appears to be NULL, we can scan
+ * prev's from the tail to double-check.
+ */
+ Node* next;
+
+ /**
+ * The thread that created this Node as is waiting to acquire the
+ * lock.
+ */
Thread* thread;
- AtomicReference<Node> nextWaiter;
+
+ /**
+ * Link to next node waiting on condition, or the special value SHARED.
+ * Because condition queues are accessed only when holding in exclusive
+ * mode, we just need a simple linked queue to hold nodes while they are
+ * waiting on conditions. They are then transferred to the queue to
+ * re-acquire. And because conditions can only be exclusive, we save a
+ * field by using special value to indicate shared mode.
+ */
+ Node* nextWaiter;
public:
@@ -69,16 +132,14 @@ namespace {
Node(Thread* thread, int waitStatus) : waitStatus(waitStatus), prev(NULL), next(NULL), thread(thread), nextWaiter(NULL) {
}
- ~Node() {
- std::cout << "Deleted Node: " << std::hex << this << std::endl;
- }
+ ~Node() {}
bool isShared() const {
- return this->nextWaiter.get() == &SHARED;
+ return this->nextWaiter == &SHARED;
}
Node* predecessor() {
- Node* p = prev.get();
+ Node* p = prev;
if (p == NULL) {
throw NullPointerException();
} else {
@@ -104,65 +165,123 @@ namespace locks {
class SynchronizerState {
public:
+ /**
+ * The object that owns this one, allows this object to call back into its parent
+ * as well as checking for exclusive ownership of Conditions.
+ */
AbstractQueuedSynchronizer* parent;
- AtomicInteger state;
+
+ /**
+ * The Sync state, subclasses can get / set this value to indicate when a Thread
+ * can acquire the lock or must wait.
+ */
+ volatile int state;
+
+ /**
+ * Platform level R/W lock. Because we don't implement a garbage collected Node
+ * scheme we can't just use atomic operations on the Node pointers so in cases where
+ * we operate on the list of Nodes to remove canceled items we must write lock the
+ * list. Likewise in cases where we are iterating through the list to collect
+ * statistics we must ensure that a Node won't suddenly become invalid so we must
+ * hold a read lock.
+ */
+ decaf_rwmutex_t rwLock;
+
+ /**
+ * Head of the wait queue, lazily initialized. Except for initialization,
+ * it is modified only via method setHead. Note: If head exists, its
+ * waitStatus is guaranteed not to be CANCELLED.
+ */
AtomicReference<Node> head;
+
+ /**
+ * Tail of the wait queue, lazily initialized. Modified only via method
+ * enq to add new wait node.
+ */
AtomicReference<Node> tail;
public:
- SynchronizerState(AbstractQueuedSynchronizer* parent) : parent(parent), state(0), head(), tail() {}
+ SynchronizerState(AbstractQueuedSynchronizer* parent) : parent(parent), state(0), rwLock(), head(), tail() {
+ PlatformThread::createRWMutex(&rwLock);
+ }
virtual ~SynchronizerState() {
- Node* oldHead = head.getAndSet(NULL);
- if (oldHead != tail.get()) {
- delete tail.get();
+
+ while (tail.get() != NULL) {
+ delete tail.getAndSet(tail.get()->prev);
}
- delete oldHead;
+
+ PlatformThread::destroyRWMutex(rwLock);
}
bool isHeldExclusively() const {
return this->parent->isHeldExclusively();
}
+ /**
+ * Enqueue of a Node is Atomic with respect to the end of the list, so no
+ * locking needs to occur here. If the head and tail have not been allocated
+ * we just need to account for contention of two or more enqueues on the
+ * addition of the new head.
+ *
+ * @param node
+ * The new node to add.
+ */
Node* enq(Node* node) {
for (;;) {
Node* t = tail.get();
if (t == NULL) { // Must initialize
Node* newHead = new Node();
- std::cout << "Enq first call, allocated head = " << std::hex << newHead << std::endl;
- if (head.compareAndSet(NULL, newHead)) {
+ if (compareAndSetHead(newHead)) {
tail.set(head.get());
+ } else {
+ delete newHead;
}
} else {
- node->prev.set(t);
- if (tail.compareAndSet(t, node)) {
- t->next.set(node);
+ node->prev = t;
+ if (compareAndSetTail(t, node)) {
+ t->next = node;
return t;
}
}
}
}
+ /**
+ * Since we can append itself in one atomic step we don't lock here. If we
+ * can't get the fast append done we will enter into the longer looping
+ * enqueue method.
+ *
+ * @param node
+ * The new Node to add.
+ */
Node* addWaiter(Node* mode) {
Node* node = new Node(Thread::currentThread(), mode);
- std::cout << "Add Waiter Allocated Node: " << std::hex << node << std::endl;
- // Try the fast path of enq; backup to full enq on failure
Node* pred = tail.get();
if (pred != NULL) {
- node->prev.set(pred);
- if (tail.compareAndSet(pred, node)) {
- pred->next.set(node);
+ node->prev = pred;
+ if (compareAndSetTail(pred, node)) {
+ pred->next = node;
return node;
}
}
+
enq(node);
return node;
}
+ /**
+ * The only place head is altered, we NULL out prev since that Node will be
+ * Destroyed or pooled after this, but leave next alone since it should still
+ * be valid.
+ *
+ * @param node
+ * The Node that is to become the new Head of the queue.
+ */
void setHead(Node* node) {
head.set(node);
node->thread = NULL;
- node->prev.set(NULL);
+ node->prev = NULL;
}
void unparkSuccessor(Node* node) {
@@ -171,29 +290,35 @@ namespace locks {
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
- int ws = node->waitStatus.get();
+ int ws = node->waitStatus;
if (ws < 0) {
- node->waitStatus.compareAndSet(ws, 0);
+ compareAndSetWaitStatus(node, ws, 0);
}
+ // We need to lock to prevent cancellation of a Node from
+ // altering the list as we iterate and check Node status fields.
+ PlatformThread::readerLockMutex(this->rwLock);
+
/*
* Thread to unpark is held in successor, which is normally
- * just the next node. But if cancelled or apparently NULL,
+ * just the next node. But if canceled or apparently NULL,
* traverse backwards from tail to find the actual
- * non-cancelled successor.
+ * non-canceled successor.
*/
- Node* s = node->next.get();
- if (s == NULL || s->waitStatus.get() > 0) {
+ Node* s = node->next;
+ if (s == NULL || s->waitStatus > 0) {
s = NULL;
- for (Node* t = tail.get(); t != NULL && t != node; t = t->prev.get())
- if (t->waitStatus.get() <= 0) {
+ for (Node* t = tail.get(); t != NULL && t != node; t = t->prev)
+ if (t->waitStatus <= 0) {
s = t;
}
}
if (s != NULL) {
- LockSupport::unpark(s->thread);
+ LockSupport::unpark((Thread*)s->thread);
}
+
+ PlatformThread::unlockRWMutex(this->rwLock);
}
/**
@@ -202,6 +327,11 @@ namespace locks {
* to calling unparkSuccessor of head if it needs signal.)
*/
void doReleaseShared() {
+
+ // Here we have to read lock because head could change when a
+ // different thread does its release shared.
+ PlatformThread::readerLockMutex(this->rwLock);
+
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
@@ -216,13 +346,13 @@ namespace locks {
for (;;) {
Node* h = head.get();
if (h != NULL && h != tail.get()) {
- int ws = h->waitStatus.get();
+ int ws = h->waitStatus;
if (ws == Node::SIGNAL) {
- if (!h->waitStatus.compareAndSet(Node::SIGNAL, 0)) {
+ if (!compareAndSetWaitStatus(h, Node::SIGNAL, 0)) {
continue; // loop to recheck cases
}
unparkSuccessor(h);
- } else if (ws == 0 && !h->waitStatus.compareAndSet(0, Node::PROPAGATE)) {
+ } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node::PROPAGATE)) {
continue; // loop on failed CAS
}
}
@@ -230,6 +360,8 @@ namespace locks {
break;
}
}
+
+ PlatformThread::unlockRWMutex(this->rwLock);
}
/**
@@ -241,29 +373,33 @@ namespace locks {
* @param propagate the return value from a tryAcquireShared
*/
void setHeadAndPropagate(Node* node, int propagate) {
+
+ // Here we have to read lock because head could change when a
+ // different thread does its release shared.
+ PlatformThread::readerLockMutex(this->rwLock);
+
Node* h = head.get(); // Record old head for check below
setHead(node);
+
/*
* Try to signal next queued node if:
- * Propagation was indicated by caller,
- * or was recorded (as h.waitStatus) by a previous operation
- * (note: this uses sign-check of waitStatus because
- * PROPAGATE status may transition to SIGNAL.)
- * and
- * The next node is waiting in shared mode,
- * or we don't know, because it appears NULL
+ * Propagation was indicated by caller, or was recorded (as h.waitStatus)
+ * by a previous operation (note: this uses sign-check of waitStatus because
+ * PROPAGATE status may transition to SIGNAL.) and the next node is waiting
+ * in shared mode, or we don't know, because it appears NULL.
*
- * The conservatism in both of these checks may cause
- * unnecessary wake-ups, but only when there are multiple
- * racing acquires/releases, so most need signals now or soon
- * anyway.
+ * The conservatism in both of these checks may cause unnecessary wake-ups,
+ * but only when there are multiple racing acquires/releases, so most need
+ * signals now or soon anyway.
*/
- if (propagate > 0 || h == NULL || h->waitStatus.get() < 0) {
- Node* s = node->next.get();
+ if (propagate > 0 || h == NULL || h->waitStatus < 0) {
+ Node* s = node->next;
if (s == NULL || s->isShared()) {
doReleaseShared();
}
}
+
+ PlatformThread::unlockRWMutex(this->rwLock);
}
/**
@@ -272,6 +408,7 @@ namespace locks {
* @param node the node
*/
void cancelAcquire(Node* node) {
+
// Ignore if node doesn't exist
if (node == NULL) {
return;
@@ -279,44 +416,47 @@ namespace locks {
node->thread = NULL;
- // Skip cancelled predecessors
- Node* pred = node->prev.get();
- while (pred->waitStatus.get() > 0) {
- pred = pred->prev.get();
- node->prev.set(pred);
- }
-
- // predNext is the apparent node to unsplice. CASes below will
- // fail if not, in which case, we lost race vs another cancel
- // or signal, so no further action is necessary.
- Node* predNext = pred->next.get();
-
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
- node->waitStatus.set(Node::CANCELLED);
+ node->waitStatus = Node::CANCELLED;
// If we are the tail, remove ourselves.
- if (node == tail.get() && tail.compareAndSet(node, pred)) {
- pred->next.compareAndSet(predNext, NULL);
+ if (node == tail.get() && compareAndSetTail(node, node->prev)) {
+ // Attempt to set next on tail, this can fail if another thread can in
+ // and replaced the old tail but that's ok since that means next is up
+ // to date in that case.
+ tail.compareAndSet(node, NULL);
delete node;
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
- if (pred != head.get() &&
- ((ws = pred->waitStatus.get()) == Node::SIGNAL ||
- (ws <= 0 && pred->waitStatus.compareAndSet(ws, Node::SIGNAL))) && pred->thread != NULL) {
- Node* next = node->next.get();
- if (next != NULL && next->waitStatus.get() <= 0) {
- pred->next.compareAndSet(predNext, next);
- }
+
+ PlatformThread::writerLockMutex(this->rwLock);
+
+ // Did we become the tail.
+ if (node == tail.get() && compareAndSetTail(node, node->prev)) {
+ tail.compareAndSet(node, NULL);
} else {
+ node->prev->next = node->next;
+ node->next->prev = node->prev;
+ }
+
+ if (node->prev != head.get() &&
+ ((ws = node->prev->waitStatus) == Node::SIGNAL ||
+ (ws <= 0 && compareAndSetWaitStatus(node->prev, ws, Node::SIGNAL))) &&
+ node->prev->thread != NULL) {
+
+ PlatformThread::unlockRWMutex(this->rwLock);
+ } else {
+ PlatformThread::unlockRWMutex(this->rwLock);
unparkSuccessor(node);
}
- }
- delete node;
+ delete node;
+
+ }
}
/**
@@ -328,35 +468,38 @@ namespace locks {
* @param node the node
* @return {@code true} if thread should block
*/
- static bool shouldParkAfterFailedAcquire(Node* pred, Node* node) {
- int ws = pred->waitStatus.get();
+ bool shouldParkAfterFailedAcquire(Node* node) {
+
+ bool result = false;
+
+ PlatformThread::readerLockMutex(this->rwLock);
+
+ int ws = node->prev->waitStatus;
if (ws == Node::SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
- return true;
+ result = true;
if (ws > 0) {
/*
- * Predecessor was cancelled. Skip over predecessors and
+ * Predecessor was canceled. Skip over predecessors and
* indicate retry.
*/
- do {
- pred = pred->prev.get();
- node->prev.set(pred);
- } while (pred->waitStatus.get() > 0);
- pred->next.set(node);
+ result = false;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
- pred->waitStatus.compareAndSet(ws, Node::SIGNAL);
+ compareAndSetWaitStatus(node->prev, ws, Node::SIGNAL);
}
- return false;
+ PlatformThread::unlockRWMutex(this->rwLock);
+
+ return result;
}
/**
@@ -401,7 +544,7 @@ namespace locks {
return interrupted;
}
- if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
+ if (shouldParkAfterFailedAcquire(node) && parkAndCheckInterrupt()) {
interrupted = true;
}
}
@@ -413,6 +556,9 @@ namespace locks {
throw ex;
}
+
+ cancelAcquire(node);
+ return true;
}
/**
@@ -427,14 +573,12 @@ namespace locks {
Node* p = node->predecessor();
if (p == head.get() && parent->tryAcquire(arg)) {
setHead(node);
- std::cout << "doAcquireInterruptibly waiting thread acquired the sync" << std::endl;
delete p;
failed = false;
return;
}
- if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
- std::cout << "doAcquireInterruptibly waiting thread interrupted" << std::endl;
+ if (shouldParkAfterFailedAcquire(node) && parkAndCheckInterrupt()) {
throw InterruptedException();
}
}
@@ -444,7 +588,7 @@ namespace locks {
cancelAcquire(node);
}
- throw ex;
+ throw InterruptedException(ex);
}
}
@@ -470,10 +614,10 @@ namespace locks {
}
if (nanosTimeout <= 0) {
- return false;
+ break;
}
- if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutLimit) {
+ if (shouldParkAfterFailedAcquire(node) && nanosTimeout > spinForTimeoutLimit) {
LockSupport::parkNanos(nanosTimeout);
}
@@ -493,6 +637,9 @@ namespace locks {
throw ex;
}
+
+ cancelAcquire(node);
+ return false;
}
/**
@@ -519,17 +666,19 @@ namespace locks {
}
}
- if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
+ if (shouldParkAfterFailedAcquire(node) && parkAndCheckInterrupt()) {
interrupted = true;
}
}
- } catch(InterruptedException& ex) {
+ } catch(Exception& ex) {
if (failed) {
cancelAcquire(node);
}
throw ex;
}
+
+ cancelAcquire(node);
}
/**
@@ -551,7 +700,7 @@ namespace locks {
return;
}
}
- if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
+ if (shouldParkAfterFailedAcquire(node) && parkAndCheckInterrupt()) {
throw InterruptedException();
}
}
@@ -589,10 +738,10 @@ namespace locks {
}
}
if (nanosTimeout <= 0) {
- return false;
+ break;
}
- if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutLimit) {
+ if (shouldParkAfterFailedAcquire(node) && nanosTimeout > spinForTimeoutLimit) {
LockSupport::parkNanos(nanosTimeout);
}
@@ -611,28 +760,12 @@ namespace locks {
throw ex;
}
+
+ cancelAcquire(node);
+ return false;
}
Thread* fullGetFirstQueuedThread() {
- /*
- * The first node is normally head->next. Try to get its
- * thread field, ensuring consistent reads: If thread
- * field is nulled out or s->prev is no longer head, then
- * some other thread(s) concurrently performed setHead in
- * between some of our reads. We try this twice before
- * resorting to traversal.
- */
- Node* h = NULL;
- Node* s = NULL;
- Thread* st = NULL;
-
- if (((h = head.get()) != NULL && (s = h->next.get()) != NULL &&
- s->prev.get() == head.get() && (st = s->thread) != NULL) ||
- ((h = head.get()) != NULL && (s = h->next.get()) != NULL &&
- s->prev.get() == head.get() && (st = s->thread) != NULL)) {
-
- return st;
- }
/*
* Head's next field might not have been set yet, or may have
@@ -645,10 +778,10 @@ namespace locks {
Node* t = tail.get();
Thread* firstThread = NULL;
while (t != NULL && t != head.get()) {
- Thread* tt = t->thread;
+ Thread* tt = (Thread*)t->thread;
if (tt != NULL)
firstThread = tt;
- t = t->prev.get();
+ t = t->prev;
}
return firstThread;
}
@@ -662,11 +795,11 @@ namespace locks {
* @return true if is reacquiring
*/
bool isOnSyncQueue(Node* node) {
- if (node->waitStatus.get() == Node::CONDITION || node->prev.get() == NULL) {
+ if (node->waitStatus == Node::CONDITION || node->prev == NULL) {
return false;
}
- if (node->next.get() != NULL) { // If has successor, it must be on queue
+ if (node->next != NULL) { // If has successor, it must be on queue
return true;
}
@@ -698,50 +831,64 @@ namespace locks {
return false;
}
- t = t->prev.get();
+ t = t->prev;
}
}
/**
* Transfers a node from a condition queue onto sync queue.
- * Returns true if successful.
- * @param node the node
+ * Returns true if successful. If the node was canceled this
+ * method will delete it before returning false.
+ *
+ * @param node
+ * The node to transfer to the wait Queue
+ *
* @return true if successfully transferred (else the node was
- * cancelled before signal).
+ * canceled before signal and deleted).
*/
bool transferForSignal(Node* node) {
/*
- * If cannot change waitStatus, the node has been cancelled.
+ * If cannot change waitStatus, the node has been canceled.
*/
- if (!node->waitStatus.compareAndSet(Node::CONDITION, 0)) {
+ if (!compareAndSetWaitStatus(node, Node::CONDITION, 0)) {
return false;
}
+ // Since we need to write to our predecessor we must lock so that
+ // it doesn't leave the Queue before we are done.
+ PlatformThread::writerLockMutex(this->rwLock);
+
/*
* Splice onto queue and try to set waitStatus of predecessor to
- * indicate that thread is (probably) waiting. If cancelled or
+ * indicate that thread is (probably) waiting. If canceled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node* p = enq(node);
- int ws = p->waitStatus.get();
- if (ws > 0 || !p->waitStatus.compareAndSet(ws, Node::SIGNAL)) {
- LockSupport::unpark(node->thread);
+ int ws = p->waitStatus;
+ if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node::SIGNAL)) {
+ LockSupport::unpark((Thread*)node->thread);
}
+ PlatformThread::unlockRWMutex(this->rwLock);
+
return true;
}
/**
- * Transfers node, if necessary, to sync queue after a cancelled
- * wait. Returns true if thread was cancelled before being
- * signalled.
- * @param current the waiting thread
- * @param node its node
- * @return true if cancelled before the node was signalled
+ * Transfers node, if necessary, to sync queue after a canceled wait. Returns
+ * true if thread was canceled before being signaled. If the Node is a Condition
+ * and has been signaled already then its not added as the signal will have
+ * already done so, we must wait though until it appears on the sync queue
+ * otherwise the attempt to re-acquire the lock could throw a NullPointerException.
+ *
+ * @param node
+ * The node that is to be transferred onto the sync queue.
+ *
+ * @return true if canceled before the node was signaled
*/
bool transferAfterCancelledWait(Node* node) {
- if (node->waitStatus.compareAndSet(Node::CONDITION, 0)) {
+ if (compareAndSetWaitStatus(node, Node::CONDITION, 0)) {
enq(node);
return true;
}
@@ -760,9 +907,13 @@ namespace locks {
}
/**
- * Invokes release with current state value; returns saved state.
- * Cancels node and throws exception on failure.
- * @param node the condition node for this wait
+ * Invokes release with current state value; returns saved state. Cancels node
+ * and throws exception on failure. When a monitor state exception is thrown
+ * the Node is added to the sync queue so that it can be deleted safely later.
+ *
+ * @param node
+ * The condition node for this wait.
+ *
* @return previous sync state
*/
int fullyRelease(Node* node) {
@@ -777,30 +928,49 @@ namespace locks {
}
} catch(IllegalMonitorStateException& ex) {
if (failed) {
- node->waitStatus.set(Node::CANCELLED);
+ node->waitStatus = Node::CANCELLED;
+ // Enqueue it even though canceled so that it gets deleted
+ enq(node);
}
throw ex;
}
}
+ bool compareAndSetHead(Node* update) {
+ return this->head.compareAndSet(NULL, update);
+ }
+ bool compareAndSetTail(Node* expect, Node* update) {
+ return this->tail.compareAndSet(expect, update);
+ }
+ static bool compareAndSetWaitStatus(Node* node, int expect, int update) {
+ return Atomics::compareAndSet32(&node->waitStatus, expect, update);
+ }
+
};
/**
* Provides a default implementation that most AbstractQueuedSynchronizer derived classes
- * can use without needing to write one from scratch.
+ * can use without needing to write one from scratch. Since the methods of this Object
+ * must always be called from a locked synchronizer we can assume that only one thread
+ * at a time will read or modify the list of waiters so we don't need to lock around
+ * modifications to the list.
+ *
+ * This object creates Node instance but leaves the deletion of those objects up to the
+ * sync queue. In all cases the Node needs to be transfered to the sync queue with its
+ * state value set to zero or CANCELLED.
*/
class DefaultConditionObject : public AbstractQueuedSynchronizer::ConditionObject {
private:
SynchronizerState* impl;
- Node* firstWaiter;
- Node* lastWaiter;
+ Node* head;
+ Node* tail;
- /** Mode meaning to reinterrupt on exit from wait */
- static const int REINTERRUPT;
- /** Mode meaning to throw InterruptedException on exit from wait */
- static const int THROW_IE;
+ enum INTERRUPTION_MODE {
+ REINTERRUPT = 1, // Re-interrupt thread on exit from a wait call.
+ THROW_IE = -1 // Throw a new InterruptedException on wait call exit.
+ };
private:
@@ -810,16 +980,21 @@ namespace locks {
public:
DefaultConditionObject(SynchronizerState* impl) :
- ConditionObject(), impl(impl), firstWaiter(NULL), lastWaiter(NULL) {}
- virtual ~DefaultConditionObject() {};
+ ConditionObject(), impl(impl), head(NULL), tail(NULL) {}
+ virtual ~DefaultConditionObject() {
+ }
virtual void await() {
+
if (Thread::interrupted()) {
throw InterruptedException(__FILE__, __LINE__, "Thread was interrupted");
}
+
Node* node = addConditionWaiter();
+
int savedState = impl->fullyRelease(node);
int interruptMode = 0;
+
while (!impl->isOnSyncQueue(node)) {
LockSupport::park();
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
@@ -830,7 +1005,7 @@ namespace locks {
if (impl->acquireQueued(node, savedState) && interruptMode != THROW_IE) {
interruptMode = REINTERRUPT;
}
- if (node->nextWaiter.get() != NULL) { // clean up if cancelled
+ if (node->nextWaiter != NULL) { // clean up if canceled
unlinkCancelledWaiters();
}
if (interruptMode != 0) {
@@ -842,6 +1017,7 @@ namespace locks {
Node* node = addConditionWaiter();
int savedState = impl->fullyRelease(node);
bool interrupted = false;
+
while (!impl->isOnSyncQueue(node)) {
LockSupport::park();
if (Thread::interrupted()) {
@@ -862,6 +1038,7 @@ namespace locks {
int savedState = impl->fullyRelease(node);
long long lastTime = System::nanoTime();
int interruptMode = 0;
+
while (!impl->isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
impl->transferAfterCancelledWait(node);
@@ -880,12 +1057,13 @@ namespace locks {
if (impl->acquireQueued(node, savedState) && interruptMode != THROW_IE) {
interruptMode = REINTERRUPT;
}
- if (node->nextWaiter.get() != NULL) {
+ if (node->nextWaiter != NULL) {
unlinkCancelledWaiters();
}
if (interruptMode != 0) {
reportInterruptAfterWait(interruptMode);
}
+
return nanosTimeout - (System::nanoTime() - lastTime);
}
@@ -899,6 +1077,7 @@ namespace locks {
long long lastTime = System::nanoTime();
bool timedout = false;
int interruptMode = 0;
+
while (!impl->isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = impl->transferAfterCancelledWait(node);
@@ -918,7 +1097,7 @@ namespace locks {
if (impl->acquireQueued(node, savedState) && interruptMode != THROW_IE) {
interruptMode = REINTERRUPT;
}
- if (node->nextWaiter.get() != NULL) {
+ if (node->nextWaiter != NULL) {
unlinkCancelledWaiters();
}
if (interruptMode != 0) {
@@ -937,6 +1116,7 @@ namespace locks {
int savedState = impl->fullyRelease(node);
bool timedout = false;
int interruptMode = 0;
+
while (!impl->isOnSyncQueue(node)) {
if (System::currentTimeMillis() > abstime) {
timedout = impl->transferAfterCancelledWait(node);
@@ -951,7 +1131,7 @@ namespace locks {
if (impl->acquireQueued(node, savedState) && interruptMode != THROW_IE) {
interruptMode = REINTERRUPT;
}
- if (node->nextWaiter.get() != NULL) {
+ if (node->nextWaiter != NULL) {
unlinkCancelledWaiters();
}
if (interruptMode != 0) {
@@ -961,11 +1141,15 @@ namespace locks {
return !timedout;
}
+ /**
+ * The Node that's been waiting the longest is moved from this Conditions
+ * wait queue to that of the parent Synchronizer.
+ */
virtual void signal() {
if (!impl->isHeldExclusively()) {
throw IllegalMonitorStateException();
}
- Node* first = firstWaiter;
+ Node* first = head;
if (first != NULL) {
doSignal(first);
}
@@ -975,7 +1159,7 @@ namespace locks {
if (!impl->isHeldExclusively()) {
throw IllegalMonitorStateException();
}
- Node* first = firstWaiter;
+ Node* first = head;
if (first != NULL) {
doSignalAll(first);
}
@@ -989,8 +1173,8 @@ namespace locks {
if (!impl->isHeldExclusively()) {
throw IllegalMonitorStateException();
}
- for (Node* w = firstWaiter; w != NULL; w = w->nextWaiter.get()) {
- if (w->waitStatus.get() == Node::CONDITION) {
+ for (Node* w = head; w != NULL; w = w->nextWaiter) {
+ if (w->waitStatus == Node::CONDITION) {
return true;
}
}
@@ -1002,8 +1186,8 @@ namespace locks {
throw IllegalMonitorStateException();
}
int n = 0;
- for (Node* w = firstWaiter; w != NULL; w = w->nextWaiter.get()) {
- if (w->waitStatus.get() == Node::CONDITION) {
+ for (Node* w = head; w != NULL; w = w->nextWaiter) {
+ if (w->waitStatus == Node::CONDITION) {
++n;
}
}
@@ -1015,9 +1199,9 @@ namespace locks {
throw IllegalMonitorStateException();
}
ArrayList<Thread*>* list = new ArrayList<Thread*>();
- for (Node* w = firstWaiter; w != NULL; w = w->nextWaiter.get()) {
- if (w->waitStatus.get() == Node::CONDITION) {
- Thread* t = w->thread;
+ for (Node* w = head; w != NULL; w = w->nextWaiter) {
+ if (w->waitStatus == Node::CONDITION) {
+ Thread* t = (Thread*)w->thread;
if (t != NULL) {
list->add(t);
}
@@ -1028,82 +1212,96 @@ namespace locks {
private:
+ /**
+ * Adds a new Node to this Condition object to be used by one of the wait methods.
+ * During this addition we scan for canceled Nodes and remove them as they are
+ * now contained on the sync queue and will be removed from there. Its safe to do
+ * this here as this method is called while the sync queue is locked so no other
+ * changes can occur to the list of Nodes.
+ *
+ * @returns the newly added Node instance.
+ */
Node* addConditionWaiter() {
- Node* t = lastWaiter;
- // If lastWaiter is cancelled, clean out.
- if (t != NULL && t->waitStatus.get() != Node::CONDITION) {
+ Node* t = tail;
+ // If last Waiter is canceled, clean out.
+ if (t != NULL && t->waitStatus != Node::CONDITION) {
unlinkCancelledWaiters();
- t = lastWaiter;
+ t = tail;
}
Node* node = new Node(Thread::currentThread(), Node::CONDITION);
if (t == NULL) {
- firstWaiter = node;
+ head = node;
} else {
- t->nextWaiter.set(node);
+ t->nextWaiter = node;
}
- lastWaiter = node;
+ tail = node;
return node;
}
/**
- * Removes and transfers nodes until hit non-cancelled one or
- * NULL. Split out from signal in part to encourage compilers
- * to inline the case of no waiters.
- * @param first (non-NULL) the first node on condition queue
+ * Removes and transfers nodes until hit non-canceled one or a NULL one.
+ * or stated another way traverses the list of waiters removing canceled
+ * nodes until it finds a non-canceled one to signal.
+ *
+ * @param first (non-NULL)
+ * The first node on condition queue.
*/
void doSignal(Node* first) {
do {
- if ((firstWaiter = first->nextWaiter.get()) == NULL) {
- lastWaiter = NULL;
+ head = first->nextWaiter;
+ first->nextWaiter = NULL;
+
+ if (head == NULL) {
+ tail = NULL;
}
- first->nextWaiter.set(NULL);
- } while (!impl->transferForSignal(first) && (first = firstWaiter) != NULL);
+ } while (!impl->transferForSignal(first) && (first = head) != NULL);
}
/**
- * Removes and transfers all nodes.
- * @param first (non-NULL) the first node on condition queue
+ * Removes and transfers all nodes. Removes all Nodes from this Condition object
+ * starting from the given node pointer, canceled waiter Nodes are moved to so we don't
+ * need to track them anymore. This is safe to do here since this method must be
+ * called while the sync queue lock is held.
+ *
+ * @param first (non-NULL)
+ * The first node on condition queue to start transferring from.
*/
void doSignalAll(Node* first) {
- lastWaiter = firstWaiter = NULL;
+ head = tail = NULL;
do {
- Node* next = first->nextWaiter.get();
- first->nextWaiter.set(NULL);
+ Node* next = first->nextWaiter;
+ first->nextWaiter = NULL;
impl->transferForSignal(first);
first = next;
} while (first != NULL);
}
/**
- * Unlinks cancelled waiter nodes from condition queue.
- * Called only while holding lock. This is called when
- * cancellation occurred during condition wait, and upon
- * insertion of a new waiter when lastWaiter is seen to have
- * been cancelled. This method is needed to avoid garbage
- * retention in the absence of signals. So even though it may
- * require a full traversal, it comes into play only when
- * timeouts or cancellations occur in the absence of
- * signals. It traverses all nodes rather than stopping at a
- * particular target to unlink all pointers to garbage nodes
- * without requiring many re-traversals during cancellation
- * storms.
+ * Unlinks canceled waiter nodes from condition queue. Called only while holding
+ * lock. This is called when cancellation occurred during condition wait, and upon
+ * insertion of a new waiter when tail is seen to have been canceled. This method
+ * is needed to avoid garbage retention in the absence of signals. So even though
+ * it may require a full traversal, it comes into play only when timeouts or
+ * cancellations occur in the absence of signals. It traverses all nodes rather
+ * than stopping at a particular target to unlink all pointers to garbage nodes
+ * without requiring many re-traversals during cancellation storms.
*/
void unlinkCancelledWaiters() {
- Node* t = firstWaiter;
+ Node* t = head;
Node* trail = NULL;
while (t != NULL) {
- Node* next = t->nextWaiter.get();
- if (t->waitStatus.get() != Node::CONDITION) {
- t->nextWaiter.set(NULL);
+ Node* next = t->nextWaiter;
+ if (t->waitStatus != Node::CONDITION) {
+ t->nextWaiter = NULL;
if (trail == NULL) {
- firstWaiter = next;
+ head = next;
} else {
- trail->nextWaiter.set(next);
+ trail->nextWaiter = next;
}
if (next == NULL) {
- lastWaiter = trail;
+ tail = trail;
}
} else {
@@ -1114,17 +1312,27 @@ namespace locks {
}
/**
- * Checks for interrupt, returning THROW_IE if interrupted
- * before signalled, REINTERRUPT if after signalled, or
- * 0 if not interrupted.
+ * Checks for interrupt, returning THROW_IE if interrupted before signaled,
+ * REINTERRUPT if after signaled, or 0 if not interrupted. The canceled node
+ * is transfered to the sync queue so that its waiter can safely re-acquire the
+ * lock.
+ *
+ * @param node
+ * The node to transfer to the sync queue if interrupted.
+ *
+ * @returns value indicating if an action is needed in response to an interrupt.
*/
int checkInterruptWhileWaiting(Node* node) {
return Thread::interrupted() ? (impl->transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}
/**
- * Throws InterruptedException, reinterrupts current thread, or
- * does nothing, depending on mode.
+ * Throws InterruptedException, re-interrupts current thread, or does nothing, depending
+ * on mode passed, abstract the logic needed for all the interruptible wait methods out
+ * into a single place.
+ *
+ * @param interruptMode
+ * indicates what action is needed for interruption handling (if any).
*/
void reportInterruptAfterWait(int interruptMode) {
if (interruptMode == THROW_IE) {
@@ -1136,9 +1344,6 @@ namespace locks {
};
- const int DefaultConditionObject::REINTERRUPT = 1;
- const int DefaultConditionObject::THROW_IE = -1;
-
}}}}
////////////////////////////////////////////////////////////////////////////////
@@ -1156,17 +1361,17 @@ AbstractQueuedSynchronizer::~AbstractQue
////////////////////////////////////////////////////////////////////////////////
int AbstractQueuedSynchronizer::getState() const {
- return this->impl->state.get();
+ return this->impl->state;
}
////////////////////////////////////////////////////////////////////////////////
void AbstractQueuedSynchronizer::setState(int value) {
- this->impl->state.set(value);
+ this->impl->state = value;
}
////////////////////////////////////////////////////////////////////////////////
bool AbstractQueuedSynchronizer::compareAndSetState(int expect, int update) {
- return this->impl->state.compareAndSet(expect, update);
+ return Atomics::compareAndSet32(&this->impl->state, expect, update);
}
////////////////////////////////////////////////////////////////////////////////
@@ -1237,7 +1442,7 @@ bool AbstractQueuedSynchronizer::release
if (tryRelease(arg)) {
Node* h = this->impl->head.get();
- if (h != NULL && h->waitStatus.get() != 0) {
+ if (h != NULL && h->waitStatus != 0) {
this->impl->unparkSuccessor(h);
}
@@ -1308,82 +1513,92 @@ bool AbstractQueuedSynchronizer::isQueue
throw NullPointerException(__FILE__, __LINE__, "Passed in thread was NULL");
}
- for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev.get()) {
+ PlatformThread::readerLockMutex(this->impl->rwLock);
+
+ for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev) {
if (p->thread == thread) {
+ PlatformThread::unlockRWMutex(this->impl->rwLock);
return true;
}
}
+ PlatformThread::unlockRWMutex(this->impl->rwLock);
+
return false;
}
-// bool apparentlyFirstQueuedIsExclusive() {
-// Node h, s;
-// return (h = head) != NULL &&
-// (s = h.next) != NULL &&
-// !s.isShared() &&
-// s.thread != NULL;
-// }
-//
-// bool hasQueuedPredecessors() {
-// // The correctness of this depends on head being initialized
-// // before tail and on head.next being accurate if the current
-// // thread is first in queue.
-// Node t = tail; // Read fields in reverse initialization order
-// Node h = head;
-// Node s;
-// return h != t &&
-// ((s = h.next) == NULL || s.thread != Thread.currentThread());
-// }
-
////////////////////////////////////////////////////////////////////////////////
int AbstractQueuedSynchronizer::getQueueLength() const {
int n = 0;
- for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev.get()) {
+
+ PlatformThread::readerLockMutex(this->impl->rwLock);
+
+ for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev) {
if (p->thread != NULL) {
++n;
}
}
+
+ PlatformThread::unlockRWMutex(this->impl->rwLock);
+
return n;
}
////////////////////////////////////////////////////////////////////////////////
Collection<Thread*>* AbstractQueuedSynchronizer::getQueuedThreads() const {
ArrayList<Thread*>* list = new ArrayList<Thread*>();
- for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev.get()) {
- Thread* t = p->thread;
+
+ PlatformThread::readerLockMutex(this->impl->rwLock);
+
+ for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev) {
+ Thread* t = (Thread*)p->thread;
if (t != NULL) {
list->add(t);
}
}
+
+ PlatformThread::unlockRWMutex(this->impl->rwLock);
+
return list;
}
////////////////////////////////////////////////////////////////////////////////
Collection<Thread*>* AbstractQueuedSynchronizer::getExclusiveQueuedThreads() const {
ArrayList<Thread*>* list = new ArrayList<Thread*>();
- for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev.get()) {
+
+ PlatformThread::readerLockMutex(this->impl->rwLock);
+
+ for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev) {
if (!p->isShared()) {
- Thread* t = p->thread;
+ Thread* t = (Thread*)p->thread;
if (t != NULL) {
list->add(t);
}
}
}
+
+ PlatformThread::unlockRWMutex(this->impl->rwLock);
+
return list;
}
////////////////////////////////////////////////////////////////////////////////
Collection<Thread*>* AbstractQueuedSynchronizer::getSharedQueuedThreads() const {
ArrayList<Thread*>* list = new ArrayList<Thread*>();
- for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev.get()) {
+
+ PlatformThread::readerLockMutex(this->impl->rwLock);
+
+ for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev) {
if (p->isShared()) {
- Thread* t = p->thread;
+ Thread* t = (Thread*)p->thread;
if (t != NULL) {
list->add(t);
}
}
}
+
+ PlatformThread::unlockRWMutex(this->impl->rwLock);
+
return list;
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/locks/AbstractQueuedSynchronizerTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/locks/AbstractQueuedSynchronizerTest.cpp?rev=1125811&r1=1125810&r2=1125811&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/locks/AbstractQueuedSynchronizerTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/locks/AbstractQueuedSynchronizerTest.cpp Sat May 21 20:46:50 2011
@@ -98,11 +98,8 @@ namespace {
virtual void run() {
try {
- std::cout << "InterruptibleSyncRunnable acquireInterruptibly" << std::endl;
mutex->acquireInterruptibly(1);
- std::cout << "InterruptibleSyncRunnable was not interrupted" << std::endl;
} catch(InterruptedException& success) {
- std::cout << "InterruptibleSyncRunnable was interrupted" << std::endl;
} catch(Exception& ex) {
parent->threadUnexpectedException(ex);
} catch(std::exception& stdex) {
@@ -125,11 +122,9 @@ namespace {
void run() {
try {
- std::cout << "InterruptedSyncRunnable acquireInterruptibly" << std::endl;
mutex->acquireInterruptibly(1);
parent->threadFail("Should have been interrupted.");
} catch(InterruptedException& success) {
- std::cout << "InterruptedSyncRunnable was interrupted" << std::endl;
} catch(Exception& ex) {
parent->threadUnexpectedException(ex);
} catch(std::exception& stdex) {
@@ -224,27 +219,26 @@ void AbstractQueuedSynchronizerTest::tes
Thread t1(&iSyncRun1);
Thread t2(&iSyncRun2);
- std::cout << std::endl;
try {
CPPUNIT_ASSERT(!mutex.isQueued(&t1));
CPPUNIT_ASSERT(!mutex.isQueued(&t2));
mutex.acquire(1);
t1.start();
Thread::sleep(SHORT_DELAY_MS);
-// CPPUNIT_ASSERT(mutex.isQueued(&t1));
+ CPPUNIT_ASSERT(mutex.isQueued(&t1));
t2.start();
Thread::sleep(SHORT_DELAY_MS);
-// CPPUNIT_ASSERT(mutex.isQueued(&t1));
-// CPPUNIT_ASSERT(mutex.isQueued(&t2));
+ CPPUNIT_ASSERT(mutex.isQueued(&t1));
+ CPPUNIT_ASSERT(mutex.isQueued(&t2));
t1.interrupt();
-// Thread::sleep(SHORT_DELAY_MS);
-// CPPUNIT_ASSERT(!mutex.isQueued(&t1));
-// CPPUNIT_ASSERT(mutex.isQueued(&t2));
- mutex.release(1);
-// Thread::sleep(SHORT_DELAY_MS);
-// CPPUNIT_ASSERT(!mutex.isQueued(&t1));
-// Thread::sleep(SHORT_DELAY_MS);
-// CPPUNIT_ASSERT(!mutex.isQueued(&t2));
+ Thread::sleep(SHORT_DELAY_MS);
+ CPPUNIT_ASSERT(!mutex.isQueued(&t1));
+ CPPUNIT_ASSERT(mutex.isQueued(&t2));
+ mutex.release(1);
+ Thread::sleep(SHORT_DELAY_MS);
+ CPPUNIT_ASSERT(!mutex.isQueued(&t1));
+ Thread::sleep(SHORT_DELAY_MS);
+ CPPUNIT_ASSERT(!mutex.isQueued(&t2));
t1.join();
t2.join();
} catch(Exception& e){
@@ -705,7 +699,7 @@ void AbstractQueuedSynchronizerTest::tes
try {
mutex.acquire(1);
Date d;
- CPPUNIT_ASSERT(!c->awaitUntil((d.getTime() + 10)));
+ CPPUNIT_ASSERT(!c->awaitUntil((d.getTime() + 15)));
mutex.release(1);
} catch(Exception& ex) {
unexpectedException();
@@ -1521,9 +1515,8 @@ namespace {
try {
parent->threadAssertFalse(latch->isSignalled());
latch->acquireSharedInterruptibly(0);
- parent->threadAssertTrue(latch->isSignalled());
+ parent->threadShouldThrow();
} catch(InterruptedException& e) {
- parent->threadUnexpectedException();
}
}
};
@@ -1566,7 +1559,6 @@ namespace {
latch->tryAcquireSharedNanos(0, AbstractQueuedSynchronizerTest::SMALL_DELAY_MS* 1000 * 1000);
parent->threadShouldThrow();
} catch(InterruptedException& e) {
- parent->threadUnexpectedException();
}
}
};
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/locks/AbstractQueuedSynchronizerTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/locks/AbstractQueuedSynchronizerTest.h?rev=1125811&r1=1125810&r2=1125811&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/locks/AbstractQueuedSynchronizerTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/locks/AbstractQueuedSynchronizerTest.h Sat May 21 20:46:50 2011
@@ -30,55 +30,55 @@ namespace locks {
class AbstractQueuedSynchronizerTest : public ExecutorsTestSupport {
CPPUNIT_TEST_SUITE( AbstractQueuedSynchronizerTest );
-// CPPUNIT_TEST( testIsHeldExclusively );
-// CPPUNIT_TEST( testAcquire );
-// CPPUNIT_TEST( testTryAcquire );
-// CPPUNIT_TEST( testhasQueuedThreads );
-// CPPUNIT_TEST( testIsQueuedNPE );
+ CPPUNIT_TEST( testIsHeldExclusively );
+ CPPUNIT_TEST( testAcquire );
+ CPPUNIT_TEST( testTryAcquire );
+ CPPUNIT_TEST( testhasQueuedThreads );
+ CPPUNIT_TEST( testIsQueuedNPE );
CPPUNIT_TEST( testIsQueued );
-// CPPUNIT_TEST( testGetFirstQueuedThread );
-// CPPUNIT_TEST( testHasContended );
-// CPPUNIT_TEST( testGetQueuedThreads );
-// CPPUNIT_TEST( testGetExclusiveQueuedThreads );
-// CPPUNIT_TEST( testGetSharedQueuedThreads );
-// CPPUNIT_TEST( testInterruptedException2 );
-// CPPUNIT_TEST( testTryAcquireWhenSynced );
-// CPPUNIT_TEST( testAcquireNanosTimeout );
-// CPPUNIT_TEST( testGetState );
-// CPPUNIT_TEST( testAcquireInterruptibly1 );
-// CPPUNIT_TEST( testAcquireInterruptibly2 );
-// CPPUNIT_TEST( testOwns );
-// CPPUNIT_TEST( testAwaitIllegalMonitor );
-// CPPUNIT_TEST( testSignalIllegalMonitor );
-// CPPUNIT_TEST( testAwaitNanosTimeout );
-// CPPUNIT_TEST( testAwaitTimeout );
-// CPPUNIT_TEST( testAwaitUntilTimeout );
-// CPPUNIT_TEST( testAwait );
-// CPPUNIT_TEST( testHasWaitersNPE );
-// CPPUNIT_TEST( testGetWaitQueueLengthNPE );
-// CPPUNIT_TEST( testGetWaitingThreadsNPE );
-// CPPUNIT_TEST( testHasWaitersIAE );
-// CPPUNIT_TEST( testHasWaitersIMSE );
-// CPPUNIT_TEST( testGetWaitQueueLengthIAE );
-// CPPUNIT_TEST( testGetWaitQueueLengthIMSE );
-// CPPUNIT_TEST( testGetWaitingThreadsIAE );
-// CPPUNIT_TEST( testGetWaitingThreadsIMSE );
-// CPPUNIT_TEST( testHasWaiters );
-// CPPUNIT_TEST( testGetWaitQueueLength );
-// CPPUNIT_TEST( testGetWaitingThreads );
-// CPPUNIT_TEST( testAwaitUninterruptibly );
-// CPPUNIT_TEST( testAwaitInterrupt );
-// CPPUNIT_TEST( testAwaitNanosInterrupt );
-// CPPUNIT_TEST( testAwaitUntilInterrupt );
-// CPPUNIT_TEST( testSignalAll );
-// CPPUNIT_TEST( testToString );
-// CPPUNIT_TEST( testGetStateWithReleaseShared );
-// CPPUNIT_TEST( testReleaseShared );
-// CPPUNIT_TEST( testAcquireSharedInterruptibly );
-// CPPUNIT_TEST( testAsquireSharedTimed );
-// CPPUNIT_TEST( testAcquireSharedInterruptiblyInterruptedException );
-// CPPUNIT_TEST( testAcquireSharedNanosInterruptedException );
-// CPPUNIT_TEST( testAcquireSharedNanosTimeout );
+ CPPUNIT_TEST( testGetFirstQueuedThread );
+ CPPUNIT_TEST( testHasContended );
+ CPPUNIT_TEST( testGetQueuedThreads );
+ CPPUNIT_TEST( testGetExclusiveQueuedThreads );
+ CPPUNIT_TEST( testGetSharedQueuedThreads );
+ CPPUNIT_TEST( testInterruptedException2 );
+ CPPUNIT_TEST( testTryAcquireWhenSynced );
+ CPPUNIT_TEST( testAcquireNanosTimeout );
+ CPPUNIT_TEST( testGetState );
+ CPPUNIT_TEST( testAcquireInterruptibly1 );
+ CPPUNIT_TEST( testAcquireInterruptibly2 );
+ CPPUNIT_TEST( testOwns );
+ CPPUNIT_TEST( testAwaitIllegalMonitor );
+ CPPUNIT_TEST( testSignalIllegalMonitor );
+ CPPUNIT_TEST( testAwaitNanosTimeout );
+ CPPUNIT_TEST( testAwaitTimeout );
+ CPPUNIT_TEST( testAwaitUntilTimeout );
+ CPPUNIT_TEST( testAwait );
+ CPPUNIT_TEST( testHasWaitersNPE );
+ CPPUNIT_TEST( testGetWaitQueueLengthNPE );
+ CPPUNIT_TEST( testGetWaitingThreadsNPE );
+ CPPUNIT_TEST( testHasWaitersIAE );
+ CPPUNIT_TEST( testHasWaitersIMSE );
+ CPPUNIT_TEST( testGetWaitQueueLengthIAE );
+ CPPUNIT_TEST( testGetWaitQueueLengthIMSE );
+ CPPUNIT_TEST( testGetWaitingThreadsIAE );
+ CPPUNIT_TEST( testGetWaitingThreadsIMSE );
+ CPPUNIT_TEST( testHasWaiters );
+ CPPUNIT_TEST( testGetWaitQueueLength );
+ CPPUNIT_TEST( testGetWaitingThreads );
+ CPPUNIT_TEST( testAwaitUninterruptibly );
+ CPPUNIT_TEST( testAwaitInterrupt );
+ CPPUNIT_TEST( testAwaitNanosInterrupt );
+ CPPUNIT_TEST( testAwaitUntilInterrupt );
+ CPPUNIT_TEST( testSignalAll );
+ CPPUNIT_TEST( testToString );
+ CPPUNIT_TEST( testGetStateWithReleaseShared );
+ CPPUNIT_TEST( testReleaseShared );
+ CPPUNIT_TEST( testAcquireSharedInterruptibly );
+ CPPUNIT_TEST( testAsquireSharedTimed );
+ CPPUNIT_TEST( testAcquireSharedInterruptiblyInterruptedException );
+ CPPUNIT_TEST( testAcquireSharedNanosInterruptedException );
+ CPPUNIT_TEST( testAcquireSharedNanosTimeout );
CPPUNIT_TEST_SUITE_END();
public: