You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/06/07 21:15:36 UTC
svn commit: r1347758 - in
/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq:
core/ActiveMQConnection.cpp transport/IOTransport.cpp
transport/TransportFilter.cpp
Author: tabish
Date: Thu Jun 7 19:15:36 2012
New Revision: 1347758
URL: http://svn.apache.org/viewvc?rev=1347758&view=rev
Log:
fixes for: https://issues.apache.org/jira/browse/AMQCPP-407
Modified:
activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/IOTransport.cpp
activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp
Modified: activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1347758&r1=1347757&r2=1347758&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Thu Jun 7 19:15:36 2012
@@ -406,54 +406,78 @@ void ActiveMQConnection::close() {
try {
- if( this->isClosed() ) {
+ if (this->isClosed()) {
return;
}
+ Exception ex;
+ bool hasException = false;
+
// If we are running lets stop first.
- if( !this->transportFailed.get() ) {
- this->stop();
+ if (!this->transportFailed.get()) {
+ try {
+ this->stop();
+ } catch (cms::CMSException& error) {
+ if (!hasException) {
+ ex = Exception(&error);
+ ex.setMark(__FILE__, __LINE__);
+ hasException = true;
+ }
+ }
}
// Indicates we are on the way out to suppress any exceptions getting
// passed on from the transport as it goes down.
this->closing.set( true );
- if(this->config->scheduler != NULL) {
+ if (this->config->scheduler != NULL) {
try {
this->config->scheduler->stop();
- }
- AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+ } catch (Exception& error) {
+ if (!hasException) {
+ ex = error;
+ ex.setMark(__FILE__, __LINE__);
+ hasException = true;
+ }
+ }
}
long long lastDeliveredSequenceId = 0;
synchronized( &this->config->activeSessions ) {
-
// Get the complete list of active sessions.
- std::auto_ptr< Iterator<ActiveMQSession*> > iter( this->config->activeSessions.iterator() );
+ std::auto_ptr<Iterator<ActiveMQSession*> > iter(this->config->activeSessions.iterator());
// Dispose of all the Session resources we know are still open.
- while( iter->hasNext() ) {
+ while (iter->hasNext()) {
ActiveMQSession* session = iter->next();
- try{
+ try {
session->dispose();
-
- lastDeliveredSequenceId =
- Math::max( lastDeliveredSequenceId, session->getLastDeliveredSequenceId() );
- } catch( cms::CMSException& ex ){
- /* Absorb */
+ lastDeliveredSequenceId = Math::max(lastDeliveredSequenceId, session->getLastDeliveredSequenceId());
+ } catch(cms::CMSException& ex){
}
}
}
// Now inform the Broker we are shutting down.
- this->disconnect( lastDeliveredSequenceId );
+ try {
+ this->disconnect(lastDeliveredSequenceId);
+ } catch (Exception& error) {
+ if (!hasException) {
+ ex = error;
+ ex.setMark(__FILE__, __LINE__);
+ hasException = true;
+ }
+ }
// Once current deliveries are done this stops the delivery
// of any new messages.
- this->started.set( false );
- this->closed.set( true );
+ this->started.set(false);
+ this->closed.set(true);
+
+ if (hasException) {
+ throw ex;
+ }
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
@@ -550,50 +574,58 @@ void ActiveMQConnection::disconnect( lon
// Clear the listener, we don't care about async errors at this point.
this->config->transport->setTransportListener( NULL );
- if( this->config->isConnectionInfoSentToBroker ) {
+ // Allow the Support class to shutdown its resources, including the Transport.
+ bool hasException = false;
+ Exception e;
+
+ if (this->config->isConnectionInfoSentToBroker) {
// Remove our ConnectionId from the Broker
- Pointer<RemoveInfo> command( this->config->connectionInfo->createRemoveCommand() );
- command->setLastDeliveredSequenceId( lastDeliveredSequenceId );
- this->syncRequest( command, this->config->closeTimeout );
+ try {
+ Pointer<RemoveInfo> command(this->config->connectionInfo->createRemoveCommand());
+ command->setLastDeliveredSequenceId(lastDeliveredSequenceId);
+ this->syncRequest(command, this->config->closeTimeout);
+ } catch (Exception& ex) {
+ if (!hasException) {
+ hasException = true;
+ ex.setMark(__FILE__, __LINE__);
+ e = ex;
+ }
+ }
// Send the disconnect command to the broker.
- Pointer<ShutdownInfo> shutdown( new ShutdownInfo() );
- oneway( shutdown );
+ try {
+ Pointer<ShutdownInfo> shutdown(new ShutdownInfo());
+ oneway(shutdown);
+ } catch (Exception& ex) {
+ if (!hasException) {
+ hasException = true;
+ ex.setMark(__FILE__, __LINE__);
+ e = ex;
+ }
+ }
}
- // Allow the Support class to shutdown its resources, including the Transport.
- bool hasException = false;
- exceptions::ActiveMQException e;
+ if (this->config->transport != NULL) {
- if( this->config->transport != NULL ){
-
- try{
- this->config->transport->close();
- }catch( exceptions::ActiveMQException& ex ){
- if( !hasException ){
- hasException = true;
- ex.setMark(__FILE__, __LINE__ );
- e = ex;
- }
- }
+ try {
+ this->config->transport->close();
+ } catch (Exception& ex) {
+ if (!hasException) {
+ hasException = true;
+ ex.setMark(__FILE__, __LINE__);
+ e = ex;
+ }
+ }
- try{
- this->config->transport.reset( NULL );
- }catch( exceptions::ActiveMQException& ex ){
- if( !hasException ){
- hasException = true;
- ex.setMark(__FILE__, __LINE__ );
- e = ex;
- }
- }
- }
+ this->config->transport.reset(NULL);
+ }
// If we encountered an exception - throw the first one we encountered.
// This will preserve the stack trace for logging purposes.
- if( hasException ){
- throw e;
- }
+ if (hasException) {
+ throw e;
+ }
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
Modified: activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/IOTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/IOTransport.cpp?rev=1347758&r1=1347757&r2=1347758&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/IOTransport.cpp (original)
+++ activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/IOTransport.cpp Thu Jun 7 19:15:36 2012
@@ -181,40 +181,55 @@ void IOTransport::stop() {
////////////////////////////////////////////////////////////////////////////////
void IOTransport::close() {
- try{
+ class Finalizer {
+ private:
- if( closed ){
- return;
- }
+ Pointer<Thread> target;
- // Mark this transport as closed.
- closed = true;
+ public:
- // We have to close the input stream before
- // we stop the thread. this will force us to
- // wake up the thread if it's stuck in a read
- // (which is likely). Otherwise, the join that
- // follows will block forever.
- if( inputStream != NULL ){
- inputStream->close();
- inputStream = NULL;
- }
-
- // Wait for the thread to die.
- if( thread != NULL ){
- thread->join();
- thread.reset( NULL );
- }
-
- // Close the output stream.
- if( outputStream != NULL ){
- outputStream->close();
- outputStream = NULL;
- }
-
- // Clear the WireFormat so we can't use it anymore
- this->wireFormat.reset( NULL );
- }
+ Finalizer(Pointer<Thread> target) : target(target) {}
+ ~Finalizer() {
+ try {
+ target->join();
+ target.reset(NULL);
+ }
+ DECAF_CATCHALL_NOTHROW()
+ }
+ };
+
+ try {
+
+ if (closed) {
+ return;
+ }
+
+
+ // Mark this transport as closed.
+ closed = true;
+
+ Finalizer finalize(thread);
+
+ // No need to fire anymore async events now.
+ this->listener = NULL;
+
+ // We have to close the input stream before we stop the thread. this will
+ // force us to wake up the thread if it's stuck in a read (which is likely).
+ // Otherwise, the join that follows will block forever.
+ if (inputStream != NULL) {
+ inputStream->close();
+ inputStream = NULL;
+ }
+
+ // Close the output stream.
+ if (outputStream != NULL) {
+ outputStream->close();
+ outputStream = NULL;
+ }
+
+ // Clear the WireFormat so we can't use it anymore
+ this->wireFormat.reset(NULL);
+ }
AMQ_CATCH_RETHROW( IOException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
AMQ_CATCHALL_THROW( IOException )
Modified: activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp?rev=1347758&r1=1347757&r2=1347758&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp (original)
+++ activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp Thu Jun 7 19:15:36 2012
@@ -106,6 +106,9 @@ void TransportFilter::close() {
next->close();
next.reset( NULL );
}
+
+ // No need to fire any more async events.
+ this->listener = NULL;
}
////////////////////////////////////////////////////////////////////////////////