You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/11/28 17:21:49 UTC
svn commit: r1414788 - in
/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent:
Threading.cpp ThreadingTypes.h
Author: tabish
Date: Wed Nov 28 16:21:48 2012
New Revision: 1414788
URL: http://svn.apache.org/viewvc?rev=1414788&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQCPP-405
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/Threading.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/ThreadingTypes.h
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/Threading.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/Threading.cpp?rev=1414788&r1=1414787&r2=1414788&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/Threading.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/Threading.cpp Wed Nov 28 16:21:48 2012
@@ -57,8 +57,7 @@ namespace {
public:
- SuspendedCompletionCondition(ThreadHandle* thread) : thread(thread) {
- }
+ SuspendedCompletionCondition(ThreadHandle* thread) : thread(thread) {}
bool operator()() {
return !thread->suspended;
@@ -77,8 +76,7 @@ namespace {
public:
- MonitorWaitCompletionCondition(ThreadHandle* handle) : handle(handle) {
- }
+ MonitorWaitCompletionCondition(ThreadHandle* handle) : handle(handle) {}
virtual bool operator()(bool timedOut) {
@@ -159,6 +157,7 @@ namespace {
void unblockThreads(ThreadHandle* monitor);
void createThreadInstance(ThreadHandle* thread, long long stackSize, int priority,
bool suspended, threadingTask threadMain, void* threadArg);
+ void dereferenceThread(ThreadHandle* thread);
ThreadHandle* initThreadHandle(ThreadHandle* thread);
MonitorHandle* initMonitorHandle(MonitorHandle* monitor);
bool interruptWaitingThread(ThreadHandle* self, ThreadHandle* target);
@@ -190,10 +189,10 @@ namespace {
self->interruptingThread = NULL;
}
- PlatformThread::notifyAll(self->condition);
-
decaf_thread_t handle = self->handle;
+ // Wake up any blocked threads
+ PlatformThread::notifyAll(self->condition);
unblockThreads(self->joiners);
PlatformThread::setTlsValue(library->threadKey, NULL);
@@ -214,6 +213,8 @@ namespace {
PlatformThread::destroyMutex(self->mutex);
PlatformThread::destroyCondition(self->condition);
delete self;
+ } else {
+ dereferenceThread(self);
}
PlatformThread::detachThread(handle);
@@ -258,7 +259,7 @@ namespace {
ThreadHandle* thread = (ThreadHandle*)arg;
// Invoke run on the task.
- try{
+ try {
thread->parent->run();
} catch( decaf::lang::Throwable& error ){
@@ -288,7 +289,6 @@ namespace {
} else if (thread->parent->getDefaultUncaughtExceptionHandler() != NULL) {
thread->parent->getDefaultUncaughtExceptionHandler()->uncaughtException(thread->parent, error);
}
-
}
}
@@ -374,6 +374,7 @@ namespace {
thread->priority = Thread::NORM_PRIORITY;
thread->stackSize = -1;
thread->state = Thread::NEW;
+ thread->references = 2;
thread->unparked = false;
thread->numAttached = 0;
thread->interruptingThread = NULL;
@@ -387,12 +388,12 @@ namespace {
::memset(thread->tls, 0, sizeof(thread->tls));
- try{
+ try {
PlatformThread::createMutex(&thread->mutex);
}
DECAF_CATCH_RETHROW( RuntimeException );
- try{
+ try {
PlatformThread::createCondition(&thread->condition);
} catch(RuntimeException& ex) {
PlatformThread::destroyMutex(thread->mutex);
@@ -402,6 +403,19 @@ namespace {
return thread;
}
+ void dereferenceThread(ThreadHandle* thread) {
+
+ // Both the Thread class and the thread hold a reference to the thread
+ // kernel, so one or the other must delete it when both are finished.
+ if (Atomics::decrementAndGet(&(thread->references)) <= 0) {
+ free(thread->name);
+ PlatformThread::destroyMutex(thread->mutex);
+ PlatformThread::destroyCondition(thread->condition);
+
+ delete thread;
+ }
+ }
+
MonitorHandle* initMonitorHandle(MonitorHandle* monitor) {
monitor->owner = NULL;
monitor->count = 0;
@@ -430,12 +444,12 @@ namespace {
// Spawn the thread so that we don't deadlock on the monitor.
target->interruptingThread = initThreadHandle(new ThreadHandle());
createThreadInstance(target->interruptingThread, 0, Thread::NORM_PRIORITY, false, interruptionThread, target);
- // }
return result;
}
void threadExitTlsCleanup(ThreadHandle* thread) {
+
for (int index = 0; index < MAX_TLS_SLOTS; ++index) {
if (thread->tls[index] != NULL) {
ThreadLocalImpl* handler = NULL;
@@ -454,20 +468,21 @@ namespace {
}
void unblockThreads(ThreadHandle* queueHead) {
+
ThreadHandle* current = NULL;
ThreadHandle* next = NULL;
next = queueHead;
- while(next != NULL) {
+ while (next != NULL) {
current = next;
next = current->next;
PlatformThread::notifyAll(current->condition);
}
}
- void enqueueThread(ThreadHandle** queue, ThreadHandle* thread)
- {
+ void enqueueThread(ThreadHandle** queue, ThreadHandle* thread) {
+
ThreadHandle* qThread = *queue;
if (thread->next != NULL) {
@@ -534,7 +549,7 @@ namespace {
next = pool->head;
- while(next != NULL) {
+ while (next != NULL) {
current = next;
next = current->next;
@@ -591,7 +606,7 @@ namespace {
next = monitor->waiting;
- while(next != NULL) {
+ while (next != NULL) {
current = next;
next = current->next;
@@ -614,7 +629,7 @@ namespace {
void doMonitorEnter(MonitorHandle* monitor, ThreadHandle* thread) {
- while(true) {
+ while (true) {
if (PlatformThread::tryLockMutex(monitor->lock) == true) {
monitor->owner = thread;
@@ -824,7 +839,7 @@ void Threading::shutdown() {
// Destroy any Foreign Thread Facades that were created during runtime.
std::vector<Thread*>::iterator iter = library->osThreads.begin();
- for( ; iter != library->osThreads.end(); ++iter ) {
+ for (; iter != library->osThreads.end(); ++iter) {
delete *iter;
}
library->osThreads.clear();
@@ -853,7 +868,7 @@ void Threading::unlockThreadsLib() {
////////////////////////////////////////////////////////////////////////////////
ThreadHandle* Threading::createNewThread(Thread* parent, const char* name, long long stackSize) {
- if(parent == NULL || name == NULL) {
+ if (parent == NULL || name == NULL) {
throw NullPointerException(__FILE__, __LINE__, "One or more arguments was NULL");
}
@@ -870,17 +885,15 @@ ThreadHandle* Threading::createNewThread
////////////////////////////////////////////////////////////////////////////////
void Threading::destroyThread(ThreadHandle* thread) {
- free(thread->name);
-
if (!thread->osThread) {
- Threading::join(thread, 0, 0);
+ try {
+ Threading::join(thread, 0, 0);
+ } catch (InterruptedException& ex) {}
} else {
PlatformThread::detachOSThread(thread->handle);
}
- PlatformThread::destroyMutex(thread->mutex);
- PlatformThread::destroyCondition(thread->condition);
- delete thread;
+ dereferenceThread(thread);
}
////////////////////////////////////////////////////////////////////////////////
@@ -895,6 +908,9 @@ ThreadHandle* Threading::attachToCurrent
std::string(std::string("OS-Thread") + Integer::toString(library->osThreadId.getAndIncrement())).c_str());
thread->threadId = PlatformThread::getCurrentThreadId();
+ // An OS Thread doesn't have a running thread, this is only a proxy to only one ref.
+ thread->references = 1;
+
// Now create a Decaf Thread as a proxy to the OS thread.
Pointer<Thread> osThread(new Thread(thread.get()));
thread->parent = osThread.get();
@@ -918,8 +934,7 @@ void Threading::start(ThreadHandle* thre
try {
if (thread->state > Thread::NEW) {
- throw IllegalThreadStateException(
- __FILE__, __LINE__, "Thread already started");
+ throw IllegalThreadStateException(__FILE__, __LINE__, "Thread already started");
}
PlatformThread::lockMutex(thread->mutex);
@@ -932,11 +947,11 @@ void Threading::start(ThreadHandle* thre
}
PlatformThread::unlockMutex(thread->mutex);
- }
- DECAF_CATCH_RETHROW( IllegalThreadStateException )
- DECAF_CATCH_RETHROW( RuntimeException )
- DECAF_CATCH_EXCEPTION_CONVERT( NullPointerException, RuntimeException )
- DECAF_CATCHALL_THROW( RuntimeException )
+ }
+ DECAF_CATCH_RETHROW(IllegalThreadStateException)
+ DECAF_CATCH_RETHROW(RuntimeException)
+ DECAF_CATCH_EXCEPTION_CONVERT(NullPointerException, RuntimeException)
+ DECAF_CATCHALL_THROW(RuntimeException)
}
////////////////////////////////////////////////////////////////////////////////
@@ -1012,8 +1027,7 @@ namespace {
public:
- JoinCompletionCondition(ThreadHandle* self, ThreadHandle* target) : self(self), target(target) {
- }
+ JoinCompletionCondition(ThreadHandle* self, ThreadHandle* target) : self(self), target(target) {}
virtual bool operator()() {
@@ -1142,8 +1156,7 @@ namespace {
public:
- SleepCompletionCondition(ThreadHandle* handle) : handle(handle) {
- }
+ SleepCompletionCondition(ThreadHandle* handle) : handle(handle) {}
bool operator()() {
if (handle->interrupted) {
@@ -1263,9 +1276,8 @@ ThreadHandle* Threading::getCurrentThrea
////////////////////////////////////////////////////////////////////////////////
void Threading::park(Thread* thread) {
- if( thread == NULL ) {
- throw NullPointerException(
- __FILE__, __LINE__, "Null Thread Pointer Passed." );
+ if (thread == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "Null Thread Pointer Passed.");
}
Threading::park(thread, 0LL, 0LL);
@@ -1286,8 +1298,7 @@ namespace {
public:
- ParkCompletionCondition(ThreadHandle* handle) : handle(handle) {
- }
+ ParkCompletionCondition(ThreadHandle* handle) : handle(handle) {}
bool operator()() {
if (handle->unparked == true) {
@@ -1305,9 +1316,8 @@ namespace {
////////////////////////////////////////////////////////////////////////////////
bool Threading::park( Thread* thread, long long mills, int nanos) {
- if( thread == NULL ) {
- throw NullPointerException(
- __FILE__, __LINE__, "Null Thread Pointer Passed." );
+ if (thread == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "Null Thread Pointer Passed.");
}
bool timedOut = false;
@@ -1355,9 +1365,8 @@ bool Threading::park( Thread* thread, lo
////////////////////////////////////////////////////////////////////////////////
void Threading::unpark(Thread* thread) {
- if( thread == NULL ) {
- throw NullPointerException(
- __FILE__, __LINE__, "Null Thread Pointer Passed." );
+ if (thread == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "Null Thread Pointer Passed.");
}
ThreadHandle* handle = thread->getHandle();
@@ -1458,9 +1467,7 @@ void Threading::enterMonitor(MonitorHand
////////////////////////////////////////////////////////////////////////////////
bool Threading::tryEnterMonitor(MonitorHandle* monitor) {
-
ThreadHandle* self = getCurrentThreadHandle();
-
return monitorTryEnterUsingThreadId(monitor, self);
}
@@ -1478,9 +1485,7 @@ void Threading::exitMonitor(MonitorHandl
////////////////////////////////////////////////////////////////////////////////
bool Threading::waitOnMonitor(MonitorHandle* monitor, long long mills, int nanos) {
-
ThreadHandle* self = getCurrentThreadHandle();
-
// Wait but do so in a non-interruptible state.
return doWaitOnMonitor(monitor, self, mills, nanos, true);
}
@@ -1537,8 +1542,7 @@ bool Threading::monitorTryEnterUsingThre
int Threading::createThreadLocalSlot(ThreadLocalImpl* threadLocal) {
if (threadLocal == NULL) {
- throw NullPointerException(
- __FILE__, __LINE__, "Null ThreadLocalImpl Pointer Passed." );
+ throw NullPointerException(__FILE__, __LINE__, "Null ThreadLocalImpl Pointer Passed." );
}
int index;
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/ThreadingTypes.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/ThreadingTypes.h?rev=1414788&r1=1414787&r2=1414788&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/ThreadingTypes.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/ThreadingTypes.h Wed Nov 28 16:21:48 2012
@@ -60,6 +60,7 @@ namespace concurrent{
decaf_mutex_t mutex;
decaf_condition_t condition;
volatile int state;
+ volatile int references;
int priority;
bool interrupted;
bool interruptible;