You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/03/26 16:58:15 UTC

svn commit: r758706 - in /activemq/activemq-cpp/trunk/src: main/ main/activemq/transport/failover/ test/activemq/transport/failover/

Author: tabish
Date: Thu Mar 26 15:57:51 2009
New Revision: 758706

URL: http://svn.apache.org/viewvc?rev=758706&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQCPP-100

Resolve all known issues with the Failover Transport.  Improves overall performance and thread safety.  

Added:
    activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransportPool.cpp   (with props)
    activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransportPool.h   (with props)
    activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/URIPool.cpp   (with props)
    activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/URIPool.h   (with props)
Removed:
    activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.h
Modified:
    activemq/activemq-cpp/trunk/src/main/Makefile.am
    activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h
    activemq/activemq-cpp/trunk/src/test/activemq/transport/failover/FailoverTransportTest.cpp

Modified: activemq/activemq-cpp/trunk/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/Makefile.am?rev=758706&r1=758705&r2=758706&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/src/main/Makefile.am Thu Mar 26 15:57:51 2009
@@ -94,11 +94,12 @@
     activemq/transport/TransportFilter.cpp \
     activemq/transport/tcp/TcpTransportFactory.cpp \
     activemq/transport/tcp/TcpTransport.cpp \
+    activemq/transport/failover/BackupTransportPool.cpp \
     activemq/transport/failover/FailoverTransportListener.cpp \
+    activemq/transport/failover/URIPool.cpp \
     activemq/transport/failover/FailoverTransportFactory.cpp \
     activemq/transport/failover/CloseTransportsTask.cpp \
     activemq/transport/failover/BackupTransport.cpp \
-    activemq/transport/failover/ReconnectTask.cpp \
     activemq/transport/failover/FailoverTransport.cpp \
     activemq/transport/TransportRegistry.cpp \
     activemq/transport/AbstractTransportFactory.cpp \
@@ -618,10 +619,11 @@
     activemq/transport/tcp/TcpTransportFactory.h \
     activemq/transport/tcp/TcpTransport.h \
     activemq/transport/failover/FailoverTransportListener.h \
+    activemq/transport/failover/URIPool.h \
     activemq/transport/failover/FailoverTransportFactory.h \
     activemq/transport/failover/CloseTransportsTask.h \
+    activemq/transport/failover/BackupTransportPool.h \
     activemq/transport/failover/BackupTransport.h \
-    activemq/transport/failover/ReconnectTask.h \
     activemq/transport/failover/FailoverTransport.h \
     activemq/transport/correlator/ResponseCorrelator.h \
     activemq/transport/correlator/FutureResponse.h \

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.cpp?rev=758706&r1=758705&r2=758706&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.cpp Thu Mar 26 15:57:51 2009
@@ -17,15 +17,15 @@
 
 #include "BackupTransport.h"
 
-#include <activemq/transport/failover/FailoverTransport.h>
+#include <activemq/transport/failover/BackupTransportPool.h>
 
 using namespace activemq;
 using namespace activemq::transport;
 using namespace activemq::transport::failover;
 
 ////////////////////////////////////////////////////////////////////////////////
