You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/04/11 17:13:48 UTC

svn commit: r1324807 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main: activemq/core/ActiveMQConnection.cpp activemq/core/ActiveMQConnection.h decaf/util/concurrent/Executor.h

Author: tabish
Date: Wed Apr 11 15:13:48 2012
New Revision: 1324807

URL: http://svn.apache.org/viewvc?rev=1324807&view=rev
Log:
work on: https://issues.apache.org/jira/browse/AMQCPP-376

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executor.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1324807&r1=1324806&r2=1324807&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Wed Apr 11 15:13:48 2012
@@ -40,6 +40,8 @@
 #include <decaf/util/concurrent/Mutex.h>
 #include <decaf/util/concurrent/TimeUnit.h>
 #include <decaf/util/concurrent/CountDownLatch.h>
+#include <decaf/util/concurrent/ThreadPoolExecutor.h>
+#include <decaf/util/concurrent/LinkedBlockingQueue.h>
 
 #include <activemq/commands/Command.h>
 #include <activemq/commands/ActiveMQMessage.h>
@@ -107,6 +109,7 @@ namespace core{
         Pointer<transport::Transport> transport;
         Pointer<util::IdGenerator> clientIdGenerator;
         Pointer<Scheduler> scheduler;
+        Pointer<ExecutorService> executor;
 
         util::LongSequenceGenerator sessionIds;
         util::LongSequenceGenerator tempDestinationIds;
@@ -192,6 +195,9 @@ namespace core{
             this->connectionInfo.reset(new ConnectionInfo());
             this->brokerInfoReceived.reset(new CountDownLatch(1));
 
+            this->executor.reset(
+                new ThreadPoolExecutor(1, 1, 5, TimeUnit::SECONDS, new LinkedBlockingQueue<Runnable*>()));
+
             // Generate a connectionId
             std::string uniqueId = CONNECTION_ID_GENERATOR.generateId();
             decaf::lang::Pointer<ConnectionId> connectionId(new ConnectionId());
@@ -208,6 +214,90 @@ namespace core{
 
     // Static init.
     util::IdGenerator ConnectionConfig::CONNECTION_ID_GENERATOR;
+
+    class ConnectionErrorRunnable : public Runnable {
+    private:
+
+        ActiveMQConnection* connection;
+        Pointer<ConnectionError> error;
+
+    public:
+
+        ConnectionErrorRunnable(ActiveMQConnection* connection, Pointer<ConnectionError> error) :
+            Runnable(), connection(connection), error(error) {}
+        virtual ~ConnectionErrorRunnable() {}
+
+        virtual void run() {
+            try {
+                if (error != NULL && error->getException() != NULL) {
+                    this->connection->onAsyncException(error->getException()->createExceptionObject());
+                }
+            } catch(Exception& ex) {}
+        }
+    };
+
+    class OnAsyncExceptionRunnable : public Runnable {
+    private:
+
+        ActiveMQConnection* connection;
+        Exception ex;
+
+    public:
+
+        OnAsyncExceptionRunnable(ActiveMQConnection* connection, Exception ex) :
+            Runnable(), connection(connection), ex(ex) {}
+        virtual ~OnAsyncExceptionRunnable() {}
+
+        virtual void run() {
+            try {
+                cms::ExceptionListener* listener = this->connection->getExceptionListener();
+                if (listener != NULL) {
+                    ActiveMQException amqEx(ex);
+                    listener->onException(amqEx.convertToCMSException());
+                }
+            } catch(Exception& ex) {}
+        }
+    };
+
+    class OnExceptionRunnable : public Runnable {
+    private:
+
+        ActiveMQConnection* connection;
+        ConnectionConfig* config;
+        Exception* ex;
+
+    public:
+
+        OnExceptionRunnable(ActiveMQConnection* connection, ConnectionConfig* config, Exception* ex) :
+            Runnable(), connection(connection), config(config), ex(ex) {}
+        virtual ~OnExceptionRunnable() {}
+
+        virtual void run() {
+            try {
+                // Mark this Connection as having a Failed transport.
+                this->connection->setFirstFailureError(ex);
+
+                try{
+                    this->config->transport->stop();
+                } catch(...) {
+                }
+
+                this->config->brokerInfoReceived->countDown();
+
+                // Clean up the Connection resources.
+                this->connection->cleanup();
+
+                Pointer< Iterator<TransportListener*> > iter( this->config->transportListeners.iterator() );
+
+                while( iter->hasNext() ) {
+                    try{
+                        iter->next()->onException(ex);
+                    } catch(...) {}
+                }
+            } catch(Exception& ex) {}
+        }
+    };
+
 }}
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -253,7 +343,7 @@ ActiveMQConnection::~ActiveMQConnection(
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::addDispatcher(
-    const decaf::lang::Pointer<ConsumerId>& consumer, Dispatcher* dispatcher ) {
+    const decaf::lang::Pointer<ConsumerId>& consumer, Dispatcher* dispatcher) {
 
     try{
         synchronized(&this->config->dispatchers) {
@@ -266,7 +356,7 @@ void ActiveMQConnection::addDispatcher(
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::removeDispatcher(const decaf::lang::Pointer<ConsumerId>& consumer) {
 
-    try{
+    try {
         synchronized(&this->config->dispatchers) {
             this->config->dispatchers.remove(consumer);
         }
@@ -433,6 +523,13 @@ void ActiveMQConnection::close() {
             }
         }
 
+        try {
+            if (this->config->executor != NULL) {
+                this->config->executor->shutdown();
+            }
+        } catch(Exception& ex) {
+        }
+
         // Now inform the Broker we are shutting down.
         this->disconnect(lastDeliveredSequenceId);
 
@@ -447,7 +544,7 @@ void ActiveMQConnection::close() {
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::cleanup() {
 
-    try{
+    try {
 
         // Get the complete list of active sessions.
         std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > > iter( this->config->activeSessions.iterator() );
@@ -483,7 +580,7 @@ void ActiveMQConnection::cleanup() {
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::start() {
 
-    try{
+    try {
 
         checkClosedOrFailed();
         ensureConnectionInfoSent();
@@ -528,7 +625,7 @@ void ActiveMQConnection::stop() {
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::disconnect(long long lastDeliveredSequenceId) {
 
-    try{
+    try {
 
         // Clear the listener, we don't care about async errors at this point.
         this->config->transport->setTransportListener(NULL);
@@ -600,7 +697,7 @@ void ActiveMQConnection::disconnect(long
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::sendPullRequest( const ConsumerInfo* consumer, long long timeout ) {
+void ActiveMQConnection::sendPullRequest(const ConsumerInfo* consumer, long long timeout) {
 
     try {
 
@@ -622,7 +719,7 @@ void ActiveMQConnection::sendPullRequest
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::destroyDestination(const ActiveMQDestination* destination) {
 
-    try{
+    try {
 
         if (destination == NULL) {
             throw NullPointerException(__FILE__, __LINE__, "Destination passed was NULL");
@@ -650,7 +747,7 @@ void ActiveMQConnection::destroyDestinat
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::destroyDestination(const cms::Destination* destination) {
 
-    try{
+    try {
 
         if (destination == NULL) {
             throw NullPointerException(__FILE__, __LINE__, "Destination passed was NULL");
@@ -672,9 +769,9 @@ void ActiveMQConnection::destroyDestinat
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::onCommand( const Pointer<Command>& command ) {
+void ActiveMQConnection::onCommand(const Pointer<Command>& command) {
 
-    try{
+    try {
 
         if (command->isMessageDispatch()) {
 
@@ -755,40 +852,11 @@ void ActiveMQConnection::onException( co
 
     try {
 
-        // We're disconnected - the asynchronous error is expected.
-        if( this->isClosed() || this->closing.get() ) {
-            return;
-        }
-
         onAsyncException(ex);
 
-        // TODO This part should be done in a separate Thread.
-
-        // Mark this Connection as having a Failed transport.
-        this->transportFailed.set( true );
-        if( this->config->firstFailureError == NULL ) {
-            this->config->firstFailureError.reset( ex.clone() );
-        }
-
-        this->config->brokerInfoReceived->countDown();
-
-        // TODO - Until this fires in another thread we can't dipose of
-        //        the transport here since it could result in this method
-        //        being called again recursively.
-        try{
-            // this->config->transport->stop();
-        } catch(...) {
-        }
-
-        // Clean up the Connection resources.
-        cleanup();
-
-        Pointer< Iterator<TransportListener*> > iter( this->config->transportListeners.iterator() );
-
-        while( iter->hasNext() ) {
-            try{
-                iter->next()->onException( ex );
-            } catch(...) {}
+        // We're disconnected - the asynchronous error is expected.
+        if (!this->isClosed() || !this->closing.get()) {
+            this->config->executor->execute(new OnExceptionRunnable(this, config, ex.clone()));
         }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
@@ -796,15 +864,12 @@ void ActiveMQConnection::onException( co
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::onAsyncException( const decaf::lang::Exception& ex ) {
+void ActiveMQConnection::onAsyncException(const decaf::lang::Exception& ex) {
 
-    if( !this->isClosed() || !this->closing.get() ) {
+    if (!this->isClosed() || !this->closing.get()) {
 
-        if( this->config->exceptionListener != NULL ) {
-
-            // Inform the user of the error.
-            // TODO - This should be run by another Thread, i.e. Executor
-            fire( exceptions::ActiveMQException( ex ) );
+        if (this->config->exceptionListener != NULL) {
+            this->config->executor->execute(new OnAsyncExceptionRunnable(this, ex));
         }
     }
 }
@@ -843,7 +908,7 @@ void ActiveMQConnection::transportResume
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::oneway( Pointer<Command> command ) {
+void ActiveMQConnection::oneway(Pointer<Command> command) {
 
     try {
         checkClosedOrFailed();
@@ -856,7 +921,7 @@ void ActiveMQConnection::oneway( Pointer
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Pointer<Response> ActiveMQConnection::syncRequest( Pointer<Command> command, unsigned int timeout ) {
+Pointer<Response> ActiveMQConnection::syncRequest(Pointer<Command> command, unsigned int timeout) {
 
     try {
 
@@ -912,7 +977,7 @@ void ActiveMQConnection::checkClosedOrFa
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::ensureConnectionInfoSent() {
 
-    try{
+    try {
 
         // Can we skip sending the ConnectionInfo packet, cheap test
         if (this->config->isConnectionInfoSentToBroker || closed.get()) {
@@ -1033,7 +1098,7 @@ void ActiveMQConnection::signalInterrupt
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setUsername( const std::string& username ) {
+void ActiveMQConnection::setUsername(const std::string& username) {
     this->config->connectionInfo->setUserName( username );
 }
 
@@ -1043,7 +1108,7 @@ const std::string& ActiveMQConnection::g
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setPassword( const std::string& password ){
+void ActiveMQConnection::setPassword(const std::string& password) {
     this->config->connectionInfo->setPassword( password );
 }
 
@@ -1053,7 +1118,7 @@ const std::string& ActiveMQConnection::g
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setBrokerURL( const std::string& brokerURL ){
+void ActiveMQConnection::setBrokerURL(const std::string& brokerURL) {
     this->config->brokerURL = brokerURL;
 }
 
@@ -1063,7 +1128,7 @@ const std::string& ActiveMQConnection::g
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setExceptionListener( cms::ExceptionListener* listener ) {
+void ActiveMQConnection::setExceptionListener(cms::ExceptionListener* listener) {
     this->config->exceptionListener = listener;
 }
 
@@ -1073,8 +1138,8 @@ cms::ExceptionListener* ActiveMQConnecti
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setPrefetchPolicy( PrefetchPolicy* policy ) {
-    this->config->defaultPrefetchPolicy.reset( policy );
+void ActiveMQConnection::setPrefetchPolicy(PrefetchPolicy* policy) {
+    this->config->defaultPrefetchPolicy.reset(policy);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -1083,8 +1148,8 @@ PrefetchPolicy* ActiveMQConnection::getP
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setRedeliveryPolicy( RedeliveryPolicy* policy ) {
-    this->config->defaultRedeliveryPolicy.reset( policy );
+void ActiveMQConnection::setRedeliveryPolicy(RedeliveryPolicy* policy) {
+    this->config->defaultRedeliveryPolicy.reset(policy);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -1108,7 +1173,7 @@ bool ActiveMQConnection::isAlwaysSyncSen
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setAlwaysSyncSend( bool value ) {
+void ActiveMQConnection::setAlwaysSyncSend(bool value) {
     this->config->alwaysSyncSend = value;
 }
 
@@ -1118,7 +1183,7 @@ bool ActiveMQConnection::isUseAsyncSend(
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setUseAsyncSend( bool value ) {
+void ActiveMQConnection::setUseAsyncSend(bool value) {
     this->config->useAsyncSend = value;
 }
 
@@ -1128,7 +1193,7 @@ bool ActiveMQConnection::isUseCompressio
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setUseCompression( bool value ) {
+void ActiveMQConnection::setUseCompression(bool value) {
     this->config->useCompression = value;
 }
 
@@ -1138,13 +1203,13 @@ int ActiveMQConnection::getCompressionLe
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setCompressionLevel( int value ) {
+void ActiveMQConnection::setCompressionLevel(int value) {
 
-    if( value < 0 ) {
+    if (value < 0) {
         this->config->compressionLevel = -1;
     }
 
-    this->config->compressionLevel = Math::min( value, 9 );
+    this->config->compressionLevel = Math::min(value, 9);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -1153,7 +1218,7 @@ unsigned int ActiveMQConnection::getSend
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setSendTimeout( unsigned int timeout ) {
+void ActiveMQConnection::setSendTimeout(unsigned int timeout) {
     this->config->sendTimeout = timeout;
 }
 
@@ -1163,7 +1228,7 @@ unsigned int ActiveMQConnection::getClos
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setCloseTimeout( unsigned int timeout ) {
+void ActiveMQConnection::setCloseTimeout(unsigned int timeout) {
     this->config->closeTimeout = timeout;
 }
 
@@ -1173,7 +1238,7 @@ unsigned int ActiveMQConnection::getProd
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setProducerWindowSize( unsigned int windowSize ) {
+void ActiveMQConnection::setProducerWindowSize(unsigned int windowSize) {
     this->config->producerWindowSize = windowSize;
 }
 
@@ -1189,7 +1254,7 @@ long long ActiveMQConnection::getNextLoc
 
 ////////////////////////////////////////////////////////////////////////////////
 transport::Transport& ActiveMQConnection::getTransport() const {
-    return *( this->config->transport );
+    return *(this->config->transport);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -1203,11 +1268,23 @@ bool ActiveMQConnection::isMessagePriori
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setMessagePrioritySupported( bool value ) {
+void ActiveMQConnection::setMessagePrioritySupported(bool value) {
     this->config->messagePrioritySupported = value;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setFirstFailureError(decaf::lang::Exception* error) {
+
+    this->transportFailed.set(true);
+
+    if (this->config->firstFailureError == NULL) {
+        this->config->firstFailureError.reset(error);
+    } else {
+        delete error;
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
 decaf::lang::Exception* ActiveMQConnection::getFirstFailureError() const {
     return this->config->firstFailureError.get();
 }
@@ -1217,7 +1294,7 @@ std::string ActiveMQConnection::getResou
     try {
         this->config->waitForBrokerInfo();
 
-        if( this->config->brokerInfo == NULL ) {
+        if (this->config->brokerInfo == NULL) {
             throw CMSException("Connection failed before Broker info was received.");
         }
 
@@ -1228,5 +1305,10 @@ std::string ActiveMQConnection::getResou
 
 ////////////////////////////////////////////////////////////////////////////////
 const decaf::util::Properties& ActiveMQConnection::getProperties() const {
-    return *( this->config->properties );
+    return *(this->config->properties);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ExecutorService* ActiveMQConnection::getExecutor() const {
+    return this->config->executor.get();
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?rev=1324807&r1=1324806&r2=1324807&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Wed Apr 11 15:13:48 2012
@@ -35,6 +35,7 @@
 #include <decaf/util/Properties.h>
 #include <decaf/util/concurrent/atomic/AtomicBoolean.h>
 #include <decaf/util/concurrent/CopyOnWriteArrayList.h>
+#include <decaf/util/concurrent/ExecutorService.h>
 #include <decaf/lang/exceptions/UnsupportedOperationException.h>
 #include <decaf/lang/exceptions/NullPointerException.h>
 #include <decaf/lang/exceptions/IllegalStateException.h>
@@ -653,9 +654,17 @@ namespace core{
         void setTransportInterruptionProcessingComplete();
 
         /**
+         * Sets the pointer to the first exception that caused the Connection to become failed.
+         *
+         * @param pointer to the exception instance that is to be the first failure error if the
+         *        first error is already set this value is deleted.
+         */
+        void setFirstFailureError(decaf::lang::Exception* error);
+
+        /**
          * Gets the pointer to the first exception that caused the Connection to become failed.
          *
-         * @returns pointer to and Exception instance or NULL if none is set.
+         * @returns pointer to an Exception instance or NULL if none is set.
          */
         decaf::lang::Exception* getFirstFailureError() const;
 
@@ -686,6 +695,11 @@ namespace core{
          */
         void ensureConnectionInfoSent();
 
+        /**
+         * @returns the ExecutorService used to run jobs for this Connection
+         */
+        decaf::util::concurrent::ExecutorService* getExecutor() const;
+
     protected:
 
         /**

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executor.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executor.h?rev=1324807&r1=1324806&r2=1324807&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executor.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/concurrent/Executor.h Wed Apr 11 15:13:48 2012
@@ -101,7 +101,6 @@ namespace concurrent {
          *
          * @throws NullPointerException if command is null
          */
-
         virtual void execute(decaf::lang::Runnable* command) = 0;
 
         /**