You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/03/02 22:21:41 UTC

svn commit: r749440 - in /activemq/activemq-cpp/trunk/src: main/ main/activemq/core/ main/activemq/state/ main/activemq/transport/ main/activemq/transport/failover/ main/activemq/transport/mock/ test/activemq/transport/ test/activemq/transport/correlator/

Author: tabish
Date: Mon Mar  2 21:21:40 2009
New Revision: 749440

URL: http://svn.apache.org/viewvc?rev=749440&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQCPP-100

A mostly finished implementation of the Failover Transport, lacking only a Factory that can parse the composite URI to build the list of Brokers to attempt to connect to.  The Transport is not tested yet.  

Added:
    activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportListener.cpp   (with props)
    activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportListener.h   (with props)
    activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.cpp   (with props)
    activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.h   (with props)
Modified:
    activemq/activemq-cpp/trunk/src/main/Makefile.am
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h
    activemq/activemq-cpp/trunk/src/main/activemq/state/Tracked.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/state/Tracked.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/CompositeTransport.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/DefaultTransportListener.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportListener.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportRegistry.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.h
    activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp
    activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp

Modified: activemq/activemq-cpp/trunk/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/Makefile.am?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/src/main/Makefile.am Mon Mar  2 21:21:40 2009
@@ -51,7 +51,9 @@
     activemq/state/TransactionState.cpp \
     activemq/state/Tracked.cpp \
     activemq/transport/failover/BackupTransport.cpp \
+    activemq/transport/failover/ReconnectTask.cpp \
     activemq/transport/failover/FailoverTransport.cpp \
+    activemq/transport/failover/FailoverTransportListener.cpp \
     activemq/transport/failover/FailoverTransportFactory.cpp \
     activemq/transport/TransportFilter.cpp \
     activemq/transport/TransportRegistry.cpp \
@@ -206,8 +208,10 @@
     activemq/transport/TransportFactory.h \
     activemq/transport/TransportRegistry.h \
     activemq/transport/failover/BackupTransport.h \
+    activemq/transport/failover/ReconnectTask.h \
     activemq/transport/failover/FailoverTransport.h \
     activemq/transport/failover/FailoverTransportFactory.h \
