You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/06/07 23:34:19 UTC

svn commit: r1347807 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq: core/ActiveMQConnection.cpp transport/IOTransport.cpp transport/TransportFilter.cpp wireformat/openwire/OpenWireFormat.cpp

Author: tabish
Date: Thu Jun  7 21:34:18 2012
New Revision: 1347807

URL: http://svn.apache.org/viewvc?rev=1347807&view=rev
Log:
Merge fixes for better connection shutdown logic.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormat.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1347807&r1=1347806&r2=1347807&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Thu Jun  7 21:34:18 2012
@@ -516,20 +516,36 @@ void ActiveMQConnection::close() {
             return;
         }
 
+        Exception ex;
+        bool hasException = false;
+
         // If we are running lets stop first.
         if (!this->transportFailed.get()) {
-            this->stop();
+            try {
+                this->stop();
+            } catch (cms::CMSException& error) {
+                if (!hasException) {
+                    ex = Exception(&error);
+                    ex.setMark(__FILE__, __LINE__);
+                    hasException = true;
+                }
+            }
         }
 
         // Indicates we are on the way out to suppress any exceptions getting
         // passed on from the transport as it goes down.
-        this->closing.set( true );
+        this->closing.set(true);
 
         if (this->config->scheduler != NULL) {
             try {
                 this->config->scheduler->stop();
+            } catch (Exception& error) {
+                if (!hasException) {
+                    ex = error;
+                    ex.setMark(__FILE__, __LINE__);
+                    hasException = true;
+                }
             }
-            AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
         }
 
         // Get the complete list of active sessions.
