You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/05/21 22:46:50 UTC

svn commit: r1125811 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/decaf/util/concurrent/locks/ test/decaf/util/concurrent/locks/

Author: tabish
Date: Sat May 21 20:46:50 2011
New Revision: 1125811

URL: http://svn.apache.org/viewvc?rev=1125811&view=rev
Log:
Working AbstractQueuedSynchronizer implementation.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/locks/AbstractQueuedSynchronizerTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/locks/AbstractQueuedSynchronizerTest.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp?rev=1125811&r1=1125810&r2=1125811&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp Sat May 21 20:46:50 2011
@@ -25,8 +25,9 @@
 
 #include <decaf/util/ArrayList.h>
 #include <decaf/util/concurrent/locks/LockSupport.h>
-#include <decaf/util/concurrent/atomic/AtomicInteger.h>
 #include <decaf/util/concurrent/atomic/AtomicReference.h>
+#include <decaf/internal/util/concurrent/Atomics.h>
+#include <decaf/internal/util/concurrent/PlatformThread.h>
 
 using namespace decaf;
 using namespace decaf::lang;
@@ -35,6 +36,7 @@ using namespace decaf::util;
 using namespace decaf::util::concurrent;
 using namespace decaf::util::concurrent::atomic;
 using namespace decaf::util::concurrent::locks;
