You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/10/12 00:39:49 UTC
svn commit: r1397341 [2/4] - in
/activemq/activemq-cpp/trunk/activemq-cpp/src: main/ main/activemq/core/
main/activemq/transport/ main/activemq/transport/correlator/
main/activemq/transport/failover/ main/activemq/transport/inactivity/
main/activemq/tr...
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp?rev=1397341&r1=1397340&r2=1397341&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp Thu Oct 11 22:39:46 2012
@@ -39,81 +39,75 @@ using namespace decaf::util;
using namespace decaf::util::concurrent;
////////////////////////////////////////////////////////////////////////////////
-BackupTransportPool::BackupTransportPool( const Pointer<CompositeTaskRunner>& taskRunner,
- const Pointer<CloseTransportsTask>& closeTask,
- const Pointer<URIPool>& uriPool ) : backups(),
- taskRunner(taskRunner),
- closeTask(closeTask),
- uriPool(uriPool),
- backupPoolSize(1),
- enabled(false),
- pending(false) {
-
- if( taskRunner == NULL ) {
- throw NullPointerException(
- __FILE__, __LINE__, "TaskRunner passed is NULL" );
- }
-
- if( uriPool == NULL ) {
- throw NullPointerException(
- __FILE__, __LINE__, "URIPool passed is NULL" );
- }
-
- if( closeTask == NULL ) {
- throw NullPointerException(
- __FILE__, __LINE__, "Close Transport Task passed is NULL" );
+BackupTransportPool::BackupTransportPool(const Pointer<CompositeTaskRunner> taskRunner,
+ const Pointer<CloseTransportsTask> closeTask,
+ const Pointer<URIPool> uriPool) : backups(),
+ taskRunner(taskRunner),
+ closeTask(closeTask),
+ uriPool(uriPool),
+ backupPoolSize(1),
+ enabled(false),
+ pending(false) {
+
+ if (taskRunner == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "TaskRunner passed is NULL");
+ }
+
+ if (uriPool == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "URIPool passed is NULL");
+ }
+
+ if (closeTask == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "Close Transport Task passed is NULL");
}
// Add this instance as a Task so that we can create backups when nothing else is
// going on.
- this->taskRunner->addTask( this );
+ this->taskRunner->addTask(this);
}
////////////////////////////////////////////////////////////////////////////////
-BackupTransportPool::BackupTransportPool( int backupPoolSize,
- const Pointer<CompositeTaskRunner>& taskRunner,
- const Pointer<CloseTransportsTask>& closeTask,
- const Pointer<URIPool>& uriPool ) : backups(),
- taskRunner(taskRunner),
- closeTask(closeTask),
- uriPool(uriPool),
- backupPoolSize(backupPoolSize),
- enabled(false),
- pending(false) {
+BackupTransportPool::BackupTransportPool(int backupPoolSize,
+ const Pointer<CompositeTaskRunner> taskRunner,
+ const Pointer<CloseTransportsTask> closeTask,
+ const Pointer<URIPool> uriPool ) : backups(),
+ taskRunner(taskRunner),
+ closeTask(closeTask),
+ uriPool(uriPool),
+ backupPoolSize(backupPoolSize),
+ enabled(false),
+ pending(false) {
- if( taskRunner == NULL ) {
- throw NullPointerException(
- __FILE__, __LINE__, "TaskRunner passed is NULL" );
+ if (taskRunner == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "TaskRunner passed is NULL");
}
- if( uriPool == NULL ) {
- throw NullPointerException(
- __FILE__, __LINE__, "URIPool passed is NULL" );
+ if (uriPool == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "URIPool passed is NULL");
}
- if( closeTask == NULL ) {
- throw NullPointerException(
- __FILE__, __LINE__, "Close Transport Task passed is NULL" );
+ if (closeTask == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "Close Transport Task passed is NULL");
}
// Add this instance as a Task so that we can create backups when nothing else is
// going on.
- this->taskRunner->addTask( this );
+ this->taskRunner->addTask(this);
}
////////////////////////////////////////////////////////////////////////////////
BackupTransportPool::~BackupTransportPool() {
- this->taskRunner->removeTask( this );
+ this->taskRunner->removeTask(this);
}
////////////////////////////////////////////////////////////////////////////////
-void BackupTransportPool::setEnabled( bool value ) {
+void BackupTransportPool::setEnabled(bool value) {
this->enabled = value;
- if( enabled == true ) {
+ if (enabled == true) {
this->taskRunner->wakeup();
} else {
- synchronized( &backups ) {
+ synchronized(&backups) {
this->backups.clear();
}
}
@@ -122,16 +116,15 @@ void BackupTransportPool::setEnabled( bo
////////////////////////////////////////////////////////////////////////////////
Pointer<BackupTransport> BackupTransportPool::getBackup() {
- if( !isEnabled() ) {
- throw IllegalStateException(
- __FILE__, __LINE__, "The Backup Pool is not enabled." );
+ if (!isEnabled()) {
+ throw IllegalStateException(__FILE__, __LINE__, "The Backup Pool is not enabled.");
}
Pointer<BackupTransport> result;
- synchronized( &backups ) {
- if( !backups.isEmpty() ) {
- result = backups.removeAt( 0 );
+ synchronized(&backups) {
+ if (!backups.isEmpty()) {
+ result = backups.removeAt(0);
}
}
@@ -145,7 +138,7 @@ Pointer<BackupTransport> BackupTransport
////////////////////////////////////////////////////////////////////////////////
bool BackupTransportPool::isPending() const {
- if( this->isEnabled() ) {
+ if (this->isEnabled()) {
return this->pending;
}
@@ -157,83 +150,80 @@ bool BackupTransportPool::iterate() {
LinkedList<URI> failures;
- synchronized( &backups ) {
+ synchronized(&backups) {
- while( isEnabled() && (int)backups.size() < backupPoolSize ) {
+ while (isEnabled() && (int) backups.size() < backupPoolSize) {
URI connectTo;
// Try for a URI, if one isn't free return and indicate this task
// is done for now, the next time a backup is requested this task
// will become pending again and we will attempt to fill the pool.
- try{
+ try {
connectTo = uriPool->getURI();
- } catch( NoSuchElementException& ex ) {
+ } catch (NoSuchElementException& ex) {
break;
}
- Pointer<BackupTransport> backup( new BackupTransport( this ) );
- backup->setUri( connectTo );
+ Pointer<BackupTransport> backup(new BackupTransport(this));
+ backup->setUri(connectTo);
- try{
- Pointer<Transport> transport = createTransport( connectTo );
- transport->setTransportListener( backup.get() );
+ try {
+ Pointer<Transport> transport = createTransport(connectTo);
+ transport->setTransportListener(backup.get());
transport->start();
- backup->setTransport( transport );
- backups.add( backup );
- } catch(...) {
+ backup->setTransport(transport);
+ backups.add(backup);
+ } catch (...) {
// Store it in the list of URIs that didn't work, once done we
// return those to the pool.
- failures.add( connectTo );
+ failures.add(connectTo);
}
-
}
}
// return all failures to the URI Pool, we can try again later.
- uriPool->addURIs( failures );
+ uriPool->addURIs(failures);
this->pending = false;
return false;
}
////////////////////////////////////////////////////////////////////////////////
-void BackupTransportPool::onBackupTransportFailure( BackupTransport* failedTransport ) {
+void BackupTransportPool::onBackupTransportFailure(BackupTransport* failedTransport) {
- synchronized( &backups ) {
+ synchronized(&backups) {
- std::auto_ptr< Iterator< Pointer<BackupTransport> > > iter( backups.iterator() );
+ std::auto_ptr<Iterator<Pointer<BackupTransport> > > iter(backups.iterator());
- while( iter->hasNext() ) {
- if( iter->next() == failedTransport ) {
+ while (iter->hasNext()) {
+ if (iter->next() == failedTransport) {
iter->remove();
}
- this->uriPool->addURI( failedTransport->getUri() );
- this->closeTask->add( failedTransport->getTransport() );
+ this->uriPool->addURI(failedTransport->getUri());
+ this->closeTask->add(failedTransport->getTransport());
this->taskRunner->wakeup();
}
}
}
////////////////////////////////////////////////////////////////////////////////
-Pointer<Transport> BackupTransportPool::createTransport( const URI& location ) const {
+Pointer<Transport> BackupTransportPool::createTransport(const URI& location) const {
- try{
+ try {
- TransportFactory* factory =
- TransportRegistry::getInstance().findFactory( location.getScheme() );
+ TransportFactory* factory = TransportRegistry::getInstance().findFactory(location.getScheme());
- if( factory == NULL ) {
- throw new IOException(
- __FILE__, __LINE__, "Invalid URI specified, no valid Factory Found.");
+ if (factory == NULL) {
+ throw new IOException(__FILE__, __LINE__, "Invalid URI specified, no valid Factory Found.");
}
- Pointer<Transport> transport( factory->createComposite( location ) );
+ Pointer<Transport> transport(factory->createComposite(location));
return transport;
}
- AMQ_CATCH_RETHROW( IOException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
- AMQ_CATCHALL_THROW( IOException )
+ AMQ_CATCH_RETHROW(IOException)
+ AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
+ AMQ_CATCHALL_THROW(IOException)
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.h?rev=1397341&r1=1397340&r2=1397341&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.h Thu Oct 11 22:39:46 2012
@@ -52,14 +52,14 @@ namespace failover {
public:
- BackupTransportPool( const Pointer<CompositeTaskRunner>& taskRunner,
- const Pointer<CloseTransportsTask>& closeTask,
- const Pointer<URIPool>& uriPool );
-
- BackupTransportPool( int backupPoolSize,
- const Pointer<CompositeTaskRunner>& taskRunner,
- const Pointer<CloseTransportsTask>& closeTask,
- const Pointer<URIPool>& uriPool );
+ BackupTransportPool(const Pointer<CompositeTaskRunner> taskRunner,
+ const Pointer<CloseTransportsTask> closeTask,
+ const Pointer<URIPool> uriPool);
+
+ BackupTransportPool(int backupPoolSize,
+ const Pointer<CompositeTaskRunner> taskRunner,
+ const Pointer<CloseTransportsTask> closeTask,
+ const Pointer<URIPool> uriPool);
virtual ~BackupTransportPool();
@@ -94,7 +94,7 @@ namespace failover {
* Sets the Max number of Backups this Task will create.
* @param size - the max number of active BackupTransports that will be created.
*/
- void setBackupPoolSize( int size ) {
+ void setBackupPoolSize(int size) {
this->backupPoolSize = size;
}
@@ -114,16 +114,16 @@ namespace failover {
*
* @param value - true to enable backup creation, false to disable.
*/
- void setEnabled( bool value );
+ void setEnabled(bool value);
private:
// The backups report their failure to the pool, the pool removes them
// from the list and returns their URIs to the URIPool, and then adds
// the internal transport to the close transport's task for cleanup.
- void onBackupTransportFailure( BackupTransport* failedTransport );
+ void onBackupTransportFailure(BackupTransport* failedTransport);
- Pointer<Transport> createTransport( const URI& location ) const;
+ Pointer<Transport> createTransport(const URI& location) const;
};
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/CloseTransportsTask.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/CloseTransportsTask.cpp?rev=1397341&r1=1397340&r2=1397341&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/CloseTransportsTask.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/CloseTransportsTask.cpp Thu Oct 11 22:39:46 2012
@@ -32,7 +32,6 @@ using namespace decaf::util::concurrent;
////////////////////////////////////////////////////////////////////////////////
CloseTransportsTask::CloseTransportsTask() :
transports() {
-
}
////////////////////////////////////////////////////////////////////////////////
@@ -41,7 +40,7 @@ CloseTransportsTask::~CloseTransportsTas
}
////////////////////////////////////////////////////////////////////////////////
-void CloseTransportsTask::add(const Pointer<Transport>& transport) {
+void CloseTransportsTask::add(const Pointer<Transport> transport) {
transports.put(transport);
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/CloseTransportsTask.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/CloseTransportsTask.h?rev=1397341&r1=1397340&r2=1397341&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/CloseTransportsTask.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/CloseTransportsTask.h Thu Oct 11 22:39:46 2012
@@ -45,7 +45,7 @@ namespace failover {
/**
* Add a new Transport to close.
*/
- void add(const Pointer<Transport>& transport);
+ void add(const Pointer<Transport> transport);
/**
* This Task is pending if there are transports in the Queue that need to be
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp?rev=1397341&r1=1397340&r2=1397341&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp Thu Oct 11 22:39:46 2012
@@ -77,52 +77,51 @@ FailoverTransport::FailoverTransport() :
closeTask(new CloseTransportsTask()),
taskRunner(new CompositeTaskRunner()),
disposedListener(),
- myTransportListener(new FailoverTransportListener( this )),
+ myTransportListener(new FailoverTransportListener(this)),
transportListener(NULL) {
- this->stateTracker.setTrackTransactions( true );
- this->backups.reset( new BackupTransportPool( taskRunner, closeTask, uris ) );
+ this->stateTracker.setTrackTransactions(true);
+ this->backups.reset(new BackupTransportPool(taskRunner, closeTask, uris));
- this->taskRunner->addTask( this );
- this->taskRunner->addTask( this->closeTask.get() );
+ this->taskRunner->addTask(this);
+ this->taskRunner->addTask(this->closeTask.get());
}
////////////////////////////////////////////////////////////////////////////////
FailoverTransport::~FailoverTransport() {
- try{
+ try {
close();
}
- AMQ_CATCH_NOTHROW( Exception )
+ AMQ_CATCH_NOTHROW( Exception)
AMQ_CATCHALL_NOTHROW()
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::add( const std::string& uri ) {
+void FailoverTransport::add(const std::string& uri) {
try {
- uris->addURI( URI( uri ) );
-
- reconnect( false );
+ uris->addURI(URI(uri));
+ reconnect(false);
}
AMQ_CATCHALL_NOTHROW()
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::addURI( bool rebalance, const List<URI>& uris ) {
+void FailoverTransport::addURI(bool rebalance, const List<URI>& uris) {
- std::auto_ptr< Iterator<URI> > iter( uris.iterator() );
+ std::auto_ptr<Iterator<URI> > iter(uris.iterator());
- while( iter->hasNext() ) {
- this->uris->addURI( iter->next() );
+ while (iter->hasNext()) {
+ this->uris->addURI(iter->next());
}
- reconnect( rebalance );
+ reconnect(rebalance);
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::removeURI( bool rebalance, const List<URI>& uris ) {
+void FailoverTransport::removeURI(bool rebalance, const List<URI>& uris) {
- std::auto_ptr< Iterator<URI> > iter( uris.iterator() );
+ std::auto_ptr<Iterator<URI> > iter(uris.iterator());
synchronized( &reconnectMutex ) {
@@ -130,30 +129,30 @@ void FailoverTransport::removeURI( bool
// we have a chance to remove the URIs in case one of them was the one
// we had a connection to and it gets reinserted into the URI pool.
- reconnect( rebalance );
+ reconnect(rebalance);
- while( iter->hasNext() ) {
- this->uris->removeURI( iter->next() );
+ while (iter->hasNext()) {
+ this->uris->removeURI(iter->next());
}
}
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::reconnect( const decaf::net::URI& uri ) {
+void FailoverTransport::reconnect(const decaf::net::URI& uri) {
try {
- this->uris->addURI( uri );
+ this->uris->addURI(uri);
- reconnect( true );
+ reconnect(true);
}
- AMQ_CATCH_RETHROW( IOException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
- AMQ_CATCHALL_THROW( IOException )
+ AMQ_CATCH_RETHROW( IOException)
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException)
+ AMQ_CATCHALL_THROW( IOException)
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::setTransportListener( TransportListener* listener ) {
+void FailoverTransport::setTransportListener(TransportListener* listener) {
synchronized( &listenerMutex ) {
this->transportListener = listener;
listenerMutex.notifyAll();
@@ -172,7 +171,7 @@ TransportListener* FailoverTransport::ge
////////////////////////////////////////////////////////////////////////////////
std::string FailoverTransport::getRemoteAddress() const {
synchronized( &reconnectMutex ) {
- if( connectedTransport != NULL ) {
+ if (connectedTransport != NULL) {
return connectedTransport->getRemoteAddress();
}
}
@@ -180,29 +179,29 @@ std::string FailoverTransport::getRemote
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::oneway( const Pointer<Command>& command ) {
+void FailoverTransport::oneway(const Pointer<Command> command) {
Pointer<Exception> error;
try {
- synchronized( &reconnectMutex ) {
+ synchronized(&reconnectMutex) {
- if( command != NULL && connectedTransport == NULL ) {
+ if (command != NULL && connectedTransport == NULL) {
- if( command->isShutdownInfo() ) {
+ if (command->isShutdownInfo()) {
// Skipping send of ShutdownInfo command when not connected.
return;
}
- if( command->isRemoveInfo() || command->isMessageAck() ) {
+ if (command->isRemoveInfo() || command->isMessageAck()) {
// Simulate response to RemoveInfo command or Ack as they will be stale.
- stateTracker.track( command );
+ stateTracker.track(command);
- if( command->isResponseRequired() ) {
- Pointer<Response> response( new Response() );
- response->setCorrelationId( command->getCommandId() );
- myTransportListener->onCommand( response );
+ if (command->isResponseRequired()) {
+ Pointer<Response> response(new Response());
+ response->setCorrelationId(command->getCommandId());
+ myTransportListener->onCommand(response);
}
return;
@@ -210,7 +209,7 @@ void FailoverTransport::oneway( const Po
}
// Keep trying until the message is sent.
- for( int i = 0; !closed; i++ ) {
+ for (int i = 0; !closed; i++) {
try {
// Wait for transport to be connected.
@@ -218,31 +217,27 @@ void FailoverTransport::oneway( const Po
long long start = System::currentTimeMillis();
bool timedout = false;
- while( transport == NULL && !closed && connectionFailure == NULL ) {
+ while (transport == NULL && !closed && connectionFailure == NULL) {
long long end = System::currentTimeMillis();
- if( timeout > 0 && ( end - start > timeout ) ) {
+ if (timeout > 0 && (end - start > timeout)) {
timedout = true;
break;
}
- reconnectMutex.wait( 100 );
+ reconnectMutex.wait(100);
transport = connectedTransport;
}
- if( transport == NULL ) {
+ if (transport == NULL) {
// Previous loop may have exited due to us being disposed.
- if( closed ) {
- error.reset( new IOException(
- __FILE__, __LINE__, "Transport disposed.") );
- } else if( connectionFailure != NULL ) {
+ if (closed) {
+ error.reset(new IOException(__FILE__, __LINE__, "Transport disposed."));
+ } else if (connectionFailure != NULL) {
error = connectionFailure;
- } else if( timedout == true ) {
- error.reset( new IOException(
- __FILE__, __LINE__,
- "Failover timeout of %d ms reached.", timeout ) );
+ } else if (timedout == true) {
+ error.reset(new IOException(__FILE__, __LINE__, "Failover timeout of %d ms reached.", timeout));
} else {
- error.reset( new IOException(
- __FILE__, __LINE__, "Unexpected failure.") );
+ error.reset(new IOException(__FILE__, __LINE__, "Unexpected failure."));
}
break;
@@ -252,37 +247,37 @@ void FailoverTransport::oneway( const Po
// tracker, then hold it in the requestMap so that we can replay
// it later.
Pointer<Tracked> tracked;
- try{
- tracked = stateTracker.track( command );
+ try {
+ tracked = stateTracker.track(command);
synchronized( &requestMap ) {
- if( tracked != NULL && tracked->isWaitingForResponse() ) {
- requestMap.put( command->getCommandId(), tracked );
- } else if( tracked == NULL && command->isResponseRequired() ) {
- requestMap.put( command->getCommandId(), command );
+ if (tracked != NULL && tracked->isWaitingForResponse()) {
+ requestMap.put(command->getCommandId(), tracked);
+ } else if (tracked == NULL && command->isResponseRequired()) {
+ requestMap.put(command->getCommandId(), command);
}
}
- } catch( Exception& ex ) {
- ex.setMark( __FILE__, __LINE__ );
- error.reset( ex.clone() );
+ } catch (Exception& ex) {
+ ex.setMark(__FILE__, __LINE__);
+ error.reset(ex.clone());
break;
}
// Send the message.
try {
- transport->oneway( command );
- stateTracker.trackBack( command );
- } catch( IOException& e ) {
+ transport->oneway(command);
+ stateTracker.trackBack(command);
+ } catch (IOException& e) {
- e.setMark( __FILE__, __LINE__ );
+ e.setMark(__FILE__, __LINE__);
// If the command was not tracked.. we will retry in
// this method
- if( tracked == NULL ) {
+ if (tracked == NULL) {
// since we will retry in this method.. take it out of the
// request map so that it is not sent 2 times on recovery
- if( command->isResponseRequired() ) {
- requestMap.remove( command->getCommandId() );
+ if (command->isResponseRequired()) {
+ requestMap.remove(command->getCommandId());
}
// Rethrow the exception so it will handled by
@@ -296,81 +291,82 @@ void FailoverTransport::oneway( const Po
}
return;
- } catch( IOException& e ) {
- e.setMark( __FILE__, __LINE__ );
- handleTransportFailure( e );
+ } catch (IOException& e) {
+ e.setMark(__FILE__, __LINE__);
+ handleTransportFailure(e);
}
}
}
}
- AMQ_CATCH_NOTHROW( Exception )
+ AMQ_CATCH_NOTHROW( Exception)
AMQ_CATCHALL_NOTHROW()
- if( !closed ) {
- if( error != NULL ) {
- throw IOException( *error );
+ if (!closed) {
+ if (error != NULL) {
+ throw IOException(*error);
}
}
}
////////////////////////////////////////////////////////////////////////////////
-Pointer<Response> FailoverTransport::request( const Pointer<Command>& command AMQCPP_UNUSED ) {
-
- throw decaf::lang::exceptions::UnsupportedOperationException(
- __FILE__, __LINE__, "FailoverTransport::request - Not Supported" );
+Pointer<FutureResponse> FailoverTransport::asyncRequest(const Pointer<Command> command AMQCPP_UNUSED,
+ const Pointer<ResponseCallback> responseCallback AMQCPP_UNUSED) {
+ throw decaf::lang::exceptions::UnsupportedOperationException(__FILE__, __LINE__, "FailoverTransport::asyncRequest - Not Supported");
}
////////////////////////////////////////////////////////////////////////////////
-Pointer<Response> FailoverTransport::request( const Pointer<Command>& command AMQCPP_UNUSED,
- unsigned int timeout AMQCPP_UNUSED ) {
+Pointer<Response> FailoverTransport::request(const Pointer<Command> command AMQCPP_UNUSED) {
+ throw decaf::lang::exceptions::UnsupportedOperationException(__FILE__, __LINE__, "FailoverTransport::request - Not Supported");
+}
- throw decaf::lang::exceptions::UnsupportedOperationException(
- __FILE__, __LINE__, "FailoverTransport::request - Not Supported" );
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Response> FailoverTransport::request(const Pointer<Command> command AMQCPP_UNUSED, unsigned int timeout AMQCPP_UNUSED) {
+ throw decaf::lang::exceptions::UnsupportedOperationException(__FILE__, __LINE__, "FailoverTransport::request - Not Supported");
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::start() {
- try{
+ try {
synchronized( &reconnectMutex ) {
- if( this->started ) {
+ if (this->started) {
return;
}
started = true;
- stateTracker.setMaxCacheSize( this->getMaxCacheSize() );
- stateTracker.setTrackMessages( this->isTrackMessages() );
- stateTracker.setTrackTransactionProducers( this->isTrackTransactionProducers() );
+ stateTracker.setMaxCacheSize(this->getMaxCacheSize());
+ stateTracker.setTrackMessages(this->isTrackMessages());
+ stateTracker.setTrackTransactionProducers(this->isTrackTransactionProducers());
- if( connectedTransport != NULL ) {
- stateTracker.restore( connectedTransport );
+ if (connectedTransport != NULL) {
+ stateTracker.restore(connectedTransport);
} else {
- reconnect( false );
+ reconnect(false);
}
}
}
- AMQ_CATCH_RETHROW( IOException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
- AMQ_CATCHALL_THROW( IOException )
+ AMQ_CATCH_RETHROW( IOException)
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException)
+ AMQ_CATCHALL_THROW( IOException)
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::stop() {
- try{
+ try {
}
- AMQ_CATCH_RETHROW( IOException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
- AMQ_CATCHALL_THROW( IOException )
+ AMQ_CATCH_RETHROW( IOException)
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException)
+ AMQ_CATCHALL_THROW( IOException)
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::close() {
- try{
+ try {
Pointer<Transport> transportToStop;
@@ -383,11 +379,11 @@ void FailoverTransport::close() {
closed = true;
connected = false;
- backups->setEnabled( false );
+ backups->setEnabled(false);
requestMap.clear();
- if( connectedTransport != NULL ) {
- transportToStop.swap( connectedTransport );
+ if (connectedTransport != NULL) {
+ transportToStop.swap(connectedTransport);
}
reconnectMutex.notifyAll();
@@ -397,41 +393,41 @@ void FailoverTransport::close() {
sleepMutex.notifyAll();
}
- taskRunner->shutdown( 2000 );
+ taskRunner->shutdown(2000);
- if( transportToStop != NULL ) {
+ if (transportToStop != NULL) {
transportToStop->close();
}
}
- AMQ_CATCH_RETHROW( IOException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
- AMQ_CATCHALL_THROW( IOException )
+ AMQ_CATCH_RETHROW( IOException)
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException)
+ AMQ_CATCHALL_THROW( IOException)
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::reconnect( bool rebalance ) {
+void FailoverTransport::reconnect(bool rebalance) {
Pointer<Transport> transport;
- synchronized( &reconnectMutex ) {
- if( started ) {
+ synchronized( &reconnectMutex ) {
+ if (started) {
- if( rebalance ) {
+ if (rebalance) {
- transport.swap( this->connectedTransport );
+ transport.swap(this->connectedTransport);
- if( transport != NULL ) {
+ if (transport != NULL) {
- if( this->disposedListener != NULL ) {
- transport->setTransportListener( disposedListener.get() );
+ if (this->disposedListener != NULL) {
+ transport->setTransportListener(disposedListener.get());
}
// Hand off to the close task so it gets done in a different thread.
- closeTask->add( transport );
+ closeTask->add(transport);
- if( this->connectedTransportURI != NULL ) {
- this->uris->addURI( *this->connectedTransportURI );
- this->connectedTransportURI.reset( NULL );
+ if (this->connectedTransportURI != NULL) {
+ this->uris->addURI(*this->connectedTransportURI);
+ this->connectedTransportURI.reset(NULL);
}
}
}
@@ -442,56 +438,56 @@ void FailoverTransport::reconnect( bool
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::restoreTransport( const Pointer<Transport>& transport ) {
+void FailoverTransport::restoreTransport(const Pointer<Transport> transport) {
- try{
+ try {
transport->start();
//send information to the broker - informing it we are an ft client
- Pointer<ConnectionControl> cc( new ConnectionControl() );
- cc->setFaultTolerant( true );
- transport->oneway( cc );
+ Pointer<ConnectionControl> cc(new ConnectionControl());
+ cc->setFaultTolerant(true);
+ transport->oneway(cc);
- stateTracker.restore( transport );
+ stateTracker.restore(transport);
decaf::util::StlMap<int, Pointer<Command> > commands;
synchronized(&requestMap) {
commands.copy(requestMap);
}
- Pointer< Iterator<Pointer<Command> > > iter(commands.values().iterator());
+ Pointer<Iterator<Pointer<Command> > > iter(commands.values().iterator());
while (iter->hasNext()) {
transport->oneway(iter->next());
}
}
- AMQ_CATCH_RETHROW( IOException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
- AMQ_CATCHALL_THROW( IOException )
+ AMQ_CATCH_RETHROW( IOException)
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException)
+ AMQ_CATCHALL_THROW( IOException)
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::handleTransportFailure( const decaf::lang::Exception& error AMQCPP_UNUSED ) {
+void FailoverTransport::handleTransportFailure(const decaf::lang::Exception& error AMQCPP_UNUSED) {
Pointer<Transport> transport;
synchronized( &reconnectMutex ) {
- connectedTransport.swap( transport );
+ connectedTransport.swap(transport);
}
- if( transport != NULL ) {
+ if (transport != NULL) {
- if( this->disposedListener != NULL ) {
- transport->setTransportListener( disposedListener.get() );
+ if (this->disposedListener != NULL) {
+ transport->setTransportListener(disposedListener.get());
}
// Hand off to the close task so it gets done in a different thread.
- closeTask->add( transport );
+ closeTask->add(transport);
synchronized( &reconnectMutex ) {
initialized = false;
- uris->addURI( *connectedTransportURI );
- connectedTransportURI.reset( NULL );
+ uris->addURI(*connectedTransportURI);
+ connectedTransportURI.reset(NULL);
connected = false;
// Place the State Tracker into a reconnection state.
@@ -499,11 +495,11 @@ void FailoverTransport::handleTransportF
// Notify before we attempt to reconnect so that the consumers have a chance
// to cleanup their state.
- if( transportListener != NULL ) {
+ if (transportListener != NULL) {
transportListener->transportInterrupted();
}
- if( started ) {
+ if (started) {
taskRunner->wakeup();
}
}
@@ -511,58 +507,58 @@ void FailoverTransport::handleTransportF
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::handleConnectionControl( const Pointer<Command>& control ) {
+void FailoverTransport::handleConnectionControl(const Pointer<Command> control) {
try {
Pointer<ConnectionControl> ctrlCommand = control.dynamicCast<ConnectionControl>();
std::string reconnectStr = ctrlCommand->getReconnectTo();
- if( !reconnectStr.empty() ) {
+ if (!reconnectStr.empty()) {
- std::remove( reconnectStr.begin(), reconnectStr.end(), ' ' );
+ std::remove(reconnectStr.begin(), reconnectStr.end(), ' ');
- if( reconnectStr.length() > 0 ) {
+ if (reconnectStr.length() > 0) {
try {
- if( isReconnectSupported() ) {
- reconnect( URI( reconnectStr ) );
+ if (isReconnectSupported()) {
+ reconnect(URI(reconnectStr));
}
- } catch( Exception e ) {
+ } catch (Exception e) {
}
}
}
- processNewTransports( ctrlCommand->isRebalanceConnection(), ctrlCommand->getConnectedBrokers() );
+ processNewTransports(ctrlCommand->isRebalanceConnection(), ctrlCommand->getConnectedBrokers());
}
- AMQ_CATCH_RETHROW( Exception )
- AMQ_CATCHALL_THROW( Exception )
+ AMQ_CATCH_RETHROW( Exception)
+ AMQ_CATCHALL_THROW( Exception)
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::processNewTransports( bool rebalance, std::string newTransports ) {
+void FailoverTransport::processNewTransports(bool rebalance, std::string newTransports) {
- if( !newTransports.empty() ) {
+ if (!newTransports.empty()) {
- std::remove( newTransports.begin(), newTransports.end(), ' ' );
+ std::remove(newTransports.begin(), newTransports.end(), ' ');
- if( newTransports.length() > 0 && isUpdateURIsSupported() ) {
+ if (newTransports.length() > 0 && isUpdateURIsSupported()) {
LinkedList<URI> list;
- StringTokenizer tokenizer( newTransports, "," );
+ StringTokenizer tokenizer(newTransports, ",");
- while( tokenizer.hasMoreTokens() ) {
+ while (tokenizer.hasMoreTokens()) {
std::string str = tokenizer.nextToken();
try {
- URI uri( str );
- list.add( uri );
- } catch( Exception& e ) {
+ URI uri(str);
+ list.add(uri);
+ } catch (Exception& e) {
}
}
- if( !list.isEmpty() ) {
+ if (!list.isEmpty()) {
try {
- updateURIs( rebalance, list );
- } catch( IOException& e ) {
+ updateURIs(rebalance, list);
+ } catch (IOException& e) {
}
}
}
@@ -570,43 +566,43 @@ void FailoverTransport::processNewTransp
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::updateURIs( bool rebalance, const decaf::util::List<decaf::net::URI>& updatedURIs ) {
+void FailoverTransport::updateURIs(bool rebalance, const decaf::util::List<decaf::net::URI>& updatedURIs) {
- if( isUpdateURIsSupported() ) {
+ if (isUpdateURIsSupported()) {
- LinkedList<URI> copy( this->updated );
+ LinkedList<URI> copy(this->updated);
LinkedList<URI> add;
- if( !updatedURIs.isEmpty() ) {
+ if (!updatedURIs.isEmpty()) {
StlSet<URI> set;
- for( int i = 0; i < updatedURIs.size(); i++ ) {
- set.add( updatedURIs.get(i) );
+ for (int i = 0; i < updatedURIs.size(); i++) {
+ set.add(updatedURIs.get(i));
}
- Pointer< Iterator<URI> > setIter( set.iterator() );
- while( setIter->hasNext() ) {
+ Pointer<Iterator<URI> > setIter(set.iterator());
+ while (setIter->hasNext()) {
URI value = setIter->next();
- if( copy.remove( value ) == false ) {
- add.add( value );
+ if (copy.remove(value) == false) {
+ add.add(value);
}
}
synchronized( &reconnectMutex ) {
this->updated.clear();
- Pointer< Iterator<URI> > listIter1( add.iterator() );
- while( listIter1->hasNext() ) {
- this->updated.add( listIter1->next() );
+ Pointer<Iterator<URI> > listIter1(add.iterator());
+ while (listIter1->hasNext()) {
+ this->updated.add(listIter1->next());
}
- Pointer< Iterator<URI> > listIter2( copy.iterator() );
- while( listIter2->hasNext() ) {
- this->uris->removeURI( listIter2->next() );
+ Pointer<Iterator<URI> > listIter2(copy.iterator());
+ while (listIter2->hasNext()) {
+ this->uris->removeURI(listIter2->next());
}
- this->addURI( rebalance, add );
+ this->addURI(rebalance, add);
}
}
}
@@ -617,19 +613,19 @@ bool FailoverTransport::isPending() cons
bool result = false;
synchronized( &reconnectMutex ) {
- if( this->connectedTransport == NULL && !closed && started ) {
+ if (this->connectedTransport == NULL && !closed && started) {
int reconnectAttempts = 0;
- if( firstConnection ) {
- if( startupMaxReconnectAttempts != 0 ) {
+ if (firstConnection) {
+ if (startupMaxReconnectAttempts != 0) {
reconnectAttempts = startupMaxReconnectAttempts;
}
}
- if( reconnectAttempts == 0 ) {
+ if (reconnectAttempts == 0) {
reconnectAttempts = maxReconnectAttempts;
}
- if( reconnectAttempts > 0 && connectFailures >= reconnectAttempts ) {
+ if (reconnectAttempts > 0 && connectFailures >= reconnectAttempts) {
result = false;
} else {
result = true;
@@ -647,11 +643,11 @@ bool FailoverTransport::iterate() {
synchronized( &reconnectMutex ) {
- if( closed || connectionFailure != NULL ) {
+ if (closed || connectionFailure != NULL) {
reconnectMutex.notifyAll();
}
- if( connectedTransport != NULL || closed || connectionFailure != NULL ) {
+ if (connectedTransport != NULL || closed || connectionFailure != NULL) {
return false;
} else {
@@ -659,97 +655,98 @@ bool FailoverTransport::iterate() {
Pointer<Transport> transport;
URI uri;
- if( !useExponentialBackOff ) {
+ if (!useExponentialBackOff) {
reconnectDelay = initialReconnectDelay;
}
- if( backups->isEnabled() ) {
+ if (backups->isEnabled()) {
Pointer<BackupTransport> backupTransport = backups->getBackup();
- if( backupTransport != NULL ) {
+ if (backupTransport != NULL) {
transport = backupTransport->getTransport();
uri = backupTransport->getUri();
- transport->setTransportListener( myTransportListener.get() );
+ transport->setTransportListener(myTransportListener.get());
try {
- if( started ) {
- restoreTransport( transport );
+ if (started) {
+ restoreTransport(transport);
}
- } catch( Exception& e ) {
+ } catch (Exception& e) {
- if( transport != NULL ) {
- if( this->disposedListener != NULL ) {
- transport->setTransportListener( disposedListener.get() );
+ if (transport != NULL) {
+ if (this->disposedListener != NULL) {
+ transport->setTransportListener(disposedListener.get());
}
// Hand off to the close task so it gets done in a different thread
// this prevents a deadlock from occurring if the Transport happens
// to call back through our onException method or locks in some other
// way.
- closeTask->add( transport );
+ closeTask->add(transport);
taskRunner->wakeup();
- transport.reset( NULL );
+ transport.reset(NULL);
}
- this->uris->addURI( uri );
+ this->uris->addURI(uri);
}
}
}
- while( transport == NULL && !closed ) {
+ while (transport == NULL && !closed) {
- try{
+ try {
uri = uris->getURI();
- } catch( NoSuchElementException& ex ) {
+ } catch (NoSuchElementException& ex) {
break;
}
try {
- transport = createTransport( uri );
- transport->setTransportListener( myTransportListener.get() );
+ transport = createTransport(uri);
+ transport->setTransportListener(myTransportListener.get());
transport->start();
- if( started ) {
- restoreTransport( transport );
+ if (started) {
+ restoreTransport(transport);
}
- } catch( Exception& e ) {
- e.setMark( __FILE__, __LINE__ );
+ } catch (Exception& e) {
+ e.setMark(__FILE__, __LINE__);
- if( transport != NULL ) {
- if( this->disposedListener != NULL ) {
- transport->setTransportListener( disposedListener.get() );
+ if (transport != NULL) {
+ if (this->disposedListener != NULL) {
+ transport->setTransportListener(disposedListener.get());
}
- try{
+ try {
transport->stop();
- } catch(...) {}
+ } catch (...) {
+ }
// Hand off to the close task so it gets done in a different thread
// this prevents a deadlock from occurring if the Transport happens
// to call back through our onException method or locks in some other
// way.
- closeTask->add( transport );
+ closeTask->add(transport);
taskRunner->wakeup();
- transport.reset( NULL );
+ transport.reset(NULL);
}
- failures.add( uri );
- failure.reset( e.clone() );
+ failures.add(uri);
+ failure.reset(e.clone());
}
}
// Return the failures to the pool, we will try again on the next iteration.
- this->uris->addURIs( failures );
+ this->uris->addURIs(failures);
- if( transport != NULL ) {
+ if (transport != NULL) {
reconnectDelay = initialReconnectDelay;
- connectedTransportURI.reset( new URI( uri ) );
+ connectedTransportURI.reset(new URI(uri));
connectedTransport = transport;
reconnectMutex.notifyAll();
connectFailures = 0;
@@ -758,17 +755,17 @@ bool FailoverTransport::iterate() {
// Make sure on initial startup, that the transportListener
// has been initialized for this instance.
synchronized( &listenerMutex ) {
- if( transportListener == NULL ) {
+ if (transportListener == NULL) {
// if it isn't set after 2secs - it probably never will be
- listenerMutex.wait( 2000 );
+ listenerMutex.wait(2000);
}
}
- if( transportListener != NULL ) {
+ if (transportListener != NULL) {
transportListener->transportResumed();
}
- if( firstConnection ) {
+ if (firstConnection) {
firstConnection = false;
}
@@ -777,38 +774,38 @@ bool FailoverTransport::iterate() {
}
int reconnectAttempts = 0;
- if( firstConnection ) {
- if( startupMaxReconnectAttempts != 0 ) {
+ if (firstConnection) {
+ if (startupMaxReconnectAttempts != 0) {
reconnectAttempts = startupMaxReconnectAttempts;
}
}
- if( reconnectAttempts == 0 ) {
+ if (reconnectAttempts == 0) {
reconnectAttempts = maxReconnectAttempts;
}
- if( reconnectAttempts > 0 && ++connectFailures >= reconnectAttempts ) {
+ if (reconnectAttempts > 0 && ++connectFailures >= reconnectAttempts) {
connectionFailure = failure;
// Make sure on initial startup, that the transportListener has been initialized
// for this instance.
synchronized( &listenerMutex ) {
- if( transportListener == NULL ) {
- listenerMutex.wait( 2000 );
+ if (transportListener == NULL) {
+ listenerMutex.wait(2000);
}
}
- if( transportListener != NULL ) {
+ if (transportListener != NULL) {
Pointer<IOException> ioException;
- try{
+ try {
ioException = connectionFailure.dynamicCast<IOException>();
}
- AMQ_CATCH_NOTHROW( ClassCastException )
+ AMQ_CATCH_NOTHROW( ClassCastException)
- if( ioException != NULL ) {
- transportListener->onException( *connectionFailure );
+ if (ioException != NULL) {
+ transportListener->onException(*connectionFailure);
} else {
- transportListener->onException( IOException( *connectionFailure ) );
+ transportListener->onException(IOException(*connectionFailure));
}
}
@@ -817,16 +814,16 @@ bool FailoverTransport::iterate() {
}
}
- if( !closed ) {
+ if (!closed) {
synchronized( &sleepMutex ) {
- sleepMutex.wait( (unsigned int)reconnectDelay );
+ sleepMutex.wait((unsigned int) reconnectDelay);
}
- if( useExponentialBackOff ) {
+ if (useExponentialBackOff) {
// Exponential increment of reconnect delay.
reconnectDelay *= backOffMultiplier;
- if( reconnectDelay > maxReconnectDelay ) {
+ if (reconnectDelay > maxReconnectDelay) {
reconnectDelay = maxReconnectDelay;
}
}
@@ -836,32 +833,30 @@ bool FailoverTransport::iterate() {
}
////////////////////////////////////////////////////////////////////////////////
-Pointer<Transport> FailoverTransport::createTransport( const URI& location ) const {
+Pointer<Transport> FailoverTransport::createTransport(const URI& location) const {
- try{
+ try {
- TransportFactory* factory =
- TransportRegistry::getInstance().findFactory( location.getScheme() );
+ TransportFactory* factory = TransportRegistry::getInstance().findFactory(location.getScheme());
- if( factory == NULL ) {
- throw new IOException(
- __FILE__, __LINE__, "Invalid URI specified, no valid Factory Found.");
+ if (factory == NULL) {
+ throw new IOException(__FILE__, __LINE__, "Invalid URI specified, no valid Factory Found.");
}
- Pointer<Transport> transport( factory->createComposite( location ) );
+ Pointer<Transport> transport(factory->createComposite(location));
return transport;
}
- AMQ_CATCH_RETHROW( IOException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
- AMQ_CATCHALL_THROW( IOException )
+ AMQ_CATCH_RETHROW( IOException)
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException)
+ AMQ_CATCHALL_THROW( IOException)
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::setConnectionInterruptProcessingComplete( const Pointer<commands::ConnectionId>& connectionId ) {
+void FailoverTransport::setConnectionInterruptProcessingComplete(const Pointer<commands::ConnectionId> connectionId) {
synchronized( &reconnectMutex ) {
- stateTracker.connectionInterruptProcessingComplete( this, connectionId );
+ stateTracker.connectionInterruptProcessingComplete(this, connectionId);
}
}
@@ -881,43 +876,43 @@ bool FailoverTransport::isInitialized()
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::setInitialized( bool value ) {
+void FailoverTransport::setInitialized(bool value) {
this->initialized = value;
}
////////////////////////////////////////////////////////////////////////////////
-Transport* FailoverTransport::narrow( const std::type_info& typeId ) {
+Transport* FailoverTransport::narrow(const std::type_info& typeId) {
- if( typeid( *this ) == typeId ) {
+ if (typeid( *this ) == typeId) {
return this;
}
- if( this->connectedTransport != NULL ) {
- return this->connectedTransport->narrow( typeId );
+ if (this->connectedTransport != NULL) {
+ return this->connectedTransport->narrow(typeId);
}
return NULL;
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::processResponse(const Pointer<Response>& response) {
+void FailoverTransport::processResponse(const Pointer<Response> response) {
Pointer<Command> object;
- synchronized( &( this->requestMap ) ) {
- try{
- object = this->requestMap.remove( response->getCorrelationId() );
- } catch( NoSuchElementException& ex ) {
+ synchronized(&(this->requestMap)) {
+ try {
+ object = this->requestMap.remove(response->getCorrelationId());
+ } catch (NoSuchElementException& ex) {
// Not tracking this request in our map, not an error.
}
}
- if( object != NULL ) {
- try{
+ if (object != NULL) {
+ try {
Pointer<Tracked> tracked = object.dynamicCast<Tracked>();
tracked->onResponse();
}
- AMQ_CATCH_NOTHROW( ClassCastException )
+ AMQ_CATCH_NOTHROW( ClassCastException)
}
}
@@ -927,7 +922,7 @@ Pointer<wireformat::WireFormat> Failover
Pointer<wireformat::WireFormat> result;
Pointer<Transport> transport = this->connectedTransport;
- if( transport != NULL ) {
+ if (transport != NULL) {
result = transport->getWireFormat();
}
@@ -940,7 +935,7 @@ long long FailoverTransport::getTimeout(
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::setTimeout( long long value ) {
+void FailoverTransport::setTimeout(long long value) {
this->timeout = value;
}
@@ -950,7 +945,7 @@ long long FailoverTransport::getInitialR
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::setInitialReconnectDelay( long long value ) {
+void FailoverTransport::setInitialReconnectDelay(long long value) {
this->initialReconnectDelay = value;
}
@@ -960,7 +955,7 @@ long long FailoverTransport::getMaxRecon
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::setMaxReconnectDelay( long long value ) {
+void FailoverTransport::setMaxReconnectDelay(long long value) {
this->maxReconnectDelay = value;
}
@@ -970,7 +965,7 @@ long long FailoverTransport::getBackOffM
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::setBackOffMultiplier( long long value ) {
+void FailoverTransport::setBackOffMultiplier(long long value) {
this->backOffMultiplier = value;
}
@@ -980,7 +975,7 @@ bool FailoverTransport::isUseExponential
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::setUseExponentialBackOff( bool value ) {
+void FailoverTransport::setUseExponentialBackOff(bool value) {
this->useExponentialBackOff = value;
}
@@ -990,8 +985,8 @@ bool FailoverTransport::isRandomize() co
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::setRandomize( bool value ) {
- this->uris->setRandomize( value );
+void FailoverTransport::setRandomize(bool value) {
+ this->uris->setRandomize(value);
}
////////////////////////////////////////////////////////////////////////////////
@@ -1000,7 +995,7 @@ int FailoverTransport::getMaxReconnectAt
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::setMaxReconnectAttempts( int value ) {
+void FailoverTransport::setMaxReconnectAttempts(int value) {
this->maxReconnectAttempts = value;
}
@@ -1010,7 +1005,7 @@ int FailoverTransport::getStartupMaxReco
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::setStartupMaxReconnectAttempts( int value ) {
+void FailoverTransport::setStartupMaxReconnectAttempts(int value) {
this->startupMaxReconnectAttempts = value;
}
@@ -1020,7 +1015,7 @@ long long FailoverTransport::getReconnec
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::setReconnectDelay( long long value ) {
+void FailoverTransport::setReconnectDelay(long long value) {
this->reconnectDelay = value;
}
@@ -1030,8 +1025,8 @@ bool FailoverTransport::isBackup() const
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::setBackup( bool value ) {
- this->backups->setEnabled( value );
+void FailoverTransport::setBackup(bool value) {
+ this->backups->setEnabled(value);
}
////////////////////////////////////////////////////////////////////////////////
@@ -1040,8 +1035,8 @@ int FailoverTransport::getBackupPoolSize
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::setBackupPoolSize( int value ) {
- this->backups->setBackupPoolSize( value );
+void FailoverTransport::setBackupPoolSize(int value) {
+ this->backups->setBackupPoolSize(value);
}
////////////////////////////////////////////////////////////////////////////////
@@ -1050,7 +1045,7 @@ bool FailoverTransport::isTrackMessages(
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::setTrackMessages( bool value ) {
+void FailoverTransport::setTrackMessages(bool value) {
this->trackMessages = value;
}
@@ -1060,7 +1055,7 @@ bool FailoverTransport::isTrackTransacti
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::setTrackTransactionProducers( bool value ) {
+void FailoverTransport::setTrackTransactionProducers(bool value) {
this->trackTransactionProducers = value;
}
@@ -1070,7 +1065,7 @@ int FailoverTransport::getMaxCacheSize()
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::setMaxCacheSize( int value ) {
+void FailoverTransport::setMaxCacheSize(int value) {
this->maxCacheSize = value;
}
@@ -1080,7 +1075,7 @@ bool FailoverTransport::isReconnectSuppo
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::setReconnectSupported( bool value ) {
+void FailoverTransport::setReconnectSupported(bool value) {
this->reconnectSupported = value;
}
@@ -1090,6 +1085,6 @@ bool FailoverTransport::isUpdateURIsSupp
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::setUpdateURIsSupported( bool value ) {
+void FailoverTransport::setUpdateURIsSupported(bool value) {
this->updateURIsSupported = value;
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h?rev=1397341&r1=1397340&r2=1397341&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h Thu Oct 11 22:39:46 2012
@@ -100,8 +100,8 @@ namespace failover {
private:
- FailoverTransport( const FailoverTransport& );
- FailoverTransport& operator= ( const FailoverTransport& );
+ FailoverTransport(const FailoverTransport&);
+ FailoverTransport& operator=(const FailoverTransport&);
public:
@@ -116,22 +116,22 @@ namespace failover {
* @param rebalance
* Indicates if the current connection should be broken and reconnected.
*/
- void reconnect( bool rebalance );
+ void reconnect(bool rebalance);
/**
* Adds a New URI to the List of URIs this transport can Connect to.
* @param uri
* A String version of a URI to add to the URIs to failover to.
*/
- void add( const std::string& uri );
+ void add(const std::string& uri);
public: // CompositeTransport methods
- virtual void addURI( bool rebalance, const List<URI>& uris );
+ virtual void addURI(bool rebalance, const List<URI>& uris);
- virtual void removeURI( bool rebalance, const List<URI>& uris );
+ virtual void removeURI(bool rebalance, const List<URI>& uris);
- public: // Transport Members
+ public:
virtual void start();
@@ -139,17 +139,20 @@ namespace failover {
virtual void close();
- virtual void oneway( const Pointer<Command>& command );
+ virtual void oneway(const Pointer<Command> command);
+
+ virtual Pointer<FutureResponse> asyncRequest(const Pointer<Command> command,
+ const Pointer<ResponseCallback> responseCallback);
- virtual Pointer<Response> request( const Pointer<Command>& command );
+ virtual Pointer<Response> request(const Pointer<Command> command);
- virtual Pointer<Response> request( const Pointer<Command>& command, unsigned int timeout );
+ virtual Pointer<Response> request(const Pointer<Command> command, unsigned int timeout);
virtual Pointer<wireformat::WireFormat> getWireFormat() const;
- virtual void setWireFormat( const Pointer<wireformat::WireFormat>& wireFormat AMQCPP_UNUSED ) {}
+ virtual void setWireFormat(const Pointer<wireformat::WireFormat> wireFormat AMQCPP_UNUSED) {}
- virtual void setTransportListener( TransportListener* listener );
+ virtual void setTransportListener(TransportListener* listener);
virtual TransportListener* getTransportListener() const;
@@ -163,17 +166,17 @@ namespace failover {
bool isInitialized() const;
- void setInitialized( bool value );
+ void setInitialized(bool value);
- virtual Transport* narrow( const std::type_info& typeId );
+ virtual Transport* narrow(const std::type_info& typeId);
virtual std::string getRemoteAddress() const;
- virtual void reconnect( const decaf::net::URI& uri );
+ virtual void reconnect(const decaf::net::URI& uri);
- virtual void updateURIs( bool rebalance, const decaf::util::List<decaf::net::URI>& uris );
+ virtual void updateURIs(bool rebalance, const decaf::util::List<decaf::net::URI>& uris);
- public: // CompositeTask Methods.
+ public:
/**
* @returns true if there is a need for the iterate method to be called by this
@@ -190,73 +193,73 @@ namespace failover {
*/
virtual bool iterate();
- public: // FailoverTransport Property Getters / Setters
+ public:
long long getTimeout() const;
- void setTimeout( long long value );
+ void setTimeout(long long value);
long long getInitialReconnectDelay() const;
- void setInitialReconnectDelay( long long value );
+ void setInitialReconnectDelay(long long value);
long long getMaxReconnectDelay() const;
- void setMaxReconnectDelay( long long value );
+ void setMaxReconnectDelay(long long value);
long long getBackOffMultiplier() const;
- void setBackOffMultiplier( long long value );
+ void setBackOffMultiplier(long long value);
bool isUseExponentialBackOff() const;
- void setUseExponentialBackOff( bool value );
+ void setUseExponentialBackOff(bool value);
bool isRandomize() const;
- void setRandomize( bool value );
+ void setRandomize(bool value);
int getMaxReconnectAttempts() const;
- void setMaxReconnectAttempts( int value );
+ void setMaxReconnectAttempts(int value);
int getStartupMaxReconnectAttempts() const;
- void setStartupMaxReconnectAttempts( int value );
+ void setStartupMaxReconnectAttempts(int value);
long long getReconnectDelay() const;
- void setReconnectDelay( long long value );
+ void setReconnectDelay(long long value);
bool isBackup() const;
- void setBackup( bool value );
+ void setBackup(bool value);
int getBackupPoolSize() const;
- void setBackupPoolSize( int value );
+ void setBackupPoolSize(int value);
bool isTrackMessages() const;
- void setTrackMessages( bool value );
+ void setTrackMessages(bool value);
bool isTrackTransactionProducers() const;
- void setTrackTransactionProducers( bool value );
+ void setTrackTransactionProducers(bool value);
int getMaxCacheSize() const;
- void setMaxCacheSize( int value );
+ void setMaxCacheSize(int value);
bool isReconnectSupported() const;
- void setReconnectSupported( bool value );
+ void setReconnectSupported(bool value);
bool isUpdateURIsSupported() const;
- void setUpdateURIsSupported( bool value );
+ void setUpdateURIsSupported(bool value);
- void setConnectionInterruptProcessingComplete( const Pointer<commands::ConnectionId>& connectionId );
+ void setConnectionInterruptProcessingComplete(const Pointer<commands::ConnectionId> connectionId);
protected:
@@ -269,14 +272,14 @@ namespace failover {
*
* @throw IOException if an errors occurs while restoring the old state.
*/
- void restoreTransport( const Pointer<Transport>& transport );
+ void restoreTransport(const Pointer<Transport> transport);
/**
* Called when this class' TransportListener is notified of a Failure.
* @param error - The CMS Exception that was thrown.
* @throw Exception if an error occurs.
*/
- void handleTransportFailure( const decaf::lang::Exception& error );
+ void handleTransportFailure(const decaf::lang::Exception& error);
/**
* Called when the Broker sends a ConnectionControl command which could
@@ -286,7 +289,7 @@ namespace failover {
* @param control
* The ConnectionControl command sent from the Broker.
*/
- void handleConnectionControl( const Pointer<Command>& control );
+ void handleConnectionControl(const Pointer<Command> control);
private:
@@ -298,11 +301,11 @@ namespace failover {
*
* @throw IOException if an I/O error occurs while creating the new Transport.
*/
- Pointer<Transport> createTransport( const URI& location ) const;
+ Pointer<Transport> createTransport(const URI& location) const;
- void processNewTransports( bool rebalance, std::string newTransports );
+ void processNewTransports(bool rebalance, std::string newTransports);
- void processResponse(const Pointer<Response>& response);
+ void processResponse(const Pointer<Response> response);
};
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp?rev=1397341&r1=1397340&r2=1397341&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp Thu Oct 11 22:39:46 2012
@@ -38,80 +38,76 @@ using namespace decaf::util;
using namespace decaf::lang;
////////////////////////////////////////////////////////////////////////////////
-Pointer<Transport> FailoverTransportFactory::create( const decaf::net::URI& location ) {
+Pointer<Transport> FailoverTransportFactory::create(const decaf::net::URI& location) {
- try{
-
- Properties properties; // unused but necessary for now.
+ try {
+ Properties properties; // unused but necessary for now.
// Create the initial Transport, then wrap it in the normal Filters
- Pointer<Transport> transport( doCreateComposite( location, properties ) );
+ Pointer<Transport> transport(doCreateComposite(location, properties));
// Create the Transport for response correlator
- transport.reset( new ResponseCorrelator( transport ) );
+ transport.reset(new ResponseCorrelator(transport));
return transport;
}
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
+ AMQ_CATCH_RETHROW( ActiveMQException)
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException)
+ AMQ_CATCHALL_THROW( ActiveMQException)
}
////////////////////////////////////////////////////////////////////////////////
-Pointer<Transport> FailoverTransportFactory::createComposite( const decaf::net::URI& location ) {
+Pointer<Transport> FailoverTransportFactory::createComposite(const decaf::net::URI& location) {
- try{
-
- Properties properties; // unused but necessary for now.
+ try {
+ Properties properties; // unused but necessary for now.
// Create the initial Transport, then wrap it in the normal Filters
- return doCreateComposite( location, properties );
+ return doCreateComposite(location, properties);
}
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
+ AMQ_CATCH_RETHROW( ActiveMQException)
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException)
+ AMQ_CATCHALL_THROW( ActiveMQException)
}
////////////////////////////////////////////////////////////////////////////////
-Pointer<Transport> FailoverTransportFactory::doCreateComposite(
- const decaf::net::URI& location,
- const decaf::util::Properties& properties AMQCPP_UNUSED ) {
+Pointer<Transport> FailoverTransportFactory::doCreateComposite(const decaf::net::URI& location, const decaf::util::Properties& properties AMQCPP_UNUSED) {
try {
- CompositeData data = URISupport::parseComposite( location );
- Pointer<FailoverTransport> transport( new FailoverTransport() );
+ CompositeData data = URISupport::parseComposite(location);
+ Pointer<FailoverTransport> transport(new FailoverTransport());
Properties topLvlProperties = data.getParameters();
transport->setInitialReconnectDelay(
- Long::parseLong( topLvlProperties.getProperty( "initialReconnectDelay", "10" ) ) );
+ Long::parseLong(topLvlProperties.getProperty("initialReconnectDelay", "10")));
transport->setMaxReconnectDelay(
- Long::parseLong( topLvlProperties.getProperty( "maxReconnectDelay", "30000" ) ) );
+ Long::parseLong(topLvlProperties.getProperty("maxReconnectDelay", "30000")));
transport->setUseExponentialBackOff(
- Boolean::parseBoolean( topLvlProperties.getProperty( "useExponentialBackOff", "true" ) ) );
+ Boolean::parseBoolean(topLvlProperties.getProperty("useExponentialBackOff", "true")));
transport->setMaxReconnectAttempts(
- Integer::parseInt( topLvlProperties.getProperty( "maxReconnectAttempts", "0" ) ) );
+ Integer::parseInt(topLvlProperties.getProperty("maxReconnectAttempts", "0")));
transport->setStartupMaxReconnectAttempts(
- Integer::parseInt( topLvlProperties.getProperty( "startupMaxReconnectAttempts", "0" ) ) );
+ Integer::parseInt(topLvlProperties.getProperty("startupMaxReconnectAttempts", "0")));
transport->setRandomize(
- Boolean::parseBoolean( topLvlProperties.getProperty( "randomize", "true" ) ) );
+ Boolean::parseBoolean(topLvlProperties.getProperty("randomize", "true")));
transport->setBackup(
- Boolean::parseBoolean( topLvlProperties.getProperty( "backup", "false" ) ) );
+ Boolean::parseBoolean(topLvlProperties.getProperty("backup", "false")));
transport->setBackupPoolSize(
- Integer::parseInt( topLvlProperties.getProperty( "backupPoolSize", "1" ) ) );
+ Integer::parseInt(topLvlProperties.getProperty("backupPoolSize", "1")));
transport->setTimeout(
- Long::parseLong( topLvlProperties.getProperty( "timeout", "-1" ) ) );
+ Long::parseLong(topLvlProperties.getProperty("timeout", "-1")));
transport->setTrackMessages(
- Boolean::parseBoolean( topLvlProperties.getProperty( "trackMessages", "false" ) ) );
+ Boolean::parseBoolean(topLvlProperties.getProperty("trackMessages", "false")));
transport->setMaxCacheSize(
- Integer::parseInt( topLvlProperties.getProperty( "maxCacheSize", "131072" ) ) );
+ Integer::parseInt(topLvlProperties.getProperty("maxCacheSize", "131072")));
- transport->addURI( false, data.getComponents() );
+ transport->addURI(false, data.getComponents());
return transport;
}
- AMQ_CATCH_RETHROW( ActiveMQException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
- AMQ_CATCHALL_THROW( ActiveMQException )
+ AMQ_CATCH_RETHROW( ActiveMQException)
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException)
+ AMQ_CATCHALL_THROW( ActiveMQException)
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.h?rev=1397341&r1=1397340&r2=1397341&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.h Thu Oct 11 22:39:46 2012
@@ -43,9 +43,9 @@ namespace failover {
virtual ~FailoverTransportFactory() {}
- virtual Pointer<Transport> create( const decaf::net::URI& location );
+ virtual Pointer<Transport> create(const decaf::net::URI& location);
- virtual Pointer<Transport> createComposite( const decaf::net::URI& location );
+ virtual Pointer<Transport> createComposite(const decaf::net::URI& location);
protected:
@@ -59,8 +59,8 @@ namespace failover {
* @return Pointer to a new FailoverTransport instance.
* @throws ActiveMQexception if an error occurs
*/
- virtual Pointer<Transport> doCreateComposite( const decaf::net::URI& location,
- const decaf::util::Properties& properties );
+ virtual Pointer<Transport> doCreateComposite(const decaf::net::URI& location,
+ const decaf::util::Properties& properties);
};
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportListener.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportListener.cpp?rev=1397341&r1=1397340&r2=1397341&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportListener.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportListener.cpp Thu Oct 11 22:39:46 2012
@@ -32,12 +32,10 @@ using namespace decaf::lang;
using namespace decaf::lang::exceptions;
////////////////////////////////////////////////////////////////////////////////
-FailoverTransportListener::FailoverTransportListener( FailoverTransport* parent ) :
- parent( parent ) {
+FailoverTransportListener::FailoverTransportListener(FailoverTransport* parent) : parent(parent) {
- if( this->parent == NULL ) {
- throw NullPointerException(
- __FILE__, __LINE__, "Pointer to Parent Transport was NULL" );
+ if (this->parent == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "Pointer to Parent Transport was NULL");
}
}
@@ -46,51 +44,51 @@ FailoverTransportListener::~FailoverTran
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransportListener::onCommand( const Pointer<Command>& command ) {
+void FailoverTransportListener::onCommand(const Pointer<Command> command) {
- if( command == NULL ) {
+ if (command == NULL) {
return;
}
- if( command->isResponse() ) {
+ if (command->isResponse()) {
Pointer<Response> response = command.dynamicCast<Response>();
parent->processResponse(response);
}
- if( !parent->isInitialized() ) {
- parent->setInitialized( true );
+ if (!parent->isInitialized()) {
+ parent->setInitialized(true);
}
- if( command->isConnectionControl() ) {
- parent->handleConnectionControl( command );
+ if (command->isConnectionControl()) {
+ parent->handleConnectionControl(command);
}
- if( parent->getTransportListener() != NULL ) {
- parent->getTransportListener()->onCommand( command );
+ if (parent->getTransportListener() != NULL) {
+ parent->getTransportListener()->onCommand(command);
}
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransportListener::onException( const decaf::lang::Exception& ex ) {
+void FailoverTransportListener::onException(const decaf::lang::Exception& ex) {
try {
- parent->handleTransportFailure( ex );
- } catch( Exception& e ) {
- if( parent->getTransportListener() != NULL ) {
- parent->getTransportListener()->onException( e );
+ parent->handleTransportFailure(ex);
+ } catch (Exception& e) {
+ if (parent->getTransportListener() != NULL) {
+ parent->getTransportListener()->onException(e);
}
}
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransportListener::transportInterrupted() {
- if( parent->getTransportListener() != NULL ) {
+ if (parent->getTransportListener() != NULL) {
parent->getTransportListener()->transportInterrupted();
}
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransportListener::transportResumed() {
- if( parent->getTransportListener() != NULL ) {
+ if (parent->getTransportListener() != NULL) {
parent->getTransportListener()->transportResumed();
}
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportListener.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportListener.h?rev=1397341&r1=1397340&r2=1397341&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportListener.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportListener.h Thu Oct 11 22:39:46 2012
@@ -34,7 +34,7 @@ namespace failover {
*
* @since 3.0
*/
- class AMQCPP_API FailoverTransportListener : public TransportListener {
+ class AMQCPP_API FailoverTransportListener: public TransportListener {
private:
// The Transport that created this listener
@@ -42,12 +42,12 @@ namespace failover {
private:
- FailoverTransportListener( const FailoverTransportListener& );
- FailoverTransportListener& operator= ( const FailoverTransportListener& );
+ FailoverTransportListener(const FailoverTransportListener&);
+ FailoverTransportListener& operator=(const FailoverTransportListener&);
public:
- FailoverTransportListener( FailoverTransport* parent );
+ FailoverTransportListener(FailoverTransport* parent);
virtual ~FailoverTransportListener();
@@ -59,14 +59,14 @@ namespace failover {
*
* @param command the received command object.
*/
- virtual void onCommand( const Pointer<Command>& command );
+ virtual void onCommand(const Pointer<Command> command);
/**
* Event handler for an exception from a command transport.
*
* @param ex The exception.
*/
- virtual void onException( const decaf::lang::Exception& ex );
+ virtual void onException(const decaf::lang::Exception& ex);
/**
* The transport has suffered an interruption from which it hopes to recover