@@ -540,39 +556,67 @@ void ActiveMQConnection::close() {
         // Dispose of all the Session resources we know are still open.
         while (iter->hasNext()) {
             Pointer<ActiveMQSessionKernel> session = iter->next();
-            try{
+            try {
                 session->dispose();
-                lastDeliveredSequenceId =
-                    Math::max(lastDeliveredSequenceId, session->getLastDeliveredSequenceId());
+                lastDeliveredSequenceId = Math::max(lastDeliveredSequenceId, session->getLastDeliveredSequenceId());
             } catch( cms::CMSException& ex ){
-                /* Absorb */
             }
         }
 
-        // As TemporaryQueue and TemporaryTopic instances are bound
-        // to a connection we should just delete them after the connection
-        // is closed to free up memory
+        // As TemporaryQueue and TemporaryTopic instances are bound to a connection
+        // we should just delete them after the connection is closed to free up memory
         std::vector< Pointer<ActiveMQTempDestination> > values = this->config->activeTempDestinations.values();
         std::vector< Pointer<ActiveMQTempDestination> >::iterator iterator = values.begin();
-        for(; iterator != values.end(); ++iterator) {
-            Pointer<ActiveMQTempDestination> dest = *iterator;
-            dest->close();
+        try {
+            for(; iterator != values.end(); ++iterator) {
+                Pointer<ActiveMQTempDestination> dest = *iterator;
+                dest->close();
+            }
+        } catch (cms::CMSException& error) {
+            if (!hasException) {
+                ex = Exception(&error);
+                ex.setMark(__FILE__, __LINE__);
+                hasException = true;
+            }
+        } catch (std::exception& stdex) {
+            if (!hasException) {
+                ex = Exception(&stdex);
+                ex.setMark(__FILE__, __LINE__);
+                hasException = true;
+            }
         }
 
         try {
             if (this->config->executor != NULL) {
                 this->config->executor->shutdown();
             }
-        } catch(Exception& ex) {
+        } catch (Exception& error) {
+            if (!hasException) {
+                ex = error;
+                ex.setMark(__FILE__, __LINE__);
+                hasException = true;
+            }
         }
 
         // Now inform the Broker we are shutting down.
-        this->disconnect(lastDeliveredSequenceId);
+        try {
+            this->disconnect(lastDeliveredSequenceId);
+        } catch (Exception& error) {
+            if (!hasException) {
+                ex = error;
+                ex.setMark(__FILE__, __LINE__);
+                hasException = true;
+            }
+        }
 
         // Once current deliveries are done this stops the delivery
         // of any new messages.
         this->started.set(false);
         this->closed.set(true);
+
+        if (hasException) {
+            throw ex;
+        }
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
@@ -713,15 +757,7 @@ void ActiveMQConnection::disconnect(long
                 }
             }
 
-            try {
-                this->config->transport.reset(NULL);
-            } catch (exceptions::ActiveMQException& ex) {
-                if (!hasException) {
-                    hasException = true;
-                    ex.setMark(__FILE__, __LINE__);
-                    e = ex;
-                }
-            }
+            this->config->transport.reset(NULL);
         }
 
         // If we encountered an exception - throw the first one we encountered.

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp?rev=1347807&r1=1347806&r2=1347807&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp Thu Jun  7 21:34:18 2012
@@ -181,39 +181,54 @@ void IOTransport::stop() {
 ////////////////////////////////////////////////////////////////////////////////
 void IOTransport::close() {
 
-    try{
+    class Finalizer {
+    private:
+
+        Pointer<Thread> target;
+
+    public:
+
+        Finalizer(Pointer<Thread> target) : target(target) {}
+        ~Finalizer() {
+            try {
+                target->join();
+                target.reset(NULL);
+            }
+            DECAF_CATCHALL_NOTHROW()
+        }
+    };
+
+    try {
 
-        if( closed ){
+        if (closed) {
             return;
         }
 
+
         // Mark this transport as closed.
         closed = true;
 
-        // We have to close the input stream before
-        // we stop the thread.  this will force us to
-        // wake up the thread if it's stuck in a read
-        // (which is likely).  Otherwise, the join that
-        // follows will block forever.
-        if( inputStream != NULL ){
+        Finalizer finalize(thread);
+
+        // No need to fire anymore async events now.
+        this->listener = NULL;
+
+        // We have to close the input stream before we stop the thread.  this will
+        // force us to wake up the thread if it's stuck in a read (which is likely).
+        // Otherwise, the join that follows will block forever.
+        if (inputStream != NULL) {
             inputStream->close();
             inputStream = NULL;
         }
 
-        // Wait for the thread to die.
-        if( thread != NULL ){
-            thread->join();
-            thread.reset( NULL );
-        }
-
         // Close the output stream.
-        if( outputStream != NULL ){
+        if (outputStream != NULL) {
             outputStream->close();
             outputStream = NULL;
         }
 
         // Clear the WireFormat so we can't use it anymore
-        this->wireFormat.reset( NULL );
+        this->wireFormat.reset(NULL);
     }
     AMQ_CATCH_RETHROW( IOException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp?rev=1347807&r1=1347806&r2=1347807&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp Thu Jun  7 21:34:18 2012
@@ -108,6 +108,8 @@ void TransportFilter::close() {
         next->close();
         next.reset( NULL );
     }
+
+    listener = NULL;
 }
 
 ////////////////////////////////////////////////////////////////////////////////

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormat.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormat.cpp?rev=1347807&r1=1347806&r2=1347807&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormat.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormat.cpp Thu Jun  7 21:34:18 2012
@@ -50,7 +50,7 @@ using namespace decaf::lang::exceptions;
 ////////////////////////////////////////////////////////////////////////////////
 const unsigned char OpenWireFormat::NULL_TYPE = 0;
 const int OpenWireFormat::DEFAULT_VERSION = 1;
-const int OpenWireFormat::MAX_SUPPORTED_VERSION = 6;
+const int OpenWireFormat::MAX_SUPPORTED_VERSION = 9;
 
 ////////////////////////////////////////////////////////////////////////////////
 OpenWireFormat::OpenWireFormat( const decaf::util::Properties& properties ) :