-BackupTransport::BackupTransport( FailoverTransport* failover ) :
-    failover( failover ), closed( true ) {
+BackupTransport::BackupTransport( BackupTransportPool* parent ) :
+    parent( parent ), closed( true ) {
 
 }
 
@@ -38,7 +38,7 @@
 
     this->closed = true;
 
-    if( this->failover != NULL ) {
-        this->failover->reconnect();
+    if( this->parent != NULL ) {
+        this->parent->onBackupTransportFailure( this );
     }
 }

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.h?rev=758706&r1=758705&r2=758706&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.h Thu Mar 26 15:57:51 2009
@@ -32,13 +32,13 @@
 
     using decaf::lang::Pointer;
 
-    class FailoverTransport;
+    class BackupTransportPool;
 
     class AMQCPP_API BackupTransport : public DefaultTransportListener {
     private:
 
-        // The parent Failover Transport
-        FailoverTransport* failover;
+        // The parent of this Backup
+        BackupTransportPool* parent;
 
         // The Transport this one is managing.
         Pointer<Transport> transport;
@@ -51,7 +51,7 @@
 
     public:
 
-        BackupTransport( FailoverTransport* failover );
+        BackupTransport( BackupTransportPool* failover );
 
         virtual ~BackupTransport();
 

Added: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransportPool.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransportPool.cpp?rev=758706&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransportPool.cpp (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransportPool.cpp Thu Mar 26 15:57:51 2009
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "BackupTransportPool.h"
+
+#include <memory>
+
+#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/transport/TransportRegistry.h>
+
+#include <decaf/lang/exceptions/NullPointerException.h>
+#include <decaf/lang/exceptions/IllegalStateException.h>
+
+using namespace activemq;
+using namespace activemq::threads;
+using namespace activemq::exceptions;
+using namespace activemq::transport;
+using namespace activemq::transport::failover;
+using namespace decaf;
+using namespace decaf::io;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+
+////////////////////////////////////////////////////////////////////////////////
+BackupTransportPool::BackupTransportPool( const Pointer<CompositeTaskRunner>& taskRunner,
+                                          const Pointer<CloseTransportsTask>& closeTask,
+                                          const Pointer<URIPool>& uriPool ) {
+
+    if( taskRunner == NULL ) {
+        throw NullPointerException(
+            __FILE__, __LINE__, "TaskRunner passed is NULL" );
+    }
+
+    if( uriPool == NULL ) {
+        throw NullPointerException(
+            __FILE__, __LINE__, "URIPool passed is NULL" );
+    }
+
+    if( closeTask == NULL ) {
+        throw NullPointerException(
+            __FILE__, __LINE__, "Close Transport Task passed is NULL" );
+    }
+
+    this->pending = false;
+    this->enabled = false;
+    this->backupPoolSize = 1;
+    this->uriPool = uriPool;
+    this->taskRunner = taskRunner;
+    this->closeTask = closeTask;
+
+    // Add this instance as a Task so that we can create backups when nothing else is
+    // going on.
+    this->taskRunner->addTask( this );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+BackupTransportPool::BackupTransportPool( int backupPoolSize,
+                                          const Pointer<CompositeTaskRunner>& taskRunner,
+                                          const Pointer<CloseTransportsTask>& closeTask,
+                                          const Pointer<URIPool>& uriPool ) {
+
+    if( taskRunner == NULL ) {
+        throw NullPointerException(
+            __FILE__, __LINE__, "TaskRunner passed is NULL" );
+    }
+
+    if( uriPool == NULL ) {
+        throw NullPointerException(
+            __FILE__, __LINE__, "URIPool passed is NULL" );
+    }
+
+    if( closeTask == NULL ) {
+        throw NullPointerException(
+            __FILE__, __LINE__, "Close Transport Task passed is NULL" );
+    }
+
+    this->pending = false;
+    this->enabled = false;
+    this->backupPoolSize = backupPoolSize;
+    this->uriPool = uriPool;
+    this->taskRunner = taskRunner;
+    this->closeTask = closeTask;
+
+    // Add this instance as a Task so that we can create backups when nothing else is
+    // going on.
+    this->taskRunner->addTask( this );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+BackupTransportPool::~BackupTransportPool() {
+    this->taskRunner->removeTask( this );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BackupTransportPool::setEnabled( bool value ) {
+    this->enabled = value;
+
+    if( enabled == true ) {
+        this->taskRunner->wakeup();
+    } else {
+        synchronized( &backups ) {
+            this->backups.clear();
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<BackupTransport> BackupTransportPool::getBackup() {
+
+    if( !isEnabled() ) {
+        throw IllegalStateException(
+            __FILE__, __LINE__, "The Backup Pool is not enabled." );
+    }
+
+    Pointer<BackupTransport> result;
+
+    synchronized( &backups ) {
+        if( !backups.isEmpty() ) {
+            result = backups.remove( 0 );
+        }
+    }
+
+    // Flag as pending so the task gets run again and new backups are created.
+    this->pending = true;
+    this->taskRunner->wakeup();
+
+    return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool BackupTransportPool::isPending() const {
+
+    if( this->isEnabled() ) {
+        return this->pending;
+    }
+
+    return false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool BackupTransportPool::iterate() {
+
+    StlList<URI> failures;
+
+    synchronized( &backups ) {
+
+        while( isEnabled() && (int)backups.size() < backupPoolSize ) {
+
+            URI connectTo;
+
+            // Try for a URI, if one isn't free return and indicate this task
+            // is done for now, the next time a backup is requested this task
+            // will become pending again and we will attempt to fill the pool.
+            try{
+                connectTo = uriPool->getURI();
+            } catch( NoSuchElementException& ex ) {
+                break;
+            }
+
+            Pointer<BackupTransport> backup( new BackupTransport( this ) );
+            backup->setUri( connectTo );
+
+            try{
+                Pointer<Transport> transport = createTransport( connectTo );
+                transport->setTransportListener( backup.get() );
+                transport->start();
+                backup->setTransport( transport );
+                backups.add( backup );
+            } catch(...) {
+                // Store it in the list of URIs that didn't work, once done we
+                // return those to the pool.
+                failures.add( connectTo );
+            }
+
+        }
+    }
+
+    // return all failures to the URI Pool, we can try again later.
+    uriPool->addURIs( failures );
+    this->pending = false;
+
+    return false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BackupTransportPool::onBackupTransportFailure( BackupTransport* failedTransport ) {
+
+    synchronized( &backups ) {
+
+        std::auto_ptr< Iterator< Pointer<BackupTransport> > > iter( backups.iterator() );
+
+        while( iter->hasNext() ) {
+            if( iter->next() == failedTransport ) {
+                iter->remove();
+            }
+
+            this->uriPool->addURI( failedTransport->getUri() );
+            this->closeTask->add( failedTransport->getTransport() );
+            this->taskRunner->wakeup();
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Transport> BackupTransportPool::createTransport( const URI& location ) const
+    throw ( decaf::io::IOException ) {
+
+    try{
+
+        TransportFactory* factory =
+            TransportRegistry::getInstance().findFactory( location.getScheme() );
+
+        if( factory == NULL ) {
+            throw new IOException(
+                __FILE__, __LINE__, "Invalid URI specified, no valid Factory Found.");
+        }
+
+        Pointer<Transport> transport( factory->createComposite( location ) );
+
+        return transport;
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}

Propchange: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransportPool.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransportPool.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransportPool.h?rev=758706&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransportPool.h (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransportPool.h Thu Mar 26 15:57:51 2009
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TRANSPORT_FAILOVER_BACKUPTRANSPORTPOOL_H_
+#define _ACTIVEMQ_TRANSPORT_FAILOVER_BACKUPTRANSPORTPOOL_H_
+
+#include <activemq/util/Config.h>
+#include <activemq/threads/CompositeTask.h>
+#include <activemq/threads/CompositeTaskRunner.h>
+#include <activemq/transport/failover/CloseTransportsTask.h>
+#include <activemq/transport/failover/BackupTransport.h>
+#include <activemq/transport/failover/URIPool.h>
+
+#include <decaf/lang/Pointer.h>
+#include <decaf/io/IOException.h>
+#include <decaf/util/StlList.h>
+
+namespace activemq {
+namespace transport {
+namespace failover {
+
+    using decaf::lang::Pointer;
+    using decaf::util::StlList;
+    using activemq::threads::CompositeTaskRunner;
+
+    class AMQCPP_API BackupTransportPool : public activemq::threads::CompositeTask {
+    private:
+
+        friend class BackupTransport;
+
+        mutable StlList< Pointer<BackupTransport> > backups;
+        Pointer<CompositeTaskRunner> taskRunner;
+        Pointer<CloseTransportsTask> closeTask;
+        Pointer<URIPool> uriPool;
+        volatile int backupPoolSize;
+        volatile bool enabled;
+        volatile bool pending;
+
+    public:
+
+        BackupTransportPool( const Pointer<CompositeTaskRunner>& taskRunner,
+                             const Pointer<CloseTransportsTask>& closeTask,
+                             const Pointer<URIPool>& uriPool );
+
+        BackupTransportPool( int backupPoolSize,
+                             const Pointer<CompositeTaskRunner>& taskRunner,
+                             const Pointer<CloseTransportsTask>& closeTask,
+                             const Pointer<URIPool>& uriPool );
+
+        virtual ~BackupTransportPool();
+
+        /**
+         * Return true if we don't currently have enough Connected Transports
+         */
+        virtual bool isPending() const;
+
+        /**
+         * Get a Connected Transport from the pool of Backups if any are present,
+         * otherwise it return a NULL Pointer.
+         *
+         * @return Pointer to a Connected Transport or NULL
+         */
+        Pointer<BackupTransport> getBackup();
+
+        /**
+         * Connect to a Backup Broker if we haven't already connected to the max
+         * number of Backups.
+         */
+        virtual bool iterate();
+
+        /**
+         * Gets the Max number of Backups this Task will create.
+         * @return the max number of active BackupTransports that will be created.
+         */
+        int getBackupPoolSize() const {
+            return this->backupPoolSize;
+        }
+
+        /**
+         * Sets the Max number of Backups this Task will create.
+         * @param size - the max number of active BackupTransports that will be created.
+         */
+        void setBackupPoolSize( int size ) {
+            this->backupPoolSize = size;
+        }
+
+        /**
+         * Gets if the backup Transport Pool has been enabled or not, when not enabled
+         * no backups are created and any that were are destroyed.
+         *
+         * @return true if enable.
+         */
+        bool isEnabled() const {
+            return this->enabled;
+        }
+
+        /**
+         * Sets if this Backup Transport Pool is enabled.  When not enabled no Backups
+         * are created and any that were are destroyed.
+         *
+         * @param value - true to enable backup creation, false to disable.
+         */
+        void setEnabled( bool value );
+
+    private:
+
+        // The backups report their failure to the pool, the pool removes them
+        // from the list and returns their URIs to the URIPool, and then adds
+        // the internal transport to the close transport's task for cleanup.
+        void onBackupTransportFailure( BackupTransport* failedTransport );
+
+        Pointer<Transport> createTransport( const URI& location ) const
+            throw ( decaf::io::IOException );
+
+    };
+
+}}}
+
+#endif /*_ACTIVEMQ_TRANSPORT_FAILOVER_BACKUPTRANSPORTPOOL_H_*/

Propchange: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransportPool.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp?rev=758706&r1=758705&r2=758706&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp Thu Mar 26 15:57:51 2009
@@ -49,13 +49,10 @@
     this->maxReconnectDelay = 1000 * 30;
     this->backOffMultiplier = 2;
     this->useExponentialBackOff = true;
-    this->randomize = true;
     this->initialized = false;
     this->maxReconnectAttempts = 0;
     this->connectFailures = 0;
     this->reconnectDelay = this->initialReconnectDelay;
-    this->backup = false;
-    this->backupPoolSize = 1;
     this->trackMessages = false;
     this->maxCacheSize = 128 * 1024;
 
@@ -64,14 +61,15 @@
     this->connected = false;
 
     this->transportListener = NULL;
+    this->uris.reset( new URIPool() );
     this->stateTracker.setTrackTransactions( true );
     this->myTransportListener.reset( new FailoverTransportListener( this ) );
-    this->reconnectTask.reset( new ReconnectTask( this ) );
     this->closeTask.reset( new CloseTransportsTask() );
-    this->taskRunner.reset( new DedicatedTaskRunner( reconnectTask.get() ) );
-    this->compositeTaskRunner.reset( new CompositeTaskRunner() );
+    this->taskRunner.reset( new CompositeTaskRunner() );
+    this->backups.reset( new BackupTransportPool( taskRunner, closeTask, uris ) );
 
-    this->compositeTaskRunner->addTask( this->closeTask.get() );
+    this->taskRunner->addTask( this );
+    this->taskRunner->addTask( this->closeTask.get() );
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -100,10 +98,7 @@
 void FailoverTransport::add( const std::string& uri ) {
 
     try {
-        URI newUri( uri );
-        if( !uris.contains( newUri ) ) {
-            uris.add( newUri );
-        }
+        uris->addURI( URI( uri ) );
 
         reconnect();
     }
@@ -113,12 +108,10 @@
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::addURI( const List<URI>& uris ) {
 
-    synchronized( &this->uris ) {
-        std::auto_ptr< Iterator<URI> > iter( uris.iterator() );
+    std::auto_ptr< Iterator<URI> > iter( uris.iterator() );
 
-        while( iter->hasNext() ) {
-            this->uris.add( iter->next() );
-        }
+    while( iter->hasNext() ) {
+        this->uris->addURI( iter->next() );
     }
 
     reconnect();
@@ -127,12 +120,10 @@
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::removeURI( const List<URI>& uris ) {
 
-    synchronized( &this->uris ) {
-        std::auto_ptr< Iterator<URI> > iter( uris.iterator() );
+    std::auto_ptr< Iterator<URI> > iter( uris.iterator() );
 
-        while( iter->hasNext() ) {
-            this->uris.remove( iter->next() );
-        }
+    while( iter->hasNext() ) {
+        this->uris->removeURI( iter->next() );
     }
 
     reconnect();
@@ -144,9 +135,7 @@
 
     try {
 
-        if( !uris.contains( uri ) ) {
-            uris.add( uri );
-        }
+        this->uris->addURI( uri );
 
         reconnect();
     }
@@ -156,36 +145,6 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-StlList<URI> FailoverTransport::getConnectList() const {
-
-    StlList<URI> result( uris );
-    bool removed = false;
-
-    if( failedConnectTransportURI != NULL ) {
-        removed = result.remove( *failedConnectTransportURI );
-    }
-
-    if( randomize ) {
-        // Randomly, reorder the list by random swapping
-        Random rand;
-        rand.setSeed( decaf::lang::System::currentTimeMillis() );
-
-        for( std::size_t i = 0; i < result.size(); i++ ) {
-            int p = rand.nextInt( (int)result.size() );
-            URI temp = result.get( p );
-            result.set( p, result.get( i ) );
-            result.set( i, temp );
-        }
-    }
-
-    if( removed ) {
-        result.add( *failedConnectTransportURI );
-    }
-
-    return result;
-}
-
-////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::setTransportListener( TransportListener* listener ) {
     synchronized( &listenerMutex ) {
         this->transportListener = listener;
@@ -195,16 +154,17 @@
 
 ////////////////////////////////////////////////////////////////////////////////
 std::string FailoverTransport::getRemoteAddress() const {
-    if( connectedTransport != NULL ) {
-        return connectedTransport->getRemoteAddress();
+    synchronized( &reconnectMutex ) {
+        if( connectedTransport != NULL ) {
+            return connectedTransport->getRemoteAddress();
+        }
     }
     return "";
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::oneway( const Pointer<Command>& command )
-    throw( IOException,
-           decaf::lang::exceptions::UnsupportedOperationException ) {
+    throw( IOException, decaf::lang::exceptions::UnsupportedOperationException ) {
 
     Pointer<Exception> error;
 
@@ -385,17 +345,11 @@
         closed = true;
         connected = false;
 
-        std::auto_ptr< Iterator< Pointer<BackupTransport> > > iter( backups.iterator() );
-        while( iter->hasNext() ) {
-            iter->next()->setClosed( true );
-        }
-
-        backups.clear();
+        backups->setEnabled( false );
         requestMap.clear();
 
         if( connectedTransport != NULL ) {
-            transportToStop = connectedTransport;
-            connectedTransport.reset( NULL );
+            transportToStop.swap( connectedTransport );
         }
 
         reconnectMutex.notifyAll();
@@ -405,8 +359,7 @@
         sleepMutex.notifyAll();
     }
 
-    taskRunner->shutdown( 1000 );
-    compositeTaskRunner->shutdown( 1000 );
+    taskRunner->shutdown( 2000 );
 
     if( transportToStop != NULL ) {
         transportToStop->close();
@@ -457,7 +410,9 @@
     throw( decaf::lang::Exception ) {
 
     Pointer<Transport> transport;
-    connectedTransport.swap( transport );
+    synchronized( &reconnectMutex ) {
+        connectedTransport.swap( transport );
+    }
 
     if( transport != NULL ) {
 
@@ -467,16 +422,16 @@
 
         // Hand off to the close task so it gets done in a different thread.
         closeTask->add( transport );
-        compositeTaskRunner->wakeup();
+        taskRunner->wakeup();
 
         synchronized( &reconnectMutex ) {
-            bool reconnectOk = started;
 
             initialized = false;
-            failedConnectTransportURI = connectedTransportURI;
+            uris->addURI( *connectedTransportURI );
             connectedTransportURI.reset( NULL );
             connected = false;
-            if( reconnectOk ) {
+
+            if( started ) {
                 taskRunner->wakeup();
             }
         }
@@ -488,7 +443,20 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-bool FailoverTransport::doReconnect() {
+bool FailoverTransport::isPending() const {
+    bool result = false;
+
+    synchronized( &reconnectMutex ) {
+        if( this->connectedTransport == NULL && !closed && started ) {
+            result = true;
+        }
+    }
+
+    return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::iterate() {
 
     Pointer<Exception> failure;
 
@@ -501,86 +469,88 @@
         if( connectedTransport != NULL || closed || connectionFailure != NULL ) {
             return false;
         } else {
-            StlList<URI> connectList = getConnectList();
-            if( connectList.isEmpty() ) {
-                failure.reset( new IOException(
-                    __FILE__, __LINE__, "No uris available to connect to." ) );
-            } else {
 
-                if( !useExponentialBackOff ) {
-                    reconnectDelay = initialReconnectDelay;
-                }
+            StlList<URI> failures;
+            Pointer<Transport> transport;
+            URI uri;
 
-                synchronized( &backupMutex ) {
+            if( !useExponentialBackOff ) {
+                reconnectDelay = initialReconnectDelay;
+            }
 
-                    if( backup && !backups.isEmpty() ) {
+            if( backups->isEnabled() ) {
 
-                        Pointer<BackupTransport> backup = backups.remove( 0 );
-                        Pointer<Transport> transport = backup->getTransport();
-                        URI uri = backup->getUri();
-                        transport->setTransportListener( myTransportListener.get() );
+                Pointer<BackupTransport> backupTransport = backups->getBackup();
 
-                        try {
+                if( backupTransport != NULL ) {
 
-                            if( started ) {
-                                restoreTransport( transport );
-                            }
+                    transport = backupTransport->getTransport();
+                    uri = backupTransport->getUri();
+                    transport->setTransportListener( myTransportListener.get() );
 
-                            reconnectDelay = initialReconnectDelay;
-                            failedConnectTransportURI.reset( NULL );
-                            connectedTransportURI.reset( new URI( uri ) );
-                            connectedTransport = transport;
-                            reconnectMutex.notifyAll();
-                            connectFailures = 0;
+                    try {
 
-                            return false;
+                        if( started ) {
+                            restoreTransport( transport );
                         }
-                        AMQ_CATCH_NOTHROW( Exception )
-                        AMQ_CATCHALL_NOTHROW()
+
+                    } catch( Exception& e ) {
+                        transport.reset( NULL );
+                        this->uris->addURI( uri );
                     }
                 }
+            }
 
-                std::auto_ptr< Iterator<URI> > iter( connectList.iterator() );
-
-                while( iter->hasNext() && connectedTransport == NULL && !closed ) {
+            while( transport == NULL && !closed ) {
 
-                    URI uri = iter->next();
-                    try {
+                try{
+                    uri = uris->getURI();
+                } catch( NoSuchElementException& ex ) {
+                    break;
+                }
 
-                        Pointer<Transport> transport = createTransport( uri );
-                        transport->setTransportListener( myTransportListener.get() );
-                        transport->start();
+                try {
 
-                        if( started ) {
-                            restoreTransport( transport );
-                        }
+                    transport = createTransport( uri );
+                    transport->setTransportListener( myTransportListener.get() );
+                    transport->start();
 
-                        reconnectDelay = initialReconnectDelay;
-                        connectedTransportURI.reset( new URI( uri ) );
-                        connectedTransport = transport;
-                        reconnectMutex.notifyAll();
-                        connectFailures = 0;
-
-                        // Make sure on initial startup, that the transportListener
-                        // has been initialized for this instance.
-                        synchronized( &listenerMutex ) {
-                            if( transportListener == NULL ) {
-                                // if it isn't set after 2secs - it
-                                // probably never will be
-                                listenerMutex.wait( 2000 );
-                            }
-                        }
+                    if( started ) {
+                        restoreTransport( transport );
+                    }
 
-                        if( transportListener != NULL ) {
-                            transportListener->transportResumed();
-                        }
+                } catch( Exception& e ) {
+                    transport.reset( NULL );
+                    failures.add( uri );
+                    failure.reset( e.clone() );
+                }
+            }
 
-                        connected = true;
-                        return false;
-                    } catch( Exception& e ) {
-                        failure.reset( e.clone() );
+            // Return the failures to the pool, we will try again on the next iteration.
+            this->uris->addURIs( failures );
+
+            if( transport != NULL ) {
+                reconnectDelay = initialReconnectDelay;
+                connectedTransportURI.reset( new URI( uri ) );
+                connectedTransport = transport;
+                reconnectMutex.notifyAll();
+                connectFailures = 0;
+                connected = true;
+
+                // Make sure on initial startup, that the transportListener
+                // has been initialized for this instance.
+                synchronized( &listenerMutex ) {
+                    if( transportListener == NULL ) {
+                        // if it isn't set after 2secs - it probably never will be
+                        listenerMutex.wait( 2000 );
                     }
                 }
+
+                if( transportListener != NULL ) {
+                    transportListener->transportResumed();
+                }
+
+                return false;
             }
         }
 
@@ -634,55 +604,6 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-bool FailoverTransport::buildBackups() {
-
-    synchronized( &backupMutex ) {
-
-        if( !closed && backup && (int)backups.size() < backupPoolSize ) {
-
-            StlList<URI> connectList = getConnectList();
-
-            //removed closed backups
-            StlList< Pointer<BackupTransport> > disposedList;
-            std::auto_ptr< Iterator<Pointer<BackupTransport> > > iter( backups.iterator() );
-            while( iter->hasNext() ) {
-                Pointer<BackupTransport> backup = iter->next();
-                if( backup->isClosed() ) {
-                    disposedList.add( backup );
-                }
-            }
-
-            backups.removeAll( disposedList );
-            disposedList.clear();
-
-            std::auto_ptr< Iterator<URI> > uriIter( connectList.iterator() );
-
-            while( uriIter->hasNext() && (int)backups.size() < backupPoolSize ) {
-                URI uri = uriIter->next();
-                if( connectedTransportURI != NULL && !connectedTransportURI->equals( uri ) ) {
-                    try {
-                        Pointer<BackupTransport> backup( new BackupTransport( this ) );
-                        backup->setUri( uri );
-
-                        if( !backups.contains( backup ) ) {
-                            Pointer<Transport> transport = createTransport( uri );
-                            transport->setTransportListener( backup.get() );
-                            transport->start();
-                            backup->setTransport( transport );
-                            backups.add( backup );
-                        }
-                    }
-                    AMQ_CATCH_NOTHROW( Exception )
-                    AMQ_CATCHALL_NOTHROW()
-                }
-            }
-        }
-    }
-
-    return false;
-}
-
-////////////////////////////////////////////////////////////////////////////////
 Pointer<Transport> FailoverTransport::createTransport( const URI& location ) const
     throw ( decaf::io::IOException ) {
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h?rev=758706&r1=758705&r2=758706&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h Thu Mar 26 15:57:51 2009
@@ -25,10 +25,10 @@
 #include <activemq/threads/CompositeTaskRunner.h>
 #include <activemq/state/ConnectionStateTracker.h>
 #include <activemq/transport/CompositeTransport.h>
-#include <activemq/transport/failover/BackupTransport.h>
-#include <activemq/transport/failover/ReconnectTask.h>
+#include <activemq/transport/failover/BackupTransportPool.h>
 #include <activemq/transport/failover/CloseTransportsTask.h>
 #include <activemq/transport/failover/FailoverTransportListener.h>
+#include <activemq/transport/failover/URIPool.h>
 #include <activemq/wireformat/WireFormat.h>
 
 #include <decaf/util/StlList.h>
@@ -50,50 +50,42 @@
     using activemq::commands::Command;
     using activemq::commands::Response;
 
-    class AMQCPP_API FailoverTransport : public CompositeTransport {
+    class AMQCPP_API FailoverTransport : public CompositeTransport,
+                                         public activemq::threads::CompositeTask {
     private:
 
         friend class FailoverTransportListener;
-        friend class ReconnectTask;
 
         bool closed;
         bool connected;
         bool started;
 
-        decaf::util::StlList<URI> uris;
-
         long long timeout;
         long long initialReconnectDelay;
         long long maxReconnectDelay;
         long long backOffMultiplier;
         bool useExponentialBackOff;
-        bool randomize;
         bool initialized;
         int maxReconnectAttempts;
         int connectFailures;
         long long reconnectDelay;
-        bool backup;
-        int backupPoolSize;
         bool trackMessages;
         int maxCacheSize;
 
-        decaf::util::StlList< Pointer<BackupTransport> > backups;
+        mutable decaf::util::concurrent::Mutex reconnectMutex;
+        mutable decaf::util::concurrent::Mutex sleepMutex;
+        mutable decaf::util::concurrent::Mutex listenerMutex;
 
         state::ConnectionStateTracker stateTracker;
-        decaf::util::concurrent::Mutex reconnectMutex;
-        decaf::util::concurrent::Mutex backupMutex;
-        decaf::util::concurrent::Mutex sleepMutex;
-        decaf::util::concurrent::Mutex listenerMutex;
         decaf::util::StlMap<int, Pointer<Command> > requestMap;
 
+        Pointer<URIPool> uris;
         Pointer<URI> connectedTransportURI;
-        Pointer<URI> failedConnectTransportURI;
         Pointer<Transport> connectedTransport;
         Pointer<Exception> connectionFailure;
-        Pointer<ReconnectTask> reconnectTask;
+        Pointer<BackupTransportPool> backups;
         Pointer<CloseTransportsTask> closeTask;
-        Pointer<TaskRunner> taskRunner;
-        Pointer<CompositeTaskRunner> compositeTaskRunner;
+        Pointer<CompositeTaskRunner> taskRunner;
         Pointer<TransportListener> disposedListener;
         Pointer<TransportListener> myTransportListener;
         TransportListener* transportListener;
@@ -203,8 +195,7 @@
          * Sets the WireFormat instance to use.
          * @param WireFormat the object used to encode / decode commands.
          */
-        virtual void setWireFormat( const Pointer<wireformat::WireFormat>& wireFormat AMQCPP_UNUSED ) {
-        }
+        virtual void setWireFormat( const Pointer<wireformat::WireFormat>& wireFormat AMQCPP_UNUSED ) {}
 
         /**
          * Sets the observer of asynchronous events from this transport.
@@ -283,6 +274,21 @@
         virtual std::string getRemoteAddress() const;
 
         /**
+         * @returns true if there is a need for the iterate method to be called by this
+         *          classes task runner.
+         */
+        virtual bool isPending() const;
+
+        /**
+         * Performs the actual Reconnect operation for the FailoverTransport, when a
+         * connection is made this method returns false to indicate it doesn't need to
+         * run again, otherwise it returns true to indicate its still trying to connect.
+         *
+         * @return false to indicate a connection, true to indicate it needs to try again.
+         */
+        virtual bool iterate();
+
+        /**
          * reconnect to another location
          * @param uri
          * @throws IOException on failure of if not supported
@@ -333,11 +339,11 @@
         }
 
         bool isRandomize() const {
-            return this->randomize;
+            return this->uris->isRandomize();
         }
 
         void setRandomize( bool value ) {
-            this->randomize = value;
+            this->uris->setRandomize( value );
         }
 
         int getMaxReconnectAttempts() const {
@@ -357,19 +363,19 @@
         }
 
         bool isBackup() const {
-            return this->backup;
+            return this->backups->isEnabled();
         }
 
         void setBackup( bool value ) {
-            this->backup = value;
+            this->backups->setEnabled( value );
         }
 
         int getBackupPoolSize() const {
-            return this->backupPoolSize;
+            return this->backups->getBackupPoolSize();
         }
 
         void setBackupPoolSize( int value ) {
-            this->backupPoolSize = value;
+            this->backups->setBackupPoolSize( value );
         }
 
         bool isTrackMessages() const {
@@ -413,31 +419,12 @@
     private:
 
         /**
-         * Returns a set of URIs that this Transport is to connect to, applying a
-         * random swapping from the class stored list of URIs if the randomize flag
-         * is enabled, otherwise just return the original list.
-         *
-         * @returns a Set of URI object that this Transport iterates over to connect.
-         */
-        decaf::util::StlList<URI> getConnectList() const;
-
-        /**
          * @return Returns true if the command is one sent when a connection
          * is being closed.
          */
         bool isShutdownCommand( const Pointer<Command>& command ) const;
 
         /**
-         * Performs the actual Reconnect operation.
-         */
-        bool doReconnect();
-
-        /**
-         * Builds a set of Backup Transports for fast Failover.
-         */
-        bool buildBackups();
-
-        /**
          * Looks up the correct Factory and create a new Composite version of the
          * Transport requested.
          *

Added: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/URIPool.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/URIPool.cpp?rev=758706&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/URIPool.cpp (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/URIPool.cpp Thu Mar 26 15:57:51 2009
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "URIPool.h"
+
+#include <memory>
+#include <decaf/util/Random.h>
+#include <decaf/lang/System.h>
+
+using namespace activemq;
+using namespace activemq::transport;
+using namespace activemq::transport::failover;
+using namespace decaf;
+using namespace decaf::net;
+using namespace decaf::util;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+URIPool::URIPool() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+URIPool::URIPool( const decaf::util::List<URI>& uris ) {
+    this->uriPool.copy( uris );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+URIPool::~URIPool() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+URI URIPool::getURI() throw ( decaf::lang::exceptions::NoSuchElementException ) {
+
+    synchronized( &uriPool ) {
+        if( uriPool.isEmpty() ) {
+            throw NoSuchElementException(
+                __FILE__, __LINE__, "URI Pool is currently empty." );
+        }
+
+        int index = 0;  // Take the first one in the list unless random is on.
+
+        if( isRandomize() ) {
+
+            Random rand;
+            rand.setSeed( decaf::lang::System::currentTimeMillis() );
+            index = rand.nextInt( (int)uriPool.size() );
+        }
+
+        return uriPool.remove( index );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void URIPool::addURI( const URI& uri ) {
+
+    synchronized( &uriPool ) {
+        if( !uriPool.contains( uri ) ) {
+            uriPool.add( uri );
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void URIPool::addURIs( const StlList<URI>& uris ) {
+
+    synchronized( &uriPool ) {
+
+        std::auto_ptr< Iterator<URI> > iter( uris.iterator() );
+
+        while( iter->hasNext() ) {
+            URI uri = iter->next();
+
+            if( !uriPool.contains( uri ) ) {
+                uriPool.add( uri );
+            }
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void URIPool::removeURI( const URI& uri ) {
+
+    synchronized( &uriPool ) {
+        if( uriPool.contains( uri ) ) {
+            uriPool.remove( uri );
+        }
+    }
+}

Propchange: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/URIPool.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/URIPool.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/URIPool.h?rev=758706&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/URIPool.h (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/URIPool.h Thu Mar 26 15:57:51 2009
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TRANSPORT_FAILOVER_URIPOOL_H_
+#define _ACTIVEMQ_TRANSPORT_FAILOVER_URIPOOL_H_
+
+#include <activemq/util/Config.h>
+
+#include <decaf/net/URI.h>
+#include <decaf/util/StlList.h>
+#include <decaf/lang/exceptions/NoSuchElementException.h>
+
+namespace activemq {
+namespace transport {
+namespace failover {
+
+    using decaf::util::StlList;
+    using decaf::net::URI;
+
+    class AMQCPP_API URIPool {
+    private:
+
+        StlList<URI> uriPool;
+        bool randomize;
+
+    public:
+
+        /**
+         * Create an Empty URI Pool.
+         */
+        URIPool();
+
+        /**
+         * Creates a new URI Pool using the given list as the initial Free List.
+         *
+         * @param uris - List of URI to place in the Pool.
+         */
+        URIPool( const decaf::util::List<URI>& uris );
+
+        virtual ~URIPool();
+
+        /**
+         * Fetches the next available URI from the pool, if there are no more
+         * URIs free when this method is called it throws a NoSuchElementException.
+         * Receiving the exception is not an indication that a URI won't be available
+         * in the future, the caller should react accordingly.
+         *
+         * @return the next free URI in the Pool.
+         * @throw NoSuchElementException if there are none free currently.
+         */
+        URI getURI() throw ( decaf::lang::exceptions::NoSuchElementException );
+
+        /**
+         * Adds a URI to the free list, callers that have previously taken one using
+         * the <code>getURI</code> method should always return the URI when they close
+         * the resource that was connected to that URI.
+         *
+         * @param uri - a URI previously taken from the pool.
+         */
+        void addURI( const URI& uri );
+
+        /**
+         * Adds a List of URIs to this Pool, the method checks for duplicates already
+         * in the pool and does not add those.
+         *
+         * @param uris - List of URIs to add into the Pool.
+         */
+        void addURIs( const StlList<URI>& uris );
+
+        /**
+         * Remove a given URI from the Free List.
+         * @param uri - the URI to find and remove from the free list
+         */
+        void removeURI( const URI& uri );
+
+        /**
+         * Is the URI that is given randomly picked from the pool or is
+         * each one taken in sequence.
+         *
+         * @return true if URI gets are random.
+         */
+        bool isRandomize() const {
+            return this->randomize;
+        }
+
+        /**
+         * Sets if the URI's that are taken from the pool are chosen Randomly or
+         * are taken in the order they are in the list.
+         *
+         * @param value - true indicates URI gets are random.
+         */
+        void setRandomize( bool value ) {
+            this->randomize = value;
+        }
+
+    };
+
+}}}
+
+#endif /*_ACTIVEMQ_TRANSPORT_FAILOVER_URIPOOL_H_*/

Propchange: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/URIPool.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-cpp/trunk/src/test/activemq/transport/failover/FailoverTransportTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/transport/failover/FailoverTransportTest.cpp?rev=758706&r1=758705&r2=758706&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/transport/failover/FailoverTransportTest.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/transport/failover/FailoverTransportTest.cpp Thu Mar 26 15:57:51 2009
@@ -63,6 +63,10 @@
     CPPUNIT_ASSERT( failover->isRandomize() == false );
 
     transport->start();
+
+    Thread::sleep( 1000 );
+    CPPUNIT_ASSERT( failover->isConnected() == true );
+
     transport->close();
 }
 
@@ -85,9 +89,11 @@
     CPPUNIT_ASSERT( failover->isRandomize() == false );
     CPPUNIT_ASSERT( failover->isBackup() == true );
 
+    transport->start();
+
     Thread::sleep( 1000 );
+    CPPUNIT_ASSERT( failover->isConnected() == true );
 
-    transport->start();
     transport->close();
 }
 
@@ -125,9 +131,10 @@
 
     transport->start();
 
-    Thread::sleep( 2000 );
+    Thread::sleep( 1000 );
 
     CPPUNIT_ASSERT( listener.caughtException == true );
+    CPPUNIT_ASSERT( failover->isConnected() == false );
 
     transport->close();
 }
@@ -186,9 +193,11 @@
     CPPUNIT_ASSERT( failover->isRandomize() == false );
     CPPUNIT_ASSERT( failover->isBackup() == true );
 
+    transport->start();
+
     Thread::sleep( 2000 );
+    CPPUNIT_ASSERT( failover->isConnected() == true );
 
-    transport->start();
     transport->close();
 }
 
@@ -229,6 +238,9 @@
 
     transport->start();
 
+    Thread::sleep( 1000 );
+    CPPUNIT_ASSERT( failover->isConnected() == true );
+
     MockTransport* mock = NULL;
     while( mock == NULL ) {
         mock = dynamic_cast<MockTransport*>( transport->narrow( typeid( MockTransport ) ) );
@@ -269,6 +281,9 @@
 
     transport->start();
 
+    Thread::sleep( 1000 );
+    CPPUNIT_ASSERT( failover->isConnected() == true );
+
     MockTransport* mock = NULL;
     while( mock == NULL ) {
         mock = dynamic_cast<MockTransport*>( transport->narrow( typeid( MockTransport ) ) );
@@ -311,6 +326,9 @@
 
     transport->start();
 
+    Thread::sleep( 1000 );
+    CPPUNIT_ASSERT( failover->isConnected() == true );
+
     MockTransport* mock = NULL;
     while( mock == NULL ) {
         mock = dynamic_cast<MockTransport*>( transport->narrow( typeid( MockTransport ) ) );
@@ -353,6 +371,9 @@
 
     transport->start();
 
+    Thread::sleep( 1000 );
+    CPPUNIT_ASSERT( failover->isConnected() == true );
+
     MockTransport* mock = NULL;
     while( mock == NULL ) {
         mock = dynamic_cast<MockTransport*>( transport->narrow( typeid( MockTransport ) ) );
@@ -390,6 +411,9 @@
 
     transport->start();
 
+    Thread::sleep( 1000 );
+    CPPUNIT_ASSERT( failover->isConnected() == true );
+
     Pointer<ConnectionInfo> connection = createConnection();
     transport->request( connection );
     Pointer<SessionInfo> session1 = createSession( connection );