You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/10/02 21:48:47 UTC

svn commit: r1393142 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main: activemq/core/ActiveMQConnection.cpp activemq/core/ActiveMQConnection.h decaf/lang/Thread.cpp decaf/util/concurrent/ThreadFactory.h

Author: tabish
Date: Tue Oct  2 19:48:46 2012
New Revision: 1393142

URL: http://svn.apache.org/viewvc?rev=1393142&view=rev
Log:
Update Connection to ensure its executor threads get a meaningful name. 

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadFactory.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1393142&r1=1393141&r2=1393142&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Tue Oct  2 19:48:46 2012
@@ -88,6 +88,29 @@ using namespace decaf::lang::exceptions;
 namespace activemq{
 namespace core{
 
+    class ConnectionThreadFactory : public ThreadFactory {
+    private:
+
+        Pointer<Transport> transport;
+
+    public:
+
+        ConnectionThreadFactory(Pointer<Transport> transport) : transport(transport) {
+            if (transport == NULL) {
+                throw NullPointerException(__FILE__, __LINE__, "Transport cannot be null");
+            }
+        }
+
+        virtual ~ConnectionThreadFactory() {}
+
+        virtual Thread* newThread(decaf::lang::Runnable* runnable) {
+            Thread* thread = new Thread(runnable,
+                    std::string("ActiveMQ Connection Executor: ") + transport->getRemoteAddress());
+            return thread;
+        }
+
+    };
+
     class ConnectionConfig {
     private:
 
@@ -167,8 +190,10 @@ namespace core{
 
         TempDestinationMap activeTempDestinations;
 
-        ConnectionConfig() : properties(),
-                             transport(),
+        ConnectionConfig(const Pointer<transport::Transport> transport,
+                         const Pointer<decaf::util::Properties> properties) :
+                             properties(properties),
+                             transport(transport),
                              clientIdGenerator(),
                              scheduler(),
                              sessionIds(),
@@ -216,7 +241,8 @@ namespace core{
             this->brokerInfoReceived.reset(new CountDownLatch(1));
 
             this->executor.reset(
-                new ThreadPoolExecutor(1, 1, 5, TimeUnit::SECONDS, new LinkedBlockingQueue<Runnable*>()));
+                new ThreadPoolExecutor(1, 1, 5, TimeUnit::SECONDS,
+                    new LinkedBlockingQueue<Runnable*>(), new ConnectionThreadFactory(transport)));
 
             // Generate a connectionId
             std::string uniqueId = CONNECTION_ID_GENERATOR.generateId();
@@ -326,8 +352,8 @@ namespace core{
 }}
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQConnection::ActiveMQConnection(const Pointer<transport::Transport>& transport,
-                                       const Pointer<decaf::util::Properties>& properties) :
+ActiveMQConnection::ActiveMQConnection(const Pointer<transport::Transport> transport,
+                                       const Pointer<decaf::util::Properties> properties) :
     config(NULL),
     connectionMetaData(new ActiveMQConnectionMetaData()),
     started(false),
@@ -335,7 +361,8 @@ ActiveMQConnection::ActiveMQConnection(c
     closing(false),
     transportFailed(false) {
 
-    Pointer<ConnectionConfig> configuration(new ConnectionConfig);
+    Pointer<ConnectionConfig> configuration(
+            new ConnectionConfig(transport, properties));
 
     // Register for messages and exceptions from the connector.
     transport->setTransportListener(this);
@@ -344,10 +371,6 @@ ActiveMQConnection::ActiveMQConnection(c
     configuration->connectionInfo->setManageable(true);
     configuration->connectionInfo->setFaultTolerant(transport->isFaultTolerant());
 
-    // Store of the transport and properties, the Connection now owns them.
-    configuration->properties = properties;
-    configuration->transport = transport;
-
     this->config = configuration.release();
 }
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?rev=1393142&r1=1393141&r2=1393142&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Tue Oct  2 19:48:46 2012
@@ -106,8 +106,8 @@ namespace core{
          * @param properties
          *        The Properties that were defined for this connection
          */
-        ActiveMQConnection(const Pointer<transport::Transport>& transport,
-                           const Pointer<decaf::util::Properties>& properties);
+        ActiveMQConnection(const Pointer<transport::Transport> transport,
+                           const Pointer<decaf::util::Properties> properties);
 
         virtual ~ActiveMQConnection();
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.cpp?rev=1393142&r1=1393141&r2=1393142&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/lang/Thread.cpp Tue Oct  2 19:48:46 2012
@@ -59,9 +59,7 @@ namespace lang{
 
         Runnable* task;
         ThreadHandle* handle;
-
         Thread::UncaughtExceptionHandler* exHandler;
-
         static unsigned int id;
         static Thread::UncaughtExceptionHandler* defaultHandler;
 
@@ -86,31 +84,31 @@ Thread::Thread(ThreadHandle* handle) : R
 
 ////////////////////////////////////////////////////////////////////////////////
 Thread::Thread() : Runnable(), properties(NULL) {
-    this->initializeSelf( NULL, "", -1 );
+    this->initializeSelf(NULL, "", -1);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Thread::Thread( Runnable* task ) : Runnable(), properties(NULL) {
-    this->initializeSelf( task, "", -1 );
+Thread::Thread(Runnable* task) : Runnable(), properties(NULL) {
+    this->initializeSelf(task, "", -1);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Thread::Thread( const std::string& name ) : Runnable(), properties(NULL) {
-    this->initializeSelf( NULL, name, -1 );
+Thread::Thread(const std::string& name) : Runnable(), properties(NULL) {
+    this->initializeSelf(NULL, name, -1);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Thread::Thread( Runnable* task, const std::string& name ) : Runnable(), properties( NULL ) {
-    this->initializeSelf( task, name, -1 );
+Thread::Thread(Runnable* task, const std::string& name) : Runnable(), properties( NULL ) {
+    this->initializeSelf(task, name, -1);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Thread::Thread( Runnable* task, const std::string& name, long long stackSize ) : Runnable(), properties( NULL ) {
-    this->initializeSelf( task, name, stackSize );
+Thread::Thread(Runnable* task, const std::string& name, long long stackSize) : Runnable(), properties( NULL ) {
+    this->initializeSelf(task, name, stackSize);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void Thread::initializeSelf( Runnable* task, const std::string& name, long long stackSize ) {
+void Thread::initializeSelf(Runnable* task, const std::string& name, long long stackSize) {
 
     std::string threadName = name;
 
@@ -128,7 +126,7 @@ void Thread::initializeSelf( Runnable* t
 
 ////////////////////////////////////////////////////////////////////////////////
 Thread::~Thread() {
-    try{
+    try {
         Threading::destroyThread(this->properties->handle);
         delete this->properties;
     }
@@ -233,7 +231,7 @@ long long Thread::getId() const {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void Thread::setName( const std::string& name ) {
+void Thread::setName(const std::string& name) {
     Threading::setThreadName(this->properties->handle, name.c_str());
 }
 
@@ -243,9 +241,9 @@ std::string Thread::getName() const {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void Thread::setPriority( int value ) {
+void Thread::setPriority(int value) {
 
-    if( value < Thread::MIN_PRIORITY || value > Thread::MAX_PRIORITY ) {
+    if (value < Thread::MIN_PRIORITY || value > Thread::MAX_PRIORITY) {
         throw IllegalArgumentException(
             __FILE__, __LINE__,
             "Thread::setPriority - Specified value {%d} is out of range", value );
@@ -260,7 +258,7 @@ int Thread::getPriority() const {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void Thread::setUncaughtExceptionHandler( UncaughtExceptionHandler* handler ) {
+void Thread::setUncaughtExceptionHandler(UncaughtExceptionHandler* handler) {
     this->properties->exHandler = handler;
 }
 
@@ -275,7 +273,7 @@ Thread::UncaughtExceptionHandler* Thread
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void Thread::setDefaultUncaughtExceptionHandler( Thread::UncaughtExceptionHandler* handler ) {
+void Thread::setDefaultUncaughtExceptionHandler(Thread::UncaughtExceptionHandler* handler) {
     ThreadProperties::defaultHandler = handler;
 }
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadFactory.h?rev=1393142&r1=1393141&r2=1393142&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadFactory.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/ThreadFactory.h Tue Oct  2 19:48:46 2012
@@ -66,7 +66,7 @@ namespace concurrent {
          * @returns constructed thread, or NULL if the request to create a thread is rejected
          *          the caller owns the returned pointer.
          */
-        virtual decaf::lang::Thread* newThread( decaf::lang::Runnable* r ) = 0;
+        virtual decaf::lang::Thread* newThread(decaf::lang::Runnable* r) = 0;
 
     };