+using namespace decaf::internal::util::concurrent;
 
 ////////////////////////////////////////////////////////////////////////////////
 namespace {
@@ -42,6 +44,32 @@ namespace {
     class Node {
     public:
 
+        /**
+         * Status field, taking on only the values:
+         *   SIGNAL:     The successor of this node is (or will soon be) blocked (via park),
+         *               so the current node must unpark its successor when  it releases or
+         *               cancels. To avoid races, acquire methods must first indicate they
+         *               need a signal, then retry the atomic acquire, and then, on failure,
+         *               block.
+         *   CANCELLED:  This node is canceled due to timeout or interrupt.  Nodes never leave
+         *               this state. In particular, a thread with canceled node never again
+         *               blocks.
+         *   CONDITION:  This node is currently on a condition queue. It will not be used as a
+         *               sync queue node until transferred, at which time the status will be
+         *               set to 0. (Use of this value here has nothing to do with the other
+         *               uses of the field, but simplifies mechanics.)
+         *   PROPAGATE:  A releaseShared should be propagated to other nodes. This is set
+         *               (for head node only) in doReleaseShared to ensure propagation
+         *               continues, even if other operations have since intervened.
+         *   0:          None of the above
+         *
+         * The values are arranged numerically to simplify use. Non-negative values mean that
+         * a node doesn't need to signal. So, most code doesn't need to check for particular
+         * values, just for sign.
+         *
+         * The field is initialized to 0 for normal sync nodes, and CONDITION for condition nodes.
+         * It is modified using CAS (or when possible, unconditional volatile writes).
+         */
         enum WaitStatus {
             CANCELLED = 1,
             SIGNAL = -1,
@@ -54,11 +82,46 @@ namespace {
 
     public:
 
-        AtomicInteger waitStatus;
-        AtomicReference<Node> prev;
-        AtomicReference<Node> next;
+        int waitStatus;
+
+        /**
+         * Link to predecessor node that current node/thread relies on
+         * for checking waitStatus. Assigned during enqueing, and nulled
+         * out.  Also, upon cancellation of a predecessor, we short-circuit
+         * while finding a non-canceled one, which will always exist because
+         * the head node is never canceled: A node becomes head only as a
+         * result of successful acquire. A canceled thread never succeeds
+         * in acquiring, and a thread only cancels itself, not any other node.
+         */
+        Node* prev;
+
+        /**
+         * Link to the successor node that the current node/thread
+         * unparks upon release. Assigned during enqueuing, adjusted
+         * when bypassing canceled predecessors, and nulled out when
+         * dequeued.  The enq operation does not assign next field of
+         * a predecessor until after attachment, so seeing a NULL next
+         * field does not necessarily mean that node is at end of queue.
+         * However, if a next field appears to be NULL, we can scan
+         * prev's from the tail to double-check.
+         */
+        Node* next;
+
+        /**
+         * The thread that created this Node as is waiting to acquire the
+         * lock.
+         */
         Thread* thread;
-        AtomicReference<Node> nextWaiter;
+
+        /**
+         * Link to next node waiting on condition, or the special value SHARED.
+         * Because condition queues are accessed only when holding in exclusive
+         * mode, we just need a simple linked queue to hold nodes while they are
+         * waiting on conditions. They are then transferred to the queue to
+         * re-acquire. And because conditions can only be exclusive, we save a
+         * field by using special value to indicate shared mode.
+         */
+        Node* nextWaiter;
 
     public:
 
@@ -69,16 +132,14 @@ namespace {
         Node(Thread* thread, int waitStatus) : waitStatus(waitStatus), prev(NULL), next(NULL), thread(thread), nextWaiter(NULL) {
         }
 
-        ~Node() {
-            std::cout << "Deleted Node: " << std::hex << this << std::endl;
-        }
+        ~Node() {}
 
         bool isShared() const {
-            return this->nextWaiter.get() == &SHARED;
+            return this->nextWaiter == &SHARED;
         }
 
         Node* predecessor() {
-            Node* p = prev.get();
+            Node* p = prev;
             if (p == NULL) {
                 throw NullPointerException();
             } else {
@@ -104,65 +165,123 @@ namespace locks {
     class SynchronizerState {
     public:
 
+        /**
+         * The object that owns this one, allows this object to call back into its parent
+         * as well as checking for exclusive ownership of Conditions.
+         */
         AbstractQueuedSynchronizer* parent;
-        AtomicInteger state;
+
+        /**
+         * The Sync state, subclasses can get / set this value to indicate when a Thread
+         * can acquire the lock or must wait.
+         */
+        volatile int state;
+
+        /**
+         * Platform level R/W lock. Because we don't implement a garbage collected Node
+         * scheme we can't just use atomic operations on the Node pointers so in cases where
+         * we operate on the list of Nodes to remove canceled items we must write lock the
+         * list.  Likewise in cases where we are iterating through the list to collect
+         * statistics we must ensure that a Node won't suddenly become invalid so we must
+         * hold a read lock.
+         */
+        decaf_rwmutex_t rwLock;
+
+        /**
+         * Head of the wait queue, lazily initialized.  Except for initialization,
+         * it is modified only via method setHead.  Note: If head exists, its
+         * waitStatus is guaranteed not to be CANCELLED.
+         */
         AtomicReference<Node> head;
+
+        /**
+         * Tail of the wait queue, lazily initialized.  Modified only via method
+         * enq to add new wait node.
+         */
         AtomicReference<Node> tail;
 
     public:
 
-        SynchronizerState(AbstractQueuedSynchronizer* parent) : parent(parent), state(0), head(), tail() {}
+        SynchronizerState(AbstractQueuedSynchronizer* parent) : parent(parent), state(0), rwLock(), head(), tail() {
+            PlatformThread::createRWMutex(&rwLock);
+        }
         virtual ~SynchronizerState() {
-            Node* oldHead = head.getAndSet(NULL);
-            if (oldHead != tail.get()) {
-                delete tail.get();
+
+            while (tail.get() != NULL) {
+                delete tail.getAndSet(tail.get()->prev);
             }
-            delete oldHead;
+
+            PlatformThread::destroyRWMutex(rwLock);
         }
 
         bool isHeldExclusively() const {
             return this->parent->isHeldExclusively();
         }
 
+        /**
+         * Enqueue of a Node is Atomic with respect to the end of the list, so no
+         * locking needs to occur here.  If the head and tail have not been allocated
+         * we just need to account for contention of two or more enqueues on the
+         * addition of the new head.
+         *
+         * @param node
+         *      The new node to add.
+         */
         Node* enq(Node* node) {
             for (;;) {
                 Node* t = tail.get();
                 if (t == NULL) { // Must initialize
                     Node* newHead = new Node();
-                    std::cout << "Enq first call, allocated head = " << std::hex << newHead << std::endl;
-                    if (head.compareAndSet(NULL, newHead)) {
+                    if (compareAndSetHead(newHead)) {
                         tail.set(head.get());
+                    } else {
+                        delete newHead;
                     }
                 } else {
-                    node->prev.set(t);
-                    if (tail.compareAndSet(t, node)) {
-                        t->next.set(node);
+                    node->prev = t;
+                    if (compareAndSetTail(t, node)) {
+                        t->next = node;
                         return t;
                     }
                 }
             }
         }
 
+        /**
+         * Since we can append itself in one atomic step we don't lock here.  If we
+         * can't get the fast append done we will enter into the longer looping
+         * enqueue method.
+         *
+         * @param node
+         *      The new Node to add.
+         */
         Node* addWaiter(Node* mode) {
             Node* node = new Node(Thread::currentThread(), mode);
-            std::cout << "Add Waiter Allocated Node: " << std::hex << node << std::endl;
-            // Try the fast path of enq; backup to full enq on failure
             Node* pred = tail.get();
             if (pred != NULL) {
-                node->prev.set(pred);
-                if (tail.compareAndSet(pred, node)) {
-                    pred->next.set(node);
+                node->prev = pred;
+                if (compareAndSetTail(pred, node)) {
+                    pred->next = node;
                     return node;
                 }
             }
+
             enq(node);
             return node;
         }
 
+        /**
+         * The only place head is altered, we NULL out prev since that Node will be
+         * Destroyed or pooled after this, but leave next alone since it should still
+         * be valid.
+         *
+         * @param node
+         *      The Node that is to become the new Head of the queue.
+         */
         void setHead(Node* node) {
             head.set(node);
             node->thread = NULL;
-            node->prev.set(NULL);
+            node->prev = NULL;
         }
 
         void unparkSuccessor(Node* node) {
@@ -171,29 +290,35 @@ namespace locks {
              * to clear in anticipation of signalling.  It is OK if this
              * fails or if status is changed by waiting thread.
              */
-            int ws = node->waitStatus.get();
+            int ws = node->waitStatus;
             if (ws < 0) {
-                node->waitStatus.compareAndSet(ws, 0);
+                compareAndSetWaitStatus(node, ws, 0);
             }
 
+            // We need to lock to prevent cancellation of a Node from
+            // altering the list as we iterate and check Node status fields.
+            PlatformThread::readerLockMutex(this->rwLock);
+
             /*
              * Thread to unpark is held in successor, which is normally
-             * just the next node.  But if cancelled or apparently NULL,
+             * just the next node.  But if canceled or apparently NULL,
              * traverse backwards from tail to find the actual
-             * non-cancelled successor.
+             * non-canceled successor.
              */
-            Node* s = node->next.get();
-            if (s == NULL || s->waitStatus.get() > 0) {
+            Node* s = node->next;
+            if (s == NULL || s->waitStatus > 0) {
                 s = NULL;
-                for (Node* t = tail.get(); t != NULL && t != node; t = t->prev.get())
-                    if (t->waitStatus.get() <= 0) {
+                for (Node* t = tail.get(); t != NULL && t != node; t = t->prev)
+                    if (t->waitStatus <= 0) {
                         s = t;
                     }
             }
 
             if (s != NULL) {
-                LockSupport::unpark(s->thread);
+                LockSupport::unpark((Thread*)s->thread);
             }
+
+            PlatformThread::unlockRWMutex(this->rwLock);
         }
 
         /**
@@ -202,6 +327,11 @@ namespace locks {
          * to calling unparkSuccessor of head if it needs signal.)
          */
         void doReleaseShared() {
+
+            // Here we have to read lock because head could change when a
+            // different thread does its release shared.
+            PlatformThread::readerLockMutex(this->rwLock);
+
             /*
              * Ensure that a release propagates, even if there are other
              * in-progress acquires/releases.  This proceeds in the usual
@@ -216,13 +346,13 @@ namespace locks {
             for (;;) {
                 Node* h = head.get();
                 if (h != NULL && h != tail.get()) {
-                    int ws = h->waitStatus.get();
+                    int ws = h->waitStatus;
                     if (ws == Node::SIGNAL) {
-                        if (!h->waitStatus.compareAndSet(Node::SIGNAL, 0)) {
+                        if (!compareAndSetWaitStatus(h, Node::SIGNAL, 0)) {
                             continue;            // loop to recheck cases
                         }
                         unparkSuccessor(h);
-                    } else if (ws == 0 && !h->waitStatus.compareAndSet(0, Node::PROPAGATE)) {
+                    } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node::PROPAGATE)) {
                         continue;                // loop on failed CAS
                     }
                 }
@@ -230,6 +360,8 @@ namespace locks {
                     break;
                 }
             }
+
+            PlatformThread::unlockRWMutex(this->rwLock);
         }
 
         /**
@@ -241,29 +373,33 @@ namespace locks {
          * @param propagate the return value from a tryAcquireShared
          */
         void setHeadAndPropagate(Node* node, int propagate) {
+
+            // Here we have to read lock because head could change when a
+            // different thread does its release shared.
+            PlatformThread::readerLockMutex(this->rwLock);
+
             Node* h = head.get(); // Record old head for check below
             setHead(node);
+
             /*
              * Try to signal next queued node if:
-             *   Propagation was indicated by caller,
-             *     or was recorded (as h.waitStatus) by a previous operation
-             *     (note: this uses sign-check of waitStatus because
-             *      PROPAGATE status may transition to SIGNAL.)
-             * and
-             *   The next node is waiting in shared mode,
-             *     or we don't know, because it appears NULL
+             *   Propagation was indicated by caller, or was recorded (as h.waitStatus)
+             *   by a previous operation (note: this uses sign-check of waitStatus because
+             *   PROPAGATE status may transition to SIGNAL.) and the next node is waiting
+             *   in shared mode, or we don't know, because it appears NULL.
              *
-             * The conservatism in both of these checks may cause
-             * unnecessary wake-ups, but only when there are multiple
-             * racing acquires/releases, so most need signals now or soon
-             * anyway.
+             * The conservatism in both of these checks may cause unnecessary wake-ups,
+             * but only when there are multiple racing acquires/releases, so most need
+             * signals now or soon anyway.
              */
-            if (propagate > 0 || h == NULL || h->waitStatus.get() < 0) {
-                Node* s = node->next.get();
+            if (propagate > 0 || h == NULL || h->waitStatus < 0) {
+                Node* s = node->next;
                 if (s == NULL || s->isShared()) {
                     doReleaseShared();
                 }
             }
+
+            PlatformThread::unlockRWMutex(this->rwLock);
         }
 
         /**
@@ -272,6 +408,7 @@ namespace locks {
          * @param node the node
          */
         void cancelAcquire(Node* node) {
+
             // Ignore if node doesn't exist
             if (node == NULL) {
                 return;
@@ -279,44 +416,47 @@ namespace locks {
 
             node->thread = NULL;
 
-            // Skip cancelled predecessors
-            Node* pred = node->prev.get();
-            while (pred->waitStatus.get() > 0) {
-                pred = pred->prev.get();
-                node->prev.set(pred);
-            }
-
-            // predNext is the apparent node to unsplice. CASes below will
-            // fail if not, in which case, we lost race vs another cancel
-            // or signal, so no further action is necessary.
-            Node* predNext = pred->next.get();
-
             // Can use unconditional write instead of CAS here.
             // After this atomic step, other Nodes can skip past us.
             // Before, we are free of interference from other threads.
-            node->waitStatus.set(Node::CANCELLED);
+            node->waitStatus = Node::CANCELLED;
 
             // If we are the tail, remove ourselves.
-            if (node == tail.get() && tail.compareAndSet(node, pred)) {
-                pred->next.compareAndSet(predNext, NULL);
+            if (node == tail.get() && compareAndSetTail(node, node->prev)) {
+                // Attempt to set next on tail, this can fail if another thread can in
+                // and replaced the old tail but that's ok since that means next is up
+                // to date in that case.
+                tail.compareAndSet(node, NULL);
                 delete node;
             } else {
                 // If successor needs signal, try to set pred's next-link
                 // so it will get one. Otherwise wake it up to propagate.
                 int ws;
-                if (pred != head.get() &&
-                    ((ws = pred->waitStatus.get()) == Node::SIGNAL ||
-                     (ws <= 0 && pred->waitStatus.compareAndSet(ws, Node::SIGNAL))) && pred->thread != NULL) {
-                    Node* next = node->next.get();
-                    if (next != NULL && next->waitStatus.get() <= 0) {
-                        pred->next.compareAndSet(predNext, next);
-                    }
+
+                PlatformThread::writerLockMutex(this->rwLock);
+
+                // Did we become the tail.
+                if (node == tail.get() && compareAndSetTail(node, node->prev)) {
+                    tail.compareAndSet(node, NULL);
                 } else {
+                    node->prev->next = node->next;
+                    node->next->prev = node->prev;
+                }
+
+                if (node->prev != head.get() &&
+                    ((ws = node->prev->waitStatus) == Node::SIGNAL ||
+                     (ws <= 0 && compareAndSetWaitStatus(node->prev, ws, Node::SIGNAL))) &&
+                     node->prev->thread != NULL) {
+
+                    PlatformThread::unlockRWMutex(this->rwLock);
+                } else {
+                    PlatformThread::unlockRWMutex(this->rwLock);
                     unparkSuccessor(node);
                 }
-            }
 
-            delete node;
+                delete node;
+
+            }
         }
 
         /**
@@ -328,35 +468,38 @@ namespace locks {
          * @param node the node
          * @return {@code true} if thread should block
          */
-        static bool shouldParkAfterFailedAcquire(Node* pred, Node* node) {
-            int ws = pred->waitStatus.get();
+        bool shouldParkAfterFailedAcquire(Node* node) {
+
+            bool result = false;
+
+            PlatformThread::readerLockMutex(this->rwLock);
+
+            int ws = node->prev->waitStatus;
 
             if (ws == Node::SIGNAL)
                 /*
                  * This node has already set status asking a release
                  * to signal it, so it can safely park.
                  */
-                return true;
+                result = true;
             if (ws > 0) {
                 /*
-                 * Predecessor was cancelled. Skip over predecessors and
+                 * Predecessor was canceled. Skip over predecessors and
                  * indicate retry.
                  */
-                do {
-                    pred = pred->prev.get();
-                    node->prev.set(pred);
-                } while (pred->waitStatus.get() > 0);
-                pred->next.set(node);
+                result = false;
             } else {
                 /*
                  * waitStatus must be 0 or PROPAGATE.  Indicate that we
                  * need a signal, but don't park yet.  Caller will need to
                  * retry to make sure it cannot acquire before parking.
                  */
-                pred->waitStatus.compareAndSet(ws, Node::SIGNAL);
+                compareAndSetWaitStatus(node->prev, ws, Node::SIGNAL);
             }
 
-            return false;
+            PlatformThread::unlockRWMutex(this->rwLock);
+
+            return result;
         }
 
         /**
@@ -401,7 +544,7 @@ namespace locks {
                         return interrupted;
                     }
 
-                    if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
+                    if (shouldParkAfterFailedAcquire(node) && parkAndCheckInterrupt()) {
                         interrupted = true;
                     }
                 }
@@ -413,6 +556,9 @@ namespace locks {
 
                 throw ex;
             }
+
+            cancelAcquire(node);
+            return true;
         }
 
         /**
@@ -427,14 +573,12 @@ namespace locks {
                     Node* p = node->predecessor();
                     if (p == head.get() && parent->tryAcquire(arg)) {
                         setHead(node);
-                        std::cout << "doAcquireInterruptibly waiting thread acquired the sync" << std::endl;
                         delete p;
                         failed = false;
                         return;
                     }
 
-                    if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
-                        std::cout << "doAcquireInterruptibly waiting thread interrupted" << std::endl;
+                    if (shouldParkAfterFailedAcquire(node) && parkAndCheckInterrupt()) {
                         throw InterruptedException();
                     }
                 }
@@ -444,7 +588,7 @@ namespace locks {
                     cancelAcquire(node);
                 }
 
-                throw ex;
+                throw InterruptedException(ex);
             }
         }
 
@@ -470,10 +614,10 @@ namespace locks {
                     }
 
                     if (nanosTimeout <= 0) {
-                        return false;
+                        break;
                     }
 
-                    if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutLimit) {
+                    if (shouldParkAfterFailedAcquire(node) && nanosTimeout > spinForTimeoutLimit) {
                         LockSupport::parkNanos(nanosTimeout);
                     }
 
@@ -493,6 +637,9 @@ namespace locks {
 
                 throw ex;
             }
+
+            cancelAcquire(node);
+            return false;
         }
 
         /**
@@ -519,17 +666,19 @@ namespace locks {
                         }
                     }
 
-                    if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
+                    if (shouldParkAfterFailedAcquire(node) && parkAndCheckInterrupt()) {
                         interrupted = true;
                     }
                 }
-            } catch(InterruptedException& ex) {
+            } catch(Exception& ex) {
                 if (failed) {
                     cancelAcquire(node);
                 }
 
                 throw ex;
             }
+
+            cancelAcquire(node);
         }
 
         /**
@@ -551,7 +700,7 @@ namespace locks {
                             return;
                         }
                     }
-                    if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
+                    if (shouldParkAfterFailedAcquire(node) && parkAndCheckInterrupt()) {
                         throw InterruptedException();
                     }
                 }
@@ -589,10 +738,10 @@ namespace locks {
                         }
                     }
                     if (nanosTimeout <= 0) {
-                        return false;
+                        break;
                     }
 
-                    if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutLimit) {
+                    if (shouldParkAfterFailedAcquire(node) && nanosTimeout > spinForTimeoutLimit) {
                         LockSupport::parkNanos(nanosTimeout);
                     }
 
@@ -611,28 +760,12 @@ namespace locks {
 
                 throw ex;
             }
+
+            cancelAcquire(node);
+            return false;
         }
 
         Thread* fullGetFirstQueuedThread() {
-            /*
-             * The first node is normally head->next. Try to get its
-             * thread field, ensuring consistent reads: If thread
-             * field is nulled out or s->prev is no longer head, then
-             * some other thread(s) concurrently performed setHead in
-             * between some of our reads. We try this twice before
-             * resorting to traversal.
-             */
-            Node* h = NULL;
-            Node* s = NULL;
-            Thread* st = NULL;
-
-            if (((h = head.get()) != NULL && (s = h->next.get()) != NULL &&
-                 s->prev.get() == head.get() && (st = s->thread) != NULL) ||
-                ((h = head.get()) != NULL && (s = h->next.get()) != NULL &&
-                 s->prev.get() == head.get() && (st = s->thread) != NULL)) {
-
-                return st;
-            }
 
             /*
              * Head's next field might not have been set yet, or may have
@@ -645,10 +778,10 @@ namespace locks {
             Node* t = tail.get();
             Thread* firstThread = NULL;
             while (t != NULL && t != head.get()) {
-                Thread* tt = t->thread;
+                Thread* tt = (Thread*)t->thread;
                 if (tt != NULL)
                     firstThread = tt;
-                t = t->prev.get();
+                t = t->prev;
             }
             return firstThread;
         }
@@ -662,11 +795,11 @@ namespace locks {
          * @return true if is reacquiring
          */
         bool isOnSyncQueue(Node* node) {
-            if (node->waitStatus.get() == Node::CONDITION || node->prev.get() == NULL) {
+            if (node->waitStatus == Node::CONDITION || node->prev == NULL) {
                 return false;
             }
 
-            if (node->next.get() != NULL) { // If has successor, it must be on queue
+            if (node->next != NULL) { // If has successor, it must be on queue
                 return true;
             }
 
@@ -698,50 +831,64 @@ namespace locks {
                     return false;
                 }
 
-                t = t->prev.get();
+                t = t->prev;
             }
         }
 
         /**
          * Transfers a node from a condition queue onto sync queue.
-         * Returns true if successful.
-         * @param node the node
+         * Returns true if successful.  If the node was canceled this
+         * method will delete it before returning false.
+         *
+         * @param node
+         *      The node to transfer to the wait Queue
+         *
          * @return true if successfully transferred (else the node was
-         * cancelled before signal).
+         *         canceled before signal and deleted).
          */
         bool transferForSignal(Node* node) {
             /*
-             * If cannot change waitStatus, the node has been cancelled.
+             * If cannot change waitStatus, the node has been canceled.
              */
-            if (!node->waitStatus.compareAndSet(Node::CONDITION, 0)) {
+            if (!compareAndSetWaitStatus(node, Node::CONDITION, 0)) {
                 return false;
             }
 
+            // Since we need to write to our predecessor we must lock so that
+            // it doesn't leave the Queue before we are done.
+            PlatformThread::writerLockMutex(this->rwLock);
+
             /*
              * Splice onto queue and try to set waitStatus of predecessor to
-             * indicate that thread is (probably) waiting. If cancelled or
+             * indicate that thread is (probably) waiting. If canceled or
              * attempt to set waitStatus fails, wake up to resync (in which
              * case the waitStatus can be transiently and harmlessly wrong).
              */
             Node* p = enq(node);
-            int ws = p->waitStatus.get();
-            if (ws > 0 || !p->waitStatus.compareAndSet(ws, Node::SIGNAL)) {
-                LockSupport::unpark(node->thread);
+            int ws = p->waitStatus;
+            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node::SIGNAL)) {
+                LockSupport::unpark((Thread*)node->thread);
             }
 
+            PlatformThread::unlockRWMutex(this->rwLock);
+
             return true;
         }
 
         /**
-         * Transfers node, if necessary, to sync queue after a cancelled
-         * wait. Returns true if thread was cancelled before being
-         * signalled.
-         * @param current the waiting thread
-         * @param node its node
-         * @return true if cancelled before the node was signalled
+         * Transfers node, if necessary, to sync queue after a canceled wait. Returns
+         * true if thread was canceled before being signaled.  If the Node is a Condition
+         * and has been signaled already then its not added as the signal will have
+         * already done so, we must wait though until it appears on the sync queue
+         * otherwise the attempt to re-acquire the lock could throw a NullPointerException.
+         *
+         * @param node
+         *      The node that is to be transferred onto the sync queue.
+         *
+         * @return true if canceled before the node was signaled
          */
         bool transferAfterCancelledWait(Node* node) {
-            if (node->waitStatus.compareAndSet(Node::CONDITION, 0)) {
+            if (compareAndSetWaitStatus(node, Node::CONDITION, 0)) {
                 enq(node);
                 return true;
             }
@@ -760,9 +907,13 @@ namespace locks {
         }
 
         /**
-         * Invokes release with current state value; returns saved state.
-         * Cancels node and throws exception on failure.
-         * @param node the condition node for this wait
+         * Invokes release with current state value; returns saved state.  Cancels node
+         * and throws exception on failure.  When a monitor state exception is thrown
+         * the Node is added to the sync queue so that it can be deleted safely later.
+         *
+         * @param node
+         *      The condition node for this wait.
+         *
          * @return previous sync state
          */
         int fullyRelease(Node* node) {
@@ -777,30 +928,49 @@ namespace locks {
                 }
             } catch(IllegalMonitorStateException& ex) {
                 if (failed) {
-                    node->waitStatus.set(Node::CANCELLED);
+                    node->waitStatus = Node::CANCELLED;
+                    // Enqueue it even though canceled so that it gets deleted
+                    enq(node);
                 }
 
                 throw ex;
             }
         }
 
+        bool compareAndSetHead(Node* update) {
+            return this->head.compareAndSet(NULL, update);
+        }
+        bool compareAndSetTail(Node* expect, Node* update) {
+            return this->tail.compareAndSet(expect, update);
+        }
+        static bool compareAndSetWaitStatus(Node* node, int expect, int update) {
+            return Atomics::compareAndSet32(&node->waitStatus, expect, update);
+        }
+
     };
 
     /**
      * Provides a default implementation that most AbstractQueuedSynchronizer derived classes
-     * can use without needing to write one from scratch.
+     * can use without needing to write one from scratch.  Since the methods of this Object
+     * must always be called from a locked synchronizer we can assume that only one thread
+     * at a time will read or modify the list of waiters so we don't need to lock around
+     * modifications to the list.
+     *
+     * This object creates Node instance but leaves the deletion of those objects up to the
+     * sync queue.  In all cases the Node needs to be transfered to the sync queue with its
+     * state value set to zero or CANCELLED.
      */
     class DefaultConditionObject : public AbstractQueuedSynchronizer::ConditionObject {
     private:
 
         SynchronizerState* impl;
-        Node* firstWaiter;
-        Node* lastWaiter;
+        Node* head;
+        Node* tail;
 
-        /** Mode meaning to reinterrupt on exit from wait */
-        static const int REINTERRUPT;
-        /** Mode meaning to throw InterruptedException on exit from wait */
-        static const int THROW_IE;
+        enum INTERRUPTION_MODE {
+            REINTERRUPT = 1,   // Re-interrupt thread on exit from a wait call.
+            THROW_IE = -1      // Throw a new InterruptedException on wait call exit.
+        };
 
     private:
 
@@ -810,16 +980,21 @@ namespace locks {
     public:
 
         DefaultConditionObject(SynchronizerState* impl) :
-            ConditionObject(), impl(impl), firstWaiter(NULL), lastWaiter(NULL) {}
-        virtual ~DefaultConditionObject() {};
+            ConditionObject(), impl(impl), head(NULL), tail(NULL) {}
+        virtual ~DefaultConditionObject() {
+        }
 
         virtual void await() {
+
             if (Thread::interrupted()) {
                 throw InterruptedException(__FILE__, __LINE__, "Thread was interrupted");
             }
+
             Node* node = addConditionWaiter();
+
             int savedState = impl->fullyRelease(node);
             int interruptMode = 0;
+
             while (!impl->isOnSyncQueue(node)) {
                 LockSupport::park();
                 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
@@ -830,7 +1005,7 @@ namespace locks {
             if (impl->acquireQueued(node, savedState) && interruptMode != THROW_IE) {
                 interruptMode = REINTERRUPT;
             }
-            if (node->nextWaiter.get() != NULL) { // clean up if cancelled
+            if (node->nextWaiter != NULL) { // clean up if canceled
                 unlinkCancelledWaiters();
             }
             if (interruptMode != 0) {
@@ -842,6 +1017,7 @@ namespace locks {
             Node* node = addConditionWaiter();
             int savedState = impl->fullyRelease(node);
             bool interrupted = false;
+
             while (!impl->isOnSyncQueue(node)) {
                 LockSupport::park();
                 if (Thread::interrupted()) {
@@ -862,6 +1038,7 @@ namespace locks {
             int savedState = impl->fullyRelease(node);
             long long lastTime = System::nanoTime();
             int interruptMode = 0;
+
             while (!impl->isOnSyncQueue(node)) {
                 if (nanosTimeout <= 0L) {
                     impl->transferAfterCancelledWait(node);
@@ -880,12 +1057,13 @@ namespace locks {
             if (impl->acquireQueued(node, savedState) && interruptMode != THROW_IE) {
                 interruptMode = REINTERRUPT;
             }
-            if (node->nextWaiter.get() != NULL) {
+            if (node->nextWaiter != NULL) {
                 unlinkCancelledWaiters();
             }
             if (interruptMode != 0) {
                 reportInterruptAfterWait(interruptMode);
             }
+
             return nanosTimeout - (System::nanoTime() - lastTime);
         }
 
@@ -899,6 +1077,7 @@ namespace locks {
             long long lastTime = System::nanoTime();
             bool timedout = false;
             int interruptMode = 0;
+
             while (!impl->isOnSyncQueue(node)) {
                 if (nanosTimeout <= 0L) {
                     timedout = impl->transferAfterCancelledWait(node);
@@ -918,7 +1097,7 @@ namespace locks {
             if (impl->acquireQueued(node, savedState) && interruptMode != THROW_IE) {
                 interruptMode = REINTERRUPT;
             }
-            if (node->nextWaiter.get() != NULL) {
+            if (node->nextWaiter != NULL) {
                 unlinkCancelledWaiters();
             }
             if (interruptMode != 0) {
@@ -937,6 +1116,7 @@ namespace locks {
             int savedState = impl->fullyRelease(node);
             bool timedout = false;
             int interruptMode = 0;
+
             while (!impl->isOnSyncQueue(node)) {
                 if (System::currentTimeMillis() > abstime) {
                     timedout = impl->transferAfterCancelledWait(node);
@@ -951,7 +1131,7 @@ namespace locks {
             if (impl->acquireQueued(node, savedState) && interruptMode != THROW_IE) {
                 interruptMode = REINTERRUPT;
             }
-            if (node->nextWaiter.get() != NULL) {
+            if (node->nextWaiter != NULL) {
                 unlinkCancelledWaiters();
             }
             if (interruptMode != 0) {
@@ -961,11 +1141,15 @@ namespace locks {
             return !timedout;
         }
 
+        /**
+         * The Node that's been waiting the longest is moved from this Conditions
+         * wait queue to that of the parent Synchronizer.
+         */
         virtual void signal() {
             if (!impl->isHeldExclusively()) {
                 throw IllegalMonitorStateException();
             }
-            Node* first = firstWaiter;
+            Node* first = head;
             if (first != NULL) {
                 doSignal(first);
             }
@@ -975,7 +1159,7 @@ namespace locks {
             if (!impl->isHeldExclusively()) {
                 throw IllegalMonitorStateException();
             }
-            Node* first = firstWaiter;
+            Node* first = head;
             if (first != NULL) {
                 doSignalAll(first);
             }
@@ -989,8 +1173,8 @@ namespace locks {
             if (!impl->isHeldExclusively()) {
                 throw IllegalMonitorStateException();
             }
-            for (Node* w = firstWaiter; w != NULL; w = w->nextWaiter.get()) {
-                if (w->waitStatus.get() == Node::CONDITION) {
+            for (Node* w = head; w != NULL; w = w->nextWaiter) {
+                if (w->waitStatus == Node::CONDITION) {
                     return true;
                 }
             }
@@ -1002,8 +1186,8 @@ namespace locks {
                 throw IllegalMonitorStateException();
             }
             int n = 0;
-            for (Node* w = firstWaiter; w != NULL; w = w->nextWaiter.get()) {
-                if (w->waitStatus.get() == Node::CONDITION) {
+            for (Node* w = head; w != NULL; w = w->nextWaiter) {
+                if (w->waitStatus == Node::CONDITION) {
                     ++n;
                 }
             }
@@ -1015,9 +1199,9 @@ namespace locks {
                 throw IllegalMonitorStateException();
             }
             ArrayList<Thread*>* list = new ArrayList<Thread*>();
-            for (Node* w = firstWaiter; w != NULL; w = w->nextWaiter.get()) {
-                if (w->waitStatus.get() == Node::CONDITION) {
-                    Thread* t = w->thread;
+            for (Node* w = head; w != NULL; w = w->nextWaiter) {
+                if (w->waitStatus == Node::CONDITION) {
+                    Thread* t = (Thread*)w->thread;
                     if (t != NULL) {
                         list->add(t);
                     }
@@ -1028,82 +1212,96 @@ namespace locks {
 
     private:
 
+        /**
+         * Adds a new Node to this Condition object to be used by one of the wait methods.
+         * During this addition we scan for canceled Nodes and remove them as they are
+         * now contained on the sync queue and will be removed from there.  Its safe to do
+         * this here as this method is called while the sync queue is locked so no other
+         * changes can occur to the list of Nodes.
+         *
+         * @returns the newly added Node instance.
+         */
         Node* addConditionWaiter() {
-            Node* t = lastWaiter;
-            // If lastWaiter is cancelled, clean out.
-            if (t != NULL && t->waitStatus.get() != Node::CONDITION) {
+            Node* t = tail;
+            // If last Waiter is canceled, clean out.
+            if (t != NULL && t->waitStatus != Node::CONDITION) {
                 unlinkCancelledWaiters();
-                t = lastWaiter;
+                t = tail;
             }
             Node* node = new Node(Thread::currentThread(), Node::CONDITION);
             if (t == NULL) {
-                firstWaiter = node;
+                head = node;
             } else {
-                t->nextWaiter.set(node);
+                t->nextWaiter = node;
             }
-            lastWaiter = node;
+            tail = node;
             return node;
         }
 
         /**
-         * Removes and transfers nodes until hit non-cancelled one or
-         * NULL. Split out from signal in part to encourage compilers
-         * to inline the case of no waiters.
-         * @param first (non-NULL) the first node on condition queue
+         * Removes and transfers nodes until hit non-canceled one or a NULL one.
+         * or stated another way traverses the list of waiters removing canceled
+         * nodes until it finds a non-canceled one to signal.
+         *
+         * @param first (non-NULL)
+         *      The first node on condition queue.
          */
         void doSignal(Node* first) {
             do {
-                if ((firstWaiter = first->nextWaiter.get()) == NULL) {
-                    lastWaiter = NULL;
+                head = first->nextWaiter;
+                first->nextWaiter = NULL;
+
+                if (head == NULL) {
+                    tail = NULL;
                 }
-                first->nextWaiter.set(NULL);
-            } while (!impl->transferForSignal(first) && (first = firstWaiter) != NULL);
+            } while (!impl->transferForSignal(first) && (first = head) != NULL);
         }
 
         /**
-         * Removes and transfers all nodes.
-         * @param first (non-NULL) the first node on condition queue
+         * Removes and transfers all nodes.  Removes all Nodes from this Condition object
+         * starting from the given node pointer, canceled waiter Nodes are moved to so we don't
+         * need to track them anymore.  This is safe to do here since this method must be
+         * called while the sync queue lock is held.
+         *
+         * @param first (non-NULL)
+         *      The first node on condition queue to start transferring from.
          */
         void doSignalAll(Node* first) {
-            lastWaiter = firstWaiter = NULL;
+            head = tail = NULL;
             do {
-                Node* next = first->nextWaiter.get();
-                first->nextWaiter.set(NULL);
+                Node* next = first->nextWaiter;
+                first->nextWaiter = NULL;
                 impl->transferForSignal(first);
                 first = next;
             } while (first != NULL);
         }
 
         /**
-         * Unlinks cancelled waiter nodes from condition queue.
-         * Called only while holding lock. This is called when
-         * cancellation occurred during condition wait, and upon
-         * insertion of a new waiter when lastWaiter is seen to have
-         * been cancelled. This method is needed to avoid garbage
-         * retention in the absence of signals. So even though it may
-         * require a full traversal, it comes into play only when
-         * timeouts or cancellations occur in the absence of
-         * signals. It traverses all nodes rather than stopping at a
-         * particular target to unlink all pointers to garbage nodes
-         * without requiring many re-traversals during cancellation
-         * storms.
+         * Unlinks canceled waiter nodes from condition queue.  Called only while holding
+         * lock. This is called when cancellation occurred during condition wait, and upon
+         * insertion of a new waiter when tail is seen to have been canceled. This method
+         * is needed to avoid garbage retention in the absence of signals. So even though
+         * it may require a full traversal, it comes into play only when timeouts or
+         * cancellations occur in the absence of signals. It traverses all nodes rather
+         * than stopping at a particular target to unlink all pointers to garbage nodes
+         * without requiring many re-traversals during cancellation storms.
          */
         void unlinkCancelledWaiters() {
-            Node* t = firstWaiter;
+            Node* t = head;
             Node* trail = NULL;
             while (t != NULL) {
-                Node* next = t->nextWaiter.get();
-                if (t->waitStatus.get() != Node::CONDITION) {
-                    t->nextWaiter.set(NULL);
+                Node* next = t->nextWaiter;
+                if (t->waitStatus != Node::CONDITION) {
+                    t->nextWaiter = NULL;
 
                     if (trail == NULL) {
-                        firstWaiter = next;
+                        head = next;
                     } else {
-                        trail->nextWaiter.set(next);
+                        trail->nextWaiter = next;
                     }
 
                     if (next == NULL) {
-                        lastWaiter = trail;
+                        tail = trail;
                     }
 
                 } else {
@@ -1114,17 +1312,27 @@ namespace locks {
         }
 
         /**
-         * Checks for interrupt, returning THROW_IE if interrupted
-         * before signalled, REINTERRUPT if after signalled, or
-         * 0 if not interrupted.
+         * Checks for interrupt, returning THROW_IE if interrupted before signaled,
+         * REINTERRUPT if after signaled, or 0 if not interrupted.  The canceled node
+         * is transfered to the sync queue so that its waiter can safely re-acquire the
+         * lock.
+         *
+         * @param node
+         *      The node to transfer to the sync queue if interrupted.
+         *
+         * @returns value indicating if an action is needed in response to an interrupt.
          */
         int checkInterruptWhileWaiting(Node* node) {
             return Thread::interrupted() ? (impl->transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
         }
 
         /**
-         * Throws InterruptedException, reinterrupts current thread, or
-         * does nothing, depending on mode.
+         * Throws InterruptedException, re-interrupts current thread, or does nothing, depending
+         * on mode passed, abstract the logic needed for all the interruptible wait methods out
+         * into a single place.
+         *
+         * @param interruptMode
+         *      indicates what action is needed for interruption handling (if any).
          */
         void reportInterruptAfterWait(int interruptMode) {
             if (interruptMode == THROW_IE) {
@@ -1136,9 +1344,6 @@ namespace locks {
 
     };
 
-    const int DefaultConditionObject::REINTERRUPT = 1;
-    const int DefaultConditionObject::THROW_IE = -1;
-
 }}}}
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -1156,17 +1361,17 @@ AbstractQueuedSynchronizer::~AbstractQue
 
 ////////////////////////////////////////////////////////////////////////////////
 int AbstractQueuedSynchronizer::getState() const {
-    return this->impl->state.get();
+    return this->impl->state;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void AbstractQueuedSynchronizer::setState(int value) {
-    this->impl->state.set(value);
+    this->impl->state = value;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 bool AbstractQueuedSynchronizer::compareAndSetState(int expect, int update) {
-    return this->impl->state.compareAndSet(expect, update);
+    return Atomics::compareAndSet32(&this->impl->state, expect, update);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -1237,7 +1442,7 @@ bool AbstractQueuedSynchronizer::release
     if (tryRelease(arg)) {
 
         Node* h = this->impl->head.get();
-        if (h != NULL && h->waitStatus.get() != 0) {
+        if (h != NULL && h->waitStatus != 0) {
             this->impl->unparkSuccessor(h);
         }
 
@@ -1308,82 +1513,92 @@ bool AbstractQueuedSynchronizer::isQueue
         throw NullPointerException(__FILE__, __LINE__, "Passed in thread was NULL");
     }
 
-    for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev.get()) {
+    PlatformThread::readerLockMutex(this->impl->rwLock);
+
+    for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev) {
         if (p->thread == thread) {
+            PlatformThread::unlockRWMutex(this->impl->rwLock);
             return true;
         }
     }
 
+    PlatformThread::unlockRWMutex(this->impl->rwLock);
+
     return false;
 }
 
-//    bool apparentlyFirstQueuedIsExclusive() {
-//        Node h, s;
-//        return (h = head) != NULL &&
-//            (s = h.next)  != NULL &&
-//            !s.isShared()         &&
-//            s.thread != NULL;
-//    }
-//
-//    bool hasQueuedPredecessors() {
-//        // The correctness of this depends on head being initialized
-//        // before tail and on head.next being accurate if the current
-//        // thread is first in queue.
-//        Node t = tail; // Read fields in reverse initialization order
-//        Node h = head;
-//        Node s;
-//        return h != t &&
-//            ((s = h.next) == NULL || s.thread != Thread.currentThread());
-//    }
-
 ////////////////////////////////////////////////////////////////////////////////
 int AbstractQueuedSynchronizer::getQueueLength() const {
     int n = 0;
-    for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev.get()) {
+
+    PlatformThread::readerLockMutex(this->impl->rwLock);
+
+    for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev) {
         if (p->thread != NULL) {
             ++n;
         }
     }
+
+    PlatformThread::unlockRWMutex(this->impl->rwLock);
+
     return n;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 Collection<Thread*>* AbstractQueuedSynchronizer::getQueuedThreads() const {
     ArrayList<Thread*>* list = new ArrayList<Thread*>();
-    for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev.get()) {
-        Thread* t = p->thread;
+
+    PlatformThread::readerLockMutex(this->impl->rwLock);
+
+    for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev) {
+        Thread* t = (Thread*)p->thread;
         if (t != NULL) {
             list->add(t);
         }
     }
+
+    PlatformThread::unlockRWMutex(this->impl->rwLock);
+
     return list;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 Collection<Thread*>* AbstractQueuedSynchronizer::getExclusiveQueuedThreads() const {
     ArrayList<Thread*>* list = new ArrayList<Thread*>();
-    for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev.get()) {
+
+    PlatformThread::readerLockMutex(this->impl->rwLock);
+
+    for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev) {
         if (!p->isShared()) {
-            Thread* t = p->thread;
+            Thread* t = (Thread*)p->thread;
             if (t != NULL) {
                 list->add(t);
             }
         }
     }
+
+    PlatformThread::unlockRWMutex(this->impl->rwLock);
+
     return list;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 Collection<Thread*>* AbstractQueuedSynchronizer::getSharedQueuedThreads() const {
     ArrayList<Thread*>* list = new ArrayList<Thread*>();
-    for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev.get()) {
+
+    PlatformThread::readerLockMutex(this->impl->rwLock);
+
+    for (Node* p = this->impl->tail.get(); p != NULL; p = p->prev) {
         if (p->isShared()) {
-            Thread* t = p->thread;
+            Thread* t = (Thread*)p->thread;
             if (t != NULL) {
                 list->add(t);
             }
         }
     }
+
+    PlatformThread::unlockRWMutex(this->impl->rwLock);
+
     return list;
 }
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/locks/AbstractQueuedSynchronizerTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/locks/AbstractQueuedSynchronizerTest.cpp?rev=1125811&r1=1125810&r2=1125811&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/locks/AbstractQueuedSynchronizerTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/locks/AbstractQueuedSynchronizerTest.cpp Sat May 21 20:46:50 2011
@@ -98,11 +98,8 @@ namespace {
 
         virtual void run() {
             try {
-                std::cout << "InterruptibleSyncRunnable acquireInterruptibly" << std::endl;
                 mutex->acquireInterruptibly(1);
-                std::cout << "InterruptibleSyncRunnable was not interrupted" << std::endl;
             } catch(InterruptedException& success) {
-                std::cout << "InterruptibleSyncRunnable was interrupted" << std::endl;
             } catch(Exception& ex) {
                 parent->threadUnexpectedException(ex);
             } catch(std::exception& stdex) {
@@ -125,11 +122,9 @@ namespace {
 
         void run() {
             try {
-                std::cout << "InterruptedSyncRunnable acquireInterruptibly" << std::endl;
                 mutex->acquireInterruptibly(1);
                 parent->threadFail("Should have been interrupted.");
             } catch(InterruptedException& success) {
-                std::cout << "InterruptedSyncRunnable was interrupted" << std::endl;
             } catch(Exception& ex) {
                 parent->threadUnexpectedException(ex);
             } catch(std::exception& stdex) {
@@ -224,27 +219,26 @@ void AbstractQueuedSynchronizerTest::tes
     Thread t1(&iSyncRun1);
     Thread t2(&iSyncRun2);
 
-    std::cout << std::endl;
     try {
         CPPUNIT_ASSERT(!mutex.isQueued(&t1));
         CPPUNIT_ASSERT(!mutex.isQueued(&t2));
         mutex.acquire(1);
         t1.start();
         Thread::sleep(SHORT_DELAY_MS);
-//        CPPUNIT_ASSERT(mutex.isQueued(&t1));
+        CPPUNIT_ASSERT(mutex.isQueued(&t1));
         t2.start();
         Thread::sleep(SHORT_DELAY_MS);
-//        CPPUNIT_ASSERT(mutex.isQueued(&t1));
-//        CPPUNIT_ASSERT(mutex.isQueued(&t2));
+        CPPUNIT_ASSERT(mutex.isQueued(&t1));
+        CPPUNIT_ASSERT(mutex.isQueued(&t2));
         t1.interrupt();
-//        Thread::sleep(SHORT_DELAY_MS);
-//        CPPUNIT_ASSERT(!mutex.isQueued(&t1));
-//        CPPUNIT_ASSERT(mutex.isQueued(&t2));
-        mutex.release(1);
-//        Thread::sleep(SHORT_DELAY_MS);
-//        CPPUNIT_ASSERT(!mutex.isQueued(&t1));
-//        Thread::sleep(SHORT_DELAY_MS);
-//        CPPUNIT_ASSERT(!mutex.isQueued(&t2));
+        Thread::sleep(SHORT_DELAY_MS);
+        CPPUNIT_ASSERT(!mutex.isQueued(&t1));
+        CPPUNIT_ASSERT(mutex.isQueued(&t2));
+        mutex.release(1);
+        Thread::sleep(SHORT_DELAY_MS);
+        CPPUNIT_ASSERT(!mutex.isQueued(&t1));
+        Thread::sleep(SHORT_DELAY_MS);
+        CPPUNIT_ASSERT(!mutex.isQueued(&t2));
         t1.join();
         t2.join();
     } catch(Exception& e){
@@ -705,7 +699,7 @@ void AbstractQueuedSynchronizerTest::tes
     try {
         mutex.acquire(1);
         Date d;
-        CPPUNIT_ASSERT(!c->awaitUntil((d.getTime() + 10)));
+        CPPUNIT_ASSERT(!c->awaitUntil((d.getTime() + 15)));
         mutex.release(1);
     } catch(Exception& ex) {
         unexpectedException();
@@ -1521,9 +1515,8 @@ namespace {
             try {
                 parent->threadAssertFalse(latch->isSignalled());
                 latch->acquireSharedInterruptibly(0);
-                parent->threadAssertTrue(latch->isSignalled());
+                parent->threadShouldThrow();
             } catch(InterruptedException& e) {
-                parent->threadUnexpectedException();
             }
         }
     };
@@ -1566,7 +1559,6 @@ namespace {
                 latch->tryAcquireSharedNanos(0, AbstractQueuedSynchronizerTest::SMALL_DELAY_MS* 1000 * 1000);
                 parent->threadShouldThrow();
             } catch(InterruptedException& e) {
-                parent->threadUnexpectedException();
             }
         }
     };

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/locks/AbstractQueuedSynchronizerTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/locks/AbstractQueuedSynchronizerTest.h?rev=1125811&r1=1125810&r2=1125811&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/locks/AbstractQueuedSynchronizerTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/decaf/util/concurrent/locks/AbstractQueuedSynchronizerTest.h Sat May 21 20:46:50 2011
@@ -30,55 +30,55 @@ namespace locks {
     class AbstractQueuedSynchronizerTest : public ExecutorsTestSupport {
 
         CPPUNIT_TEST_SUITE( AbstractQueuedSynchronizerTest );
-//        CPPUNIT_TEST( testIsHeldExclusively );
-//        CPPUNIT_TEST( testAcquire );
-//        CPPUNIT_TEST( testTryAcquire );
-//        CPPUNIT_TEST( testhasQueuedThreads );
-//        CPPUNIT_TEST( testIsQueuedNPE );
+        CPPUNIT_TEST( testIsHeldExclusively );
+        CPPUNIT_TEST( testAcquire );
+        CPPUNIT_TEST( testTryAcquire );
+        CPPUNIT_TEST( testhasQueuedThreads );
+        CPPUNIT_TEST( testIsQueuedNPE );
         CPPUNIT_TEST( testIsQueued );
-//        CPPUNIT_TEST( testGetFirstQueuedThread );
-//        CPPUNIT_TEST( testHasContended );
-//        CPPUNIT_TEST( testGetQueuedThreads );
-//        CPPUNIT_TEST( testGetExclusiveQueuedThreads );
-//        CPPUNIT_TEST( testGetSharedQueuedThreads );
-//        CPPUNIT_TEST( testInterruptedException2 );
-//        CPPUNIT_TEST( testTryAcquireWhenSynced );
-//        CPPUNIT_TEST( testAcquireNanosTimeout );
-//        CPPUNIT_TEST( testGetState );
-//        CPPUNIT_TEST( testAcquireInterruptibly1 );
-//        CPPUNIT_TEST( testAcquireInterruptibly2 );
-//        CPPUNIT_TEST( testOwns );
-//        CPPUNIT_TEST( testAwaitIllegalMonitor );
-//        CPPUNIT_TEST( testSignalIllegalMonitor );
-//        CPPUNIT_TEST( testAwaitNanosTimeout );
-//        CPPUNIT_TEST( testAwaitTimeout );
-//        CPPUNIT_TEST( testAwaitUntilTimeout );
-//        CPPUNIT_TEST( testAwait );
-//        CPPUNIT_TEST( testHasWaitersNPE );
-//        CPPUNIT_TEST( testGetWaitQueueLengthNPE );
-//        CPPUNIT_TEST( testGetWaitingThreadsNPE );
-//        CPPUNIT_TEST( testHasWaitersIAE );
-//        CPPUNIT_TEST( testHasWaitersIMSE );
-//        CPPUNIT_TEST( testGetWaitQueueLengthIAE );
-//        CPPUNIT_TEST( testGetWaitQueueLengthIMSE );
-//        CPPUNIT_TEST( testGetWaitingThreadsIAE );
-//        CPPUNIT_TEST( testGetWaitingThreadsIMSE );
-//        CPPUNIT_TEST( testHasWaiters );
-//        CPPUNIT_TEST( testGetWaitQueueLength );
-//        CPPUNIT_TEST( testGetWaitingThreads );
-//        CPPUNIT_TEST( testAwaitUninterruptibly );
-//        CPPUNIT_TEST( testAwaitInterrupt );
-//        CPPUNIT_TEST( testAwaitNanosInterrupt );
-//        CPPUNIT_TEST( testAwaitUntilInterrupt );
-//        CPPUNIT_TEST( testSignalAll );
-//        CPPUNIT_TEST( testToString );
-//        CPPUNIT_TEST( testGetStateWithReleaseShared );
-//        CPPUNIT_TEST( testReleaseShared );
-//        CPPUNIT_TEST( testAcquireSharedInterruptibly );
-//        CPPUNIT_TEST( testAsquireSharedTimed );
-//        CPPUNIT_TEST( testAcquireSharedInterruptiblyInterruptedException );
-//        CPPUNIT_TEST( testAcquireSharedNanosInterruptedException );
-//        CPPUNIT_TEST( testAcquireSharedNanosTimeout );
+        CPPUNIT_TEST( testGetFirstQueuedThread );
+        CPPUNIT_TEST( testHasContended );
+        CPPUNIT_TEST( testGetQueuedThreads );
+        CPPUNIT_TEST( testGetExclusiveQueuedThreads );
+        CPPUNIT_TEST( testGetSharedQueuedThreads );
+        CPPUNIT_TEST( testInterruptedException2 );
+        CPPUNIT_TEST( testTryAcquireWhenSynced );
+        CPPUNIT_TEST( testAcquireNanosTimeout );
+        CPPUNIT_TEST( testGetState );
+        CPPUNIT_TEST( testAcquireInterruptibly1 );
+        CPPUNIT_TEST( testAcquireInterruptibly2 );
+        CPPUNIT_TEST( testOwns );
+        CPPUNIT_TEST( testAwaitIllegalMonitor );
+        CPPUNIT_TEST( testSignalIllegalMonitor );
+        CPPUNIT_TEST( testAwaitNanosTimeout );
+        CPPUNIT_TEST( testAwaitTimeout );
+        CPPUNIT_TEST( testAwaitUntilTimeout );
+        CPPUNIT_TEST( testAwait );
+        CPPUNIT_TEST( testHasWaitersNPE );
+        CPPUNIT_TEST( testGetWaitQueueLengthNPE );
+        CPPUNIT_TEST( testGetWaitingThreadsNPE );
+        CPPUNIT_TEST( testHasWaitersIAE );
+        CPPUNIT_TEST( testHasWaitersIMSE );
+        CPPUNIT_TEST( testGetWaitQueueLengthIAE );
+        CPPUNIT_TEST( testGetWaitQueueLengthIMSE );
+        CPPUNIT_TEST( testGetWaitingThreadsIAE );
+        CPPUNIT_TEST( testGetWaitingThreadsIMSE );
+        CPPUNIT_TEST( testHasWaiters );
+        CPPUNIT_TEST( testGetWaitQueueLength );
+        CPPUNIT_TEST( testGetWaitingThreads );
+        CPPUNIT_TEST( testAwaitUninterruptibly );
+        CPPUNIT_TEST( testAwaitInterrupt );
+        CPPUNIT_TEST( testAwaitNanosInterrupt );
+        CPPUNIT_TEST( testAwaitUntilInterrupt );
+        CPPUNIT_TEST( testSignalAll );
+        CPPUNIT_TEST( testToString );
+        CPPUNIT_TEST( testGetStateWithReleaseShared );
+        CPPUNIT_TEST( testReleaseShared );
+        CPPUNIT_TEST( testAcquireSharedInterruptibly );
+        CPPUNIT_TEST( testAsquireSharedTimed );
+        CPPUNIT_TEST( testAcquireSharedInterruptiblyInterruptedException );
+        CPPUNIT_TEST( testAcquireSharedNanosInterruptedException );
+        CPPUNIT_TEST( testAcquireSharedNanosTimeout );
         CPPUNIT_TEST_SUITE_END();
 
     public: