You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/05/22 00:43:35 UTC

svn commit: r1125840 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks: AbstractQueuedSynchronizer.cpp AbstractQueuedSynchronizer.h ReentrantLock.cpp ReentrantLock.h

Author: tabish
Date: Sat May 21 22:43:34 2011
New Revision: 1125840

URL: http://svn.apache.org/viewvc?rev=1125840&view=rev
Log:
Implements a platform independent Reentrant Lock that's interruptible, has try lock methods and allows timed versions of lock and tryLock.  Uses the AbstractQueuedSynchronizer underneath so its fairly trivial to implement.  Allows for both fair and unfair locking policies.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/ReentrantLock.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/ReentrantLock.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp?rev=1125840&r1=1125839&r2=1125840&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.cpp Sat May 21 22:43:34 2011
@@ -1603,7 +1603,7 @@ Collection<Thread*>* AbstractQueuedSynch
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-bool AbstractQueuedSynchronizer::owns(ConditionObject* condition) const {
+bool AbstractQueuedSynchronizer::owns(const ConditionObject* condition) const {
     if (condition == NULL) {
         throw NullPointerException(__FILE__, __LINE__, "Condition Pointer arg was NULL");
     }
@@ -1611,7 +1611,7 @@ bool AbstractQueuedSynchronizer::owns(Co
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-bool AbstractQueuedSynchronizer::hasWaiters(ConditionObject* condition) const {
+bool AbstractQueuedSynchronizer::hasWaiters(const ConditionObject* condition) const {
     if (!owns(condition)) {
         throw IllegalArgumentException(__FILE__, __LINE__, "Not owner");
     }
@@ -1619,7 +1619,7 @@ bool AbstractQueuedSynchronizer::hasWait
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-int AbstractQueuedSynchronizer::getWaitQueueLength(ConditionObject* condition) const {
+int AbstractQueuedSynchronizer::getWaitQueueLength(const ConditionObject* condition) const {
     if (!owns(condition)) {
         throw IllegalArgumentException(__FILE__, __LINE__, "Not owner");
     }
@@ -1627,7 +1627,7 @@ int AbstractQueuedSynchronizer::getWaitQ
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Collection<Thread*>* AbstractQueuedSynchronizer::getWaitingThreads(ConditionObject* condition) const {
+Collection<Thread*>* AbstractQueuedSynchronizer::getWaitingThreads(const ConditionObject* condition) const {
     if (!owns(condition)) {
         throw IllegalArgumentException(__FILE__, __LINE__, "Not owner");
     }
@@ -1638,3 +1638,24 @@ Collection<Thread*>* AbstractQueuedSynch
 AbstractQueuedSynchronizer::ConditionObject* AbstractQueuedSynchronizer::createDefaultConditionObject() {
     return new DefaultConditionObject(this->impl);
 }
+
+////////////////////////////////////////////////////////////////////////////////
+bool AbstractQueuedSynchronizer::hasQueuedPredecessors() const {
+
+    bool result = false;
+
+    PlatformThread::readerLockMutex(this->impl->rwLock);
+
+    // The correctness of this depends on head being initialized
+    // before tail and on head->next being accurate if the current
+    // thread is first in queue.
+    Node* t = this->impl->tail.get(); // Read fields in reverse initialization order
+    Node* h = this->impl->head.get();
+    Node* s = NULL;
+
+    result = h != t && ((s = h->next) == NULL || s->thread != Thread::currentThread());
+
+    PlatformThread::unlockRWMutex(this->impl->rwLock);
+
+    return result;
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.h?rev=1125840&r1=1125839&r2=1125840&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/AbstractQueuedSynchronizer.h Sat May 21 22:43:34 2011
@@ -205,7 +205,7 @@ namespace locks {
          * @throws IllegalArgumentException if the ConditionObject is not associated with this Synchronizer.
          * @throws IllegalMonitorStateException if the caller does not hold exclusive synchronization.
          */
-        Collection<decaf::lang::Thread*>* getWaitingThreads(AbstractQueuedSynchronizer::ConditionObject* condition) const;
+        Collection<decaf::lang::Thread*>* getWaitingThreads(const AbstractQueuedSynchronizer::ConditionObject* condition) const;
 
         /**
          * Gets an estimated count of the number of threads that are currently waiting on the given
@@ -219,7 +219,7 @@ namespace locks {
          * @throws IllegalArgumentException if the ConditionObject is not associated with this Synchronizer.
          * @throws IllegalMonitorStateException if the caller does not hold exclusive synchronization.
          */
-        int getWaitQueueLength(AbstractQueuedSynchronizer::ConditionObject* condition) const;
+        int getWaitQueueLength(const AbstractQueuedSynchronizer::ConditionObject* condition) const;
 
         /**
          * @returns true if there has ever been the need for the acquire method to block.
@@ -241,7 +241,7 @@ namespace locks {
          * @throws IllegalArgumentException if the ConditionObject is not associated with this Synchronizer.
          * @throws IllegalMonitorStateException if the caller does not hold exclusive synchronization.
          */
-        bool hasWaiters(AbstractQueuedSynchronizer::ConditionObject* condition) const;
+        bool hasWaiters(const AbstractQueuedSynchronizer::ConditionObject* condition) const;
 
         /**
          * Traverse the Queue if waiting threads to see if the given thread is present.
@@ -259,7 +259,7 @@ namespace locks {
          *
          * @throws NullPointerException if the condition pointer is NULL.
          */
-        bool owns(AbstractQueuedSynchronizer::ConditionObject* condition) const;
+        bool owns(const AbstractQueuedSynchronizer::ConditionObject* condition) const;
 
         /**
          * When held in exclusive mode this method releases the Synchronizer.  This method calls
@@ -455,6 +455,40 @@ namespace locks {
          */
         virtual ConditionObject* createDefaultConditionObject();
 
+        /**
+         * Queries whether any threads have been waiting to acquire longer than the
+         * current thread.
+         *
+         * Note that because cancellations due to interrupts and timeouts may occur at any
+         * time, a true return does not guarantee that some other thread will acquire before
+         * the current thread.  Likewise, it is possible for another thread to win a race to
+         * enqueue after this method has returned false, due to the queue being empty.
+         *
+         * This method is designed to be used by a fair synchronizer to avoid barging. Such a
+         * synchronizer's tryAcquire method should return false, and its tryAcquireShared method
+         * should return a negative value, if this method returns true (unless this is a
+         * reentrant acquire).  For example, the tryAcquire method for a fair, reentrant,
+         * exclusive mode synchronizer might look like this:
+         *
+         * <pre> {@code
+         * virtual bool tryAcquire(int arg) {
+         *   if (isHeldExclusively()) {
+         *     // A reentrant acquire; increment hold count
+         *     return true;
+         *   } else if (hasQueuedPredecessors()) {
+         *     return false;
+         *   } else {
+         *     // try to acquire normally
+         *   }
+         * }}
+         * </pre>
+         *
+         * @return true if there is a queued thread preceding the current thread,
+         *         and false if the current thread is at the head of the queue
+         *         or the queue is empty
+         */
+        bool hasQueuedPredecessors() const;
+
         friend class SynchronizerState;
     };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/ReentrantLock.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/ReentrantLock.cpp?rev=1125840&r1=1125839&r2=1125840&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/ReentrantLock.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/ReentrantLock.cpp Sat May 21 22:43:34 2011
@@ -18,37 +18,12 @@
 #include <decaf/util/concurrent/locks/ReentrantLock.h>
 
 #include <decaf/lang/Thread.h>
+#include <decaf/lang/exceptions/RuntimeException.h>
+#include <decaf/lang/exceptions/IllegalMonitorStateException.h>
+#include <decaf/util/concurrent/locks/AbstractQueuedSynchronizer.h>
 
 #include <sstream>
 
-#if HAVE_PTHREAD_H
-#include <pthread.h>
-#endif
-#if HAVE_PTHREAD_H
-#include <pthread.h>
-#endif
-#if HAVE_SIGNAL_H
-#include <signal.h>
-#endif
-#if HAVE_STRING_H
-#include <string.h>
-#endif
-#if HAVE_SCHED_H
-#include <sched.h>
-#endif
-#if HAVE_SYS_TIME_H
-#include <sys/time.h>
-#endif
-#if HAVE_UNISTD_H
-#include <unistd.h>
-#endif
-#if HAVE_TIME_H
-#include <time.h>
-#endif
-#if HAVE_ERRNO_H
-#include <errno.h>
-#endif
-
 using namespace decaf;
 using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
@@ -63,323 +38,246 @@ namespace concurrent{
 namespace locks{
 
     /**
-     * Lock Handle provides the data needed to keep track of the Lock on
-     * the supported platforms.
+     * Fairness Policy base class that uses the AbstraactQueuedSynchronizer to do
+     * most of the heavy lifting for the ReentrantLock object.
      */
-    class LockHandle {
-    private:
+    class Sync : public AbstractQueuedSynchronizer {
+    public:
 
-        LockHandle( const LockHandle& );
-        LockHandle& operator= ( const LockHandle& );
+        Sync() {}
+        virtual ~Sync() {}
 
-    public:
+        /**
+         * Performs the actual lock.  The locking policy subclasses can
+         * optimize the locking process based on their fairness.
+         */
+        virtual void lock() = 0;
 
-        // OS specific Lock Object.
-#ifdef HAVE_PTHREAD_H
-        pthread_mutex_t handle;
-#else
-        CRITICAL_SECTION handle;
-#endif
-
-        // Lock Status Members
-        Thread* lock_owner;
-        volatile long long lock_owner_tid;
-        volatile long long lock_count;
+        virtual bool isFair() const = 0;
 
-    public:
+        /**
+         * Performs non-fair tryLock.  tryAcquire is implemented by each of the
+         * policy subclasses but both can make use of a non-fair try.
+         */
+        bool nonfairTryAcquire(int acquires) {
 
-        LockHandle() : handle(), lock_owner(NULL), lock_owner_tid(0), lock_count(0) {
+            Thread* current = Thread::currentThread();
+            int c = this->getState();
 
-#ifdef HAVE_PTHREAD_H
+            if (c == 0) {
 
-            if( pthread_mutex_init( &handle, NULL ) != 0 ) {
-                throw RuntimeException(
-                    __FILE__, __LINE__, "Failed to create OS Mutex object." );
-            }
+                if (this->compareAndSetState(0, acquires)) {
+                    this->setExclusiveOwnerThread(current);
+                    return true;
+                }
 
-#else
+            } else if (current == this->getExclusiveOwnerThread()) {
 
-            try{
-                InitializeCriticalSection( &handle );
-            } catch(...) {
-                throw RuntimeException(
-                    __FILE__, __LINE__, "Failed to create OS Mutex object." );
+                int nextc = c + acquires;
+
+                // Check for overflow of the state counter.
+                if (nextc < 0) {
+                    throw new RuntimeException(__FILE__, __LINE__, "Maximum lock count exceeded");
+                }
+
+                this->setState(nextc);
+                return true;
             }
 
-#endif
+            return false;
         }
 
-        ~LockHandle() {
-#ifdef HAVE_PTHREAD_H
-            pthread_mutex_destroy( &handle );
-#else
-            DeleteCriticalSection( &handle );
-#endif
+        Condition* getNewCondition() {
+            return AbstractQueuedSynchronizer::createDefaultConditionObject();
         }
 
-    };
+        Thread* getOwner() const {
+            return getState() == 0 ? NULL : getExclusiveOwnerThread();
+        }
 
-    /**
-     * Internally defined Condition Object.
-     *
-     * This Condition Object implements a Condition object that is associated with
-     * a single ReentrantLock object and has access to its internal LockHandle.
-     */
-    class ConditionObject : public Condition {
-    private:
+        int getHoldCount() const {
+            return isHeldExclusively() ? getState() : 0;
+        }
 
-        ConditionObject( const ConditionObject& );
-        ConditionObject& operator= ( const ConditionObject& );
+        bool isLocked() const {
+            return getState() != 0;
+        }
+
+        bool isHeldExclusively() const {
+            return getExclusiveOwnerThread() == Thread::currentThread();
+        }
 
-    private:
+    protected:
 
-        LockHandle* lock;
-
-#ifdef HAVE_PTHREAD_H
-        pthread_cond_t condition;
-#else
-        HANDLE semaphore;
-        CRITICAL_SECTION criticalSection;
-        unsigned int numWaiting;
-        unsigned int numWake;
-        unsigned int generation;
-#endif
+        bool tryRelease(int releases) {
 
-    public:
+            int c = getState() - releases;
 
-        ConditionObject( LockHandle* lock ) : lock(lock),
-#ifdef HAVE_PTHREAD_H
-            condition() {
-#else
-            semaphore(), criticalSection(), numWaiting(0), numWake(0), generation(0) {
-#endif
-
-#ifdef HAVE_PTHREAD_H
-            if( pthread_cond_init( &condition, NULL ) != 0 ) {
-                throw RuntimeException(
-                    __FILE__, __LINE__, "Failed to initialize OS Condition object." );
+            if (Thread::currentThread() != getExclusiveOwnerThread()) {
+                throw IllegalMonitorStateException();
             }
-#else
-            semaphore = CreateSemaphore( NULL, 0, LONG_MAX, NULL );
-            if( semaphore == NULL ) {
-                throw RuntimeException(
-                    __FILE__, __LINE__, "Failed to initialize OS Condition object." );
+
+            bool free = false;
+
+            if (c == 0) {
+                free = true;
+                setExclusiveOwnerThread(NULL);
             }
 
-            try{
-                InitializeCriticalSection( &criticalSection );
-            } catch(...) {
-                throw RuntimeException(
-                    __FILE__, __LINE__, "Failed to initialize OS Condition object." );
+            setState(c);
+
+            return free;
+        }
+
+    };
+
+    class NonFairSync : public Sync {
+    public:
+
+        NonFairSync() : Sync() {}
+        virtual ~NonFairSync() {}
+
+    public:
+
+        virtual bool isFair() const {
+            return false;
+        }
+
+        virtual void lock() {
+
+            if (this->compareAndSetState(0, 1)) {
+                this->setExclusiveOwnerThread(Thread::currentThread());
+            } else {
+                acquire(1);
             }
-#endif
         }
 
-        virtual ~ConditionObject() {
-#ifdef HAVE_PTHREAD_H
-            pthread_cond_destroy( &condition );
-#else
-            CloseHandle( semaphore );
-            ::DeleteCriticalSection( &criticalSection );
-#endif
+    protected:
+
+        virtual bool tryAcquire(int acquires) {
+            return this->nonfairTryAcquire(acquires);
+        }
+    };
+
+    /**
+     * The fair policy only lets a Thread acquire if the lock is not held, the
+     * call amounts to a recursive lock, or the Thread is the first one in line.
+     */
+    class FairSync : public Sync {
+    public:
+
+        FairSync() : Sync() {}
+        virtual ~FairSync() {}
+
+    public:
+
+        virtual void lock() {
+            this->acquire(1);
+        }
+
+        virtual bool isFair() const {
+            return true;
         }
 
-        virtual void await();
+    protected:
+
+        virtual bool tryAcquire(int acquires) {
+
+            Thread* current = Thread::currentThread();
+
+            int c = this->getState();
 
-        virtual void awaitUninterruptibly();
+            if (c == 0) {
 
-        virtual long long awaitNanos( long long nanosTimeout );
+                if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
+                    this->setExclusiveOwnerThread(current);
+                    return true;
+                }
 
-        virtual bool await( long long time, const TimeUnit& unit );
+            } else if (current == this->getExclusiveOwnerThread()) {
 
-        virtual bool awaitUntil( const Date& deadline );
+                int nextc = c + acquires;
 
-        virtual void signal();
+                // Check for overflow of the lock sstate variable.
+                if (nextc < 0) {
+                    throw new RuntimeException(__FILE__, __LINE__, "Maximum lock count exceeded");
+                }
 
-        virtual void signalAll();
+                setState(nextc);
+                return true;
+            }
 
+            return false;
+        }
     };
 
 }}}}
 
 ////////////////////////////////////////////////////////////////////////////////
-ReentrantLock::ReentrantLock() : handle(new LockHandle) {
+ReentrantLock::ReentrantLock() : Lock(), sync(new NonFairSync) {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ReentrantLock::ReentrantLock(bool fair) : Lock(), sync(NULL) {
+    fair == true ? sync = new FairSync() : sync = new NonFairSync();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 ReentrantLock::~ReentrantLock() {
     try{
-        delete this->handle;
+        delete this->sync;
     }
     DECAF_CATCHALL_NOTHROW()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ReentrantLock::lock() {
-
-    long long threadId = Thread::currentThread()->getId();
-
-    if( threadId == handle->lock_owner_tid ) {
-        handle->lock_count++;
-    } else {
-
-#ifdef HAVE_PTHREAD_H
-
-        if( pthread_mutex_lock( &( handle->handle ) ) != 0 ) {
-            throw RuntimeException(
-                __FILE__, __LINE__, "Failed to Lock OS Mutex" );
-        }
-
-#else
-
-        try{
-            EnterCriticalSection( &handle->handle );
-        } catch(...) {
-            throw RuntimeException(
-                __FILE__, __LINE__, "Failed to Lock OS Mutex" );
-        }
-
-#endif
-
-        handle->lock_owner_tid = threadId;
-        handle->lock_owner = Thread::currentThread();
-        handle->lock_count = 1;
-    }
+    this->sync->lock();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ReentrantLock::lockInterruptibly() {
-
-    this->lock();
+    this->sync->acquireInterruptibly(1);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 bool ReentrantLock::tryLock() {
-
-    long long threadId = Thread::currentThread()->getId();
-
-    if( threadId == handle->lock_owner_tid ) {
-        handle->lock_count++;
-    } else {
-
-        unsigned int result = 0;
-
-#ifdef HAVE_PTHREAD_H
-        result = pthread_mutex_trylock( &( handle->handle ) );
-#else
-        try{
-            result = TryEnterCriticalSection( &handle->handle );
-        } catch(...) {
-            throw RuntimeException(
-                __FILE__, __LINE__, "Failed to Lock OS Mutex" );
-        }
-#endif
-
-        if( result == 0 ) {
-            handle->lock_owner_tid = threadId;
-            handle->lock_count = 1;
-            handle->lock_owner = Thread::currentThread();
-        } else {
-            return false;
-        }
-    }
-
-    return true;
+    return this->sync->nonfairTryAcquire(1);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-bool ReentrantLock::tryLock( long long time DECAF_UNUSED, const TimeUnit& unit DECAF_UNUSED ) {
-
-//    long long threadId = Thread::getId();
-//
-//    if( threadId == handle->lock_owner_tid ) {
-//        handle->lock_count++;
-//    } else {
-//
-//        if( pthread_mutex_timedlock( &( handle->handle ) ) == 0 ) {
-//            handle->lock_owner_tid = threadId;
-//            handle->lock_count = 1;
-//            handle->lock_owner = Thread::currentThread();
-//        } else {
-//            return false;
-//        }
-//    }
-//
-//    return true;
-
-    return false;
+bool ReentrantLock::tryLock(long long time, const TimeUnit& unit) {
+    return this->sync->tryAcquireNanos(1, unit.toNanos(time));
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ReentrantLock::unlock() {
-
-    if( handle->lock_owner_tid == 0 ) {
-        return;
-    }
-
-    if( handle->lock_owner_tid != Thread::currentThread()->getId() ) {
-        throw IllegalMonitorStateException(
-            __FILE__, __LINE__,
-            "Unlock Failed, this thread is not the Lock Owner!" );
-    }
-
-    handle->lock_count--;
-
-    if( handle->lock_count == 0 ) {
-        handle->lock_owner_tid = 0;
-        handle->lock_owner = NULL;
-
-#ifdef HAVE_PTHREAD_H
-        pthread_mutex_unlock( &( handle->handle ) );
-#else
-        try{
-            LeaveCriticalSection( &handle->handle );
-        } catch(...) {
-            throw RuntimeException(
-                __FILE__, __LINE__, "Failed to Unlock OS Mutex" );
-        }
-#endif
-    }
+    this->sync->release(1);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 Condition* ReentrantLock::newCondition() {
-
-    return new ConditionObject( this->handle );
+    return this->sync->getNewCondition();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 int ReentrantLock::getHoldCount() const {
-
-    long long threadId = Thread::currentThread()->getId();
-
-    if( threadId == handle->lock_owner_tid ) {
-        return (int)handle->lock_count;
-    }
-
-    return 0;
+    return this->sync->getHoldCount();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 bool ReentrantLock::isHeldByCurrentThread() const {
-
-    long long threadId = Thread::currentThread()->getId();
-
-    if( threadId == handle->lock_owner_tid ) {
-        return true;
-    }
-
-    return false;
+    return this->sync->isHeldExclusively();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 bool ReentrantLock::isLocked() const {
-    return this->handle->lock_count > 0;
+    return this->sync->isLocked();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 bool ReentrantLock::isFair() const {
-    return false;
+    return this->sync->isFair();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -388,7 +286,7 @@ std::string ReentrantLock::toString() co
 
     result << "ReentrantLock: ";
 
-    Thread* current = handle->lock_owner;
+    Thread* current = this->sync->getOwner();
 
     if( current != NULL ) {
         result << "[Locked by Thread: " << current->getName() << "]";
@@ -400,169 +298,82 @@ std::string ReentrantLock::toString() co
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ConditionObject::await() {
-
-    // Save the current owner as we are going to unlock and release for
-    // someone else to lock on potentially.  When we come back and
-    // re-lock we want to restore to the state we were in before.
-    long long lock_owner_tid = lock->lock_owner_tid;
-    long long lock_count = lock->lock_count;
-    Thread* lock_owner = lock->lock_owner;
-
-    // Clear the owner for now.
-    lock->lock_owner = NULL;
-    lock->lock_count = 0;
-    lock->lock_owner_tid = 0;
-
-#ifdef HAVE_PTHREAD_H
-    if( pthread_cond_wait( &condition, &lock->handle ) != 0 ) {
-        throw RuntimeException(
-            __FILE__, __LINE__, "Failed to wait on OS Condition object." );
-    }
-#else
-
-#endif
-
-    // restore the owner
-    lock->lock_owner = lock_owner;
-    lock->lock_count = lock_count;
-    lock->lock_owner_tid = lock_owner_tid;
+decaf::lang::Thread* ReentrantLock::getOwner() const {
+    return this->sync->getOwner();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ConditionObject::awaitUninterruptibly() {
-    this->await();
+int ReentrantLock::getQueueLength() const {
+    return this->sync->getQueueLength();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-long long ConditionObject::awaitNanos( long long nanosTimeout ) {
-
-    // Save the current owner as we are going to unlock and release for
-    // someone else to lock on potentially.  When we come back and
-    // re-lock we want to restore to the state we were in before.
-    long long lock_owner_tid = lock->lock_owner_tid;
-    long long lock_count = lock->lock_count;
-    Thread* lock_owner = lock->lock_owner;
-
-    // Clear the owner for now.
-    lock->lock_owner = NULL;
-    lock->lock_count = 0;
-    lock->lock_owner_tid = 0;
-
-#ifdef HAVE_PTHREAD_H
-
-    // Get time now as nanoseconds.
-    struct timeval tv;
-    gettimeofday( &tv, NULL );
-    long long timeNow = TimeUnit::SECONDS.toNanos( tv.tv_sec ) +
-                        TimeUnit::MICROSECONDS.toNanos( tv.tv_usec );
-
-    // Convert delay to nanoseconds and add it to now.
-    long long delay = nanosTimeout + timeNow;
-
-    struct timespec abstime;
-    abstime.tv_sec = TimeUnit::NANOSECONDS.toSeconds( delay );
-    abstime.tv_nsec = delay % 1000000000;
-
-    unsigned int result = pthread_cond_timedwait( &condition, &lock->handle, &abstime );
-
-    if( result != 0 && result != ETIMEDOUT ) {
-        throw RuntimeException(
-            __FILE__, __LINE__, "Failed to wait on OS Condition object." );
-    }
-
-#else
-
-#endif
-
-    // restore the owner
-    lock->lock_owner = lock_owner;
-    lock->lock_count = lock_count;
-    lock->lock_owner_tid = lock_owner_tid;
-
-    return 0;
+Collection<decaf::lang::Thread*>* ReentrantLock::getQueuedThreads() const {
+    return this->sync->getQueuedThreads();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-bool ConditionObject::await( long long time DECAF_UNUSED, const TimeUnit& unit DECAF_UNUSED ) {
-
-    return false;
+bool ReentrantLock::hasQueuedThreads() const {
+    return this->sync->hasQueuedThreads();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-bool ConditionObject::awaitUntil( const Date& deadline DECAF_UNUSED ) {
+bool ReentrantLock::hasQueuedThread(decaf::lang::Thread* thread) const {
+
+    if (thread == NULL) {
+        throw NullPointerException(__FILE__, __LINE__, "The thread to check was NULL");
+    }
 
-    return false;
+    return this->sync->isQueued(thread);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ConditionObject::signal() {
+int ReentrantLock::getWaitQueueLength(Condition* condition) const {
 
-#ifdef HAVE_PTHREAD_H
-    if( pthread_cond_signal( &condition ) ) {
-        throw RuntimeException(
-            __FILE__, __LINE__, "Failed to signal OS Condition object." );
+    if (condition == NULL) {
+        throw NullPointerException(__FILE__, __LINE__, "The Condition to check was NULL");
     }
-#else
-
-    try {
-
-        bool doWake = false;
-
-        EnterCriticalSection( &criticalSection );
-
-        if( numWaiting > numWake ) {
-            doWake = true;
-            numWake++;
-            generation++;
-        }
 
-        LeaveCriticalSection( &criticalSection );
+    const AbstractQueuedSynchronizer::ConditionObject* cond =
+        dynamic_cast<const AbstractQueuedSynchronizer::ConditionObject*>(condition);
 
-        if( doWake ) {
-            ReleaseSemaphore( semaphore, 1, NULL );
-        }
-
-    } catch(...) {
-        throw RuntimeException(
-            __FILE__, __LINE__, "Failed to signal OS Condition object." );
+    if (cond == NULL) {
+        throw IllegalArgumentException(__FILE__, __LINE__, "Condition is not associated with this Lock");
     }
 
-#endif
+    return this->sync->getWaitQueueLength(cond);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ConditionObject::signalAll() {
+bool ReentrantLock::hasWaiters(Condition* condition) const {
 
-#ifdef HAVE_PTHREAD_H
-    if( pthread_cond_broadcast( &condition ) ) {
-        throw RuntimeException(
-            __FILE__, __LINE__, "Failed to broadcast signal OS Condition object." );
+    if (condition == NULL) {
+        throw NullPointerException(__FILE__, __LINE__, "The Condition to check was NULL");
     }
-#else
 
-    try {
+    const AbstractQueuedSynchronizer::ConditionObject* cond =
+        dynamic_cast<const AbstractQueuedSynchronizer::ConditionObject*>(condition);
 
-        unsigned int numWakeTemp = 0;
+    if (cond == NULL) {
+        throw IllegalArgumentException(__FILE__, __LINE__, "Condition is not associated with this Lock");
+    }
 
-        EnterCriticalSection( &criticalSection );
+    return this->sync->hasWaiters(cond);
+}
 
-        if( numWaiting > numWake ) {
-            numWakeTemp = numWaiting - numWake;
-            numWake = numWaiting;
-            generation++;
-        }
+////////////////////////////////////////////////////////////////////////////////
+Collection<decaf::lang::Thread*>* ReentrantLock::getWaitingThreads(Condition* condition) const {
 
-        LeaveCriticalSection( &criticalSection );
+    if (condition == NULL) {
+        throw NullPointerException(__FILE__, __LINE__, "The Condition to check was NULL");
+    }
 
-        if( numWakeTemp ) {
-            ReleaseSemaphore( semaphore, numWake, NULL );
-        }
+    const AbstractQueuedSynchronizer::ConditionObject* cond =
+        dynamic_cast<const AbstractQueuedSynchronizer::ConditionObject*>(condition);
 
-    } catch(...) {
-        throw RuntimeException(
-            __FILE__, __LINE__, "Failed to broadcast signal OS Condition object." );
+    if (cond == NULL) {
+        throw IllegalArgumentException(__FILE__, __LINE__, "Condition is not associated with this Lock");
     }
 
-#endif
+    return this->sync->getWaitingThreads(cond);
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/ReentrantLock.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/ReentrantLock.h?rev=1125840&r1=1125839&r2=1125840&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/ReentrantLock.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/locks/ReentrantLock.h Sat May 21 22:43:34 2011
@@ -21,13 +21,14 @@
 #include <decaf/util/Config.h>
 
 #include <decaf/util/concurrent/locks/Lock.h>
+#include <decaf/util/Collection.h>
 
 namespace decaf {
 namespace util {
 namespace concurrent {
 namespace locks {
 
-    class LockHandle;
+    class Sync;
 
     /**
      * A reentrant mutual exclusion Lock with extended capabilities.
@@ -79,7 +80,9 @@ namespace locks {
     class DECAF_API ReentrantLock : public Lock {
     private:
 
-        LockHandle* handle;
+        // Instance of an AbstractQueuedSynchronizer specific to this type of Lock.
+        // Will vary depending on whether fair or unfair lock semantics are requested.
+        Sync* sync;
 
     private:
 
@@ -88,8 +91,19 @@ namespace locks {
 
     public:
 
+        /**
+         * Create a new ReentrantLock instance with unfair locking semantics.
+         */
         ReentrantLock();
 
+        /**
+         * Create a new ReentrantLock instance with the specified locking semantics.
+         *
+         * @param fair
+         *      Boolean value indicating if the lock should be fair or not.
+         */
+        ReentrantLock(bool fair);
+
         virtual ~ReentrantLock();
 
         /**
@@ -254,6 +268,8 @@ namespace locks {
          */
         virtual Condition* newCondition();
 
+    public:  // Diagnostics methods.
+
         /**
          * Queries the number of holds on this lock by the current thread.
          *
@@ -353,6 +369,90 @@ namespace locks {
          */
         std::string toString() const;
 
+        /**
+         * Gets an estimated count of the number of threads that are currently waiting to acquire, this
+         * value changes dynamically so the result of this method can be invalid immediately after it
+         * is called.
+         *
+         * @returns an estimate of the number of waiting threads.
+         */
+        int getQueueLength() const;
+
+        /**
+         * Gets an estimated count of the number of threads that are currently waiting on the given
+         * Condition object, this value changes dynamically so the result of this method can be invalid
+         * immediately after it is called.  The Condition object must be associated with this Lock
+         * or an exception will be thrown.
+         *
+         * @returns an estimate of the number of waiting threads.
+         *
+         * @throws NullPointerException if the ConditionObject pointer is NULL.
+         * @throws IllegalArgumentException if the ConditionObject is not associated with this Synchronizer.
+         * @throws IllegalMonitorStateException if the caller does not hold exclusive synchronization.
+         */
+        int getWaitQueueLength(Condition* condition) const;
+
+        /**
+         * Returns true if there are any threads that are currently waiting on the given Condition object,
+         * the condition must be associated with this Lock instance.
+         *
+         * @returns true if the condition object has waiting threads.
+         *
+         * @throws NullPointerException if the ConditionObject pointer is NULL.
+         * @throws IllegalArgumentException if the ConditionObject is not associated with this Lock.
+         * @throws IllegalMonitorStateException if the caller does not hold exclusive synchronization.
+         */
+        bool hasWaiters(Condition* condition) const;
+
+        /**
+         * @returns true if there are threads that are currently waiting to acquire this Lock.
+         */
+        bool hasQueuedThreads() const;
+
+        /**
+         * @returns true if the given Thread is waiting to acquire this Lock object.  Because of cancellations
+         *          this method can return true but the given Thread is not in the Queue afterwards.
+         *
+         * @throws NullPointerException if the given thread is NULL.
+         */
+        bool hasQueuedThread(decaf::lang::Thread* thread) const;
+
+    protected:
+
+        /**
+         * Creates and returns a new Collection object that contains all the threads that may be waiting
+         * on the given Condition object instance at the time this method is called.
+         *
+         * @returns a Collection pointer that contains waiting threads on given Condition object.
+         *          The caller owns the returned pointer.
+         *
+         * @throws NullPointerException if the ConditionObject pointer is NULL.
+         * @throws IllegalArgumentException if the ConditionObject is not associated with this Synchronizer.
+         * @throws IllegalMonitorStateException if the caller does not hold exclusive synchronization.
+         */
+        decaf::util::Collection<decaf::lang::Thread*>* getWaitingThreads(Condition* condition) const;
+
+        /**
+         * Returns the thread that currently owns this lock, or NULL if not owned. When this method
+         * is called by a thread that is not the owner, the return value reflects a best-effort
+         * approximation of current lock status. For example, the owner may be momentarily NULL even
+         * if there are threads trying to acquire the lock but have not yet done so. This method is
+         * designed to facilitate construction of subclasses that provide more extensive lock
+         * monitoring facilities.
+         *
+         * @return pointer to the Thread that owns this lock, or NULL if not owned.
+         */
+        decaf::lang::Thread* getOwner() const;
+
+        /**
+         * Creates and returns a new Collection object that contains a best effort snapshot of the
+         * threads that are currently waiting to acquire.
+         *
+         * @returns a Collection pointer that contains waiting threads for lock acquisition.
+         *          The caller owns the returned pointer.
+         */
+        decaf::util::Collection<decaf::lang::Thread*>* getQueuedThreads() const;
+
     };
 
 }}}}