You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/05/20 00:51:50 UTC

svn commit: r1125159 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent: PlatformThread.h unix/PlatformDefs.h unix/PlatformThread.cpp windows/PlatformDefs.h windows/PlatformThread.cpp

Author: tabish
Date: Thu May 19 22:51:50 2011
New Revision: 1125159

URL: http://svn.apache.org/viewvc?rev=1125159&view=rev
Log:
Adds a simple RW Lock for use in the internal decaf implementation.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/PlatformThread.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/unix/PlatformDefs.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/unix/PlatformThread.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/windows/PlatformDefs.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/windows/PlatformThread.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/PlatformThread.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/PlatformThread.h?rev=1125159&r1=1125158&r2=1125159&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/PlatformThread.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/PlatformThread.h Thu May 19 22:51:50 2011
@@ -38,8 +38,16 @@ namespace concurrent {
         PlatformThread(const PlatformThread&);
         PlatformThread& operator= (const PlatformThread&);
 
-    public:
+    public:  // Mutex processing methods
 
+        /**
+         * Creates a new Mutex instance at the location given by the mutex pointer
+         * argument.  The mutex must be destroyed by calling the destoryMutex
+         * method when it is no longer needed.
+         *
+         * @param mutex
+         *      Pointer to a memory location where the new Mutex is to be stored.
+         */
         static void createMutex(decaf_mutex_t* mutex);
 
         static void lockMutex(decaf_mutex_t mutex);
@@ -50,6 +58,20 @@ namespace concurrent {
 
         static void destroyMutex(decaf_mutex_t mutex);
 
+    public: // Reader / Writer Mutex processing methods.
+
+        static void createRWMutex(decaf_rwmutex_t* mutex);
+
+        static void readerLockMutex(decaf_rwmutex_t mutex);
+        static void writerLockMutex(decaf_rwmutex_t mutex);
+
+        static bool tryReaderLockMutex(decaf_rwmutex_t mutex);
+        static bool tryWriterLockMutex(decaf_rwmutex_t mutex);
+
+        static void unlockRWMutex(decaf_rwmutex_t mutex);
+
+        static void destroyRWMutex(decaf_rwmutex_t mutex);
+
     public:  // Condition processing methods
 
         static void createCondition(decaf_condition_t* condition);

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/unix/PlatformDefs.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/unix/PlatformDefs.h?rev=1125159&r1=1125158&r2=1125159&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/unix/PlatformDefs.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/unix/PlatformDefs.h Thu May 19 22:51:50 2011
@@ -57,7 +57,7 @@ namespace concurrent{
     typedef pthread_key_t decaf_tls_key;
     typedef pthread_cond_t* decaf_condition_t;
     typedef pthread_mutex_t* decaf_mutex_t;
-
+    typedef pthread_rwlock_t* decaf_rwmutex_t;
 
 }}}}
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/unix/PlatformThread.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/unix/PlatformThread.cpp?rev=1125159&r1=1125158&r2=1125159&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/unix/PlatformThread.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/unix/PlatformThread.cpp Thu May 19 22:51:50 2011
@@ -99,6 +99,60 @@ void PlatformThread::destroyMutex(decaf_
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void PlatformThread::createRWMutex(decaf_rwmutex_t* mutex) {
+
+    *mutex = new pthread_rwlock_t;
+
+    if( pthread_rwlock_init(*mutex, NULL) != 0 ) {
+        throw RuntimeException(
+            __FILE__, __LINE__, "Failed to create OS Mutex object." );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PlatformThread::readerLockMutex(decaf_rwmutex_t mutex) {
+
+    if (pthread_rwlock_rdlock(mutex) != 0) {
+        throw RuntimeException(
+            __FILE__, __LINE__, "Failed to Lock OS RW Mutex" );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PlatformThread::writerLockMutex(decaf_rwmutex_t mutex) {
+
+    if (pthread_rwlock_wrlock(mutex) != 0) {
+        throw RuntimeException(
+            __FILE__, __LINE__, "Failed to Lock OS RW Mutex" );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool PlatformThread::tryReaderLockMutex(decaf_rwmutex_t mutex) {
+    return pthread_rwlock_tryrdlock(mutex) == 0 ? true : false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool PlatformThread::tryWriterLockMutex(decaf_rwmutex_t mutex) {
+    return pthread_rwlock_trywrlock(mutex) == 0 ? true : false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PlatformThread::unlockRWMutex(decaf_rwmutex_t mutex) {
+
+    if (pthread_rwlock_unlock(mutex) != 0) {
+        throw RuntimeException(
+            __FILE__, __LINE__, "Failed to Unlock OS RW Mutex" );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PlatformThread::destroyRWMutex(decaf_rwmutex_t mutex) {
+    pthread_rwlock_destroy(mutex);
+    delete mutex;
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void PlatformThread::createCondition(decaf_condition_t* condition) {
 
     *condition = new pthread_cond_t;

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/windows/PlatformDefs.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/windows/PlatformDefs.h?rev=1125159&r1=1125158&r2=1125159&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/windows/PlatformDefs.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/windows/PlatformDefs.h Thu May 19 22:51:50 2011
@@ -47,7 +47,14 @@ namespace internal{
 namespace util{
 namespace concurrent{
 
+    struct RWLOCK {
+        HANDLE writeMutex;
+        HANDLE readEvent;
+        LONG readers;
+    };
+
     typedef void* PLATFORM_THREAD_ENTRY_ARG;
+
     #define PLATFORM_THREAD_RETURN() return 0;
     #define PLATFORM_THREAD_CALLBACK_TYPE unsigned
     #define PLATFORM_DEFAULT_STACK_SIZE 0x8000
@@ -57,6 +64,7 @@ namespace concurrent{
     typedef DWORD decaf_tls_key;
     typedef HANDLE decaf_condition_t;
     typedef LPCRITICAL_SECTION decaf_mutex_t;
+    typedef RWLOCK* decaf_rwmutex_t;
 
 }}}}
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/windows/PlatformThread.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/windows/PlatformThread.cpp?rev=1125159&r1=1125158&r2=1125159&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/windows/PlatformThread.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/windows/PlatformThread.cpp Thu May 19 22:51:50 2011
@@ -36,7 +36,7 @@ using namespace decaf::internal::util::c
 
 ////////////////////////////////////////////////////////////////////////////////
 void PlatformThread::createMutex(decaf_mutex_t* mutex) {
-	*mutex = new CRITICAL_SECTION;
+    *mutex = new CRITICAL_SECTION;
     ::InitializeCriticalSection(*mutex);
 }
 
@@ -58,7 +58,168 @@ void PlatformThread::unlockMutex(decaf_m
 ////////////////////////////////////////////////////////////////////////////////
 void PlatformThread::destroyMutex(decaf_mutex_t mutex) {
     ::DeleteCriticalSection(mutex);
-	delete mutex;
+    delete mutex;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PlatformThread::createRWMutex(decaf_rwmutex_t* mutex) {
+    *mutex = new RWLOCK;
+
+    (*mutex)->readers = 0;
+
+    if (!((*mutex)->readEvent = ::CreateEvent(NULL, TRUE, FALSE, NULL))) {
+        *mutex = NULL;
+        throw RuntimeException(
+            __FILE__, __LINE__, "Failed to initialize OS Event object.");
+    }
+
+    if (! ((*mutex)->writeMutex = ::CreateMutex(NULL, FALSE, NULL))) {
+        ::CloseHandle((*mutex)->readEvent);
+        *mutex = NULL;
+        throw RuntimeException(
+            __FILE__, __LINE__, "Failed to initialize OS Mutex object.");
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PlatformThread::readerLockMutex(decaf_rwmutex_t mutex) {
+
+    DWORD code = ::WaitForSingleObject(mutex->writeMutex, INFINITE);
+
+    if (code == WAIT_FAILED || code == WAIT_TIMEOUT) {
+        throw RuntimeException(
+            __FILE__, __LINE__, "Failed to wait for lock on RW object.");
+    }
+
+    // The write mutex lock prevents writers from entering while we add a
+    // reader, and protects the read counter from races.
+    ::InterlockedIncrement(&mutex->readers);
+
+    if (! ::ResetEvent(mutex->readEvent)) {
+        throw RuntimeException(
+            __FILE__, __LINE__, "Failed to reset RW Event object.");
+    }
+
+    if (! ReleaseMutex(mutex->writeMutex)) {
+        throw RuntimeException(
+            __FILE__, __LINE__, "Failed to release RW Mutex object.");
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PlatformThread::writerLockMutex(decaf_rwmutex_t mutex) {
+
+    DWORD code = ::WaitForSingleObject(mutex->writMmutex, INFINITE);
+
+    if (code == WAIT_FAILED || code == WAIT_TIMEOUT) {
+        throw RuntimeException(
+            __FILE__, __LINE__, "Failed to wait for lock on RW object.");
+    }
+
+    // Once we own the lock we must wait for the readers to all leave.
+    if (mutex->readers) {
+
+        code = ::WaitForSingleObject(mutex->readEvent, INFINITE);
+        if (code == WAIT_FAILED || code == WAIT_TIMEOUT) {
+
+            ::ReleaseMutex(mutex->writeMutex);
+
+            throw RuntimeException(
+                __FILE__, __LINE__, "Failed to wait for lock on RW object.");
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool PlatformThread::tryReaderLockMutex(decaf_rwmutex_t mutex) {
+
+    DWORD code = ::WaitForSingleObject(mutex->writeMutex, 0);
+
+    if (code == WAIT_FAILED) {
+        throw RuntimeException(
+            __FILE__, __LINE__, "Failed to wait for lock on RW object.");
+    }
+
+    if (code == WAIT_TIMEOUT) {
+        return false;
+    }
+
+    // The write mutex lock prevents writers from entering while we add a
+    // reader, and protects the read counter from races.
+    ::InterlockedIncrement(&mutex->readers);
+
+    if (! ::ResetEvent(mutex->readEvent)) {
+        throw RuntimeException(
+            __FILE__, __LINE__, "Failed to reset RW Event object.");
+    }
+
+    if (! ::ReleaseMutex(mutex->writeMutex)) {
+        throw RuntimeException(
+            __FILE__, __LINE__, "Failed to release RW Mutex object.");
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool PlatformThread::tryWriterLockMutex(decaf_rwmutex_t mutex) {
+
+    DWORD code = ::WaitForSingleObject(mutex->writMmutex, 0);
+
+    if (code == WAIT_FAILED) {
+        throw RuntimeException(
+            __FILE__, __LINE__, "Failed to wait for lock on RW object.");
+    }
+
+    if (code == WAIT_TIMEOUT) {
+        return false;
+    }
+
+    // Once we own the lock we must check for readers, if there
+    if (mutex->readers > 0) {
+
+        if (! ::ReleaseMutex(mutex->writeMutex)) {
+            throw RuntimeException(
+                __FILE__, __LINE__, "Failed to release RW Mutex object.");
+        }
+        return false;
+    }
+
+    return true;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PlatformThread::unlockRWMutex(decaf_rwmutex_t mutex) {
+
+    DWORD result = 0;
+
+    if (! ::ReleaseMutex(rwlock->write_mutex)) {
+        result = ::GetLastError();
+    }
+
+    // If not the owner then we must be a reader holding the lock.
+    if (result == ERROR_NOT_OWNER) {
+
+        // If there are readers and this is the last release, signal the event
+        // so that any waiting writers get nofitied.
+        if (mutex->readers > 0 && ::InterlockedDecrement(mutex->readers) == 0) {
+
+           if (! ::SetEvent(mutex->readEvent)) {
+               throw RuntimeException(
+                   __FILE__, __LINE__, "Failed to signal OS event object.");
+           }
+        }
+    } else {
+        throw RuntimeException(
+            __FILE__, __LINE__, "Failed to signal unlock OS Mutex object.");
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PlatformThread::destroyRWMutex(decaf_rwmutex_t mutex) {
+
+    ::CloseHandle(mutex->readEvent);
+    ::CloseHandle(mutex->writeMutex);
+
+    delete mutex;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -144,7 +305,7 @@ bool PlatformThread::interruptibleWaitOn
 
         if (timedOut == WAIT_TIMEOUT) {
 
-			// interruption events take precedence over timeout.
+            // interruption events take precedence over timeout.
             if (complete(true)) {
                break;
             }