You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/06/07 21:15:36 UTC

svn commit: r1347758 - in /activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq: core/ActiveMQConnection.cpp transport/IOTransport.cpp transport/TransportFilter.cpp

Author: tabish
Date: Thu Jun  7 19:15:36 2012
New Revision: 1347758

URL: http://svn.apache.org/viewvc?rev=1347758&view=rev
Log:
fixes for: https://issues.apache.org/jira/browse/AMQCPP-407

Modified:
    activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/IOTransport.cpp
    activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp

Modified: activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1347758&r1=1347757&r2=1347758&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Thu Jun  7 19:15:36 2012
@@ -406,54 +406,78 @@ void ActiveMQConnection::close() {
 
     try {
 
-        if( this->isClosed() ) {
+        if (this->isClosed()) {
             return;
         }
 
+        Exception ex;
+        bool hasException = false;
+
         // If we are running lets stop first.
-        if( !this->transportFailed.get() ) {
-            this->stop();
+        if (!this->transportFailed.get()) {
+        	try {
+            	this->stop();
+        	} catch (cms::CMSException& error) {
+        		if (!hasException) {
+					ex = Exception(&error);
+					ex.setMark(__FILE__, __LINE__);
+					hasException = true;
+        		}
+        	}
         }
 
         // Indicates we are on the way out to suppress any exceptions getting
         // passed on from the transport as it goes down.
         this->closing.set( true );
 
-        if(this->config->scheduler != NULL) {
+        if (this->config->scheduler != NULL) {
             try {
                 this->config->scheduler->stop();
-            }
-            AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+        	} catch (Exception& error) {
+        		if (!hasException) {
+					ex = error;
+					ex.setMark(__FILE__, __LINE__);
+					hasException = true;
+        		}
+        	}
         }
 
 		long long lastDeliveredSequenceId = 0;
 
         synchronized( &this->config->activeSessions ) {
-
 			// Get the complete list of active sessions.
-			std::auto_ptr< Iterator<ActiveMQSession*> > iter( this->config->activeSessions.iterator() );
+			std::auto_ptr<Iterator<ActiveMQSession*> > iter(this->config->activeSessions.iterator());
 
 			// Dispose of all the Session resources we know are still open.
-			while( iter->hasNext() ) {
+			while (iter->hasNext()) {
 				ActiveMQSession* session = iter->next();
-				try{
+				try {
 					session->dispose();
-
-					lastDeliveredSequenceId =
-						Math::max( lastDeliveredSequenceId, session->getLastDeliveredSequenceId() );
-				} catch( cms::CMSException& ex ){
-					/* Absorb */
+					lastDeliveredSequenceId = Math::max(lastDeliveredSequenceId, session->getLastDeliveredSequenceId());
+				} catch(cms::CMSException& ex){
 				}
 			}
         }
 
         // Now inform the Broker we are shutting down.
-        this->disconnect( lastDeliveredSequenceId );
+        try {
+            this->disconnect(lastDeliveredSequenceId);
+    	} catch (Exception& error) {
+    		if (!hasException) {
+				ex = error;
+				ex.setMark(__FILE__, __LINE__);
+				hasException = true;
+    		}
+    	}
 
         // Once current deliveries are done this stops the delivery
         // of any new messages.
-        this->started.set( false );
-        this->closed.set( true );
+        this->started.set(false);
+		this->closed.set(true);
+
+		if (hasException) {
+			throw ex;
+		}
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
@@ -550,50 +574,58 @@ void ActiveMQConnection::disconnect( lon
         // Clear the listener, we don't care about async errors at this point.
         this->config->transport->setTransportListener( NULL );
 
-        if( this->config->isConnectionInfoSentToBroker ) {
+        // Allow the Support class to shutdown its resources, including the Transport.
+        bool hasException = false;
+        Exception e;
+
+        if (this->config->isConnectionInfoSentToBroker) {
 
             // Remove our ConnectionId from the Broker
-            Pointer<RemoveInfo> command( this->config->connectionInfo->createRemoveCommand() );
-            command->setLastDeliveredSequenceId( lastDeliveredSequenceId );
-            this->syncRequest( command, this->config->closeTimeout );
+            try {
+				Pointer<RemoveInfo> command(this->config->connectionInfo->createRemoveCommand());
+				command->setLastDeliveredSequenceId(lastDeliveredSequenceId);
+				this->syncRequest(command, this->config->closeTimeout);
+			} catch (Exception& ex) {
+				if (!hasException) {
+					hasException = true;
+					ex.setMark(__FILE__, __LINE__);
+					e = ex;
+				}
+			}
 
             // Send the disconnect command to the broker.
-            Pointer<ShutdownInfo> shutdown( new ShutdownInfo() );
-            oneway( shutdown );
+            try {
+				Pointer<ShutdownInfo> shutdown(new ShutdownInfo());
+				oneway(shutdown);
+			} catch (Exception& ex) {
+				if (!hasException) {
+					hasException = true;
+					ex.setMark(__FILE__, __LINE__);
+					e = ex;
+				}
+			}
         }
 
-        // Allow the Support class to shutdown its resources, including the Transport.
-        bool hasException = false;
-        exceptions::ActiveMQException e;
+        if (this->config->transport != NULL) {
 
-        if( this->config->transport != NULL ){
-
-            try{
-                this->config->transport->close();
-            }catch( exceptions::ActiveMQException& ex ){
-                if( !hasException ){
-                    hasException = true;
-                    ex.setMark(__FILE__, __LINE__ );
-                    e = ex;
-                }
-            }
+			try {
+				this->config->transport->close();
+			} catch (Exception& ex) {
+				if (!hasException) {
+					hasException = true;
+					ex.setMark(__FILE__, __LINE__);
+					e = ex;
+				}
+			}
 
-            try{
-                this->config->transport.reset( NULL );
-            }catch( exceptions::ActiveMQException& ex ){
-                if( !hasException ){
-                    hasException = true;
-                    ex.setMark(__FILE__, __LINE__ );
-                    e = ex;
-                }
-            }
-        }
+			this->config->transport.reset(NULL);
+		}
 
         // If we encountered an exception - throw the first one we encountered.
         // This will preserve the stack trace for logging purposes.
-        if( hasException ){
-            throw e;
-        }
+        if (hasException) {
+			throw e;
+		}
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )

Modified: activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/IOTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/IOTransport.cpp?rev=1347758&r1=1347757&r2=1347758&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/IOTransport.cpp (original)
+++ activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/IOTransport.cpp Thu Jun  7 19:15:36 2012
@@ -181,40 +181,55 @@ void IOTransport::stop() {
 ////////////////////////////////////////////////////////////////////////////////
 void IOTransport::close() {
 
-    try{
+	class Finalizer {
+	private:
 
-        if( closed ){
-            return;
-        }
+		Pointer<Thread> target;
 
-        // Mark this transport as closed.
-        closed = true;
+	public:
 
-        // We have to close the input stream before
-        // we stop the thread.  this will force us to
-        // wake up the thread if it's stuck in a read
-        // (which is likely).  Otherwise, the join that
-        // follows will block forever.
-        if( inputStream != NULL ){
-            inputStream->close();
-            inputStream = NULL;
-        }
-
-        // Wait for the thread to die.
-        if( thread != NULL ){
-            thread->join();
-            thread.reset( NULL );
-        }
-
-        // Close the output stream.
-        if( outputStream != NULL ){
-            outputStream->close();
-            outputStream = NULL;
-        }
-
-        // Clear the WireFormat so we can't use it anymore
-        this->wireFormat.reset( NULL );
-    }
+		Finalizer(Pointer<Thread> target) : target(target) {}
+		~Finalizer() {
+			try {
+				target->join();
+				target.reset(NULL);
+			}
+			DECAF_CATCHALL_NOTHROW()
+		}
+	};
+
+    try {
+
+		if (closed) {
+			return;
+		}
+
+
+		// Mark this transport as closed.
+		closed = true;
+
+		Finalizer finalize(thread);
+
+		// No need to fire anymore async events now.
+		this->listener = NULL;
+
+		// We have to close the input stream before we stop the thread.  this will
+		// force us to wake up the thread if it's stuck in a read (which is likely).
+		// Otherwise, the join that follows will block forever.
+		if (inputStream != NULL) {
+			inputStream->close();
+			inputStream = NULL;
+		}
+
+		// Close the output stream.
+		if (outputStream != NULL) {
+			outputStream->close();
+			outputStream = NULL;
+		}
+
+		// Clear the WireFormat so we can't use it anymore
+		this->wireFormat.reset(NULL);
+	}
     AMQ_CATCH_RETHROW( IOException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
     AMQ_CATCHALL_THROW( IOException )

Modified: activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp?rev=1347758&r1=1347757&r2=1347758&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp (original)
+++ activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp Thu Jun  7 19:15:36 2012
@@ -106,6 +106,9 @@ void TransportFilter::close() {
         next->close();
         next.reset( NULL );
     }
+
+    // No need to fire any more async events.
+    this->listener = NULL;
 }
 
 ////////////////////////////////////////////////////////////////////////////////