You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/11/28 17:21:49 UTC

svn commit: r1414788 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent: Threading.cpp ThreadingTypes.h

Author: tabish
Date: Wed Nov 28 16:21:48 2012
New Revision: 1414788

URL: http://svn.apache.org/viewvc?rev=1414788&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQCPP-405

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/Threading.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/ThreadingTypes.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/Threading.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/Threading.cpp?rev=1414788&r1=1414787&r2=1414788&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/Threading.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/Threading.cpp Wed Nov 28 16:21:48 2012
@@ -57,8 +57,7 @@ namespace {
 
     public:
 
-        SuspendedCompletionCondition(ThreadHandle* thread) : thread(thread) {
-        }
+        SuspendedCompletionCondition(ThreadHandle* thread) : thread(thread) {}
 
         bool operator()() {
             return !thread->suspended;
@@ -77,8 +76,7 @@ namespace {
 
     public:
 
-        MonitorWaitCompletionCondition(ThreadHandle* handle) : handle(handle) {
-        }
+        MonitorWaitCompletionCondition(ThreadHandle* handle) : handle(handle) {}
 
         virtual bool operator()(bool timedOut) {
 
@@ -159,6 +157,7 @@ namespace {
     void unblockThreads(ThreadHandle* monitor);
     void createThreadInstance(ThreadHandle* thread, long long stackSize, int priority,
                               bool suspended, threadingTask threadMain, void* threadArg);
+    void dereferenceThread(ThreadHandle* thread);
     ThreadHandle* initThreadHandle(ThreadHandle* thread);
     MonitorHandle* initMonitorHandle(MonitorHandle* monitor);
     bool interruptWaitingThread(ThreadHandle* self, ThreadHandle* target);
@@ -190,10 +189,10 @@ namespace {
             self->interruptingThread = NULL;
         }
 
-        PlatformThread::notifyAll(self->condition);
-
         decaf_thread_t handle = self->handle;
 
+        // Wake up any blocked threads
+        PlatformThread::notifyAll(self->condition);
         unblockThreads(self->joiners);
 
         PlatformThread::setTlsValue(library->threadKey, NULL);
@@ -214,6 +213,8 @@ namespace {
             PlatformThread::destroyMutex(self->mutex);
             PlatformThread::destroyCondition(self->condition);
             delete self;
+        } else {
+            dereferenceThread(self);
         }
 
         PlatformThread::detachThread(handle);
@@ -258,7 +259,7 @@ namespace {
         ThreadHandle* thread = (ThreadHandle*)arg;
 
         // Invoke run on the task.
-        try{
+        try {
             thread->parent->run();
         } catch( decaf::lang::Throwable& error ){
 
@@ -288,7 +289,6 @@ namespace {
             } else if (thread->parent->getDefaultUncaughtExceptionHandler() != NULL) {
                 thread->parent->getDefaultUncaughtExceptionHandler()->uncaughtException(thread->parent, error);
             }
-
         }
     }
 
@@ -374,6 +374,7 @@ namespace {
         thread->priority = Thread::NORM_PRIORITY;
         thread->stackSize = -1;
         thread->state = Thread::NEW;
+        thread->references = 2;
         thread->unparked = false;
         thread->numAttached = 0;
         thread->interruptingThread = NULL;
@@ -387,12 +388,12 @@ namespace {
 
         ::memset(thread->tls, 0, sizeof(thread->tls));
 
-        try{
+        try {
             PlatformThread::createMutex(&thread->mutex);
         }
         DECAF_CATCH_RETHROW( RuntimeException );
 
-        try{
+        try {
             PlatformThread::createCondition(&thread->condition);
         } catch(RuntimeException& ex) {
             PlatformThread::destroyMutex(thread->mutex);
@@ -402,6 +403,19 @@ namespace {
         return thread;
     }
 
+    void dereferenceThread(ThreadHandle* thread) {
+
+        // Both the Thread class and the thread hold a reference to the thread
+        // kernel, so one or the other must delete it when both are finished.
+        if (Atomics::decrementAndGet(&(thread->references)) <= 0) {
+            free(thread->name);
+            PlatformThread::destroyMutex(thread->mutex);
+            PlatformThread::destroyCondition(thread->condition);
+
+            delete thread;
+        }
+    }
+
     MonitorHandle* initMonitorHandle(MonitorHandle* monitor) {
         monitor->owner = NULL;
         monitor->count = 0;
@@ -430,12 +444,12 @@ namespace {
         // Spawn the thread so that we don't deadlock on the monitor.
         target->interruptingThread = initThreadHandle(new ThreadHandle());
         createThreadInstance(target->interruptingThread, 0, Thread::NORM_PRIORITY, false, interruptionThread, target);
-        //    }
 
         return result;
     }
 
     void threadExitTlsCleanup(ThreadHandle* thread) {
+
         for (int index = 0; index < MAX_TLS_SLOTS; ++index) {
             if (thread->tls[index] != NULL) {
                 ThreadLocalImpl* handler = NULL;
@@ -454,20 +468,21 @@ namespace {
     }
 
     void unblockThreads(ThreadHandle* queueHead) {
+
         ThreadHandle* current = NULL;
         ThreadHandle* next = NULL;
 
         next = queueHead;
 
-        while(next != NULL) {
+        while (next != NULL) {
             current = next;
             next = current->next;
             PlatformThread::notifyAll(current->condition);
         }
     }
 
-    void enqueueThread(ThreadHandle** queue, ThreadHandle* thread)
-    {
+    void enqueueThread(ThreadHandle** queue, ThreadHandle* thread) {
+
         ThreadHandle* qThread = *queue;
 
         if (thread->next != NULL) {
@@ -534,7 +549,7 @@ namespace {
 
         next = pool->head;
 
-        while(next != NULL) {
+        while (next != NULL) {
             current = next;
             next = current->next;
 
@@ -591,7 +606,7 @@ namespace {
 
         next = monitor->waiting;
 
-        while(next != NULL) {
+        while (next != NULL) {
             current = next;
             next = current->next;
 
@@ -614,7 +629,7 @@ namespace {
 
     void doMonitorEnter(MonitorHandle* monitor, ThreadHandle* thread) {
 
-        while(true) {
+        while (true) {
 
             if (PlatformThread::tryLockMutex(monitor->lock) == true) {
                 monitor->owner = thread;
@@ -824,7 +839,7 @@ void Threading::shutdown() {
 
     // Destroy any Foreign Thread Facades that were created during runtime.
     std::vector<Thread*>::iterator iter = library->osThreads.begin();
-    for( ; iter != library->osThreads.end(); ++iter ) {
+    for (; iter != library->osThreads.end(); ++iter) {
         delete *iter;
     }
     library->osThreads.clear();
@@ -853,7 +868,7 @@ void Threading::unlockThreadsLib() {
 ////////////////////////////////////////////////////////////////////////////////
 ThreadHandle* Threading::createNewThread(Thread* parent, const char* name, long long stackSize) {
 
-    if(parent == NULL || name == NULL) {
+    if (parent == NULL || name == NULL) {
         throw NullPointerException(__FILE__, __LINE__, "One or more arguments was NULL");
     }
 
@@ -870,17 +885,15 @@ ThreadHandle* Threading::createNewThread
 ////////////////////////////////////////////////////////////////////////////////
 void Threading::destroyThread(ThreadHandle* thread) {
 
-    free(thread->name);
-
     if (!thread->osThread) {
-        Threading::join(thread, 0, 0);
+        try {
+            Threading::join(thread, 0, 0);
+        } catch (InterruptedException& ex) {}
     } else {
         PlatformThread::detachOSThread(thread->handle);
     }
-    PlatformThread::destroyMutex(thread->mutex);
-    PlatformThread::destroyCondition(thread->condition);
 
-    delete thread;
+    dereferenceThread(thread);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -895,6 +908,9 @@ ThreadHandle* Threading::attachToCurrent
         std::string(std::string("OS-Thread") + Integer::toString(library->osThreadId.getAndIncrement())).c_str());
     thread->threadId = PlatformThread::getCurrentThreadId();
 
+    // An OS Thread doesn't have a running thread, this is only a proxy to only one ref.
+    thread->references = 1;
+
     // Now create a Decaf Thread as a proxy to the OS thread.
     Pointer<Thread> osThread(new Thread(thread.get()));
     thread->parent = osThread.get();
@@ -918,8 +934,7 @@ void Threading::start(ThreadHandle* thre
     try {
 
         if (thread->state > Thread::NEW) {
-            throw IllegalThreadStateException(
-                __FILE__, __LINE__, "Thread already started");
+            throw IllegalThreadStateException(__FILE__, __LINE__, "Thread already started");
         }
 
         PlatformThread::lockMutex(thread->mutex);
@@ -932,11 +947,11 @@ void Threading::start(ThreadHandle* thre
         }
 
         PlatformThread::unlockMutex(thread->mutex);
-     }
-     DECAF_CATCH_RETHROW( IllegalThreadStateException )
-     DECAF_CATCH_RETHROW( RuntimeException )
-     DECAF_CATCH_EXCEPTION_CONVERT( NullPointerException, RuntimeException )
-     DECAF_CATCHALL_THROW( RuntimeException )
+    }
+    DECAF_CATCH_RETHROW(IllegalThreadStateException)
+    DECAF_CATCH_RETHROW(RuntimeException)
+    DECAF_CATCH_EXCEPTION_CONVERT(NullPointerException, RuntimeException)
+    DECAF_CATCHALL_THROW(RuntimeException)
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -1012,8 +1027,7 @@ namespace {
 
     public:
 
-        JoinCompletionCondition(ThreadHandle* self, ThreadHandle* target) : self(self), target(target) {
-        }
+        JoinCompletionCondition(ThreadHandle* self, ThreadHandle* target) : self(self), target(target) {}
 
         virtual bool operator()() {
 
@@ -1142,8 +1156,7 @@ namespace {
 
     public:
 
-        SleepCompletionCondition(ThreadHandle* handle) : handle(handle) {
-        }
+        SleepCompletionCondition(ThreadHandle* handle) : handle(handle) {}
 
         bool operator()() {
             if (handle->interrupted) {
@@ -1263,9 +1276,8 @@ ThreadHandle* Threading::getCurrentThrea
 ////////////////////////////////////////////////////////////////////////////////
 void Threading::park(Thread* thread) {
 
-    if( thread == NULL ) {
-        throw NullPointerException(
-            __FILE__, __LINE__, "Null Thread Pointer Passed." );
+    if (thread == NULL) {
+        throw NullPointerException(__FILE__, __LINE__, "Null Thread Pointer Passed.");
     }
 
     Threading::park(thread, 0LL, 0LL);
@@ -1286,8 +1298,7 @@ namespace {
 
     public:
 
-        ParkCompletionCondition(ThreadHandle* handle) : handle(handle) {
-        }
+        ParkCompletionCondition(ThreadHandle* handle) : handle(handle) {}
 
         bool operator()() {
             if (handle->unparked == true) {
@@ -1305,9 +1316,8 @@ namespace {
 ////////////////////////////////////////////////////////////////////////////////
 bool Threading::park( Thread* thread, long long mills, int nanos) {
 
-    if( thread == NULL ) {
-        throw NullPointerException(
-            __FILE__, __LINE__, "Null Thread Pointer Passed." );
+    if (thread == NULL) {
+        throw NullPointerException(__FILE__, __LINE__, "Null Thread Pointer Passed.");
     }
 
     bool timedOut = false;
@@ -1355,9 +1365,8 @@ bool Threading::park( Thread* thread, lo
 ////////////////////////////////////////////////////////////////////////////////
 void Threading::unpark(Thread* thread) {
 
-    if( thread == NULL ) {
-        throw NullPointerException(
-            __FILE__, __LINE__, "Null Thread Pointer Passed." );
+    if (thread == NULL) {
+        throw NullPointerException(__FILE__, __LINE__, "Null Thread Pointer Passed.");
     }
 
     ThreadHandle* handle = thread->getHandle();
@@ -1458,9 +1467,7 @@ void Threading::enterMonitor(MonitorHand
 
 ////////////////////////////////////////////////////////////////////////////////
 bool Threading::tryEnterMonitor(MonitorHandle* monitor) {
-
     ThreadHandle* self = getCurrentThreadHandle();
-
     return monitorTryEnterUsingThreadId(monitor, self);
 }
 
@@ -1478,9 +1485,7 @@ void Threading::exitMonitor(MonitorHandl
 
 ////////////////////////////////////////////////////////////////////////////////
 bool Threading::waitOnMonitor(MonitorHandle* monitor, long long mills, int nanos) {
-
     ThreadHandle* self = getCurrentThreadHandle();
-
     // Wait but do so in a non-interruptible state.
     return doWaitOnMonitor(monitor, self, mills, nanos, true);
 }
@@ -1537,8 +1542,7 @@ bool Threading::monitorTryEnterUsingThre
 int Threading::createThreadLocalSlot(ThreadLocalImpl* threadLocal) {
 
     if (threadLocal == NULL) {
-        throw NullPointerException(
-            __FILE__, __LINE__, "Null ThreadLocalImpl Pointer Passed." );
+        throw NullPointerException(__FILE__, __LINE__, "Null ThreadLocalImpl Pointer Passed." );
     }
 
     int index;

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/ThreadingTypes.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/ThreadingTypes.h?rev=1414788&r1=1414787&r2=1414788&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/ThreadingTypes.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/util/concurrent/ThreadingTypes.h Wed Nov 28 16:21:48 2012
@@ -60,6 +60,7 @@ namespace concurrent{
         decaf_mutex_t mutex;
         decaf_condition_t condition;
         volatile int state;
+        volatile int references;
         int priority;
         bool interrupted;
         bool interruptible;