You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/03/26 16:58:15 UTC
svn commit: r758706 - in /activemq/activemq-cpp/trunk/src: main/
main/activemq/transport/failover/ test/activemq/transport/failover/
Author: tabish
Date: Thu Mar 26 15:57:51 2009
New Revision: 758706
URL: http://svn.apache.org/viewvc?rev=758706&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQCPP-100
Resolve all known issues with the Failover Transport. Improves overall performance and thread safety.
Added:
activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransportPool.cpp (with props)
activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransportPool.h (with props)
activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/URIPool.cpp (with props)
activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/URIPool.h (with props)
Removed:
activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.cpp
activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.h
Modified:
activemq/activemq-cpp/trunk/src/main/Makefile.am
activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.cpp
activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.h
activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp
activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h
activemq/activemq-cpp/trunk/src/test/activemq/transport/failover/FailoverTransportTest.cpp
Modified: activemq/activemq-cpp/trunk/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/Makefile.am?rev=758706&r1=758705&r2=758706&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/src/main/Makefile.am Thu Mar 26 15:57:51 2009
@@ -94,11 +94,12 @@
activemq/transport/TransportFilter.cpp \
activemq/transport/tcp/TcpTransportFactory.cpp \
activemq/transport/tcp/TcpTransport.cpp \
+ activemq/transport/failover/BackupTransportPool.cpp \
activemq/transport/failover/FailoverTransportListener.cpp \
+ activemq/transport/failover/URIPool.cpp \
activemq/transport/failover/FailoverTransportFactory.cpp \
activemq/transport/failover/CloseTransportsTask.cpp \
activemq/transport/failover/BackupTransport.cpp \
- activemq/transport/failover/ReconnectTask.cpp \
activemq/transport/failover/FailoverTransport.cpp \
activemq/transport/TransportRegistry.cpp \
activemq/transport/AbstractTransportFactory.cpp \
@@ -618,10 +619,11 @@
activemq/transport/tcp/TcpTransportFactory.h \
activemq/transport/tcp/TcpTransport.h \
activemq/transport/failover/FailoverTransportListener.h \
+ activemq/transport/failover/URIPool.h \
activemq/transport/failover/FailoverTransportFactory.h \
activemq/transport/failover/CloseTransportsTask.h \
+ activemq/transport/failover/BackupTransportPool.h \
activemq/transport/failover/BackupTransport.h \
- activemq/transport/failover/ReconnectTask.h \
activemq/transport/failover/FailoverTransport.h \
activemq/transport/correlator/ResponseCorrelator.h \
activemq/transport/correlator/FutureResponse.h \
Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.cpp?rev=758706&r1=758705&r2=758706&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.cpp Thu Mar 26 15:57:51 2009
@@ -17,15 +17,15 @@
#include "BackupTransport.h"
-#include <activemq/transport/failover/FailoverTransport.h>
+#include <activemq/transport/failover/BackupTransportPool.h>
using namespace activemq;
using namespace activemq::transport;
using namespace activemq::transport::failover;
////////////////////////////////////////////////////////////////////////////////
-BackupTransport::BackupTransport( FailoverTransport* failover ) :
- failover( failover ), closed( true ) {
+BackupTransport::BackupTransport( BackupTransportPool* parent ) :
+ parent( parent ), closed( true ) {
}
@@ -38,7 +38,7 @@
this->closed = true;
- if( this->failover != NULL ) {
- this->failover->reconnect();
+ if( this->parent != NULL ) {
+ this->parent->onBackupTransportFailure( this );
}
}
Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.h?rev=758706&r1=758705&r2=758706&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.h Thu Mar 26 15:57:51 2009
@@ -32,13 +32,13 @@
using decaf::lang::Pointer;
- class FailoverTransport;
+ class BackupTransportPool;
class AMQCPP_API BackupTransport : public DefaultTransportListener {
private:
- // The parent Failover Transport
- FailoverTransport* failover;
+ // The parent of this Backup
+ BackupTransportPool* parent;
// The Transport this one is managing.
Pointer<Transport> transport;
@@ -51,7 +51,7 @@
public:
- BackupTransport( FailoverTransport* failover );
+ BackupTransport( BackupTransportPool* failover );
virtual ~BackupTransport();
Added: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransportPool.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransportPool.cpp?rev=758706&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransportPool.cpp (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransportPool.cpp Thu Mar 26 15:57:51 2009
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "BackupTransportPool.h"
+
+#include <memory>
+
+#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/transport/TransportRegistry.h>
+
+#include <decaf/lang/exceptions/NullPointerException.h>
+#include <decaf/lang/exceptions/IllegalStateException.h>
+
+using namespace activemq;
+using namespace activemq::threads;
+using namespace activemq::exceptions;
+using namespace activemq::transport;
+using namespace activemq::transport::failover;
+using namespace decaf;
+using namespace decaf::io;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+
+////////////////////////////////////////////////////////////////////////////////
+BackupTransportPool::BackupTransportPool( const Pointer<CompositeTaskRunner>& taskRunner,
+ const Pointer<CloseTransportsTask>& closeTask,
+ const Pointer<URIPool>& uriPool ) {
+
+ if( taskRunner == NULL ) {
+ throw NullPointerException(
+ __FILE__, __LINE__, "TaskRunner passed is NULL" );
+ }
+
+ if( uriPool == NULL ) {
+ throw NullPointerException(
+ __FILE__, __LINE__, "URIPool passed is NULL" );
+ }
+
+ if( closeTask == NULL ) {
+ throw NullPointerException(
+ __FILE__, __LINE__, "Close Transport Task passed is NULL" );
+ }
+
+ this->pending = false;
+ this->enabled = false;
+ this->backupPoolSize = 1;
+ this->uriPool = uriPool;
+ this->taskRunner = taskRunner;
+ this->closeTask = closeTask;
+
+ // Add this instance as a Task so that we can create backups when nothing else is
+ // going on.
+ this->taskRunner->addTask( this );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+BackupTransportPool::BackupTransportPool( int backupPoolSize,
+ const Pointer<CompositeTaskRunner>& taskRunner,
+ const Pointer<CloseTransportsTask>& closeTask,
+ const Pointer<URIPool>& uriPool ) {
+
+ if( taskRunner == NULL ) {
+ throw NullPointerException(
+ __FILE__, __LINE__, "TaskRunner passed is NULL" );
+ }
+
+ if( uriPool == NULL ) {
+ throw NullPointerException(
+ __FILE__, __LINE__, "URIPool passed is NULL" );
+ }
+
+ if( closeTask == NULL ) {
+ throw NullPointerException(
+ __FILE__, __LINE__, "Close Transport Task passed is NULL" );
+ }
+
+ this->pending = false;
+ this->enabled = false;
+ this->backupPoolSize = backupPoolSize;
+ this->uriPool = uriPool;
+ this->taskRunner = taskRunner;
+ this->closeTask = closeTask;
+
+ // Add this instance as a Task so that we can create backups when nothing else is
+ // going on.
+ this->taskRunner->addTask( this );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+BackupTransportPool::~BackupTransportPool() {
+ this->taskRunner->removeTask( this );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BackupTransportPool::setEnabled( bool value ) {
+ this->enabled = value;
+
+ if( enabled == true ) {
+ this->taskRunner->wakeup();
+ } else {
+ synchronized( &backups ) {
+ this->backups.clear();
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<BackupTransport> BackupTransportPool::getBackup() {
+
+ if( !isEnabled() ) {
+ throw IllegalStateException(
+ __FILE__, __LINE__, "The Backup Pool is not enabled." );
+ }
+
+ Pointer<BackupTransport> result;
+
+ synchronized( &backups ) {
+ if( !backups.isEmpty() ) {
+ result = backups.remove( 0 );
+ }
+ }
+
+ // Flag as pending so the task gets run again and new backups are created.
+ this->pending = true;
+ this->taskRunner->wakeup();
+
+ return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool BackupTransportPool::isPending() const {
+
+ if( this->isEnabled() ) {
+ return this->pending;
+ }
+
+ return false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool BackupTransportPool::iterate() {
+
+ StlList<URI> failures;
+
+ synchronized( &backups ) {
+
+ while( isEnabled() && (int)backups.size() < backupPoolSize ) {
+
+ URI connectTo;
+
+ // Try for a URI, if one isn't free return and indicate this task
+ // is done for now, the next time a backup is requested this task
+ // will become pending again and we will attempt to fill the pool.
+ try{
+ connectTo = uriPool->getURI();
+ } catch( NoSuchElementException& ex ) {
+ break;
+ }
+
+ Pointer<BackupTransport> backup( new BackupTransport( this ) );
+ backup->setUri( connectTo );
+
+ try{
+ Pointer<Transport> transport = createTransport( connectTo );
+ transport->setTransportListener( backup.get() );
+ transport->start();
+ backup->setTransport( transport );
+ backups.add( backup );
+ } catch(...) {
+ // Store it in the list of URIs that didn't work, once done we
+ // return those to the pool.
+ failures.add( connectTo );
+ }
+
+ }
+ }
+
+ // return all failures to the URI Pool, we can try again later.
+ uriPool->addURIs( failures );
+ this->pending = false;
+
+ return false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BackupTransportPool::onBackupTransportFailure( BackupTransport* failedTransport ) {
+
+ synchronized( &backups ) {
+
+ std::auto_ptr< Iterator< Pointer<BackupTransport> > > iter( backups.iterator() );
+
+ while( iter->hasNext() ) {
+ if( iter->next() == failedTransport ) {
+ iter->remove();
+ }
+
+ this->uriPool->addURI( failedTransport->getUri() );
+ this->closeTask->add( failedTransport->getTransport() );
+ this->taskRunner->wakeup();
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Transport> BackupTransportPool::createTransport( const URI& location ) const
+ throw ( decaf::io::IOException ) {
+
+ try{
+
+ TransportFactory* factory =
+ TransportRegistry::getInstance().findFactory( location.getScheme() );
+
+ if( factory == NULL ) {
+ throw new IOException(
+ __FILE__, __LINE__, "Invalid URI specified, no valid Factory Found.");
+ }
+
+ Pointer<Transport> transport( factory->createComposite( location ) );
+
+ return transport;
+ }
+ AMQ_CATCH_RETHROW( IOException )
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+ AMQ_CATCHALL_THROW( IOException )
+}
Propchange: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransportPool.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransportPool.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransportPool.h?rev=758706&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransportPool.h (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransportPool.h Thu Mar 26 15:57:51 2009
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TRANSPORT_FAILOVER_BACKUPTRANSPORTPOOL_H_
+#define _ACTIVEMQ_TRANSPORT_FAILOVER_BACKUPTRANSPORTPOOL_H_
+
+#include <activemq/util/Config.h>
+#include <activemq/threads/CompositeTask.h>
+#include <activemq/threads/CompositeTaskRunner.h>
+#include <activemq/transport/failover/CloseTransportsTask.h>
+#include <activemq/transport/failover/BackupTransport.h>
+#include <activemq/transport/failover/URIPool.h>
+
+#include <decaf/lang/Pointer.h>
+#include <decaf/io/IOException.h>
+#include <decaf/util/StlList.h>
+
+namespace activemq {
+namespace transport {
+namespace failover {
+
+ using decaf::lang::Pointer;
+ using decaf::util::StlList;
+ using activemq::threads::CompositeTaskRunner;
+
+ class AMQCPP_API BackupTransportPool : public activemq::threads::CompositeTask {
+ private:
+
+ friend class BackupTransport;
+
+ mutable StlList< Pointer<BackupTransport> > backups;
+ Pointer<CompositeTaskRunner> taskRunner;
+ Pointer<CloseTransportsTask> closeTask;
+ Pointer<URIPool> uriPool;
+ volatile int backupPoolSize;
+ volatile bool enabled;
+ volatile bool pending;
+
+ public:
+
+ BackupTransportPool( const Pointer<CompositeTaskRunner>& taskRunner,
+ const Pointer<CloseTransportsTask>& closeTask,
+ const Pointer<URIPool>& uriPool );
+
+ BackupTransportPool( int backupPoolSize,
+ const Pointer<CompositeTaskRunner>& taskRunner,
+ const Pointer<CloseTransportsTask>& closeTask,
+ const Pointer<URIPool>& uriPool );
+
+ virtual ~BackupTransportPool();
+
+ /**
+ * Return true if we don't currently have enough Connected Transports
+ */
+ virtual bool isPending() const;
+
+ /**
+ * Get a Connected Transport from the pool of Backups if any are present,
+ * otherwise it return a NULL Pointer.
+ *
+ * @return Pointer to a Connected Transport or NULL
+ */
+ Pointer<BackupTransport> getBackup();
+
+ /**
+ * Connect to a Backup Broker if we haven't already connected to the max
+ * number of Backups.
+ */
+ virtual bool iterate();
+
+ /**
+ * Gets the Max number of Backups this Task will create.
+ * @return the max number of active BackupTransports that will be created.
+ */
+ int getBackupPoolSize() const {
+ return this->backupPoolSize;
+ }
+
+ /**
+ * Sets the Max number of Backups this Task will create.
+ * @param size - the max number of active BackupTransports that will be created.
+ */
+ void setBackupPoolSize( int size ) {
+ this->backupPoolSize = size;
+ }
+
+ /**
+ * Gets if the backup Transport Pool has been enabled or not, when not enabled
+ * no backups are created and any that were are destroyed.
+ *
+ * @return true if enable.
+ */
+ bool isEnabled() const {
+ return this->enabled;
+ }
+
+ /**
+ * Sets if this Backup Transport Pool is enabled. When not enabled no Backups
+ * are created and any that were are destroyed.
+ *
+ * @param value - true to enable backup creation, false to disable.
+ */
+ void setEnabled( bool value );
+
+ private:
+
+ // The backups report their failure to the pool, the pool removes them
+ // from the list and returns their URIs to the URIPool, and then adds
+ // the internal transport to the close transport's task for cleanup.
+ void onBackupTransportFailure( BackupTransport* failedTransport );
+
+ Pointer<Transport> createTransport( const URI& location ) const
+ throw ( decaf::io::IOException );
+
+ };
+
+}}}
+
+#endif /*_ACTIVEMQ_TRANSPORT_FAILOVER_BACKUPTRANSPORTPOOL_H_*/
Propchange: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransportPool.h
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp?rev=758706&r1=758705&r2=758706&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp Thu Mar 26 15:57:51 2009
@@ -49,13 +49,10 @@
this->maxReconnectDelay = 1000 * 30;
this->backOffMultiplier = 2;
this->useExponentialBackOff = true;
- this->randomize = true;
this->initialized = false;
this->maxReconnectAttempts = 0;
this->connectFailures = 0;
this->reconnectDelay = this->initialReconnectDelay;
- this->backup = false;
- this->backupPoolSize = 1;
this->trackMessages = false;
this->maxCacheSize = 128 * 1024;
@@ -64,14 +61,15 @@
this->connected = false;
this->transportListener = NULL;
+ this->uris.reset( new URIPool() );
this->stateTracker.setTrackTransactions( true );
this->myTransportListener.reset( new FailoverTransportListener( this ) );
- this->reconnectTask.reset( new ReconnectTask( this ) );
this->closeTask.reset( new CloseTransportsTask() );
- this->taskRunner.reset( new DedicatedTaskRunner( reconnectTask.get() ) );
- this->compositeTaskRunner.reset( new CompositeTaskRunner() );
+ this->taskRunner.reset( new CompositeTaskRunner() );
+ this->backups.reset( new BackupTransportPool( taskRunner, closeTask, uris ) );
- this->compositeTaskRunner->addTask( this->closeTask.get() );
+ this->taskRunner->addTask( this );
+ this->taskRunner->addTask( this->closeTask.get() );
}
////////////////////////////////////////////////////////////////////////////////
@@ -100,10 +98,7 @@
void FailoverTransport::add( const std::string& uri ) {
try {
- URI newUri( uri );
- if( !uris.contains( newUri ) ) {
- uris.add( newUri );
- }
+ uris->addURI( URI( uri ) );
reconnect();
}
@@ -113,12 +108,10 @@
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::addURI( const List<URI>& uris ) {
- synchronized( &this->uris ) {
- std::auto_ptr< Iterator<URI> > iter( uris.iterator() );
+ std::auto_ptr< Iterator<URI> > iter( uris.iterator() );
- while( iter->hasNext() ) {
- this->uris.add( iter->next() );
- }
+ while( iter->hasNext() ) {
+ this->uris->addURI( iter->next() );
}
reconnect();
@@ -127,12 +120,10 @@
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::removeURI( const List<URI>& uris ) {
- synchronized( &this->uris ) {
- std::auto_ptr< Iterator<URI> > iter( uris.iterator() );
+ std::auto_ptr< Iterator<URI> > iter( uris.iterator() );
- while( iter->hasNext() ) {
- this->uris.remove( iter->next() );
- }
+ while( iter->hasNext() ) {
+ this->uris->removeURI( iter->next() );
}
reconnect();
@@ -144,9 +135,7 @@
try {
- if( !uris.contains( uri ) ) {
- uris.add( uri );
- }
+ this->uris->addURI( uri );
reconnect();
}
@@ -156,36 +145,6 @@
}
////////////////////////////////////////////////////////////////////////////////
-StlList<URI> FailoverTransport::getConnectList() const {
-
- StlList<URI> result( uris );
- bool removed = false;
-
- if( failedConnectTransportURI != NULL ) {
- removed = result.remove( *failedConnectTransportURI );
- }
-
- if( randomize ) {
- // Randomly, reorder the list by random swapping
- Random rand;
- rand.setSeed( decaf::lang::System::currentTimeMillis() );
-
- for( std::size_t i = 0; i < result.size(); i++ ) {
- int p = rand.nextInt( (int)result.size() );
- URI temp = result.get( p );
- result.set( p, result.get( i ) );
- result.set( i, temp );
- }
- }
-
- if( removed ) {
- result.add( *failedConnectTransportURI );
- }
-
- return result;
-}
-
-////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setTransportListener( TransportListener* listener ) {
synchronized( &listenerMutex ) {
this->transportListener = listener;
@@ -195,16 +154,17 @@
////////////////////////////////////////////////////////////////////////////////
std::string FailoverTransport::getRemoteAddress() const {
- if( connectedTransport != NULL ) {
- return connectedTransport->getRemoteAddress();
+ synchronized( &reconnectMutex ) {
+ if( connectedTransport != NULL ) {
+ return connectedTransport->getRemoteAddress();
+ }
}
return "";
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::oneway( const Pointer<Command>& command )
- throw( IOException,
- decaf::lang::exceptions::UnsupportedOperationException ) {
+ throw( IOException, decaf::lang::exceptions::UnsupportedOperationException ) {
Pointer<Exception> error;
@@ -385,17 +345,11 @@
closed = true;
connected = false;
- std::auto_ptr< Iterator< Pointer<BackupTransport> > > iter( backups.iterator() );
- while( iter->hasNext() ) {
- iter->next()->setClosed( true );
- }
-
- backups.clear();
+ backups->setEnabled( false );
requestMap.clear();
if( connectedTransport != NULL ) {
- transportToStop = connectedTransport;
- connectedTransport.reset( NULL );
+ transportToStop.swap( connectedTransport );
}
reconnectMutex.notifyAll();
@@ -405,8 +359,7 @@
sleepMutex.notifyAll();
}
- taskRunner->shutdown( 1000 );
- compositeTaskRunner->shutdown( 1000 );
+ taskRunner->shutdown( 2000 );
if( transportToStop != NULL ) {
transportToStop->close();
@@ -457,7 +410,9 @@
throw( decaf::lang::Exception ) {
Pointer<Transport> transport;
- connectedTransport.swap( transport );
+ synchronized( &reconnectMutex ) {
+ connectedTransport.swap( transport );
+ }
if( transport != NULL ) {
@@ -467,16 +422,16 @@
// Hand off to the close task so it gets done in a different thread.
closeTask->add( transport );
- compositeTaskRunner->wakeup();
+ taskRunner->wakeup();
synchronized( &reconnectMutex ) {
- bool reconnectOk = started;
initialized = false;
- failedConnectTransportURI = connectedTransportURI;
+ uris->addURI( *connectedTransportURI );
connectedTransportURI.reset( NULL );
connected = false;
- if( reconnectOk ) {
+
+ if( started ) {
taskRunner->wakeup();
}
}
@@ -488,7 +443,20 @@
}
////////////////////////////////////////////////////////////////////////////////
-bool FailoverTransport::doReconnect() {
+bool FailoverTransport::isPending() const {
+ bool result = false;
+
+ synchronized( &reconnectMutex ) {
+ if( this->connectedTransport == NULL && !closed && started ) {
+ result = true;
+ }
+ }
+
+ return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::iterate() {
Pointer<Exception> failure;
@@ -501,86 +469,88 @@
if( connectedTransport != NULL || closed || connectionFailure != NULL ) {
return false;
} else {
- StlList<URI> connectList = getConnectList();
- if( connectList.isEmpty() ) {
- failure.reset( new IOException(
- __FILE__, __LINE__, "No uris available to connect to." ) );
- } else {
- if( !useExponentialBackOff ) {
- reconnectDelay = initialReconnectDelay;
- }
+ StlList<URI> failures;
+ Pointer<Transport> transport;
+ URI uri;
- synchronized( &backupMutex ) {
+ if( !useExponentialBackOff ) {
+ reconnectDelay = initialReconnectDelay;
+ }
- if( backup && !backups.isEmpty() ) {
+ if( backups->isEnabled() ) {
- Pointer<BackupTransport> backup = backups.remove( 0 );
- Pointer<Transport> transport = backup->getTransport();
- URI uri = backup->getUri();
- transport->setTransportListener( myTransportListener.get() );
+ Pointer<BackupTransport> backupTransport = backups->getBackup();
- try {
+ if( backupTransport != NULL ) {
- if( started ) {
- restoreTransport( transport );
- }
+ transport = backupTransport->getTransport();
+ uri = backupTransport->getUri();
+ transport->setTransportListener( myTransportListener.get() );
- reconnectDelay = initialReconnectDelay;
- failedConnectTransportURI.reset( NULL );
- connectedTransportURI.reset( new URI( uri ) );
- connectedTransport = transport;
- reconnectMutex.notifyAll();
- connectFailures = 0;
+ try {
- return false;
+ if( started ) {
+ restoreTransport( transport );
}
- AMQ_CATCH_NOTHROW( Exception )
- AMQ_CATCHALL_NOTHROW()
+
+ } catch( Exception& e ) {
+ transport.reset( NULL );
+ this->uris->addURI( uri );
}
}
+ }
- std::auto_ptr< Iterator<URI> > iter( connectList.iterator() );
-
- while( iter->hasNext() && connectedTransport == NULL && !closed ) {
+ while( transport == NULL && !closed ) {
- URI uri = iter->next();
- try {
+ try{
+ uri = uris->getURI();
+ } catch( NoSuchElementException& ex ) {
+ break;
+ }
- Pointer<Transport> transport = createTransport( uri );
- transport->setTransportListener( myTransportListener.get() );
- transport->start();
+ try {
- if( started ) {
- restoreTransport( transport );
- }
+ transport = createTransport( uri );
+ transport->setTransportListener( myTransportListener.get() );
+ transport->start();
- reconnectDelay = initialReconnectDelay;
- connectedTransportURI.reset( new URI( uri ) );
- connectedTransport = transport;
- reconnectMutex.notifyAll();
- connectFailures = 0;
-
- // Make sure on initial startup, that the transportListener
- // has been initialized for this instance.
- synchronized( &listenerMutex ) {
- if( transportListener == NULL ) {
- // if it isn't set after 2secs - it
- // probably never will be
- listenerMutex.wait( 2000 );
- }
- }
+ if( started ) {
+ restoreTransport( transport );
+ }
- if( transportListener != NULL ) {
- transportListener->transportResumed();
- }
+ } catch( Exception& e ) {
+ transport.reset( NULL );
+ failures.add( uri );
+ failure.reset( e.clone() );
+ }
+ }
- connected = true;
- return false;
- } catch( Exception& e ) {
- failure.reset( e.clone() );
+ // Return the failures to the pool, we will try again on the next iteration.
+ this->uris->addURIs( failures );
+
+ if( transport != NULL ) {
+ reconnectDelay = initialReconnectDelay;
+ connectedTransportURI.reset( new URI( uri ) );
+ connectedTransport = transport;
+ reconnectMutex.notifyAll();
+ connectFailures = 0;
+ connected = true;
+
+ // Make sure on initial startup, that the transportListener
+ // has been initialized for this instance.
+ synchronized( &listenerMutex ) {
+ if( transportListener == NULL ) {
+ // if it isn't set after 2secs - it probably never will be
+ listenerMutex.wait( 2000 );
}
}
+
+ if( transportListener != NULL ) {
+ transportListener->transportResumed();
+ }
+
+ return false;
}
}
@@ -634,55 +604,6 @@
}
////////////////////////////////////////////////////////////////////////////////
-bool FailoverTransport::buildBackups() {
-
- synchronized( &backupMutex ) {
-
- if( !closed && backup && (int)backups.size() < backupPoolSize ) {
-
- StlList<URI> connectList = getConnectList();
-
- //removed closed backups
- StlList< Pointer<BackupTransport> > disposedList;
- std::auto_ptr< Iterator<Pointer<BackupTransport> > > iter( backups.iterator() );
- while( iter->hasNext() ) {
- Pointer<BackupTransport> backup = iter->next();
- if( backup->isClosed() ) {
- disposedList.add( backup );
- }
- }
-
- backups.removeAll( disposedList );
- disposedList.clear();
-
- std::auto_ptr< Iterator<URI> > uriIter( connectList.iterator() );
-
- while( uriIter->hasNext() && (int)backups.size() < backupPoolSize ) {
- URI uri = uriIter->next();
- if( connectedTransportURI != NULL && !connectedTransportURI->equals( uri ) ) {
- try {
- Pointer<BackupTransport> backup( new BackupTransport( this ) );
- backup->setUri( uri );
-
- if( !backups.contains( backup ) ) {
- Pointer<Transport> transport = createTransport( uri );
- transport->setTransportListener( backup.get() );
- transport->start();
- backup->setTransport( transport );
- backups.add( backup );
- }
- }
- AMQ_CATCH_NOTHROW( Exception )
- AMQ_CATCHALL_NOTHROW()
- }
- }
- }
- }
-
- return false;
-}
-
-////////////////////////////////////////////////////////////////////////////////
Pointer<Transport> FailoverTransport::createTransport( const URI& location ) const
throw ( decaf::io::IOException ) {
Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h?rev=758706&r1=758705&r2=758706&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h Thu Mar 26 15:57:51 2009
@@ -25,10 +25,10 @@
#include <activemq/threads/CompositeTaskRunner.h>
#include <activemq/state/ConnectionStateTracker.h>
#include <activemq/transport/CompositeTransport.h>
-#include <activemq/transport/failover/BackupTransport.h>
-#include <activemq/transport/failover/ReconnectTask.h>
+#include <activemq/transport/failover/BackupTransportPool.h>
#include <activemq/transport/failover/CloseTransportsTask.h>
#include <activemq/transport/failover/FailoverTransportListener.h>
+#include <activemq/transport/failover/URIPool.h>
#include <activemq/wireformat/WireFormat.h>
#include <decaf/util/StlList.h>
@@ -50,50 +50,42 @@
using activemq::commands::Command;
using activemq::commands::Response;
- class AMQCPP_API FailoverTransport : public CompositeTransport {
+ class AMQCPP_API FailoverTransport : public CompositeTransport,
+ public activemq::threads::CompositeTask {
private:
friend class FailoverTransportListener;
- friend class ReconnectTask;
bool closed;
bool connected;
bool started;
- decaf::util::StlList<URI> uris;
-
long long timeout;
long long initialReconnectDelay;
long long maxReconnectDelay;
long long backOffMultiplier;
bool useExponentialBackOff;
- bool randomize;
bool initialized;
int maxReconnectAttempts;
int connectFailures;
long long reconnectDelay;
- bool backup;
- int backupPoolSize;
bool trackMessages;
int maxCacheSize;
- decaf::util::StlList< Pointer<BackupTransport> > backups;
+ mutable decaf::util::concurrent::Mutex reconnectMutex;
+ mutable decaf::util::concurrent::Mutex sleepMutex;
+ mutable decaf::util::concurrent::Mutex listenerMutex;
state::ConnectionStateTracker stateTracker;
- decaf::util::concurrent::Mutex reconnectMutex;
- decaf::util::concurrent::Mutex backupMutex;
- decaf::util::concurrent::Mutex sleepMutex;
- decaf::util::concurrent::Mutex listenerMutex;
decaf::util::StlMap<int, Pointer<Command> > requestMap;
+ Pointer<URIPool> uris;
Pointer<URI> connectedTransportURI;
- Pointer<URI> failedConnectTransportURI;
Pointer<Transport> connectedTransport;
Pointer<Exception> connectionFailure;
- Pointer<ReconnectTask> reconnectTask;
+ Pointer<BackupTransportPool> backups;
Pointer<CloseTransportsTask> closeTask;
- Pointer<TaskRunner> taskRunner;
- Pointer<CompositeTaskRunner> compositeTaskRunner;
+ Pointer<CompositeTaskRunner> taskRunner;
Pointer<TransportListener> disposedListener;
Pointer<TransportListener> myTransportListener;
TransportListener* transportListener;
@@ -203,8 +195,7 @@
* Sets the WireFormat instance to use.
* @param WireFormat the object used to encode / decode commands.
*/
- virtual void setWireFormat( const Pointer<wireformat::WireFormat>& wireFormat AMQCPP_UNUSED ) {
- }
+ virtual void setWireFormat( const Pointer<wireformat::WireFormat>& wireFormat AMQCPP_UNUSED ) {}
/**
* Sets the observer of asynchronous events from this transport.
@@ -283,6 +274,21 @@
virtual std::string getRemoteAddress() const;
/**
+ * @returns true if there is a need for the iterate method to be called by this
+ * classes task runner.
+ */
+ virtual bool isPending() const;
+
+ /**
+ * Performs the actual Reconnect operation for the FailoverTransport, when a
+ * connection is made this method returns false to indicate it doesn't need to
+ * run again, otherwise it returns true to indicate its still trying to connect.
+ *
+ * @return false to indicate a connection, true to indicate it needs to try again.
+ */
+ virtual bool iterate();
+
+ /**
* reconnect to another location
* @param uri
* @throws IOException on failure of if not supported
@@ -333,11 +339,11 @@
}
bool isRandomize() const {
- return this->randomize;
+ return this->uris->isRandomize();
}
void setRandomize( bool value ) {
- this->randomize = value;
+ this->uris->setRandomize( value );
}
int getMaxReconnectAttempts() const {
@@ -357,19 +363,19 @@
}
bool isBackup() const {
- return this->backup;
+ return this->backups->isEnabled();
}
void setBackup( bool value ) {
- this->backup = value;
+ this->backups->setEnabled( value );
}
int getBackupPoolSize() const {
- return this->backupPoolSize;
+ return this->backups->getBackupPoolSize();
}
void setBackupPoolSize( int value ) {
- this->backupPoolSize = value;
+ this->backups->setBackupPoolSize( value );
}
bool isTrackMessages() const {
@@ -413,31 +419,12 @@
private:
/**
- * Returns a set of URIs that this Transport is to connect to, applying a
- * random swapping from the class stored list of URIs if the randomize flag
- * is enabled, otherwise just return the original list.
- *
- * @returns a Set of URI object that this Transport iterates over to connect.
- */
- decaf::util::StlList<URI> getConnectList() const;
-
- /**
* @return Returns true if the command is one sent when a connection
* is being closed.
*/
bool isShutdownCommand( const Pointer<Command>& command ) const;
/**
- * Performs the actual Reconnect operation.
- */
- bool doReconnect();
-
- /**
- * Builds a set of Backup Transports for fast Failover.
- */
- bool buildBackups();
-
- /**
* Looks up the correct Factory and create a new Composite version of the
* Transport requested.
*
Added: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/URIPool.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/URIPool.cpp?rev=758706&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/URIPool.cpp (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/URIPool.cpp Thu Mar 26 15:57:51 2009
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "URIPool.h"
+
+#include <memory>
+#include <decaf/util/Random.h>
+#include <decaf/lang/System.h>
+
+using namespace activemq;
+using namespace activemq::transport;
+using namespace activemq::transport::failover;
+using namespace decaf;
+using namespace decaf::net;
+using namespace decaf::util;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+URIPool::URIPool() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+URIPool::URIPool( const decaf::util::List<URI>& uris ) {
+ this->uriPool.copy( uris );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+URIPool::~URIPool() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+URI URIPool::getURI() throw ( decaf::lang::exceptions::NoSuchElementException ) {
+
+ synchronized( &uriPool ) {
+ if( uriPool.isEmpty() ) {
+ throw NoSuchElementException(
+ __FILE__, __LINE__, "URI Pool is currently empty." );
+ }
+
+ int index = 0; // Take the first one in the list unless random is on.
+
+ if( isRandomize() ) {
+
+ Random rand;
+ rand.setSeed( decaf::lang::System::currentTimeMillis() );
+ index = rand.nextInt( (int)uriPool.size() );
+ }
+
+ return uriPool.remove( index );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void URIPool::addURI( const URI& uri ) {
+
+ synchronized( &uriPool ) {
+ if( !uriPool.contains( uri ) ) {
+ uriPool.add( uri );
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void URIPool::addURIs( const StlList<URI>& uris ) {
+
+ synchronized( &uriPool ) {
+
+ std::auto_ptr< Iterator<URI> > iter( uris.iterator() );
+
+ while( iter->hasNext() ) {
+ URI uri = iter->next();
+
+ if( !uriPool.contains( uri ) ) {
+ uriPool.add( uri );
+ }
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void URIPool::removeURI( const URI& uri ) {
+
+ synchronized( &uriPool ) {
+ if( uriPool.contains( uri ) ) {
+ uriPool.remove( uri );
+ }
+ }
+}
Propchange: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/URIPool.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/URIPool.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/URIPool.h?rev=758706&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/URIPool.h (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/URIPool.h Thu Mar 26 15:57:51 2009
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TRANSPORT_FAILOVER_URIPOOL_H_
+#define _ACTIVEMQ_TRANSPORT_FAILOVER_URIPOOL_H_
+
+#include <activemq/util/Config.h>
+
+#include <decaf/net/URI.h>
+#include <decaf/util/StlList.h>
+#include <decaf/lang/exceptions/NoSuchElementException.h>
+
+namespace activemq {
+namespace transport {
+namespace failover {
+
+ using decaf::util::StlList;
+ using decaf::net::URI;
+
+ class AMQCPP_API URIPool {
+ private:
+
+ StlList<URI> uriPool;
+ bool randomize;
+
+ public:
+
+ /**
+ * Create an Empty URI Pool.
+ */
+ URIPool();
+
+ /**
+ * Creates a new URI Pool using the given list as the initial Free List.
+ *
+ * @param uris - List of URI to place in the Pool.
+ */
+ URIPool( const decaf::util::List<URI>& uris );
+
+ virtual ~URIPool();
+
+ /**
+ * Fetches the next available URI from the pool, if there are no more
+ * URIs free when this method is called it throws a NoSuchElementException.
+ * Receiving the exception is not an indication that a URI won't be available
+ * in the future, the caller should react accordingly.
+ *
+ * @return the next free URI in the Pool.
+ * @throw NoSuchElementException if there are none free currently.
+ */
+ URI getURI() throw ( decaf::lang::exceptions::NoSuchElementException );
+
+ /**
+ * Adds a URI to the free list, callers that have previously taken one using
+ * the <code>getURI</code> method should always return the URI when they close
+ * the resource that was connected to that URI.
+ *
+ * @param uri - a URI previously taken from the pool.
+ */
+ void addURI( const URI& uri );
+
+ /**
+ * Adds a List of URIs to this Pool, the method checks for duplicates already
+ * in the pool and does not add those.
+ *
+ * @param uris - List of URIs to add into the Pool.
+ */
+ void addURIs( const StlList<URI>& uris );
+
+ /**
+ * Remove a given URI from the Free List.
+ * @param uri - the URI to find and remove from the free list
+ */
+ void removeURI( const URI& uri );
+
+ /**
+ * Is the URI that is given randomly picked from the pool or is
+ * each one taken in sequence.
+ *
+ * @return true if URI gets are random.
+ */
+ bool isRandomize() const {
+ return this->randomize;
+ }
+
+ /**
+ * Sets if the URI's that are taken from the pool are chosen Randomly or
+ * are taken in the order they are in the list.
+ *
+ * @param value - true indicates URI gets are random.
+ */
+ void setRandomize( bool value ) {
+ this->randomize = value;
+ }
+
+ };
+
+}}}
+
+#endif /*_ACTIVEMQ_TRANSPORT_FAILOVER_URIPOOL_H_*/
Propchange: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/URIPool.h
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-cpp/trunk/src/test/activemq/transport/failover/FailoverTransportTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/transport/failover/FailoverTransportTest.cpp?rev=758706&r1=758705&r2=758706&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/transport/failover/FailoverTransportTest.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/transport/failover/FailoverTransportTest.cpp Thu Mar 26 15:57:51 2009
@@ -63,6 +63,10 @@
CPPUNIT_ASSERT( failover->isRandomize() == false );
transport->start();
+
+ Thread::sleep( 1000 );
+ CPPUNIT_ASSERT( failover->isConnected() == true );
+
transport->close();
}
@@ -85,9 +89,11 @@
CPPUNIT_ASSERT( failover->isRandomize() == false );
CPPUNIT_ASSERT( failover->isBackup() == true );
+ transport->start();
+
Thread::sleep( 1000 );
+ CPPUNIT_ASSERT( failover->isConnected() == true );
- transport->start();
transport->close();
}
@@ -125,9 +131,10 @@
transport->start();
- Thread::sleep( 2000 );
+ Thread::sleep( 1000 );
CPPUNIT_ASSERT( listener.caughtException == true );
+ CPPUNIT_ASSERT( failover->isConnected() == false );
transport->close();
}
@@ -186,9 +193,11 @@
CPPUNIT_ASSERT( failover->isRandomize() == false );
CPPUNIT_ASSERT( failover->isBackup() == true );
+ transport->start();
+
Thread::sleep( 2000 );
+ CPPUNIT_ASSERT( failover->isConnected() == true );
- transport->start();
transport->close();
}
@@ -229,6 +238,9 @@
transport->start();
+ Thread::sleep( 1000 );
+ CPPUNIT_ASSERT( failover->isConnected() == true );
+
MockTransport* mock = NULL;
while( mock == NULL ) {
mock = dynamic_cast<MockTransport*>( transport->narrow( typeid( MockTransport ) ) );
@@ -269,6 +281,9 @@
transport->start();
+ Thread::sleep( 1000 );
+ CPPUNIT_ASSERT( failover->isConnected() == true );
+
MockTransport* mock = NULL;
while( mock == NULL ) {
mock = dynamic_cast<MockTransport*>( transport->narrow( typeid( MockTransport ) ) );
@@ -311,6 +326,9 @@
transport->start();
+ Thread::sleep( 1000 );
+ CPPUNIT_ASSERT( failover->isConnected() == true );
+
MockTransport* mock = NULL;
while( mock == NULL ) {
mock = dynamic_cast<MockTransport*>( transport->narrow( typeid( MockTransport ) ) );
@@ -353,6 +371,9 @@
transport->start();
+ Thread::sleep( 1000 );
+ CPPUNIT_ASSERT( failover->isConnected() == true );
+
MockTransport* mock = NULL;
while( mock == NULL ) {
mock = dynamic_cast<MockTransport*>( transport->narrow( typeid( MockTransport ) ) );
@@ -390,6 +411,9 @@
transport->start();
+ Thread::sleep( 1000 );
+ CPPUNIT_ASSERT( failover->isConnected() == true );
+
Pointer<ConnectionInfo> connection = createConnection();
transport->request( connection );
Pointer<SessionInfo> session1 = createSession( connection );