You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/10/24 22:09:00 UTC
svn commit: r1401852 -
/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
Author: tabish
Date: Wed Oct 24 20:08:59 2012
New Revision: 1401852
URL: http://svn.apache.org/viewvc?rev=1401852&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQCPP-405
Account for advisory consumer in interruption processing.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1401852&r1=1401851&r2=1401852&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Wed Oct 24 20:08:59 2012
@@ -408,12 +408,8 @@ namespace core{
////////////////////////////////////////////////////////////////////////////////
ActiveMQConnection::ActiveMQConnection(const Pointer<transport::Transport> transport,
const Pointer<decaf::util::Properties> properties) :
- config(NULL),
- connectionMetaData(new ActiveMQConnectionMetaData()),
- started(false),
- closed(false),
- closing(false),
- transportFailed(false) {
+ config(NULL), connectionMetaData(new ActiveMQConnectionMetaData()), started(false),
+ closed(false), closing(false), transportFailed(false) {
Pointer<ConnectionConfig> configuration(
new ConnectionConfig(transport, properties));
@@ -432,28 +428,26 @@ ActiveMQConnection::ActiveMQConnection(c
ActiveMQConnection::~ActiveMQConnection() {
try {
- try{
+ try {
this->close();
- } catch(...) {}
+ }
+ AMQ_CATCHALL_NOTHROW()
// This must happen even if exceptions occur in the Close attempt.
delete this->config;
}
- AMQ_CATCH_NOTHROW( ActiveMQException )
- AMQ_CATCHALL_NOTHROW( )
+ AMQ_CATCHALL_NOTHROW()
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::addDispatcher(
- const decaf::lang::Pointer<ConsumerId>& consumer, Dispatcher* dispatcher) {
+void ActiveMQConnection::addDispatcher(const decaf::lang::Pointer<ConsumerId>& consumer, Dispatcher* dispatcher) {
- try{
+ try {
synchronized(&this->config->dispatchers) {
this->config->dispatchers.put(consumer, dispatcher);
}
}
- AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
-}
+ AMQ_CATCH_ALL_THROW_CMSEXCEPTION()}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::removeDispatcher(const decaf::lang::Pointer<ConsumerId>& consumer) {
@@ -463,8 +457,7 @@ void ActiveMQConnection::removeDispatche
this->config->dispatchers.remove(consumer);
}
}
- AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
-}
+ AMQ_CATCH_ALL_THROW_CMSEXCEPTION()}
////////////////////////////////////////////////////////////////////////////////
cms::Session* ActiveMQConnection::createSession() {
@@ -485,8 +478,7 @@ cms::Session* ActiveMQConnection::create
// Create the session instance as a Session Kernel we then create and return a
// ActiveMQSession instance that acts as a proxy to the kernel caller can delete
// that at any time since we only refer to the Pointer to the session kernel.
- Pointer<ActiveMQSessionKernel> session(new ActiveMQSessionKernel(
- this, getNextSessionId(), ackMode, *this->config->properties));
+ Pointer<ActiveMQSessionKernel> session(new ActiveMQSessionKernel(this, getNextSessionId(), ackMode, *this->config->properties));
session->setMessageTransformer(this->config->transformer);
@@ -570,7 +562,7 @@ std::string ActiveMQConnection::getClien
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setClientID( const std::string& clientID ) {
+void ActiveMQConnection::setClientID(const std::string& clientID) {
if (this->closed.get()) {
throw cms::IllegalStateException("Connection is already closed", NULL);
@@ -666,7 +658,7 @@ void ActiveMQConnection::close() {
this->config->activeSessions.clear();
this->config->sessionsLock.writeLock().unlock();
- } catch(Exception& error) {
+ } catch (Exception& error) {
this->config->sessionsLock.writeLock().unlock();
if (!hasException) {
ex = error;
@@ -677,8 +669,7 @@ void ActiveMQConnection::close() {
// As TemporaryQueue and TemporaryTopic instances are bound to a connection
// we should just delete them after the connection is closed to free up memory
- Pointer< Iterator< Pointer< ActiveMQTempDestination> > > iterator(
- this->config->activeTempDestinations.values().iterator());
+ Pointer<Iterator<Pointer<ActiveMQTempDestination> > > iterator(this->config->activeTempDestinations.values().iterator());
try {
while (iterator->hasNext()) {
@@ -761,14 +752,14 @@ void ActiveMQConnection::cleanup() {
try {
// We need to use a copy since we aren't able to use CopyOnWriteArrayList
ArrayList<Pointer<ActiveMQSessionKernel> > sessions(this->config->activeSessions);
- std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > > iter(sessions.iterator());
+ std::auto_ptr<Iterator<Pointer<ActiveMQSessionKernel> > > iter(sessions.iterator());
// Dispose of all the Session resources we know are still open.
while (iter->hasNext()) {
Pointer<ActiveMQSessionKernel> session = iter->next();
- try{
+ try {
session->dispose();
- } catch( cms::CMSException& ex ){
+ } catch (cms::CMSException& ex) {
/* Absorb */
}
}
@@ -813,7 +804,7 @@ void ActiveMQConnection::start() {
this->config->sessionsLock.readLock().lock();
// Start all the sessions.
- std::auto_ptr<Iterator< Pointer<ActiveMQSessionKernel> > > iter(this->config->activeSessions.iterator());
+ std::auto_ptr<Iterator<Pointer<ActiveMQSessionKernel> > > iter(this->config->activeSessions.iterator());
while (iter->hasNext()) {
iter->next()->start();
}
@@ -840,7 +831,7 @@ void ActiveMQConnection::stop() {
// new messages.
if (this->started.compareAndSet(true, false)) {
this->config->sessionsLock.readLock().lock();
- std::auto_ptr<Iterator< Pointer<ActiveMQSessionKernel> > > iter(this->config->activeSessions.iterator());
+ std::auto_ptr<Iterator<Pointer<ActiveMQSessionKernel> > > iter(this->config->activeSessions.iterator());
while (iter->hasNext()) {
iter->next()->stop();
@@ -916,9 +907,9 @@ void ActiveMQConnection::disconnect(long
throw e;
}
}
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
+ AMQ_CATCH_RETHROW(ActiveMQException)
+ AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
+ AMQ_CATCHALL_THROW(ActiveMQException)
}
////////////////////////////////////////////////////////////////////////////////
@@ -926,7 +917,7 @@ void ActiveMQConnection::sendPullRequest
try {
- if (consumer->getPrefetchSize() == 0) {
+ if (consumer->getPrefetchSize() == 0) {
Pointer<MessagePull> messagePull(new MessagePull());
messagePull->setConsumerId(consumer->getConsumerId());
@@ -936,9 +927,9 @@ void ActiveMQConnection::sendPullRequest
this->oneway(messagePull);
}
}
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
+ AMQ_CATCH_RETHROW(ActiveMQException)
+ AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
+ AMQ_CATCHALL_THROW(ActiveMQException)
}
////////////////////////////////////////////////////////////////////////////////
@@ -957,16 +948,16 @@ void ActiveMQConnection::destroyDestinat
command->setConnectionId(this->config->connectionInfo->getConnectionId());
command->setOperationType(ActiveMQConstants::DESTINATION_REMOVE_OPERATION);
- command->setDestination(Pointer<ActiveMQDestination> (destination->cloneDataStructure()));
+ command->setDestination(Pointer<ActiveMQDestination>(destination->cloneDataStructure()));
// Send the message to the broker.
syncRequest(command);
}
- AMQ_CATCH_RETHROW( NullPointerException )
- AMQ_CATCH_RETHROW( decaf::lang::exceptions::IllegalStateException )
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
+ AMQ_CATCH_RETHROW(NullPointerException)
+ AMQ_CATCH_RETHROW(decaf::lang::exceptions::IllegalStateException)
+ AMQ_CATCH_RETHROW(ActiveMQException)
+ AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
+ AMQ_CATCHALL_THROW(ActiveMQException)
}
////////////////////////////////////////////////////////////////////////////////
@@ -981,16 +972,15 @@ void ActiveMQConnection::destroyDestinat
checkClosedOrFailed();
ensureConnectionInfoSent();
- const ActiveMQDestination* amqDestination =
- dynamic_cast<const ActiveMQDestination*> (destination);
+ const ActiveMQDestination* amqDestination = dynamic_cast<const ActiveMQDestination*>(destination);
this->destroyDestination(amqDestination);
}
- AMQ_CATCH_RETHROW( NullPointerException )
- AMQ_CATCH_RETHROW( decaf::lang::exceptions::IllegalStateException )
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
+ AMQ_CATCH_RETHROW(NullPointerException)
+ AMQ_CATCH_RETHROW(decaf::lang::exceptions::IllegalStateException)
+ AMQ_CATCH_RETHROW(ActiveMQException)
+ AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
+ AMQ_CATCHALL_THROW(ActiveMQException)
}
////////////////////////////////////////////////////////////////////////////////
@@ -1031,7 +1021,7 @@ void ActiveMQConnection::onCommand(const
} else if (command->isProducerAck()) {
- ProducerAck* producerAck = dynamic_cast<ProducerAck*>( command.get() );
+ ProducerAck* producerAck = dynamic_cast<ProducerAck*>(command.get());
// Get the consumer info object for this consumer.
Pointer<ActiveMQProducerKernel> producer;
@@ -1061,17 +1051,18 @@ void ActiveMQConnection::onCommand(const
}
synchronized(&this->config->transportListeners) {
- Pointer< Iterator<TransportListener*> > iter(this->config->transportListeners.iterator());
+ Pointer<Iterator<TransportListener*> > iter(this->config->transportListeners.iterator());
while (iter->hasNext()) {
- try{
+ try {
iter->next()->onCommand(command);
- } catch(...) {}
+ } catch (...) {
+ }
}
}
}
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
+ AMQ_CATCH_RETHROW(ActiveMQException)
+ AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
+ AMQ_CATCHALL_THROW(ActiveMQException)
}
////////////////////////////////////////////////////////////////////////////////
@@ -1092,7 +1083,7 @@ void ActiveMQConnection::onConsumerContr
this->config->sessionsLock.readLock().lock();
try {
// Get the complete list of active sessions.
- std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > > iter( this->config->activeSessions.iterator() );
+ std::auto_ptr<Iterator<Pointer<ActiveMQSessionKernel> > > iter(this->config->activeSessions.iterator());
while (iter->hasNext()) {
Pointer<ActiveMQSessionKernel> session = iter->next();
@@ -1110,7 +1101,7 @@ void ActiveMQConnection::onConsumerContr
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::onException( const decaf::lang::Exception& ex ) {
+void ActiveMQConnection::onException(const decaf::lang::Exception& ex) {
try {
@@ -1121,8 +1112,8 @@ void ActiveMQConnection::onException( co
this->config->executor->execute(new OnExceptionRunnable(this, config, ex.clone()));
}
}
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
+ AMQ_CATCH_RETHROW(ActiveMQException)
+ AMQ_CATCHALL_THROW(ActiveMQException)
}
////////////////////////////////////////////////////////////////////////////////
@@ -1139,7 +1130,7 @@ void ActiveMQConnection::onAsyncExceptio
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::onClientInternalException(const decaf::lang::Exception& ex) {
- if ( !closed.get() && !closing.get() ) {
+ if (!closed.get() && !closing.get()) {
if (this->config->exceptionListener != NULL) {
this->config->executor->execute(new OnAsyncExceptionRunnable(this, ex));
@@ -1152,12 +1143,13 @@ void ActiveMQConnection::onClientInterna
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::transportInterrupted() {
- this->config->transportInterruptionProcessingComplete.reset(
- new CountDownLatch( (int)this->config->dispatchers.size() ) );
+ int consumers = this->config->watchTopicAdvisories ? (int) this->config->dispatchers.size() - 1 : (int) this->config->dispatchers.size();
+
+ this->config->transportInterruptionProcessingComplete.reset(new CountDownLatch(consumers));
this->config->sessionsLock.readLock().lock();
try {
- std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > > sessions(this->config->activeSessions.iterator());
+ std::auto_ptr<Iterator<Pointer<ActiveMQSessionKernel> > > sessions(this->config->activeSessions.iterator());
while (sessions->hasNext()) {
sessions->next()->clearMessagesInProgress();
}
@@ -1168,11 +1160,12 @@ void ActiveMQConnection::transportInterr
}
synchronized(&this->config->transportListeners) {
- Pointer< Iterator<TransportListener*> > listeners(this->config->transportListeners.iterator());
+ Pointer<Iterator<TransportListener*> > listeners(this->config->transportListeners.iterator());
while (listeners->hasNext()) {
- try{
+ try {
listeners->next()->transportInterrupted();
- } catch(...) {}
+ } catch (...) {
+ }
}
}
}
@@ -1181,11 +1174,12 @@ void ActiveMQConnection::transportInterr
void ActiveMQConnection::transportResumed() {
synchronized(&this->config->transportListeners) {
- Pointer< Iterator<TransportListener*> > iter( this->config->transportListeners.iterator() );
- while( iter->hasNext() ) {
- try{
+ Pointer<Iterator<TransportListener*> > iter(this->config->transportListeners.iterator());
+ while (iter->hasNext()) {
+ try {
iter->next()->transportResumed();
- } catch(...) {}
+ } catch (...) {
+ }
}
}
}
@@ -1197,10 +1191,10 @@ void ActiveMQConnection::oneway(Pointer<
checkClosedOrFailed();
this->config->transport->oneway(command);
}
- AMQ_CATCH_EXCEPTION_CONVERT( IOException, ActiveMQException )
- AMQ_CATCH_EXCEPTION_CONVERT( decaf::lang::exceptions::UnsupportedOperationException, ActiveMQException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
+ AMQ_CATCH_EXCEPTION_CONVERT(IOException, ActiveMQException)
+ AMQ_CATCH_EXCEPTION_CONVERT(decaf::lang::exceptions::UnsupportedOperationException, ActiveMQException)
+ AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
+ AMQ_CATCHALL_THROW(ActiveMQException)
}
////////////////////////////////////////////////////////////////////////////////
@@ -1218,8 +1212,7 @@ Pointer<Response> ActiveMQConnection::sy
response = this->config->transport->request(command, timeout);
}
- commands::ExceptionResponse* exceptionResponse =
- dynamic_cast<ExceptionResponse*> (response.get());
+ commands::ExceptionResponse* exceptionResponse = dynamic_cast<ExceptionResponse*>(response.get());
if (exceptionResponse != NULL) {
@@ -1267,9 +1260,7 @@ void ActiveMQConnection::asyncRequest(Po
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::checkClosed() const {
if (this->isClosed()) {
- throw ActiveMQException(
- __FILE__, __LINE__,
- "ActiveMQConnection::enforceConnected - Connection has already been closed!" );
+ throw ActiveMQException(__FILE__, __LINE__, "ActiveMQConnection::enforceConnected - Connection has already been closed!");
}
}
@@ -1278,7 +1269,7 @@ void ActiveMQConnection::checkClosedOrFa
checkClosed();
if (this->transportFailed.get() == true) {
- throw ConnectionFailedException( *this->config->firstFailureError );
+ throw ConnectionFailedException(*this->config->firstFailureError);
}
}
@@ -1316,9 +1307,9 @@ void ActiveMQConnection::ensureConnectio
}
}
}
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
+ AMQ_CATCH_RETHROW(ActiveMQException)
+ AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
+ AMQ_CATCHALL_THROW(ActiveMQException)
}
////////////////////////////////////////////////////////////////////////////////
@@ -1405,19 +1396,17 @@ void ActiveMQConnection::signalInterrupt
if (cdl->getCount() == 0) {
this->config->transportInterruptionProcessingComplete.reset(NULL);
- FailoverTransport* failoverTransport =
- dynamic_cast<FailoverTransport*>(this->config->transport->narrow(typeid(FailoverTransport)));
+ FailoverTransport* failoverTransport = dynamic_cast<FailoverTransport*>(this->config->transport->narrow(typeid(FailoverTransport)));
if (failoverTransport != NULL) {
- failoverTransport->setConnectionInterruptProcessingComplete(
- this->config->connectionInfo->getConnectionId());
+ failoverTransport->setConnectionInterruptProcessingComplete(this->config->connectionInfo->getConnectionId());
}
}
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setUsername(const std::string& username) {
- this->config->connectionInfo->setUserName( username );
+ this->config->connectionInfo->setUserName(username);
}
////////////////////////////////////////////////////////////////////////////////
@@ -1427,7 +1416,7 @@ const std::string& ActiveMQConnection::g
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::setPassword(const std::string& password) {
- this->config->connectionInfo->setPassword( password );
+ this->config->connectionInfo->setPassword(password);
}
////////////////////////////////////////////////////////////////////////////////
@@ -1491,7 +1480,7 @@ bool ActiveMQConnection::isDispatchAsync
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setDispatchAsync( bool value ) {
+void ActiveMQConnection::setDispatchAsync(bool value) {
this->config->dispatchAsync = value;
}
@@ -1675,7 +1664,7 @@ void ActiveMQConnection::deleteTempDesti
this->config->sessionsLock.readLock().lock();
try {
- Pointer< Iterator< Pointer<ActiveMQSessionKernel> > > iterator(this->config->activeSessions.iterator());
+ Pointer<Iterator<Pointer<ActiveMQSessionKernel> > > iterator(this->config->activeSessions.iterator());
while (iterator->hasNext()) {
Pointer<ActiveMQSessionKernel> session = iterator->next();
if (session->isInUse(destination)) {
@@ -1695,16 +1684,16 @@ void ActiveMQConnection::deleteTempDesti
command->setConnectionId(this->config->connectionInfo->getConnectionId());
command->setOperationType(ActiveMQConstants::DESTINATION_REMOVE_OPERATION);
- command->setDestination(Pointer<ActiveMQDestination> (destination->cloneDataStructure()));
+ command->setDestination(Pointer<ActiveMQDestination>(destination->cloneDataStructure()));
// Send the message to the broker.
syncRequest(command);
}
- AMQ_CATCH_RETHROW( NullPointerException )
- AMQ_CATCH_RETHROW( decaf::lang::exceptions::IllegalStateException )
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
+ AMQ_CATCH_RETHROW(NullPointerException)
+ AMQ_CATCH_RETHROW(decaf::lang::exceptions::IllegalStateException)
+ AMQ_CATCH_RETHROW(ActiveMQException)
+ AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
+ AMQ_CATCHALL_THROW(ActiveMQException)
}
////////////////////////////////////////////////////////////////////////////////
@@ -1714,8 +1703,7 @@ void ActiveMQConnection::cleanUpTempDest
return;
}
- Pointer< Iterator< Pointer< ActiveMQTempDestination> > > iterator(
- this->config->activeTempDestinations.values().iterator());
+ Pointer<Iterator<Pointer<ActiveMQTempDestination> > > iterator(this->config->activeTempDestinations.values().iterator());
while (iterator->hasNext()) {
Pointer<ActiveMQTempDestination> dest = iterator->next();
@@ -1724,13 +1712,13 @@ void ActiveMQConnection::cleanUpTempDest
// Only delete this temporary destination if it was created from this connection, since the
// advisory consumer tracks all temporary destinations there can be others in our mapping that
// this connection did not create.
- std::string thisConnectionId = this->config->connectionInfo->getConnectionId() != NULL ?
- this->config->connectionInfo->getConnectionId()->toString() : "";
+ std::string thisConnectionId =
+ this->config->connectionInfo->getConnectionId() != NULL ? this->config->connectionInfo->getConnectionId()->toString() : "";
if (dest->getConnectionId() == thisConnectionId) {
this->deleteTempDestination(dest);
}
- } catch(Exception& ex) {
+ } catch (Exception& ex) {
}
}
}