You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/10/02 21:48:47 UTC
svn commit: r1393142 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main:
activemq/core/ActiveMQConnection.cpp activemq/core/ActiveMQConnection.h
decaf/lang/Thread.cpp decaf/util/concurrent/ThreadFactory.h
Author: tabish
Date: Tue Oct 2 19:48:46 2012
New Revision: 1393142
URL: http://svn.apache.org/viewvc?rev=1393142&view=rev
Log:
Update Connection to ensure its executor threads get a meaningful name.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadFactory.h
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1393142&r1=1393141&r2=1393142&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Tue Oct 2 19:48:46 2012
@@ -88,6 +88,29 @@ using namespace decaf::lang::exceptions;
namespace activemq{
namespace core{
+ class ConnectionThreadFactory : public ThreadFactory {
+ private:
+
+ Pointer<Transport> transport;
+
+ public:
+
+ ConnectionThreadFactory(Pointer<Transport> transport) : transport(transport) {
+ if (transport == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "Transport cannot be null");
+ }
+ }
+
+ virtual ~ConnectionThreadFactory() {}
+
+ virtual Thread* newThread(decaf::lang::Runnable* runnable) {
+ Thread* thread = new Thread(runnable,
+ std::string("ActiveMQ Connection Executor: ") + transport->getRemoteAddress());
+ return thread;
+ }
+
+ };
+
class ConnectionConfig {
private:
@@ -167,8 +190,10 @@ namespace core{
TempDestinationMap activeTempDestinations;
- ConnectionConfig() : properties(),
- transport(),
+ ConnectionConfig(const Pointer<transport::Transport> transport,
+ const Pointer<decaf::util::Properties> properties) :
+ properties(properties),
+ transport(transport),
clientIdGenerator(),
scheduler(),
sessionIds(),
@@ -216,7 +241,8 @@ namespace core{
this->brokerInfoReceived.reset(new CountDownLatch(1));
this->executor.reset(
- new ThreadPoolExecutor(1, 1, 5, TimeUnit::SECONDS, new LinkedBlockingQueue<Runnable*>()));
+ new ThreadPoolExecutor(1, 1, 5, TimeUnit::SECONDS,
+ new LinkedBlockingQueue<Runnable*>(), new ConnectionThreadFactory(transport)));
// Generate a connectionId
std::string uniqueId = CONNECTION_ID_GENERATOR.generateId();
@@ -326,8 +352,8 @@ namespace core{
}}
////////////////////////////////////////////////////////////////////////////////
-ActiveMQConnection::ActiveMQConnection(const Pointer<transport::Transport>& transport,
- const Pointer<decaf::util::Properties>& properties) :
+ActiveMQConnection::ActiveMQConnection(const Pointer<transport::Transport> transport,
+ const Pointer<decaf::util::Properties> properties) :
config(NULL),
connectionMetaData(new ActiveMQConnectionMetaData()),
started(false),
@@ -335,7 +361,8 @@ ActiveMQConnection::ActiveMQConnection(c
closing(false),
transportFailed(false) {
- Pointer<ConnectionConfig> configuration(new ConnectionConfig);
+ Pointer<ConnectionConfig> configuration(
+ new ConnectionConfig(transport, properties));
// Register for messages and exceptions from the connector.
transport->setTransportListener(this);
@@ -344,10 +371,6 @@ ActiveMQConnection::ActiveMQConnection(c
configuration->connectionInfo->setManageable(true);
configuration->connectionInfo->setFaultTolerant(transport->isFaultTolerant());
- // Store of the transport and properties, the Connection now owns them.
- configuration->properties = properties;
- configuration->transport = transport;
-
this->config = configuration.release();
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?rev=1393142&r1=1393141&r2=1393142&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Tue Oct 2 19:48:46 2012
@@ -106,8 +106,8 @@ namespace core{
* @param properties
* The Properties that were defined for this connection
*/
- ActiveMQConnection(const Pointer<transport::Transport>& transport,
- const Pointer<decaf::util::Properties>& properties);
+ ActiveMQConnection(const Pointer<transport::Transport> transport,
+ const Pointer<decaf::util::Properties> properties);
virtual ~ActiveMQConnection();
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.cpp?rev=1393142&r1=1393141&r2=1393142&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.cpp Tue Oct 2 19:48:46 2012
@@ -59,9 +59,7 @@ namespace lang{
Runnable* task;
ThreadHandle* handle;
-
Thread::UncaughtExceptionHandler* exHandler;
-
static unsigned int id;
static Thread::UncaughtExceptionHandler* defaultHandler;
@@ -86,31 +84,31 @@ Thread::Thread(ThreadHandle* handle) : R
////////////////////////////////////////////////////////////////////////////////
Thread::Thread() : Runnable(), properties(NULL) {
- this->initializeSelf( NULL, "", -1 );
+ this->initializeSelf(NULL, "", -1);
}
////////////////////////////////////////////////////////////////////////////////
-Thread::Thread( Runnable* task ) : Runnable(), properties(NULL) {
- this->initializeSelf( task, "", -1 );
+Thread::Thread(Runnable* task) : Runnable(), properties(NULL) {
+ this->initializeSelf(task, "", -1);
}
////////////////////////////////////////////////////////////////////////////////
-Thread::Thread( const std::string& name ) : Runnable(), properties(NULL) {
- this->initializeSelf( NULL, name, -1 );
+Thread::Thread(const std::string& name) : Runnable(), properties(NULL) {
+ this->initializeSelf(NULL, name, -1);
}
////////////////////////////////////////////////////////////////////////////////
-Thread::Thread( Runnable* task, const std::string& name ) : Runnable(), properties( NULL ) {
- this->initializeSelf( task, name, -1 );
+Thread::Thread(Runnable* task, const std::string& name) : Runnable(), properties( NULL ) {
+ this->initializeSelf(task, name, -1);
}
////////////////////////////////////////////////////////////////////////////////
-Thread::Thread( Runnable* task, const std::string& name, long long stackSize ) : Runnable(), properties( NULL ) {
- this->initializeSelf( task, name, stackSize );
+Thread::Thread(Runnable* task, const std::string& name, long long stackSize) : Runnable(), properties( NULL ) {
+ this->initializeSelf(task, name, stackSize);
}
////////////////////////////////////////////////////////////////////////////////
-void Thread::initializeSelf( Runnable* task, const std::string& name, long long stackSize ) {
+void Thread::initializeSelf(Runnable* task, const std::string& name, long long stackSize) {
std::string threadName = name;
@@ -128,7 +126,7 @@ void Thread::initializeSelf( Runnable* t
////////////////////////////////////////////////////////////////////////////////
Thread::~Thread() {
- try{
+ try {
Threading::destroyThread(this->properties->handle);
delete this->properties;
}
@@ -233,7 +231,7 @@ long long Thread::getId() const {
}
////////////////////////////////////////////////////////////////////////////////
-void Thread::setName( const std::string& name ) {
+void Thread::setName(const std::string& name) {
Threading::setThreadName(this->properties->handle, name.c_str());
}
@@ -243,9 +241,9 @@ std::string Thread::getName() const {
}
////////////////////////////////////////////////////////////////////////////////
-void Thread::setPriority( int value ) {
+void Thread::setPriority(int value) {
- if( value < Thread::MIN_PRIORITY || value > Thread::MAX_PRIORITY ) {
+ if (value < Thread::MIN_PRIORITY || value > Thread::MAX_PRIORITY) {
throw IllegalArgumentException(
__FILE__, __LINE__,
"Thread::setPriority - Specified value {%d} is out of range", value );
@@ -260,7 +258,7 @@ int Thread::getPriority() const {
}
////////////////////////////////////////////////////////////////////////////////
-void Thread::setUncaughtExceptionHandler( UncaughtExceptionHandler* handler ) {
+void Thread::setUncaughtExceptionHandler(UncaughtExceptionHandler* handler) {
this->properties->exHandler = handler;
}
@@ -275,7 +273,7 @@ Thread::UncaughtExceptionHandler* Thread
}
////////////////////////////////////////////////////////////////////////////////
-void Thread::setDefaultUncaughtExceptionHandler( Thread::UncaughtExceptionHandler* handler ) {
+void Thread::setDefaultUncaughtExceptionHandler(Thread::UncaughtExceptionHandler* handler) {
ThreadProperties::defaultHandler = handler;
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadFactory.h?rev=1393142&r1=1393141&r2=1393142&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadFactory.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadFactory.h Tue Oct 2 19:48:46 2012
@@ -66,7 +66,7 @@ namespace concurrent {
* @returns constructed thread, or NULL if the request to create a thread is rejected
* the caller owns the returned pointer.
*/
- virtual decaf::lang::Thread* newThread( decaf::lang::Runnable* r ) = 0;
+ virtual decaf::lang::Thread* newThread(decaf::lang::Runnable* r) = 0;
};