You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/06/07 23:34:19 UTC
svn commit: r1347807 - in
/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq:
core/ActiveMQConnection.cpp transport/IOTransport.cpp
transport/TransportFilter.cpp wireformat/openwire/OpenWireFormat.cpp
Author: tabish
Date: Thu Jun 7 21:34:18 2012
New Revision: 1347807
URL: http://svn.apache.org/viewvc?rev=1347807&view=rev
Log:
Merge fixes for better connection shutdown logic.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormat.cpp
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1347807&r1=1347806&r2=1347807&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Thu Jun 7 21:34:18 2012
@@ -516,20 +516,36 @@ void ActiveMQConnection::close() {
return;
}
+ Exception ex;
+ bool hasException = false;
+
// If we are running lets stop first.
if (!this->transportFailed.get()) {
- this->stop();
+ try {
+ this->stop();
+ } catch (cms::CMSException& error) {
+ if (!hasException) {
+ ex = Exception(&error);
+ ex.setMark(__FILE__, __LINE__);
+ hasException = true;
+ }
+ }
}
// Indicates we are on the way out to suppress any exceptions getting
// passed on from the transport as it goes down.
- this->closing.set( true );
+ this->closing.set(true);
if (this->config->scheduler != NULL) {
try {
this->config->scheduler->stop();
+ } catch (Exception& error) {
+ if (!hasException) {
+ ex = error;
+ ex.setMark(__FILE__, __LINE__);
+ hasException = true;
+ }
}
- AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
// Get the complete list of active sessions.
@@ -540,39 +556,67 @@ void ActiveMQConnection::close() {
// Dispose of all the Session resources we know are still open.
while (iter->hasNext()) {
Pointer<ActiveMQSessionKernel> session = iter->next();
- try{
+ try {
session->dispose();
- lastDeliveredSequenceId =
- Math::max(lastDeliveredSequenceId, session->getLastDeliveredSequenceId());
+ lastDeliveredSequenceId = Math::max(lastDeliveredSequenceId, session->getLastDeliveredSequenceId());
} catch( cms::CMSException& ex ){
- /* Absorb */
}
}
- // As TemporaryQueue and TemporaryTopic instances are bound
- // to a connection we should just delete them after the connection
- // is closed to free up memory
+ // As TemporaryQueue and TemporaryTopic instances are bound to a connection
+ // we should just delete them after the connection is closed to free up memory
std::vector< Pointer<ActiveMQTempDestination> > values = this->config->activeTempDestinations.values();
std::vector< Pointer<ActiveMQTempDestination> >::iterator iterator = values.begin();
- for(; iterator != values.end(); ++iterator) {
- Pointer<ActiveMQTempDestination> dest = *iterator;
- dest->close();
+ try {
+ for(; iterator != values.end(); ++iterator) {
+ Pointer<ActiveMQTempDestination> dest = *iterator;
+ dest->close();
+ }
+ } catch (cms::CMSException& error) {
+ if (!hasException) {
+ ex = Exception(&error);
+ ex.setMark(__FILE__, __LINE__);
+ hasException = true;
+ }
+ } catch (std::exception& stdex) {
+ if (!hasException) {
+ ex = Exception(&stdex);
+ ex.setMark(__FILE__, __LINE__);
+ hasException = true;
+ }
}
try {
if (this->config->executor != NULL) {
this->config->executor->shutdown();
}
- } catch(Exception& ex) {
+ } catch (Exception& error) {
+ if (!hasException) {
+ ex = error;
+ ex.setMark(__FILE__, __LINE__);
+ hasException = true;
+ }
}
// Now inform the Broker we are shutting down.
- this->disconnect(lastDeliveredSequenceId);
+ try {
+ this->disconnect(lastDeliveredSequenceId);
+ } catch (Exception& error) {
+ if (!hasException) {
+ ex = error;
+ ex.setMark(__FILE__, __LINE__);
+ hasException = true;
+ }
+ }
// Once current deliveries are done this stops the delivery
// of any new messages.
this->started.set(false);
this->closed.set(true);
+
+ if (hasException) {
+ throw ex;
+ }
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
@@ -713,15 +757,7 @@ void ActiveMQConnection::disconnect(long
}
}
- try {
- this->config->transport.reset(NULL);
- } catch (exceptions::ActiveMQException& ex) {
- if (!hasException) {
- hasException = true;
- ex.setMark(__FILE__, __LINE__);
- e = ex;
- }
- }
+ this->config->transport.reset(NULL);
}
// If we encountered an exception - throw the first one we encountered.
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp?rev=1347807&r1=1347806&r2=1347807&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp Thu Jun 7 21:34:18 2012
@@ -181,39 +181,54 @@ void IOTransport::stop() {
////////////////////////////////////////////////////////////////////////////////
void IOTransport::close() {
- try{
+ class Finalizer {
+ private:
+
+ Pointer<Thread> target;
+
+ public:
+
+ Finalizer(Pointer<Thread> target) : target(target) {}
+ ~Finalizer() {
+ try {
+ target->join();
+ target.reset(NULL);
+ }
+ DECAF_CATCHALL_NOTHROW()
+ }
+ };
+
+ try {
- if( closed ){
+ if (closed) {
return;
}
+
// Mark this transport as closed.
closed = true;
- // We have to close the input stream before
- // we stop the thread. this will force us to
- // wake up the thread if it's stuck in a read
- // (which is likely). Otherwise, the join that
- // follows will block forever.
- if( inputStream != NULL ){
+ Finalizer finalize(thread);
+
+ // No need to fire anymore async events now.
+ this->listener = NULL;
+
+ // We have to close the input stream before we stop the thread. this will
+ // force us to wake up the thread if it's stuck in a read (which is likely).
+ // Otherwise, the join that follows will block forever.
+ if (inputStream != NULL) {
inputStream->close();
inputStream = NULL;
}
- // Wait for the thread to die.
- if( thread != NULL ){
- thread->join();
- thread.reset( NULL );
- }
-
// Close the output stream.
- if( outputStream != NULL ){
+ if (outputStream != NULL) {
outputStream->close();
outputStream = NULL;
}
// Clear the WireFormat so we can't use it anymore
- this->wireFormat.reset( NULL );
+ this->wireFormat.reset(NULL);
}
AMQ_CATCH_RETHROW( IOException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp?rev=1347807&r1=1347806&r2=1347807&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp Thu Jun 7 21:34:18 2012
@@ -108,6 +108,8 @@ void TransportFilter::close() {
next->close();
next.reset( NULL );
}
+
+ listener = NULL;
}
////////////////////////////////////////////////////////////////////////////////
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormat.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormat.cpp?rev=1347807&r1=1347806&r2=1347807&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormat.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormat.cpp Thu Jun 7 21:34:18 2012
@@ -50,7 +50,7 @@ using namespace decaf::lang::exceptions;
////////////////////////////////////////////////////////////////////////////////
const unsigned char OpenWireFormat::NULL_TYPE = 0;
const int OpenWireFormat::DEFAULT_VERSION = 1;
-const int OpenWireFormat::MAX_SUPPORTED_VERSION = 6;
+const int OpenWireFormat::MAX_SUPPORTED_VERSION = 9;
////////////////////////////////////////////////////////////////////////////////
OpenWireFormat::OpenWireFormat( const decaf::util::Properties& properties ) :