You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/05/20 00:51:50 UTC
svn commit: r1125159 - in
/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent:
PlatformThread.h unix/PlatformDefs.h unix/PlatformThread.cpp
windows/PlatformDefs.h windows/PlatformThread.cpp
Author: tabish
Date: Thu May 19 22:51:50 2011
New Revision: 1125159
URL: http://svn.apache.org/viewvc?rev=1125159&view=rev
Log:
Adds a simple RW Lock for use in the internal decaf implementation.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/PlatformThread.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/unix/PlatformDefs.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/unix/PlatformThread.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/windows/PlatformDefs.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/windows/PlatformThread.cpp
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/PlatformThread.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/PlatformThread.h?rev=1125159&r1=1125158&r2=1125159&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/PlatformThread.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/PlatformThread.h Thu May 19 22:51:50 2011
@@ -38,8 +38,16 @@ namespace concurrent {
PlatformThread(const PlatformThread&);
PlatformThread& operator= (const PlatformThread&);
- public:
+ public: // Mutex processing methods
+ /**
+ * Creates a new Mutex instance at the location given by the mutex pointer
+ * argument. The mutex must be destroyed by calling the destoryMutex
+ * method when it is no longer needed.
+ *
+ * @param mutex
+ * Pointer to a memory location where the new Mutex is to be stored.
+ */
static void createMutex(decaf_mutex_t* mutex);
static void lockMutex(decaf_mutex_t mutex);
@@ -50,6 +58,20 @@ namespace concurrent {
static void destroyMutex(decaf_mutex_t mutex);
+ public: // Reader / Writer Mutex processing methods.
+
+ static void createRWMutex(decaf_rwmutex_t* mutex);
+
+ static void readerLockMutex(decaf_rwmutex_t mutex);
+ static void writerLockMutex(decaf_rwmutex_t mutex);
+
+ static bool tryReaderLockMutex(decaf_rwmutex_t mutex);
+ static bool tryWriterLockMutex(decaf_rwmutex_t mutex);
+
+ static void unlockRWMutex(decaf_rwmutex_t mutex);
+
+ static void destroyRWMutex(decaf_rwmutex_t mutex);
+
public: // Condition processing methods
static void createCondition(decaf_condition_t* condition);
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/unix/PlatformDefs.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/unix/PlatformDefs.h?rev=1125159&r1=1125158&r2=1125159&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/unix/PlatformDefs.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/unix/PlatformDefs.h Thu May 19 22:51:50 2011
@@ -57,7 +57,7 @@ namespace concurrent{
typedef pthread_key_t decaf_tls_key;
typedef pthread_cond_t* decaf_condition_t;
typedef pthread_mutex_t* decaf_mutex_t;
-
+ typedef pthread_rwlock_t* decaf_rwmutex_t;
}}}}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/unix/PlatformThread.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/unix/PlatformThread.cpp?rev=1125159&r1=1125158&r2=1125159&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/unix/PlatformThread.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/unix/PlatformThread.cpp Thu May 19 22:51:50 2011
@@ -99,6 +99,60 @@ void PlatformThread::destroyMutex(decaf_
}
////////////////////////////////////////////////////////////////////////////////
+void PlatformThread::createRWMutex(decaf_rwmutex_t* mutex) {
+
+ *mutex = new pthread_rwlock_t;
+
+ if( pthread_rwlock_init(*mutex, NULL) != 0 ) {
+ throw RuntimeException(
+ __FILE__, __LINE__, "Failed to create OS Mutex object." );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PlatformThread::readerLockMutex(decaf_rwmutex_t mutex) {
+
+ if (pthread_rwlock_rdlock(mutex) != 0) {
+ throw RuntimeException(
+ __FILE__, __LINE__, "Failed to Lock OS RW Mutex" );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PlatformThread::writerLockMutex(decaf_rwmutex_t mutex) {
+
+ if (pthread_rwlock_wrlock(mutex) != 0) {
+ throw RuntimeException(
+ __FILE__, __LINE__, "Failed to Lock OS RW Mutex" );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool PlatformThread::tryReaderLockMutex(decaf_rwmutex_t mutex) {
+ return pthread_rwlock_tryrdlock(mutex) == 0 ? true : false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool PlatformThread::tryWriterLockMutex(decaf_rwmutex_t mutex) {
+ return pthread_rwlock_trywrlock(mutex) == 0 ? true : false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PlatformThread::unlockRWMutex(decaf_rwmutex_t mutex) {
+
+ if (pthread_rwlock_unlock(mutex) != 0) {
+ throw RuntimeException(
+ __FILE__, __LINE__, "Failed to Unlock OS RW Mutex" );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PlatformThread::destroyRWMutex(decaf_rwmutex_t mutex) {
+ pthread_rwlock_destroy(mutex);
+ delete mutex;
+}
+
+////////////////////////////////////////////////////////////////////////////////
void PlatformThread::createCondition(decaf_condition_t* condition) {
*condition = new pthread_cond_t;
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/windows/PlatformDefs.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/windows/PlatformDefs.h?rev=1125159&r1=1125158&r2=1125159&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/windows/PlatformDefs.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/windows/PlatformDefs.h Thu May 19 22:51:50 2011
@@ -47,7 +47,14 @@ namespace internal{
namespace util{
namespace concurrent{
+ struct RWLOCK {
+ HANDLE writeMutex;
+ HANDLE readEvent;
+ LONG readers;
+ };
+
typedef void* PLATFORM_THREAD_ENTRY_ARG;
+
#define PLATFORM_THREAD_RETURN() return 0;
#define PLATFORM_THREAD_CALLBACK_TYPE unsigned
#define PLATFORM_DEFAULT_STACK_SIZE 0x8000
@@ -57,6 +64,7 @@ namespace concurrent{
typedef DWORD decaf_tls_key;
typedef HANDLE decaf_condition_t;
typedef LPCRITICAL_SECTION decaf_mutex_t;
+ typedef RWLOCK* decaf_rwmutex_t;
}}}}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/windows/PlatformThread.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/windows/PlatformThread.cpp?rev=1125159&r1=1125158&r2=1125159&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/windows/PlatformThread.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/windows/PlatformThread.cpp Thu May 19 22:51:50 2011
@@ -36,7 +36,7 @@ using namespace decaf::internal::util::c
////////////////////////////////////////////////////////////////////////////////
void PlatformThread::createMutex(decaf_mutex_t* mutex) {
- *mutex = new CRITICAL_SECTION;
+ *mutex = new CRITICAL_SECTION;
::InitializeCriticalSection(*mutex);
}
@@ -58,7 +58,168 @@ void PlatformThread::unlockMutex(decaf_m
////////////////////////////////////////////////////////////////////////////////
void PlatformThread::destroyMutex(decaf_mutex_t mutex) {
::DeleteCriticalSection(mutex);
- delete mutex;
+ delete mutex;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PlatformThread::createRWMutex(decaf_rwmutex_t* mutex) {
+ *mutex = new RWLOCK;
+
+ (*mutex)->readers = 0;
+
+ if (!((*mutex)->readEvent = ::CreateEvent(NULL, TRUE, FALSE, NULL))) {
+ *mutex = NULL;
+ throw RuntimeException(
+ __FILE__, __LINE__, "Failed to initialize OS Event object.");
+ }
+
+ if (! ((*mutex)->writeMutex = ::CreateMutex(NULL, FALSE, NULL))) {
+ ::CloseHandle((*mutex)->readEvent);
+ *mutex = NULL;
+ throw RuntimeException(
+ __FILE__, __LINE__, "Failed to initialize OS Mutex object.");
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PlatformThread::readerLockMutex(decaf_rwmutex_t mutex) {
+
+ DWORD code = ::WaitForSingleObject(mutex->writeMutex, INFINITE);
+
+ if (code == WAIT_FAILED || code == WAIT_TIMEOUT) {
+ throw RuntimeException(
+ __FILE__, __LINE__, "Failed to wait for lock on RW object.");
+ }
+
+ // The write mutex lock prevents writers from entering while we add a
+ // reader, and protects the read counter from races.
+ ::InterlockedIncrement(&mutex->readers);
+
+ if (! ::ResetEvent(mutex->readEvent)) {
+ throw RuntimeException(
+ __FILE__, __LINE__, "Failed to reset RW Event object.");
+ }
+
+ if (! ReleaseMutex(mutex->writeMutex)) {
+ throw RuntimeException(
+ __FILE__, __LINE__, "Failed to release RW Mutex object.");
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PlatformThread::writerLockMutex(decaf_rwmutex_t mutex) {
+
+ DWORD code = ::WaitForSingleObject(mutex->writMmutex, INFINITE);
+
+ if (code == WAIT_FAILED || code == WAIT_TIMEOUT) {
+ throw RuntimeException(
+ __FILE__, __LINE__, "Failed to wait for lock on RW object.");
+ }
+
+ // Once we own the lock we must wait for the readers to all leave.
+ if (mutex->readers) {
+
+ code = ::WaitForSingleObject(mutex->readEvent, INFINITE);
+ if (code == WAIT_FAILED || code == WAIT_TIMEOUT) {
+
+ ::ReleaseMutex(mutex->writeMutex);
+
+ throw RuntimeException(
+ __FILE__, __LINE__, "Failed to wait for lock on RW object.");
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool PlatformThread::tryReaderLockMutex(decaf_rwmutex_t mutex) {
+
+ DWORD code = ::WaitForSingleObject(mutex->writeMutex, 0);
+
+ if (code == WAIT_FAILED) {
+ throw RuntimeException(
+ __FILE__, __LINE__, "Failed to wait for lock on RW object.");
+ }
+
+ if (code == WAIT_TIMEOUT) {
+ return false;
+ }
+
+ // The write mutex lock prevents writers from entering while we add a
+ // reader, and protects the read counter from races.
+ ::InterlockedIncrement(&mutex->readers);
+
+ if (! ::ResetEvent(mutex->readEvent)) {
+ throw RuntimeException(
+ __FILE__, __LINE__, "Failed to reset RW Event object.");
+ }
+
+ if (! ::ReleaseMutex(mutex->writeMutex)) {
+ throw RuntimeException(
+ __FILE__, __LINE__, "Failed to release RW Mutex object.");
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool PlatformThread::tryWriterLockMutex(decaf_rwmutex_t mutex) {
+
+ DWORD code = ::WaitForSingleObject(mutex->writMmutex, 0);
+
+ if (code == WAIT_FAILED) {
+ throw RuntimeException(
+ __FILE__, __LINE__, "Failed to wait for lock on RW object.");
+ }
+
+ if (code == WAIT_TIMEOUT) {
+ return false;
+ }
+
+ // Once we own the lock we must check for readers, if there
+ if (mutex->readers > 0) {
+
+ if (! ::ReleaseMutex(mutex->writeMutex)) {
+ throw RuntimeException(
+ __FILE__, __LINE__, "Failed to release RW Mutex object.");
+ }
+ return false;
+ }
+
+ return true;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PlatformThread::unlockRWMutex(decaf_rwmutex_t mutex) {
+
+ DWORD result = 0;
+
+ if (! ::ReleaseMutex(rwlock->write_mutex)) {
+ result = ::GetLastError();
+ }
+
+ // If not the owner then we must be a reader holding the lock.
+ if (result == ERROR_NOT_OWNER) {
+
+ // If there are readers and this is the last release, signal the event
+ // so that any waiting writers get nofitied.
+ if (mutex->readers > 0 && ::InterlockedDecrement(mutex->readers) == 0) {
+
+ if (! ::SetEvent(mutex->readEvent)) {
+ throw RuntimeException(
+ __FILE__, __LINE__, "Failed to signal OS event object.");
+ }
+ }
+ } else {
+ throw RuntimeException(
+ __FILE__, __LINE__, "Failed to signal unlock OS Mutex object.");
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PlatformThread::destroyRWMutex(decaf_rwmutex_t mutex) {
+
+ ::CloseHandle(mutex->readEvent);
+ ::CloseHandle(mutex->writeMutex);
+
+ delete mutex;
}
////////////////////////////////////////////////////////////////////////////////
@@ -144,7 +305,7 @@ bool PlatformThread::interruptibleWaitOn
if (timedOut == WAIT_TIMEOUT) {
- // interruption events take precedence over timeout.
+ // interruption events take precedence over timeout.
if (complete(true)) {
break;
}