You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/02/08 01:10:37 UTC
svn commit: r1443802 - in
/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq: threads/
transport/failover/
Author: tabish
Date: Fri Feb 8 00:10:36 2013
New Revision: 1443802
URL: http://svn.apache.org/r1443802
Log:
https://issues.apache.org/jira/browse/AMQCPP-457
Better control over Transport start and stop.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/CompositeTaskRunner.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/CompositeTaskRunner.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/DedicatedTaskRunner.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/DedicatedTaskRunner.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/TaskRunner.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/CompositeTaskRunner.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/CompositeTaskRunner.cpp?rev=1443802&r1=1443801&r2=1443802&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/CompositeTaskRunner.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/CompositeTaskRunner.cpp Fri Feb 8 00:10:36 2013
@@ -33,9 +33,6 @@ using namespace decaf::lang::exceptions;
////////////////////////////////////////////////////////////////////////////////
CompositeTaskRunner::CompositeTaskRunner() :
tasks(), mutex(), thread(), threadTerminated(false), pending(false), shutDown(false) {
-
- this->thread.reset(new Thread(this, "CompositeTaskRunner Thread"));
- this->thread->start();
}
////////////////////////////////////////////////////////////////////////////////
@@ -49,6 +46,31 @@ CompositeTaskRunner::~CompositeTaskRunne
}
////////////////////////////////////////////////////////////////////////////////
+void CompositeTaskRunner::start() {
+
+ synchronized(&mutex) {
+ if (this->thread == NULL) {
+ this->thread.reset(new Thread(this, "ActiveMQ CompositeTaskRunner Thread"));
+ this->thread->start();
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool CompositeTaskRunner::isStarted() const {
+
+ bool result = false;
+
+ synchronized(&mutex) {
+ if (this->thread != NULL) {
+ result = true;
+ }
+ }
+
+ return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
void CompositeTaskRunner::shutdown(unsigned int timeout) {
synchronized(&mutex) {
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/CompositeTaskRunner.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/CompositeTaskRunner.h?rev=1443802&r1=1443801&r2=1443802&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/CompositeTaskRunner.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/CompositeTaskRunner.h Fri Feb 8 00:10:36 2013
@@ -31,8 +31,6 @@
namespace activemq {
namespace threads {
- using decaf::lang::Pointer;
-
/**
* A Task Runner that can contain one or more CompositeTasks that are each checked
* for pending work and run if any is present in the order that the tasks were added.
@@ -45,9 +43,9 @@ namespace threads {
private:
decaf::util::LinkedList<CompositeTask*> tasks;
- decaf::util::concurrent::Mutex mutex;
+ mutable decaf::util::concurrent::Mutex mutex;
- Pointer<decaf::lang::Thread> thread;
+ decaf::lang::Pointer<decaf::lang::Thread> thread;
bool threadTerminated;
bool pending;
@@ -64,6 +62,10 @@ namespace threads {
virtual ~CompositeTaskRunner();
+ virtual void start();
+
+ virtual bool isStarted() const;
+
/**
* Adds a new CompositeTask to the Set of Tasks that this class manages.
* @param task - Pointer to a CompositeTask instance.
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/DedicatedTaskRunner.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/DedicatedTaskRunner.cpp?rev=1443802&r1=1443801&r2=1443802&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/DedicatedTaskRunner.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/DedicatedTaskRunner.cpp Fri Feb 8 00:10:36 2013
@@ -34,8 +34,6 @@ DedicatedTaskRunner::DedicatedTaskRunner
throw NullPointerException(__FILE__, __LINE__, "Task passed was null");
}
- this->thread.reset(new Thread(this, "ActiveMQ Dedicated Task Runner"));
- this->thread->start();
}
////////////////////////////////////////////////////////////////////////////////
@@ -47,9 +45,39 @@ DedicatedTaskRunner::~DedicatedTaskRunne
}
////////////////////////////////////////////////////////////////////////////////
+void DedicatedTaskRunner::start() {
+
+ synchronized(&mutex) {
+ if (this->thread == NULL) {
+ this->thread.reset(new Thread(this, "ActiveMQ Dedicated Task Runner"));
+ this->thread->start();
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool DedicatedTaskRunner::isStarted() const {
+
+ bool result = false;
+
+ synchronized(&mutex) {
+ if (this->thread != NULL) {
+ result = true;
+ }
+ }
+
+ return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
void DedicatedTaskRunner::shutdown(unsigned int timeout) {
synchronized(&mutex) {
+
+ if (this->thread == NULL) {
+ return;
+ }
+
shutDown = true;
pending = true;
mutex.notifyAll();
@@ -66,6 +94,11 @@ void DedicatedTaskRunner::shutdown(unsig
void DedicatedTaskRunner::shutdown() {
synchronized(&mutex) {
+
+ if (this->thread == NULL) {
+ return;
+ }
+
shutDown = true;
pending = true;
mutex.notifyAll();
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/DedicatedTaskRunner.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/DedicatedTaskRunner.h?rev=1443802&r1=1443801&r2=1443802&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/DedicatedTaskRunner.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/DedicatedTaskRunner.h Fri Feb 8 00:10:36 2013
@@ -30,20 +30,16 @@
namespace activemq {
namespace threads {
- using decaf::lang::Pointer;
-
class AMQCPP_API DedicatedTaskRunner : public TaskRunner,
public decaf::lang::Runnable {
private:
- decaf::util::concurrent::Mutex mutex;
-
- Pointer<decaf::lang::Thread> thread;
+ mutable decaf::util::concurrent::Mutex mutex;
+ decaf::lang::Pointer<decaf::lang::Thread> thread;
bool threadTerminated;
bool pending;
bool shutDown;
-
Task* task;
private:
@@ -56,6 +52,10 @@ namespace threads {
DedicatedTaskRunner(Task* task);
virtual ~DedicatedTaskRunner();
+ virtual void start();
+
+ virtual bool isStarted() const;
+
/**
* Shutdown after a timeout, does not guarantee that the task's iterate
* method has completed and the thread halted.
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/TaskRunner.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/TaskRunner.h?rev=1443802&r1=1443801&r2=1443802&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/TaskRunner.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/threads/TaskRunner.h Fri Feb 8 00:10:36 2013
@@ -30,6 +30,18 @@ namespace threads {
virtual ~TaskRunner();
/**
+ * Starts the task runner. Prior to call this method tasks can be added to a
+ * Runner, but no executions will occur. The start method will create the
+ * background Thread(s) which do the work for this task runner.
+ */
+ virtual void start() = 0;
+
+ /**
+ * @retuns true if the start method has been called.
+ */
+ virtual bool isStarted() const = 0;
+
+ /**
* Shutdown after a timeout, does not guarantee that the task's iterate
* method has completed and the thread halted.
*
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp?rev=1443802&r1=1443801&r2=1443802&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp Fri Feb 8 00:10:36 2013
@@ -329,7 +329,7 @@ void FailoverTransport::start() {
try {
- synchronized( &reconnectMutex ) {
+ synchronized(&reconnectMutex) {
if (this->started) {
return;
@@ -337,6 +337,8 @@ void FailoverTransport::start() {
started = true;
+ taskRunner->start();
+
stateTracker.setMaxCacheSize(this->getMaxCacheSize());
stateTracker.setTrackMessages(this->isTrackMessages());
stateTracker.setTrackTransactionProducers(this->isTrackTransactionProducers());
@@ -348,9 +350,9 @@ void FailoverTransport::start() {
}
}
}
- AMQ_CATCH_RETHROW( IOException)
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException)
- AMQ_CATCHALL_THROW( IOException)
+ AMQ_CATCH_RETHROW(IOException)
+ AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
+ AMQ_CATCHALL_THROW(IOException)
}
////////////////////////////////////////////////////////////////////////////////