+    activemq/transport/failover/FailoverTransportListener.h \
     activemq/transport/mock/MockTransport.h \
     activemq/transport/mock/MockTransportFactory.h \
     activemq/transport/correlator/FutureResponse.h \

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp Mon Mar  2 21:21:40 2009
@@ -521,8 +521,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::onTransportException( transport::Transport* source AMQCPP_UNUSED,
-                                               const decaf::lang::Exception& ex ) {
+void ActiveMQConnection::onException( const decaf::lang::Exception& ex ) {
 
     try {
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h Mon Mar  2 21:21:40 2009
@@ -331,8 +331,7 @@
          * @param source The source of the exception
          * @param ex The exception.
          */
-        virtual void onTransportException( transport::Transport* source,
-                                           const decaf::lang::Exception& ex );
+        virtual void onException( const decaf::lang::Exception& ex );
 
     public:
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/state/Tracked.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/state/Tracked.cpp?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/state/Tracked.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/state/Tracked.cpp Mon Mar  2 21:21:40 2009
@@ -30,7 +30,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void Tracked::onResponses() {
+void Tracked::onResponse() {
 
     try {
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/state/Tracked.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/state/Tracked.h?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/state/Tracked.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/state/Tracked.h Mon Mar  2 21:21:40 2009
@@ -40,7 +40,7 @@
 
         virtual ~Tracked() {}
 
-        void onResponses();
+        void onResponse();
 
         bool isWaitingForResponse() const {
             return runnable != NULL;

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/CompositeTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/CompositeTransport.h?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/CompositeTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/CompositeTransport.h Mon Mar  2 21:21:40 2009
@@ -20,10 +20,14 @@
 
 #include <activemq/transport/Transport.h>
 #include <decaf/net/URI.h>
+#include <decaf/util/List.h>
 
 namespace activemq {
 namespace transport {
 
+    using decaf::util::List;
+    using decaf::net::URI;
+
     /**
      * A Composite Transport is a Transport implementation that is composed of several
      * Transports.  The composition could be such that only one Transport exists for
@@ -44,7 +48,7 @@
          * @param uri
          *        The new URI to add to the set this composite maintains.
          */
-        virtual void addURI( const decaf::net::URI& uri ) = 0;
+        virtual void addURI( const List<URI>& uris ) = 0;
 
         /**
          * Remove a URI from the set of URI's that represents the set of Transports
@@ -55,7 +59,7 @@
          * @param uri
          *        The new URI to remove to the set this composite maintains.
          */
-        virtual void removeURI( const decaf::net::URI& uri ) = 0;
+        virtual void removeURI( const List<URI>& uris ) = 0;
 
     };
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/DefaultTransportListener.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/DefaultTransportListener.h?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/DefaultTransportListener.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/DefaultTransportListener.h Mon Mar  2 21:21:40 2009
@@ -47,11 +47,9 @@
         /**
          * Event handler for an exception from a command transport.
          *
-         * @param source The source of the exception
          * @param ex The exception.
          */
-        virtual void onTransportException( Transport* source AMQCPP_UNUSED,
-                                           const decaf::lang::Exception& ex AMQCPP_UNUSED ) {}
+        virtual void onException( const decaf::lang::Exception& ex AMQCPP_UNUSED ) {}
 
         /**
          * The transport has suffered an interruption from which it hopes to recover

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp Mon Mar  2 21:21:40 2009
@@ -70,7 +70,7 @@
     if( this->listener != NULL && !this->closed ){
 
         try{
-            this->listener->onTransportException( this, ex );
+            this->listener->onException( ex );
         }catch( ... ){}
     }
 }

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h Mon Mar  2 21:21:40 2009
@@ -252,6 +252,21 @@
             return this->closed;
         }
 
+        /**
+         * @return the remote address for this connection
+         */
+        virtual std::string getRemoteAddress() const {
+            return "";
+        }
+
+        /**
+         * reconnect to another location
+         * @param uri
+         * @throws IOException on failure of if not supported
+         */
+        virtual void reconnect( const decaf::net::URI& uri )
+            throw( decaf::io::IOException ) {}
+
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h Mon Mar  2 21:21:40 2009
@@ -20,6 +20,7 @@
 
 #include <decaf/io/InputStream.h>
 #include <decaf/io/OutputStream.h>
+#include <decaf/net/URI.h>
 #include <decaf/lang/Pointer.h>
 #include <decaf/lang/exceptions/UnsupportedOperationException.h>
 #include <activemq/util/Config.h>
@@ -144,6 +145,19 @@
          */
         virtual bool isClosed() const = 0;
 
+        /**
+         * @return the remote address for this connection
+         */
+        virtual std::string getRemoteAddress() const = 0;
+
+        /**
+         * reconnect to another location
+         * @param uri
+         * @throws IOException on failure of if not supported
+         */
+        virtual void reconnect( const decaf::net::URI& uri )
+            throw( decaf::io::IOException ) = 0;
+
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.cpp?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.cpp Mon Mar  2 21:21:40 2009
@@ -16,11 +16,12 @@
  */
 
 #include "TransportFilter.h"
-#include <activemq/util/Config.h>
+#include <decaf/io/IOException.h>
 
 using namespace activemq;
 using namespace activemq::transport;
 using namespace decaf::lang;
+using namespace decaf::io;
 
 ////////////////////////////////////////////////////////////////////////////////
 TransportFilter::TransportFilter( const Pointer<Transport>& next ) :
@@ -31,8 +32,19 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void TransportFilter::onTransportException( Transport* source AMQCPP_UNUSED,
-                                            const decaf::lang::Exception& ex ) {
+void TransportFilter::onException( const decaf::lang::Exception& ex ) {
 
     fire( ex );
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void TransportFilter::reconnect( const decaf::net::URI& uri )
+    throw( decaf::io::IOException ) {
+
+    try{
+        next->reconnect( uri );
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h Mon Mar  2 21:21:40 2009
@@ -62,7 +62,7 @@
 
             if( listener != NULL ){
                 try{
-                    listener->onTransportException( this, ex );
+                    listener->onException( ex );
                 }catch( ... ){}
             }
         }
@@ -103,8 +103,7 @@
          * @param source The source of the exception
          * @param ex The exception.
          */
-        virtual void onTransportException( Transport* source,
-                                           const decaf::lang::Exception& ex );
+        virtual void onException( const decaf::lang::Exception& ex );
 
         /**
          * The transport has suffered an interruption from which it hopes to recover
@@ -250,6 +249,21 @@
             return next->isClosed();
         }
 
+        /**
+         * @return the remote address for this connection
+         */
+        virtual std::string getRemoteAddress() const {
+            return next->getRemoteAddress();
+        }
+
+        /**
+         * reconnect to another location
+         * @param uri
+         * @throws IOException on failure of if not supported
+         */
+        virtual void reconnect( const decaf::net::URI& uri )
+            throw( decaf::io::IOException );
+
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportListener.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportListener.h?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportListener.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportListener.h Mon Mar  2 21:21:40 2009
@@ -56,8 +56,7 @@
          * @param source The source of the exception
          * @param ex The exception.
          */
-        virtual void onTransportException( Transport* source,
-                                           const decaf::lang::Exception& ex ) = 0;
+        virtual void onException( const decaf::lang::Exception& ex ) = 0;
 
         /**
          * The transport has suffered an interruption from which it hopes to recover

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportRegistry.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportRegistry.h?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportRegistry.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportRegistry.h Mon Mar  2 21:21:40 2009
@@ -109,6 +109,8 @@
          */
         std::vector<std::string> getTransportNames() const;
 
+    public:  // Static methods
+
         /**
          * Gets the single instance of the TransportRegistry
          * @return reference to the single instance of this Registry

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.h?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.h Mon Mar  2 21:21:40 2009
@@ -59,14 +59,14 @@
          * Gets the URI assigned to this Backup
          * @return the assigned URI
          */
-        decaf::net::URI getURI() const {
+        decaf::net::URI getUri() const {
             return this->uri;
         }
 
         /**
          * Sets the URI assigned to this Transport.
          */
-        void setURI( const decaf::net::URI& uri ) {
+        void setUri( const decaf::net::URI& uri ) {
             this->uri = uri;
         }
 
@@ -110,6 +110,14 @@
             return this->closed;
         }
 
+        /**
+         * Sets the closed flag on this Transport.
+         * @param value - true for closed.
+         */
+        void setClosed( bool value ) {
+            this->closed = value;
+        }
+
     };
 
 }}}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp Mon Mar  2 21:21:40 2009
@@ -17,9 +17,27 @@
 
 #include "FailoverTransport.h"
 
+#include <activemq/commands/ConnectionControl.h>
+#include <activemq/commands/ShutdownInfo.h>
+#include <activemq/commands/RemoveInfo.h>
+#include <activemq/transport/TransportRegistry.h>
+#include <decaf/util/Random.h>
+#include <decaf/lang/System.h>
+#include <decaf/lang/Integer.h>
+
+using namespace std;
 using namespace activemq;
+using namespace activemq::state;
+using namespace activemq::commands;
+using namespace activemq::exceptions;
 using namespace activemq::transport;
 using namespace activemq::transport::failover;
+using namespace decaf;
+using namespace decaf::io;
+using namespace decaf::net;
+using namespace decaf::util;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
 FailoverTransport::FailoverTransport() {
@@ -39,26 +57,126 @@
     this->backupPoolSize = 1;
     this->trackMessages = false;
     this->maxCacheSize = 128 * 1024;
+
+    this->myTransportListener.reset( new FailoverTransportListener( this ) );
+    this->reconnectTask.reset( new ReconnectTask( this ) );
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 FailoverTransport::~FailoverTransport() {
+    try{
+        close();
+    }
+    AMQ_CATCH_NOTHROW( Exception )
+    AMQ_CATCHALL_NOTHROW()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::isShutdownCommand( const Pointer<Command>& command ) const {
+
+    if( command != NULL ) {
+
+        if( command->isShutdownInfo() ) {
+            return true;
+        }
+
+        try{
+            Pointer<RemoveInfo> remove =
+                command.dynamicCast<RemoveInfo, Pointer<RemoveInfo>::CounterType >();
+
+            return true;
+        } AMQ_CATCHALL_NOTHROW()
+    }
+
+    return false;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::addURI( const decaf::net::URI& uri ) {
+void FailoverTransport::add( const std::string& uri ) {
+
+    try {
+        URI newUri( uri );
+        if( !uris.contains( newUri ) ) {
+            uris.add( newUri );
+        }
+
+        reconnect();
+    }
+    AMQ_CATCHALL_NOTHROW()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::addURI( const List<URI>& uris ) {
 
     synchronized( &this->uris ) {
-        this->uris.add( uri );
+        std::auto_ptr< Iterator<URI> > iter( uris.iterator() );
+
+        while( iter->hasNext() ) {
+            this->uris.add( iter->next() );
+        }
     }
+
+    reconnect();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::removeURI( const decaf::net::URI& uri ) {
+void FailoverTransport::removeURI( const List<URI>& uris ) {
 
     synchronized( &this->uris ) {
-        this->uris.remove( uri );
+        std::auto_ptr< Iterator<URI> > iter( uris.iterator() );
+
+        while( iter->hasNext() ) {
+            this->uris.remove( iter->next() );
+        }
     }
+
+    reconnect();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+StlList<URI> FailoverTransport::getConnectList() const {
+
+    StlList<URI> result( uris );
+    bool removed = false;
+
+    if( failedConnectTransportURI != NULL ) {
+        removed = result.remove( *failedConnectTransportURI );
+    }
+
+    if( randomize ) {
+        // Randomly, reorder the list by random swapping
+        Random rand;
+        rand.setSeed( decaf::lang::System::currentTimeMillis() );
+
+        for( std::size_t i = 0; i < result.size(); i++ ) {
+            int p = rand.nextInt( result.size() );
+            URI temp = result.get( p );
+            result.set( p, result.get( i ) );
+            result.set( i, temp );
+        }
+    }
+
+    if( removed ) {
+        result.add( *failedConnectTransportURI );
+    }
+
+    return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setTransportListener( TransportListener* listener ) {
+    synchronized( &listenerMutex ) {
+        this->transportListener = listener;
+        listenerMutex.notifyAll();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string FailoverTransport::getRemoteAddress() const {
+    if( connectedTransport != NULL ) {
+        return connectedTransport->getRemoteAddress();
+    }
+    return "";
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -66,6 +184,117 @@
     throw( CommandIOException,
            decaf::lang::exceptions::UnsupportedOperationException ) {
 
+    Pointer<Exception> error;
+
+    try {
+
+        synchronized( &reconnectMutex ) {
+
+            if( isShutdownCommand( command ) && connectedTransport == NULL ) {
+
+                if( command->isShutdownInfo() ) {
+                    // Skipping send of ShutdownInfo command when not connected.
+                    return;
+                }
+
+                if( command->isRemoveInfo() ) {
+                    // Simulate response to RemoveInfo command
+                    Pointer<Response> response( new Response() );
+                    response->setCorrelationId( command->getCommandId() );
+                    myTransportListener->onCommand( response );
+                    return;
+                }
+            }
+
+            // Keep trying until the message is sent.
+            for( int i = 0; !closed; i++ ) {
+                try {
+
+                    // Wait for transport to be connected.
+                    Pointer<Transport> transport = connectedTransport;
+                    long long start = System::currentTimeMillis();
+                    bool timedout = false;
+
+                    while( transport == NULL && !closed && connectionFailure == NULL ) {
+                        long long end = System::currentTimeMillis();
+                        if( timeout > 0 && ( end - start > timeout ) ) {
+                            timedout = true;
+                            break;
+                        }
+
+                        reconnectMutex.wait( 100 );
+                        transport = connectedTransport;
+                    }
+
+                    if( transport == NULL ) {
+                        // Previous loop may have exited due to us being disposed.
+                        if( closed ) {
+                            error.reset( new IOException(
+                                __FILE__, __LINE__, "Transport disposed.") );
+                        } else if( connectionFailure != NULL ) {
+                            error = connectionFailure;
+                        } else if( timedout == true ) {
+                            error.reset( new IOException(
+                                __FILE__, __LINE__,
+                                "Failover timeout of %d ms reached.", timedout ) );
+                        } else {
+                            error.reset( new IOException(
+                                __FILE__, __LINE__, "Unexpected failure.") );
+                        }
+
+                        break;
+                    }
+
+                    // If it was a request and it was not being tracked by
+                    // the state tracker,
+                    // then hold it in the requestMap so that we can replay
+                    // it later.
+                    Pointer<Tracked> tracked = stateTracker.track( command );
+                    synchronized( &requestMap ) {
+                        if( tracked != NULL && tracked->isWaitingForResponse() ) {
+                            requestMap.put( command->getCommandId(), tracked );
+                        } else if( tracked == NULL && command->isResponseRequired() ) {
+                            requestMap.put( command->getCommandId(), command );
+                        }
+                    }
+
+                    // Send the message.
+                    try {
+                        transport->oneway( command );
+                        stateTracker.trackBack( command );
+                    } catch( IOException& e ) {
+
+                        // If the command was not tracked.. we will retry in
+                        // this method
+                        if( tracked == NULL ) {
+
+                            // since we will retry in this method.. take it out of the
+                            // request map so that it is not sent 2 times on recovery
+                            if( command->isResponseRequired() ) {
+                                requestMap.remove( command->getCommandId() );
+                            }
+
+                            // Rethrow the exception so it will handled by
+                            // the outer catch
+                            throw e;
+                        }
+                    }
+
+                    return;
+                } catch( IOException& e ) {
+                    handleTransportFailure( e );
+                }
+            }
+        }
+    }
+    AMQ_CATCH_NOTHROW( Exception )
+    AMQ_CATCHALL_NOTHROW()
+
+    if( !closed ) {
+        if( error != NULL ) {
+            throw IOException( *error );
+        }
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -90,14 +319,361 @@
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::start() throw( cms::CMSException ) {
 
+    try{
+
+        synchronized( &reconnectMutex ) {
+
+            if( this->started ) {
+                return;
+            }
+
+            started = true;
+
+            stateTracker.setMaxCacheSize( this->getMaxCacheSize() );
+            stateTracker.setTrackMessages( this->isTrackMessages() );
+
+            if( connectedTransport.get() != NULL ) {
+                stateTracker.restore( connectedTransport );
+            } else {
+                reconnect();
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::close() throw( cms::CMSException ) {
 
+    Pointer<Transport> transportToStop;
+
+    synchronized( &reconnectMutex ) {
+        if (!started) {
+            return;
+        }
+
+        started = false;
+        closed = true;
+        connected = false;
+
+        std::auto_ptr< Iterator< Pointer<BackupTransport> > > iter( backups.iterator() );
+        while( iter->hasNext() ) {
+            iter->next()->setClosed( true );
+        }
+
+        backups.clear();
+
+        if( connectedTransport != NULL ) {
+            transportToStop = connectedTransport;
+            connectedTransport.reset( NULL );
+        }
+
+        reconnectMutex.notifyAll();
+    }
+
+    synchronized( &sleepMutex ) {
+        sleepMutex.notifyAll();
+    }
+
+    reconnectTask->shutdown();
+
+    if( transportToStop != NULL ) {
+        transportToStop->close();
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::reconnect() {
 
+    synchronized( &reconnectMutex  ) {
+        if( started ) {
+            reconnectTask->wakeup();
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::restoreTransport( const Pointer<Transport>& transport )
+    throw( IOException ) {
+
+    try{
+
+        transport->start();
+
+        //send information to the broker - informing it we are an ft client
+        Pointer<ConnectionControl> cc( new ConnectionControl() );
+        cc->setFaultTolerant( true );
+        transport->oneway( cc );
+
+        stateTracker.restore( transport );
+        std::vector< Pointer<Command> > commands;
+        synchronized( &requestMap ) {
+            commands = requestMap.values();
+        }
+
+        std::vector< Pointer<Command> >::const_iterator iter = commands.begin();
+        for( ; iter != commands.end(); ++iter ) {
+            transport->oneway( *iter );
+        }
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::handleTransportFailure( const decaf::lang::Exception& error )
+    throw( decaf::lang::Exception ) {
+
+    Pointer<Transport> transport;
+    connectedTransport.swap( transport );
+
+    if( transport != NULL ) {
+
+        if( this->disposedListener != NULL ) {
+            transport->setTransportListener( disposedListener.get() );
+        }
+        transport->close();
+
+        synchronized( &reconnectMutex ) {
+            bool reconnectOk = started;
+
+            initialized = false;
+            failedConnectTransportURI = connectedTransportURI;
+            connectedTransportURI.reset( NULL );
+            connected = false;
+            if( reconnectOk ) {
+                reconnectTask->wakeup();
+            }
+        }
+
+        if( transportListener != NULL ) {
+            transportListener->transportInterrupted();
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::doReconnect() {
+
+    Pointer<Exception> failure;
+
+    synchronized( &reconnectMutex ) {
+
+        if( closed || connectionFailure != NULL ) {
+            reconnectMutex.notifyAll();
+        }
+
+        if( connectedTransport != NULL || closed || connectionFailure != NULL ) {
+            return false;
+        } else {
+            StlList<URI> connectList = getConnectList();
+            if( connectList.isEmpty() ) {
+                failure.reset( new IOException(
+                    __FILE__, __LINE__, "No uris available to connect to." ) );
+            } else {
+
+                if( !useExponentialBackOff ) {
+                    reconnectDelay = initialReconnectDelay;
+                }
+
+                synchronized( &backupMutex ) {
+
+                    if( backup && !backups.isEmpty() ) {
+
+                        Pointer<BackupTransport> backup = backups.remove( 0 );
+                        Pointer<Transport> transport = backup->getTransport();
+                        URI uri = backup->getUri();
+                        transport->setTransportListener( myTransportListener.get() );
+
+                        try {
+
+                            if( started ) {
+                                restoreTransport( transport );
+                            }
+
+                            reconnectDelay = initialReconnectDelay;
+                            failedConnectTransportURI.reset( NULL );
+                            connectedTransportURI.reset( new URI( uri ) );
+                            connectedTransport = transport;
+                            reconnectMutex.notifyAll();
+                            connectFailures = 0;
+
+                            return false;
+                        }
+                        AMQ_CATCH_NOTHROW( Exception )
+                        AMQ_CATCHALL_NOTHROW()
+                    }
+                }
+
+                std::auto_ptr< Iterator<URI> > iter( connectList.iterator() );
+
+                while( iter->hasNext() && connectedTransport == NULL && !closed ) {
+
+                    URI uri = iter->next();
+                    try {
+
+                        Pointer<Transport> transport = createTransport( uri );
+                        transport->setTransportListener( myTransportListener.get() );
+                        transport->start();
+
+                        if( started ) {
+                            restoreTransport( transport );
+                        }
+
+                        reconnectDelay = initialReconnectDelay;
+                        connectedTransportURI.reset( new URI( uri ) );
+                        connectedTransport = transport;
+                        reconnectMutex.notifyAll();
+                        connectFailures = 0;
+
+                        // Make sure on initial startup, that the transportListener
+                        // has been initialized for this instance.
+                        synchronized( &listenerMutex ) {
+                            if( transportListener == NULL ) {
+                                // if it isn't set after 2secs - it
+                                // probably never will be
+                                listenerMutex.wait( 2000 );
+                            }
+                        }
+
+                        if( transportListener != NULL ) {
+                            transportListener->transportResumed();
+                        }
+
+                        if( firstConnection ) {
+                            firstConnection = false;
+                        }
+
+                        connected = true;
+                        return false;
+                    } catch( Exception& e ) {
+                        failure.reset( e.clone() );
+                    }
+                }
+            }
+        }
+
+        if( maxReconnectAttempts > 0 && ++connectFailures >= maxReconnectAttempts ) {
+            connectionFailure = failure;
+
+            // Make sure on initial startup, that the transportListener has been initialized
+            // for this instance.
+            synchronized( &listenerMutex ) {
+                if( transportListener == NULL ) {
+                    listenerMutex.wait( 2000 );
+                }
+            }
+
+            if( transportListener != NULL ) {
+
+                Pointer<IOException> ioException;
+                try{
+                    ioException = connectionFailure.dynamicCast<
+                        IOException, Pointer<IOException>::CounterType >();
+                }
+                AMQ_CATCH_NOTHROW( ClassCastException )
+
+                if( ioException != NULL ) {
+                    transportListener->onException( *connectionFailure );
+                } else {
+                    transportListener->onException( IOException( *connectionFailure ) );
+                }
+            }
+
+            reconnectMutex.notifyAll();
+            return false;
+        }
+    }
+
+    if( !closed ) {
+
+        synchronized( &sleepMutex ) {
+            sleepMutex.wait( reconnectDelay );
+        }
+
+        if( useExponentialBackOff ) {
+            // Exponential increment of reconnect delay.
+            reconnectDelay *= backOffMultiplier;
+            if( reconnectDelay > maxReconnectDelay ) {
+                reconnectDelay = maxReconnectDelay;
+            }
+        }
+    }
+
+    return !closed;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::buildBackups() {
+
+    synchronized( &backupMutex ) {
+
+        if( !closed && backup && (int)backups.size() < backupPoolSize ) {
+
+            StlList<URI> connectList = getConnectList();
+
+            //removed closed backups
+            StlList< Pointer<BackupTransport> > disposedList;
+            std::auto_ptr< Iterator<Pointer<BackupTransport> > > iter( backups.iterator() );
+            while( iter->hasNext() ) {
+                Pointer<BackupTransport> backup = iter->next();
+                if( backup->isClosed() ) {
+                    disposedList.add( backup );
+                }
+            }
+
+            backups.removeAll( disposedList );
+            disposedList.clear();
+
+            std::auto_ptr< Iterator<URI> > uriIter( connectList.iterator() );
+
+            while( uriIter->hasNext() && (int)backups.size() < backupPoolSize ) {
+                URI uri = uriIter->next();
+                if( connectedTransportURI != NULL && !connectedTransportURI->equals( uri ) ) {
+                    try {
+                        Pointer<BackupTransport> backup( new BackupTransport( this ) );
+                        backup->setUri( uri );
+
+                        if( !backups.contains( backup ) ) {
+                            Pointer<Transport> transport = createTransport( uri );
+                            transport->setTransportListener( backup.get() );
+                            transport->start();
+                            backup->setTransport( transport );
+                            backups.add( backup );
+                        }
+                    }
+                    AMQ_CATCH_NOTHROW( Exception )
+                    AMQ_CATCHALL_NOTHROW()
+                }
+            }
+        }
+    }
+
+    return false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Transport> FailoverTransport::createTransport( const URI& location ) const
+    throw ( decaf::io::IOException ) {
+
+    try{
+
+        TransportFactory* factory =
+            TransportRegistry::getInstance().findFactory( location.getScheme() );
+
+        if( factory == NULL ) {
+            throw new IOException(
+                __FILE__, __LINE__, "Invalid URI specified, no valid Factory Found.");
+        }
+
+        Pointer<Transport> transport( factory->createComposite( location ) );
+
+        return transport;
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
 }

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h Mon Mar  2 21:21:40 2009
@@ -24,35 +24,38 @@
 #include <activemq/state/ConnectionStateTracker.h>
 #include <activemq/transport/CompositeTransport.h>
 #include <activemq/transport/failover/BackupTransport.h>
+#include <activemq/transport/failover/ReconnectTask.h>
+#include <activemq/transport/failover/FailoverTransportListener.h>
 
-#include <decaf/util/StlSet.h>
+#include <decaf/util/StlList.h>
 #include <decaf/util/StlMap.h>
 #include <decaf/util/Properties.h>
 #include <decaf/util/concurrent/Mutex.h>
 #include <decaf/util/concurrent/atomic/AtomicReference.h>
 #include <decaf/net/URI.h>
+#include <decaf/io/IOException.h>
 
 namespace activemq {
 namespace transport {
 namespace failover {
 
-    using decaf::lang::Pointer;
+    using namespace decaf::lang;
+    using decaf::net::URI;
+    using namespace decaf::util;
     using activemq::commands::Command;
     using activemq::commands::Response;
 
     class AMQCPP_API FailoverTransport : public CompositeTransport {
     private:
 
+        friend class FailoverTransportListener;
+        friend class ReconnectTask;
+
         bool closed;
         bool connected;
         bool started;
 
-        decaf::net::URI connectedTransportURI;
-        decaf::net::URI failedConnectTransportURI;
-        decaf::util::concurrent::atomic::AtomicReference<Transport> connectedTransport;
-        //TaskRunner reconnectTask;
-
-        decaf::util::StlSet<decaf::net::URI> uris;
+        decaf::util::StlList<URI> uris;
 
         long long timeout;
         long long initialReconnectDelay;
@@ -70,8 +73,7 @@
         bool trackMessages;
         int maxCacheSize;
 
-        decaf::util::StlSet< Pointer<BackupTransport> > backups;
-        decaf::lang::Exception connectionFailure;
+        decaf::util::StlList< Pointer<BackupTransport> > backups;
 
         state::ConnectionStateTracker stateTracker;
         decaf::util::concurrent::Mutex reconnectMutex;
@@ -80,8 +82,14 @@
         decaf::util::concurrent::Mutex listenerMutex;
         decaf::util::StlMap<int, Pointer<Command> > requestMap;
 
+        Pointer<URI> connectedTransportURI;
+        Pointer<URI> failedConnectTransportURI;
+        Pointer<Transport> connectedTransport;
+        Pointer<Exception> connectionFailure;
+        Pointer<ReconnectTask> reconnectTask;
         Pointer<TransportListener> disposedListener;
-        Pointer<TransportListener> myTansportListener;
+        Pointer<TransportListener> myTransportListener;
+        TransportListener* transportListener;
 
     public:
 
@@ -95,16 +103,23 @@
          */
         void reconnect();
 
+        /**
+         * Adds a New URI to the List of URIs this transport can Connect to.
+         * @param uri
+         *        A String version of a URI to add to the URIs to failover to.
+         */
+        void add( const std::string& uri );
+
     public: // CompositeTransport methods
 
         /**
          * Add a URI to the list of URI's that will represent the set of Transports
          * that this Transport is a composite of.
          *
-         * @param uri
-         *        The new URI to add to the set this composite maintains.
+         * @param uris
+         *        The new URIs to add to the set this composite maintains.
          */
-        virtual void addURI( const decaf::net::URI& uri );
+        virtual void addURI( const List<URI>& uris );
 
         /**
          * Remove a URI from the set of URI's that represents the set of Transports
@@ -112,10 +127,10 @@
          * has created a connected Transport should result in that Transport being
          * disposed of.
          *
-         * @param uri
-         *        The new URI to remove to the set this composite maintains.
+         * @param uris
+         *        The new URIs to remove to the set this composite maintains.
          */
-        virtual void removeURI( const decaf::net::URI& uri );
+        virtual void removeURI( const List<URI>& uris );
 
     public: // Transport Members
 
@@ -188,7 +203,7 @@
          * Sets the observer of asynchronous events from this transport.
          * @param listener the listener of transport events.
          */
-        virtual void setTransportListener( TransportListener* listener ) {}
+        virtual void setTransportListener( TransportListener* listener );
 
         /**
          * Is this Transport fault tolerant, meaning that it will reconnect to
@@ -235,6 +250,19 @@
             return NULL;
         }
 
+        /**
+         * @return the remote address for this connection
+         */
+        virtual std::string getRemoteAddress() const;
+
+        /**
+         * reconnect to another location
+         * @param uri
+         * @throws IOException on failure of if not supported
+         */
+        virtual void reconnect( const decaf::net::URI& uri )
+            throw( decaf::io::IOException ) {}
+
     public: // FailoverTransport Property Getters / Setters
 
         long long getTimeout() const {
@@ -333,6 +361,64 @@
             this->maxCacheSize = value;
         }
 
+    protected:
+
+        /**
+         * Given a Transport restore the state of the Client's connection to the Broker
+         * using the data accumulated in the State Tracker.
+         *
+         * @param transport
+         *        The new Transport connected to the Broker.
+         *
+         * @throw IOException if an errors occurs while restoring the old state.
+         */
+        void restoreTransport( const Pointer<Transport>& transport )
+            throw( decaf::io::IOException );
+
+        /**
+         * Called when this class' TransportListener is notified of a Failure.
+         * @param error - The CMS Exception that was thrown.
+         * @throw Exception if an error occurs.
+         */
+        void handleTransportFailure( const decaf::lang::Exception& error )
+            throw( decaf::lang::Exception );
+
+    private:
+
+        /**
+         * Returns a set of URIs that this Transport is to connect to, applying a
+         * random swapping from the class stored list of URIs if the randomize flag
+         * is enabled, otherwise just return the original list.
+         *
+         * @returns a Set of URI object that this Transport iterates over to connect.
+         */
+        decaf::util::StlList<URI> getConnectList() const;
+
+        /**
+         * @return Returns true if the command is one sent when a connection
+         * is being closed.
+         */
+        bool isShutdownCommand( const Pointer<Command>& command ) const;
+
+        /**
+         * Performs the actual Reconnect operation.
+         */
+        bool doReconnect();
+
+        /**
+         * Builds a set of Backup Transports for fast Failover.
+         */
+        bool buildBackups();
+
+        /**
+         * Looks up the correct Factory and create a new Composite version of the
+         * Transport requested.
+         *
+         * @param uri - The URI to connect to
+         */
+        Pointer<Transport> createTransport( const URI& location ) const
+            throw ( decaf::io::IOException );
+
     };
 
 }}}

Added: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportListener.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportListener.cpp?rev=749440&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportListener.cpp (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportListener.cpp Mon Mar  2 21:21:40 2009
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FailoverTransportListener.h"
+#include "FailoverTransport.h"
+
+#include <decaf/lang/exceptions/NullPointerException.h>
+#include <activemq/commands/Response.h>
+#include <activemq/state/Tracked.h>
+
+using namespace activemq;
+using namespace activemq::exceptions;
+using namespace activemq::state;
+using namespace activemq::transport;
+using namespace activemq::transport::failover;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+FailoverTransportListener::FailoverTransportListener( FailoverTransport* parent ) :
+    parent( parent ) {
+
+    if( this->parent == NULL ) {
+        throw NullPointerException(
+            __FILE__, __LINE__, "Pointer to Parent Transport was NULL" );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+FailoverTransportListener::~FailoverTransportListener() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransportListener::onCommand( const Pointer<Command>& command ) {
+
+    if( command == NULL ) {
+        return;
+    }
+
+    if( command->isResponse() ) {
+
+        Pointer<Response> response =
+            command.dynamicCast<Response, Pointer<Response>::CounterType >();
+        Pointer<Command> object;
+
+        synchronized( &( parent->requestMap ) ) {
+            object = parent->requestMap.remove( response->getCorrelationId() );
+        }
+
+        if( object != NULL ) {
+            try{
+                Pointer<Tracked> tracked =
+                    object.dynamicCast<Tracked, Pointer<Tracked>::CounterType >();
+                tracked->onResponse();
+            }
+            AMQ_CATCH_NOTHROW( ClassCastException )
+        }
+    }
+
+    if( !parent->initialized && command->isBrokerInfo() ) {
+
+        Pointer<BrokerInfo> info =
+            command.dynamicCast<BrokerInfo, Pointer<BrokerInfo>::CounterType >();
+        std::vector< Pointer<BrokerInfo> >& peers = info->getPeerBrokerInfos();
+        for( std::size_t i = 0; i < peers.size(); ++i ) {
+            std::string brokerString = peers[i]->getBrokerURL();
+            parent->add( brokerString );
+        }
+        parent->initialized = true;
+    }
+
+    if( parent->transportListener != NULL ) {
+        parent->transportListener->onCommand( command );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransportListener::onException( const decaf::lang::Exception& ex ) {
+    try {
+        parent->handleTransportFailure( ex );
+    } catch( Exception& e ) {
+        if( parent->transportListener != NULL ) {
+            parent->transportListener->onException( e );
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransportListener::transportInterrupted() {
+    if( parent->transportListener != NULL ) {
+        parent->transportListener->transportInterrupted();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransportListener::transportResumed() {
+    if( parent->transportListener != NULL ) {
+        parent->transportListener->transportResumed();
+    }
+}

Propchange: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportListener.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportListener.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportListener.h?rev=749440&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportListener.h (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportListener.h Mon Mar  2 21:21:40 2009
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FAILOVERTRANSPORTLISTENER_H_
+#define FAILOVERTRANSPORTLISTENER_H_
+
+#include <activemq/util/Config.h>
+#include <activemq/transport/TransportListener.h>
+#include <decaf/lang/Pointer.h>
+
+namespace activemq {
+namespace transport {
+namespace failover {
+
+    class FailoverTransport;
+
+    /**
+     * Utility class used by the Transport to perform the work of responding to events
+     * from the active Transport.
+     *
+     * @since 3.0
+     */
+    class AMQCPP_API FailoverTransportListener : public TransportListener {
+    private:
+
+        // The Transport that created this listener
+        FailoverTransport* parent;
+
+    public:
+
+        FailoverTransportListener( FailoverTransport* parent );
+
+        virtual ~FailoverTransportListener();
+
+        /**
+         * Event handler for the receipt of a command.  The transport passes
+         * off all received commands to its listeners, the listener then owns
+         * the Object.  If there is no registered listener the Transport deletes
+         * the command upon receipt.
+         *
+         * @param command the received command object.
+         */
+        virtual void onCommand( const Pointer<Command>& command );
+
+        /**
+         * Event handler for an exception from a command transport.
+         *
+         * @param ex The exception.
+         */
+        virtual void onException( const decaf::lang::Exception& ex );
+
+        /**
+         * The transport has suffered an interruption from which it hopes to recover
+         */
+        virtual void transportInterrupted();
+
+        /**
+         * The transport has resumed after an interruption
+         */
+        virtual void transportResumed();
+
+    };
+
+}}}
+
+#endif /* FAILOVERTRANSPORTLISTENER_H_ */

Propchange: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportListener.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.cpp?rev=749440&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.cpp (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.cpp Mon Mar  2 21:21:40 2009
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReconnectTask.h"
+#include "FailoverTransport.h"
+
+#include <activemq/exceptions/ActiveMQException.h>
+#include <decaf/lang/exceptions/NullPointerException.h>
+
+using namespace activemq;
+using namespace activemq::exceptions;
+using namespace activemq::transport;
+using namespace activemq::transport::failover;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+ReconnectTask::ReconnectTask( FailoverTransport* parent ) : parent( parent ) {
+
+    if( this->parent == NULL ) {
+        throw NullPointerException(
+            __FILE__, __LINE__, "Parent FailoverTransport passed was null" );
+    }
+
+    this->threadTerminated = false;
+    this->pending = false;
+    this->shutDown = false;
+
+    this->start();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ReconnectTask::~ReconnectTask() {
+    try{
+        this->shutdown();
+    }
+    AMQ_CATCHALL_NOTHROW()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ReconnectTask::shutdown( unsigned int timeout ) {
+
+    synchronized( &mutex ) {
+        shutDown = true;
+        pending = true;
+        mutex.notifyAll();
+
+        // Wait till the thread stops ( no need to wait if shutdown
+        // is called from thread that is shutting down)
+        if( !threadTerminated ) {
+            mutex.wait( timeout );
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ReconnectTask::wakeup() {
+
+    synchronized( &mutex ) {
+        if( shutDown) {
+            return;
+        }
+        pending = true;
+        mutex.notifyAll();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ReconnectTask::iterate() {
+
+    bool result = false;
+    bool buildBackup = true;
+    bool doReconnect = !parent->closed;
+
+    synchronized( &( parent->backupMutex ) ) {
+        if( parent->connectedTransport == NULL && !parent->closed ) {
+            result = parent->doReconnect();
+            buildBackup = false;
+        }
+    }
+
+    if( buildBackup ) {
+        parent->buildBackups();
+    } else {
+        // build backups on the next iteration
+        result = true;
+        this->wakeup();
+    }
+
+    return result;
+}

Propchange: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.h?rev=749440&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.h (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.h Mon Mar  2 21:21:40 2009
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TRANSPORT_FAILOVER_RECONNECTTASK_H_
+#define _ACTIVEMQ_TRANSPORT_FAILOVER_RECONNECTTASK_H_
+
+#include <activemq/util/Config.h>
+
+#include <decaf/lang/Thread.h>
+#include <decaf/lang/Runnable.h>
+#include <decaf/util/concurrent/Mutex.h>
+
+namespace activemq {
+namespace transport {
+namespace failover {
+
+    class FailoverTransport;
+
+    class AMQCPP_API ReconnectTask : public decaf::lang::Thread {
+    private:
+
+        decaf::util::concurrent::Mutex mutex;
+
+        bool threadTerminated;
+        bool pending;
+        bool shutDown;
+
+        FailoverTransport* parent;
+
+    public:
+
+        ReconnectTask( FailoverTransport* parent );
+
+        virtual ~ReconnectTask();
+
+        void shutdown( unsigned int timeout );
+
+        void shutdown() {
+            this->shutdown( 0 );
+        }
+
+        void wakeup();
+
+    protected:
+
+        bool iterate();
+
+        virtual void run();
+
+    };
+
+}}}
+
+#endif /* _ACTIVEMQ_TRANSPORT_FAILOVER_RECONNECTTASK_H_ */

Propchange: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.h?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.h Mon Mar  2 21:21:40 2009
@@ -257,7 +257,7 @@
          */
         virtual void fireException( const exceptions::ActiveMQException& ex ){
             if( listener != NULL ){
-                listener->onTransportException( this, ex );
+                listener->onException( ex );
             }
         }
 
@@ -309,6 +309,22 @@
         virtual bool isClosed() const {
             return false;
         }
+
+        /**
+         * @return the remote address for this connection
+         */
+        virtual std::string getRemoteAddress() const {
+            return "";
+        }
+
+        /**
+         * reconnect to another location
+         * @param uri
+         * @throws IOException on failure of if not supported
+         */
+        virtual void reconnect( const decaf::net::URI& uri )
+            throw( decaf::io::IOException ) {}
+
     };
 
 }}}

Modified: activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp Mon Mar  2 21:21:40 2009
@@ -156,11 +156,11 @@
 
 public:
 
-    Transport* transport;
     decaf::util::concurrent::Mutex mutex;
+    bool caughtOne;
 
-    MyTransportListener() : latch(1) { this->transport = NULL; }
-    MyTransportListener( unsigned int num ) : latch( num ) { this->transport = NULL; }
+    MyTransportListener() : latch(1), caughtOne( false ) {}
+    MyTransportListener( unsigned int num ) : latch( num ), caughtOne( false ) {}
     virtual ~MyTransportListener(){}
 
     virtual void await() {
@@ -174,10 +174,9 @@
         latch.countDown();
     }
 
-    virtual void onTransportException( Transport* source,
-                const decaf::lang::Exception& ex AMQCPP_UNUSED){
-        transport = source;
+    virtual void onException( const decaf::lang::Exception& ex AMQCPP_UNUSED){
 
+        this->caughtOne = true;
         synchronized( &mutex )
         {
            mutex.notify();
@@ -356,16 +355,12 @@
 
     transport.start();
 
-    synchronized(&listener.mutex)
-    {
-       if(listener.transport != &transport)
-       {
-          listener.mutex.wait(1000);
-       }
+    synchronized(&listener.mutex) {
+        if( !listener.caughtOne ) {
+            listener.mutex.wait(1000);
+         }
     }
 
-    CPPUNIT_ASSERT( listener.transport == &transport );
-
     transport.close();
 }
 

Modified: activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp Mon Mar  2 21:21:40 2009
@@ -184,13 +184,13 @@
                 }
             }catch( exceptions::ActiveMQException& ex ){
                 if( listener ){
-                    listener->onTransportException( this, ex );
+                    listener->onException( ex );
                 }
             }
             catch( ... ){
                 if( listener ){
                     exceptions::ActiveMQException ex( __FILE__, __LINE__, "stuff" );
-                    listener->onTransportException( this, ex );
+                    listener->onException( ex );
                 }
             }
         }
@@ -215,6 +215,13 @@
             return false;
         }
 
+        virtual std::string getRemoteAddress() const {
+            return "";
+        }
+
+        virtual void reconnect( const decaf::net::URI& uri )
+            throw( decaf::io::IOException ) {}
+
     };
 
     class MyBrokenTransport : public MyTransport{
@@ -251,8 +258,7 @@
             }
         }
 
-        virtual void onTransportException(
-            Transport* source AMQCPP_UNUSED,
+        virtual void onException(
             const decaf::lang::Exception& ex AMQCPP_UNUSED)
         {
             synchronized( &mutex ){