You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/02/08 01:10:37 UTC

svn commit: r1443802 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq: threads/ transport/failover/

Author: tabish
Date: Fri Feb  8 00:10:36 2013
New Revision: 1443802

URL: http://svn.apache.org/r1443802
Log:
https://issues.apache.org/jira/browse/AMQCPP-457

Better control over Transport start and stop.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/CompositeTaskRunner.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/CompositeTaskRunner.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/DedicatedTaskRunner.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/DedicatedTaskRunner.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/TaskRunner.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/CompositeTaskRunner.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/CompositeTaskRunner.cpp?rev=1443802&r1=1443801&r2=1443802&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/CompositeTaskRunner.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/CompositeTaskRunner.cpp Fri Feb  8 00:10:36 2013
@@ -33,9 +33,6 @@ using namespace decaf::lang::exceptions;
 ////////////////////////////////////////////////////////////////////////////////
 CompositeTaskRunner::CompositeTaskRunner() :
     tasks(), mutex(), thread(), threadTerminated(false), pending(false), shutDown(false) {
-
-    this->thread.reset(new Thread(this, "CompositeTaskRunner Thread"));
-    this->thread->start();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -49,6 +46,31 @@ CompositeTaskRunner::~CompositeTaskRunne
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void CompositeTaskRunner::start() {
+
+    synchronized(&mutex) {
+        if (this->thread == NULL) {
+            this->thread.reset(new Thread(this, "ActiveMQ CompositeTaskRunner Thread"));
+            this->thread->start();
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool CompositeTaskRunner::isStarted() const {
+
+    bool result = false;
+
+    synchronized(&mutex) {
+        if (this->thread != NULL) {
+            result = true;
+        }
+    }
+
+    return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void CompositeTaskRunner::shutdown(unsigned int timeout) {
 
     synchronized(&mutex) {

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/CompositeTaskRunner.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/CompositeTaskRunner.h?rev=1443802&r1=1443801&r2=1443802&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/CompositeTaskRunner.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/CompositeTaskRunner.h Fri Feb  8 00:10:36 2013
@@ -31,8 +31,6 @@
 namespace activemq {
 namespace threads {
 
-    using decaf::lang::Pointer;
-
     /**
      * A Task Runner that can contain one or more CompositeTasks that are each checked
      * for pending work and run if any is present in the order that the tasks were added.
@@ -45,9 +43,9 @@ namespace threads {
     private:
 
         decaf::util::LinkedList<CompositeTask*> tasks;
-        decaf::util::concurrent::Mutex mutex;
+        mutable decaf::util::concurrent::Mutex mutex;
 
-        Pointer<decaf::lang::Thread> thread;
+        decaf::lang::Pointer<decaf::lang::Thread> thread;
 
         bool threadTerminated;
         bool pending;
@@ -64,6 +62,10 @@ namespace threads {
 
         virtual ~CompositeTaskRunner();
 
+        virtual void start();
+
+        virtual bool isStarted() const;
+
         /**
          * Adds a new CompositeTask to the Set of Tasks that this class manages.
          * @param task - Pointer to a CompositeTask instance.

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/DedicatedTaskRunner.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/DedicatedTaskRunner.cpp?rev=1443802&r1=1443801&r2=1443802&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/DedicatedTaskRunner.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/DedicatedTaskRunner.cpp Fri Feb  8 00:10:36 2013
@@ -34,8 +34,6 @@ DedicatedTaskRunner::DedicatedTaskRunner
         throw NullPointerException(__FILE__, __LINE__, "Task passed was null");
     }
 
-    this->thread.reset(new Thread(this, "ActiveMQ Dedicated Task Runner"));
-    this->thread->start();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -47,9 +45,39 @@ DedicatedTaskRunner::~DedicatedTaskRunne
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void DedicatedTaskRunner::start() {
+
+    synchronized(&mutex) {
+        if (this->thread == NULL) {
+            this->thread.reset(new Thread(this, "ActiveMQ Dedicated Task Runner"));
+            this->thread->start();
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool DedicatedTaskRunner::isStarted() const {
+
+    bool result = false;
+
+    synchronized(&mutex) {
+        if (this->thread != NULL) {
+            result = true;
+        }
+    }
+
+    return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void DedicatedTaskRunner::shutdown(unsigned int timeout) {
 
     synchronized(&mutex) {
+
+        if (this->thread == NULL) {
+            return;
+        }
+
         shutDown = true;
         pending = true;
         mutex.notifyAll();
@@ -66,6 +94,11 @@ void DedicatedTaskRunner::shutdown(unsig
 void DedicatedTaskRunner::shutdown() {
 
     synchronized(&mutex) {
+
+        if (this->thread == NULL) {
+            return;
+        }
+
         shutDown = true;
         pending = true;
         mutex.notifyAll();

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/DedicatedTaskRunner.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/DedicatedTaskRunner.h?rev=1443802&r1=1443801&r2=1443802&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/DedicatedTaskRunner.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/DedicatedTaskRunner.h Fri Feb  8 00:10:36 2013
@@ -30,20 +30,16 @@
 namespace activemq {
 namespace threads {
 
-    using decaf::lang::Pointer;
-
     class AMQCPP_API DedicatedTaskRunner : public TaskRunner,
                                            public decaf::lang::Runnable {
     private:
 
-        decaf::util::concurrent::Mutex mutex;
-
-        Pointer<decaf::lang::Thread> thread;
+        mutable decaf::util::concurrent::Mutex mutex;
+        decaf::lang::Pointer<decaf::lang::Thread> thread;
 
         bool threadTerminated;
         bool pending;
         bool shutDown;
-
         Task* task;
 
     private:
@@ -56,6 +52,10 @@ namespace threads {
         DedicatedTaskRunner(Task* task);
         virtual ~DedicatedTaskRunner();
 
+        virtual void start();
+
+        virtual bool isStarted() const;
+
         /**
          * Shutdown after a timeout, does not guarantee that the task's iterate
          * method has completed and the thread halted.

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/TaskRunner.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/TaskRunner.h?rev=1443802&r1=1443801&r2=1443802&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/TaskRunner.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/TaskRunner.h Fri Feb  8 00:10:36 2013
@@ -30,6 +30,18 @@ namespace threads {
         virtual ~TaskRunner();
 
         /**
+         * Starts the task runner.  Prior to call this method tasks can be added to a
+         * Runner, but no executions will occur.  The start method will create the
+         * background Thread(s) which do the work for this task runner.
+         */
+        virtual void start() = 0;
+
+        /**
+         * @retuns true if the start method has been called.
+         */
+        virtual bool isStarted() const = 0;
+
+        /**
          * Shutdown after a timeout, does not guarantee that the task's iterate
          * method has completed and the thread halted.
          *

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp?rev=1443802&r1=1443801&r2=1443802&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp Fri Feb  8 00:10:36 2013
@@ -329,7 +329,7 @@ void FailoverTransport::start() {
 
     try {
 
-        synchronized( &reconnectMutex ) {
+        synchronized(&reconnectMutex) {
 
             if (this->started) {
                 return;
@@ -337,6 +337,8 @@ void FailoverTransport::start() {
 
             started = true;
 
+            taskRunner->start();
+
             stateTracker.setMaxCacheSize(this->getMaxCacheSize());
             stateTracker.setTrackMessages(this->isTrackMessages());
             stateTracker.setTrackTransactionProducers(this->isTrackTransactionProducers());
@@ -348,9 +350,9 @@ void FailoverTransport::start() {
             }
         }
     }
-    AMQ_CATCH_RETHROW( IOException)
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException)
-    AMQ_CATCHALL_THROW( IOException)
+    AMQ_CATCH_RETHROW(IOException)
+    AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
+    AMQ_CATCHALL_THROW(IOException)
 }
 
 ////////////////////////////////////////////////////////////////////////////////