You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2009/03/02 22:21:41 UTC
svn commit: r749440 - in /activemq/activemq-cpp/trunk/src: main/
main/activemq/core/ main/activemq/state/ main/activemq/transport/
main/activemq/transport/failover/ main/activemq/transport/mock/
test/activemq/transport/ test/activemq/transport/correlator/
Author: tabish
Date: Mon Mar 2 21:21:40 2009
New Revision: 749440
URL: http://svn.apache.org/viewvc?rev=749440&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQCPP-100
A mostly finished implementation of the Failover Transport, lacking only a Factory that can parse the composite URI to build the list of Brokers to attempt to connect to. The Transport is not tested yet.
Added:
activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportListener.cpp (with props)
activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportListener.h (with props)
activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.cpp (with props)
activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.h (with props)
Modified:
activemq/activemq-cpp/trunk/src/main/Makefile.am
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h
activemq/activemq-cpp/trunk/src/main/activemq/state/Tracked.cpp
activemq/activemq-cpp/trunk/src/main/activemq/state/Tracked.h
activemq/activemq-cpp/trunk/src/main/activemq/transport/CompositeTransport.h
activemq/activemq-cpp/trunk/src/main/activemq/transport/DefaultTransportListener.h
activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp
activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h
activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h
activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.cpp
activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h
activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportListener.h
activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportRegistry.h
activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.h
activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp
activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h
activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.h
activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp
activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp
Modified: activemq/activemq-cpp/trunk/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/Makefile.am?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/src/main/Makefile.am Mon Mar 2 21:21:40 2009
@@ -51,7 +51,9 @@
activemq/state/TransactionState.cpp \
activemq/state/Tracked.cpp \
activemq/transport/failover/BackupTransport.cpp \
+ activemq/transport/failover/ReconnectTask.cpp \
activemq/transport/failover/FailoverTransport.cpp \
+ activemq/transport/failover/FailoverTransportListener.cpp \
activemq/transport/failover/FailoverTransportFactory.cpp \
activemq/transport/TransportFilter.cpp \
activemq/transport/TransportRegistry.cpp \
@@ -206,8 +208,10 @@
activemq/transport/TransportFactory.h \
activemq/transport/TransportRegistry.h \
activemq/transport/failover/BackupTransport.h \
+ activemq/transport/failover/ReconnectTask.h \
activemq/transport/failover/FailoverTransport.h \
activemq/transport/failover/FailoverTransportFactory.h \
+ activemq/transport/failover/FailoverTransportListener.h \
activemq/transport/mock/MockTransport.h \
activemq/transport/mock/MockTransportFactory.h \
activemq/transport/correlator/FutureResponse.h \
Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp Mon Mar 2 21:21:40 2009
@@ -521,8 +521,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::onTransportException( transport::Transport* source AMQCPP_UNUSED,
- const decaf::lang::Exception& ex ) {
+void ActiveMQConnection::onException( const decaf::lang::Exception& ex ) {
try {
Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h Mon Mar 2 21:21:40 2009
@@ -331,8 +331,7 @@
* @param source The source of the exception
* @param ex The exception.
*/
- virtual void onTransportException( transport::Transport* source,
- const decaf::lang::Exception& ex );
+ virtual void onException( const decaf::lang::Exception& ex );
public:
Modified: activemq/activemq-cpp/trunk/src/main/activemq/state/Tracked.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/state/Tracked.cpp?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/state/Tracked.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/state/Tracked.cpp Mon Mar 2 21:21:40 2009
@@ -30,7 +30,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-void Tracked::onResponses() {
+void Tracked::onResponse() {
try {
Modified: activemq/activemq-cpp/trunk/src/main/activemq/state/Tracked.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/state/Tracked.h?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/state/Tracked.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/state/Tracked.h Mon Mar 2 21:21:40 2009
@@ -40,7 +40,7 @@
virtual ~Tracked() {}
- void onResponses();
+ void onResponse();
bool isWaitingForResponse() const {
return runnable != NULL;
Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/CompositeTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/CompositeTransport.h?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/CompositeTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/CompositeTransport.h Mon Mar 2 21:21:40 2009
@@ -20,10 +20,14 @@
#include <activemq/transport/Transport.h>
#include <decaf/net/URI.h>
+#include <decaf/util/List.h>
namespace activemq {
namespace transport {
+ using decaf::util::List;
+ using decaf::net::URI;
+
/**
* A Composite Transport is a Transport implementation that is composed of several
* Transports. The composition could be such that only one Transport exists for
@@ -44,7 +48,7 @@
* @param uri
* The new URI to add to the set this composite maintains.
*/
- virtual void addURI( const decaf::net::URI& uri ) = 0;
+ virtual void addURI( const List<URI>& uris ) = 0;
/**
* Remove a URI from the set of URI's that represents the set of Transports
@@ -55,7 +59,7 @@
* @param uri
* The new URI to remove to the set this composite maintains.
*/
- virtual void removeURI( const decaf::net::URI& uri ) = 0;
+ virtual void removeURI( const List<URI>& uris ) = 0;
};
Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/DefaultTransportListener.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/DefaultTransportListener.h?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/DefaultTransportListener.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/DefaultTransportListener.h Mon Mar 2 21:21:40 2009
@@ -47,11 +47,9 @@
/**
* Event handler for an exception from a command transport.
*
- * @param source The source of the exception
* @param ex The exception.
*/
- virtual void onTransportException( Transport* source AMQCPP_UNUSED,
- const decaf::lang::Exception& ex AMQCPP_UNUSED ) {}
+ virtual void onException( const decaf::lang::Exception& ex AMQCPP_UNUSED ) {}
/**
* The transport has suffered an interruption from which it hopes to recover
Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp Mon Mar 2 21:21:40 2009
@@ -70,7 +70,7 @@
if( this->listener != NULL && !this->closed ){
try{
- this->listener->onTransportException( this, ex );
+ this->listener->onException( ex );
}catch( ... ){}
}
}
Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h Mon Mar 2 21:21:40 2009
@@ -252,6 +252,21 @@
return this->closed;
}
+ /**
+ * @return the remote address for this connection
+ */
+ virtual std::string getRemoteAddress() const {
+ return "";
+ }
+
+ /**
+ * reconnect to another location
+ * @param uri
+ * @throws IOException on failure of if not supported
+ */
+ virtual void reconnect( const decaf::net::URI& uri )
+ throw( decaf::io::IOException ) {}
+
};
}}
Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h Mon Mar 2 21:21:40 2009
@@ -20,6 +20,7 @@
#include <decaf/io/InputStream.h>
#include <decaf/io/OutputStream.h>
+#include <decaf/net/URI.h>
#include <decaf/lang/Pointer.h>
#include <decaf/lang/exceptions/UnsupportedOperationException.h>
#include <activemq/util/Config.h>
@@ -144,6 +145,19 @@
*/
virtual bool isClosed() const = 0;
+ /**
+ * @return the remote address for this connection
+ */
+ virtual std::string getRemoteAddress() const = 0;
+
+ /**
+ * reconnect to another location
+ * @param uri
+ * @throws IOException on failure of if not supported
+ */
+ virtual void reconnect( const decaf::net::URI& uri )
+ throw( decaf::io::IOException ) = 0;
+
};
}}
Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.cpp?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.cpp Mon Mar 2 21:21:40 2009
@@ -16,11 +16,12 @@
*/
#include "TransportFilter.h"
-#include <activemq/util/Config.h>
+#include <decaf/io/IOException.h>
using namespace activemq;
using namespace activemq::transport;
using namespace decaf::lang;
+using namespace decaf::io;
////////////////////////////////////////////////////////////////////////////////
TransportFilter::TransportFilter( const Pointer<Transport>& next ) :
@@ -31,8 +32,19 @@
}
////////////////////////////////////////////////////////////////////////////////
-void TransportFilter::onTransportException( Transport* source AMQCPP_UNUSED,
- const decaf::lang::Exception& ex ) {
+void TransportFilter::onException( const decaf::lang::Exception& ex ) {
fire( ex );
}
+
+////////////////////////////////////////////////////////////////////////////////
+void TransportFilter::reconnect( const decaf::net::URI& uri )
+ throw( decaf::io::IOException ) {
+
+ try{
+ next->reconnect( uri );
+ }
+ AMQ_CATCH_RETHROW( IOException )
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+ AMQ_CATCHALL_THROW( IOException )
+}
Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h Mon Mar 2 21:21:40 2009
@@ -62,7 +62,7 @@
if( listener != NULL ){
try{
- listener->onTransportException( this, ex );
+ listener->onException( ex );
}catch( ... ){}
}
}
@@ -103,8 +103,7 @@
* @param source The source of the exception
* @param ex The exception.
*/
- virtual void onTransportException( Transport* source,
- const decaf::lang::Exception& ex );
+ virtual void onException( const decaf::lang::Exception& ex );
/**
* The transport has suffered an interruption from which it hopes to recover
@@ -250,6 +249,21 @@
return next->isClosed();
}
+ /**
+ * @return the remote address for this connection
+ */
+ virtual std::string getRemoteAddress() const {
+ return next->getRemoteAddress();
+ }
+
+ /**
+ * reconnect to another location
+ * @param uri
+ * @throws IOException on failure of if not supported
+ */
+ virtual void reconnect( const decaf::net::URI& uri )
+ throw( decaf::io::IOException );
+
};
}}
Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportListener.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportListener.h?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportListener.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportListener.h Mon Mar 2 21:21:40 2009
@@ -56,8 +56,7 @@
* @param source The source of the exception
* @param ex The exception.
*/
- virtual void onTransportException( Transport* source,
- const decaf::lang::Exception& ex ) = 0;
+ virtual void onException( const decaf::lang::Exception& ex ) = 0;
/**
* The transport has suffered an interruption from which it hopes to recover
Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportRegistry.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportRegistry.h?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportRegistry.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportRegistry.h Mon Mar 2 21:21:40 2009
@@ -109,6 +109,8 @@
*/
std::vector<std::string> getTransportNames() const;
+ public: // Static methods
+
/**
* Gets the single instance of the TransportRegistry
* @return reference to the single instance of this Registry
Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.h?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.h Mon Mar 2 21:21:40 2009
@@ -59,14 +59,14 @@
* Gets the URI assigned to this Backup
* @return the assigned URI
*/
- decaf::net::URI getURI() const {
+ decaf::net::URI getUri() const {
return this->uri;
}
/**
* Sets the URI assigned to this Transport.
*/
- void setURI( const decaf::net::URI& uri ) {
+ void setUri( const decaf::net::URI& uri ) {
this->uri = uri;
}
@@ -110,6 +110,14 @@
return this->closed;
}
+ /**
+ * Sets the closed flag on this Transport.
+ * @param value - true for closed.
+ */
+ void setClosed( bool value ) {
+ this->closed = value;
+ }
+
};
}}}
Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp Mon Mar 2 21:21:40 2009
@@ -17,9 +17,27 @@
#include "FailoverTransport.h"
+#include <activemq/commands/ConnectionControl.h>
+#include <activemq/commands/ShutdownInfo.h>
+#include <activemq/commands/RemoveInfo.h>
+#include <activemq/transport/TransportRegistry.h>
+#include <decaf/util/Random.h>
+#include <decaf/lang/System.h>
+#include <decaf/lang/Integer.h>
+
+using namespace std;
using namespace activemq;
+using namespace activemq::state;
+using namespace activemq::commands;
+using namespace activemq::exceptions;
using namespace activemq::transport;
using namespace activemq::transport::failover;
+using namespace decaf;
+using namespace decaf::io;
+using namespace decaf::net;
+using namespace decaf::util;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
////////////////////////////////////////////////////////////////////////////////
FailoverTransport::FailoverTransport() {
@@ -39,26 +57,126 @@
this->backupPoolSize = 1;
this->trackMessages = false;
this->maxCacheSize = 128 * 1024;
+
+ this->myTransportListener.reset( new FailoverTransportListener( this ) );
+ this->reconnectTask.reset( new ReconnectTask( this ) );
}
////////////////////////////////////////////////////////////////////////////////
FailoverTransport::~FailoverTransport() {
+ try{
+ close();
+ }
+ AMQ_CATCH_NOTHROW( Exception )
+ AMQ_CATCHALL_NOTHROW()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::isShutdownCommand( const Pointer<Command>& command ) const {
+
+ if( command != NULL ) {
+
+ if( command->isShutdownInfo() ) {
+ return true;
+ }
+
+ try{
+ Pointer<RemoveInfo> remove =
+ command.dynamicCast<RemoveInfo, Pointer<RemoveInfo>::CounterType >();
+
+ return true;
+ } AMQ_CATCHALL_NOTHROW()
+ }
+
+ return false;
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::addURI( const decaf::net::URI& uri ) {
+void FailoverTransport::add( const std::string& uri ) {
+
+ try {
+ URI newUri( uri );
+ if( !uris.contains( newUri ) ) {
+ uris.add( newUri );
+ }
+
+ reconnect();
+ }
+ AMQ_CATCHALL_NOTHROW()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::addURI( const List<URI>& uris ) {
synchronized( &this->uris ) {
- this->uris.add( uri );
+ std::auto_ptr< Iterator<URI> > iter( uris.iterator() );
+
+ while( iter->hasNext() ) {
+ this->uris.add( iter->next() );
+ }
}
+
+ reconnect();
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::removeURI( const decaf::net::URI& uri ) {
+void FailoverTransport::removeURI( const List<URI>& uris ) {
synchronized( &this->uris ) {
- this->uris.remove( uri );
+ std::auto_ptr< Iterator<URI> > iter( uris.iterator() );
+
+ while( iter->hasNext() ) {
+ this->uris.remove( iter->next() );
+ }
}
+
+ reconnect();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+StlList<URI> FailoverTransport::getConnectList() const {
+
+ StlList<URI> result( uris );
+ bool removed = false;
+
+ if( failedConnectTransportURI != NULL ) {
+ removed = result.remove( *failedConnectTransportURI );
+ }
+
+ if( randomize ) {
+ // Randomly, reorder the list by random swapping
+ Random rand;
+ rand.setSeed( decaf::lang::System::currentTimeMillis() );
+
+ for( std::size_t i = 0; i < result.size(); i++ ) {
+ int p = rand.nextInt( result.size() );
+ URI temp = result.get( p );
+ result.set( p, result.get( i ) );
+ result.set( i, temp );
+ }
+ }
+
+ if( removed ) {
+ result.add( *failedConnectTransportURI );
+ }
+
+ return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setTransportListener( TransportListener* listener ) {
+ synchronized( &listenerMutex ) {
+ this->transportListener = listener;
+ listenerMutex.notifyAll();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string FailoverTransport::getRemoteAddress() const {
+ if( connectedTransport != NULL ) {
+ return connectedTransport->getRemoteAddress();
+ }
+ return "";
}
////////////////////////////////////////////////////////////////////////////////
@@ -66,6 +184,117 @@
throw( CommandIOException,
decaf::lang::exceptions::UnsupportedOperationException ) {
+ Pointer<Exception> error;
+
+ try {
+
+ synchronized( &reconnectMutex ) {
+
+ if( isShutdownCommand( command ) && connectedTransport == NULL ) {
+
+ if( command->isShutdownInfo() ) {
+ // Skipping send of ShutdownInfo command when not connected.
+ return;
+ }
+
+ if( command->isRemoveInfo() ) {
+ // Simulate response to RemoveInfo command
+ Pointer<Response> response( new Response() );
+ response->setCorrelationId( command->getCommandId() );
+ myTransportListener->onCommand( response );
+ return;
+ }
+ }
+
+ // Keep trying until the message is sent.
+ for( int i = 0; !closed; i++ ) {
+ try {
+
+ // Wait for transport to be connected.
+ Pointer<Transport> transport = connectedTransport;
+ long long start = System::currentTimeMillis();
+ bool timedout = false;
+
+ while( transport == NULL && !closed && connectionFailure == NULL ) {
+ long long end = System::currentTimeMillis();
+ if( timeout > 0 && ( end - start > timeout ) ) {
+ timedout = true;
+ break;
+ }
+
+ reconnectMutex.wait( 100 );
+ transport = connectedTransport;
+ }
+
+ if( transport == NULL ) {
+ // Previous loop may have exited due to us being disposed.
+ if( closed ) {
+ error.reset( new IOException(
+ __FILE__, __LINE__, "Transport disposed.") );
+ } else if( connectionFailure != NULL ) {
+ error = connectionFailure;
+ } else if( timedout == true ) {
+ error.reset( new IOException(
+ __FILE__, __LINE__,
+ "Failover timeout of %d ms reached.", timedout ) );
+ } else {
+ error.reset( new IOException(
+ __FILE__, __LINE__, "Unexpected failure.") );
+ }
+
+ break;
+ }
+
+ // If it was a request and it was not being tracked by
+ // the state tracker,
+ // then hold it in the requestMap so that we can replay
+ // it later.
+ Pointer<Tracked> tracked = stateTracker.track( command );
+ synchronized( &requestMap ) {
+ if( tracked != NULL && tracked->isWaitingForResponse() ) {
+ requestMap.put( command->getCommandId(), tracked );
+ } else if( tracked == NULL && command->isResponseRequired() ) {
+ requestMap.put( command->getCommandId(), command );
+ }
+ }
+
+ // Send the message.
+ try {
+ transport->oneway( command );
+ stateTracker.trackBack( command );
+ } catch( IOException& e ) {
+
+ // If the command was not tracked.. we will retry in
+ // this method
+ if( tracked == NULL ) {
+
+ // since we will retry in this method.. take it out of the
+ // request map so that it is not sent 2 times on recovery
+ if( command->isResponseRequired() ) {
+ requestMap.remove( command->getCommandId() );
+ }
+
+ // Rethrow the exception so it will handled by
+ // the outer catch
+ throw e;
+ }
+ }
+
+ return;
+ } catch( IOException& e ) {
+ handleTransportFailure( e );
+ }
+ }
+ }
+ }
+ AMQ_CATCH_NOTHROW( Exception )
+ AMQ_CATCHALL_NOTHROW()
+
+ if( !closed ) {
+ if( error != NULL ) {
+ throw IOException( *error );
+ }
+ }
}
////////////////////////////////////////////////////////////////////////////////
@@ -90,14 +319,361 @@
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::start() throw( cms::CMSException ) {
+ try{
+
+ synchronized( &reconnectMutex ) {
+
+ if( this->started ) {
+ return;
+ }
+
+ started = true;
+
+ stateTracker.setMaxCacheSize( this->getMaxCacheSize() );
+ stateTracker.setTrackMessages( this->isTrackMessages() );
+
+ if( connectedTransport.get() != NULL ) {
+ stateTracker.restore( connectedTransport );
+ } else {
+ reconnect();
+ }
+ }
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::close() throw( cms::CMSException ) {
+ Pointer<Transport> transportToStop;
+
+ synchronized( &reconnectMutex ) {
+ if (!started) {
+ return;
+ }
+
+ started = false;
+ closed = true;
+ connected = false;
+
+ std::auto_ptr< Iterator< Pointer<BackupTransport> > > iter( backups.iterator() );
+ while( iter->hasNext() ) {
+ iter->next()->setClosed( true );
+ }
+
+ backups.clear();
+
+ if( connectedTransport != NULL ) {
+ transportToStop = connectedTransport;
+ connectedTransport.reset( NULL );
+ }
+
+ reconnectMutex.notifyAll();
+ }
+
+ synchronized( &sleepMutex ) {
+ sleepMutex.notifyAll();
+ }
+
+ reconnectTask->shutdown();
+
+ if( transportToStop != NULL ) {
+ transportToStop->close();
+ }
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::reconnect() {
+ synchronized( &reconnectMutex ) {
+ if( started ) {
+ reconnectTask->wakeup();
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::restoreTransport( const Pointer<Transport>& transport )
+ throw( IOException ) {
+
+ try{
+
+ transport->start();
+
+ //send information to the broker - informing it we are an ft client
+ Pointer<ConnectionControl> cc( new ConnectionControl() );
+ cc->setFaultTolerant( true );
+ transport->oneway( cc );
+
+ stateTracker.restore( transport );
+ std::vector< Pointer<Command> > commands;
+ synchronized( &requestMap ) {
+ commands = requestMap.values();
+ }
+
+ std::vector< Pointer<Command> >::const_iterator iter = commands.begin();
+ for( ; iter != commands.end(); ++iter ) {
+ transport->oneway( *iter );
+ }
+ }
+ AMQ_CATCH_RETHROW( IOException )
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+ AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::handleTransportFailure( const decaf::lang::Exception& error )
+ throw( decaf::lang::Exception ) {
+
+ Pointer<Transport> transport;
+ connectedTransport.swap( transport );
+
+ if( transport != NULL ) {
+
+ if( this->disposedListener != NULL ) {
+ transport->setTransportListener( disposedListener.get() );
+ }
+ transport->close();
+
+ synchronized( &reconnectMutex ) {
+ bool reconnectOk = started;
+
+ initialized = false;
+ failedConnectTransportURI = connectedTransportURI;
+ connectedTransportURI.reset( NULL );
+ connected = false;
+ if( reconnectOk ) {
+ reconnectTask->wakeup();
+ }
+ }
+
+ if( transportListener != NULL ) {
+ transportListener->transportInterrupted();
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::doReconnect() {
+
+ Pointer<Exception> failure;
+
+ synchronized( &reconnectMutex ) {
+
+ if( closed || connectionFailure != NULL ) {
+ reconnectMutex.notifyAll();
+ }
+
+ if( connectedTransport != NULL || closed || connectionFailure != NULL ) {
+ return false;
+ } else {
+ StlList<URI> connectList = getConnectList();
+ if( connectList.isEmpty() ) {
+ failure.reset( new IOException(
+ __FILE__, __LINE__, "No uris available to connect to." ) );
+ } else {
+
+ if( !useExponentialBackOff ) {
+ reconnectDelay = initialReconnectDelay;
+ }
+
+ synchronized( &backupMutex ) {
+
+ if( backup && !backups.isEmpty() ) {
+
+ Pointer<BackupTransport> backup = backups.remove( 0 );
+ Pointer<Transport> transport = backup->getTransport();
+ URI uri = backup->getUri();
+ transport->setTransportListener( myTransportListener.get() );
+
+ try {
+
+ if( started ) {
+ restoreTransport( transport );
+ }
+
+ reconnectDelay = initialReconnectDelay;
+ failedConnectTransportURI.reset( NULL );
+ connectedTransportURI.reset( new URI( uri ) );
+ connectedTransport = transport;
+ reconnectMutex.notifyAll();
+ connectFailures = 0;
+
+ return false;
+ }
+ AMQ_CATCH_NOTHROW( Exception )
+ AMQ_CATCHALL_NOTHROW()
+ }
+ }
+
+ std::auto_ptr< Iterator<URI> > iter( connectList.iterator() );
+
+ while( iter->hasNext() && connectedTransport == NULL && !closed ) {
+
+ URI uri = iter->next();
+ try {
+
+ Pointer<Transport> transport = createTransport( uri );
+ transport->setTransportListener( myTransportListener.get() );
+ transport->start();
+
+ if( started ) {
+ restoreTransport( transport );
+ }
+
+ reconnectDelay = initialReconnectDelay;
+ connectedTransportURI.reset( new URI( uri ) );
+ connectedTransport = transport;
+ reconnectMutex.notifyAll();
+ connectFailures = 0;
+
+ // Make sure on initial startup, that the transportListener
+ // has been initialized for this instance.
+ synchronized( &listenerMutex ) {
+ if( transportListener == NULL ) {
+ // if it isn't set after 2secs - it
+ // probably never will be
+ listenerMutex.wait( 2000 );
+ }
+ }
+
+ if( transportListener != NULL ) {
+ transportListener->transportResumed();
+ }
+
+ if( firstConnection ) {
+ firstConnection = false;
+ }
+
+ connected = true;
+ return false;
+ } catch( Exception& e ) {
+ failure.reset( e.clone() );
+ }
+ }
+ }
+ }
+
+ if( maxReconnectAttempts > 0 && ++connectFailures >= maxReconnectAttempts ) {
+ connectionFailure = failure;
+
+ // Make sure on initial startup, that the transportListener has been initialized
+ // for this instance.
+ synchronized( &listenerMutex ) {
+ if( transportListener == NULL ) {
+ listenerMutex.wait( 2000 );
+ }
+ }
+
+ if( transportListener != NULL ) {
+
+ Pointer<IOException> ioException;
+ try{
+ ioException = connectionFailure.dynamicCast<
+ IOException, Pointer<IOException>::CounterType >();
+ }
+ AMQ_CATCH_NOTHROW( ClassCastException )
+
+ if( ioException != NULL ) {
+ transportListener->onException( *connectionFailure );
+ } else {
+ transportListener->onException( IOException( *connectionFailure ) );
+ }
+ }
+
+ reconnectMutex.notifyAll();
+ return false;
+ }
+ }
+
+ if( !closed ) {
+
+ synchronized( &sleepMutex ) {
+ sleepMutex.wait( reconnectDelay );
+ }
+
+ if( useExponentialBackOff ) {
+ // Exponential increment of reconnect delay.
+ reconnectDelay *= backOffMultiplier;
+ if( reconnectDelay > maxReconnectDelay ) {
+ reconnectDelay = maxReconnectDelay;
+ }
+ }
+ }
+
+ return !closed;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::buildBackups() {
+
+ synchronized( &backupMutex ) {
+
+ if( !closed && backup && (int)backups.size() < backupPoolSize ) {
+
+ StlList<URI> connectList = getConnectList();
+
+ //removed closed backups
+ StlList< Pointer<BackupTransport> > disposedList;
+ std::auto_ptr< Iterator<Pointer<BackupTransport> > > iter( backups.iterator() );
+ while( iter->hasNext() ) {
+ Pointer<BackupTransport> backup = iter->next();
+ if( backup->isClosed() ) {
+ disposedList.add( backup );
+ }
+ }
+
+ backups.removeAll( disposedList );
+ disposedList.clear();
+
+ std::auto_ptr< Iterator<URI> > uriIter( connectList.iterator() );
+
+ while( uriIter->hasNext() && (int)backups.size() < backupPoolSize ) {
+ URI uri = uriIter->next();
+ if( connectedTransportURI != NULL && !connectedTransportURI->equals( uri ) ) {
+ try {
+ Pointer<BackupTransport> backup( new BackupTransport( this ) );
+ backup->setUri( uri );
+
+ if( !backups.contains( backup ) ) {
+ Pointer<Transport> transport = createTransport( uri );
+ transport->setTransportListener( backup.get() );
+ transport->start();
+ backup->setTransport( transport );
+ backups.add( backup );
+ }
+ }
+ AMQ_CATCH_NOTHROW( Exception )
+ AMQ_CATCHALL_NOTHROW()
+ }
+ }
+ }
+ }
+
+ return false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Transport> FailoverTransport::createTransport( const URI& location ) const
+ throw ( decaf::io::IOException ) {
+
+ try{
+
+ TransportFactory* factory =
+ TransportRegistry::getInstance().findFactory( location.getScheme() );
+
+ if( factory == NULL ) {
+ throw new IOException(
+ __FILE__, __LINE__, "Invalid URI specified, no valid Factory Found.");
+ }
+
+ Pointer<Transport> transport( factory->createComposite( location ) );
+
+ return transport;
+ }
+ AMQ_CATCH_RETHROW( IOException )
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+ AMQ_CATCHALL_THROW( IOException )
}
Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h Mon Mar 2 21:21:40 2009
@@ -24,35 +24,38 @@
#include <activemq/state/ConnectionStateTracker.h>
#include <activemq/transport/CompositeTransport.h>
#include <activemq/transport/failover/BackupTransport.h>
+#include <activemq/transport/failover/ReconnectTask.h>
+#include <activemq/transport/failover/FailoverTransportListener.h>
-#include <decaf/util/StlSet.h>
+#include <decaf/util/StlList.h>
#include <decaf/util/StlMap.h>
#include <decaf/util/Properties.h>
#include <decaf/util/concurrent/Mutex.h>
#include <decaf/util/concurrent/atomic/AtomicReference.h>
#include <decaf/net/URI.h>
+#include <decaf/io/IOException.h>
namespace activemq {
namespace transport {
namespace failover {
- using decaf::lang::Pointer;
+ using namespace decaf::lang;
+ using decaf::net::URI;
+ using namespace decaf::util;
using activemq::commands::Command;
using activemq::commands::Response;
class AMQCPP_API FailoverTransport : public CompositeTransport {
private:
+ friend class FailoverTransportListener;
+ friend class ReconnectTask;
+
bool closed;
bool connected;
bool started;
- decaf::net::URI connectedTransportURI;
- decaf::net::URI failedConnectTransportURI;
- decaf::util::concurrent::atomic::AtomicReference<Transport> connectedTransport;
- //TaskRunner reconnectTask;
-
- decaf::util::StlSet<decaf::net::URI> uris;
+ decaf::util::StlList<URI> uris;
long long timeout;
long long initialReconnectDelay;
@@ -70,8 +73,7 @@
bool trackMessages;
int maxCacheSize;
- decaf::util::StlSet< Pointer<BackupTransport> > backups;
- decaf::lang::Exception connectionFailure;
+ decaf::util::StlList< Pointer<BackupTransport> > backups;
state::ConnectionStateTracker stateTracker;
decaf::util::concurrent::Mutex reconnectMutex;
@@ -80,8 +82,14 @@
decaf::util::concurrent::Mutex listenerMutex;
decaf::util::StlMap<int, Pointer<Command> > requestMap;
+ Pointer<URI> connectedTransportURI;
+ Pointer<URI> failedConnectTransportURI;
+ Pointer<Transport> connectedTransport;
+ Pointer<Exception> connectionFailure;
+ Pointer<ReconnectTask> reconnectTask;
Pointer<TransportListener> disposedListener;
- Pointer<TransportListener> myTansportListener;
+ Pointer<TransportListener> myTransportListener;
+ TransportListener* transportListener;
public:
@@ -95,16 +103,23 @@
*/
void reconnect();
+ /**
+ * Adds a New URI to the List of URIs this transport can Connect to.
+ * @param uri
+ * A String version of a URI to add to the URIs to failover to.
+ */
+ void add( const std::string& uri );
+
public: // CompositeTransport methods
/**
* Add a URI to the list of URI's that will represent the set of Transports
* that this Transport is a composite of.
*
- * @param uri
- * The new URI to add to the set this composite maintains.
+ * @param uris
+ * The new URIs to add to the set this composite maintains.
*/
- virtual void addURI( const decaf::net::URI& uri );
+ virtual void addURI( const List<URI>& uris );
/**
* Remove a URI from the set of URI's that represents the set of Transports
@@ -112,10 +127,10 @@
* has created a connected Transport should result in that Transport being
* disposed of.
*
- * @param uri
- * The new URI to remove to the set this composite maintains.
+ * @param uris
+ * The new URIs to remove to the set this composite maintains.
*/
- virtual void removeURI( const decaf::net::URI& uri );
+ virtual void removeURI( const List<URI>& uris );
public: // Transport Members
@@ -188,7 +203,7 @@
* Sets the observer of asynchronous events from this transport.
* @param listener the listener of transport events.
*/
- virtual void setTransportListener( TransportListener* listener ) {}
+ virtual void setTransportListener( TransportListener* listener );
/**
* Is this Transport fault tolerant, meaning that it will reconnect to
@@ -235,6 +250,19 @@
return NULL;
}
+ /**
+ * @return the remote address for this connection
+ */
+ virtual std::string getRemoteAddress() const;
+
+ /**
+ * reconnect to another location
+ * @param uri
+ * @throws IOException on failure of if not supported
+ */
+ virtual void reconnect( const decaf::net::URI& uri )
+ throw( decaf::io::IOException ) {}
+
public: // FailoverTransport Property Getters / Setters
long long getTimeout() const {
@@ -333,6 +361,64 @@
this->maxCacheSize = value;
}
+ protected:
+
+ /**
+ * Given a Transport restore the state of the Client's connection to the Broker
+ * using the data accumulated in the State Tracker.
+ *
+ * @param transport
+ * The new Transport connected to the Broker.
+ *
+ * @throw IOException if an errors occurs while restoring the old state.
+ */
+ void restoreTransport( const Pointer<Transport>& transport )
+ throw( decaf::io::IOException );
+
+ /**
+ * Called when this class' TransportListener is notified of a Failure.
+ * @param error - The CMS Exception that was thrown.
+ * @throw Exception if an error occurs.
+ */
+ void handleTransportFailure( const decaf::lang::Exception& error )
+ throw( decaf::lang::Exception );
+
+ private:
+
+ /**
+ * Returns a set of URIs that this Transport is to connect to, applying a
+ * random swapping from the class stored list of URIs if the randomize flag
+ * is enabled, otherwise just return the original list.
+ *
+ * @returns a Set of URI object that this Transport iterates over to connect.
+ */
+ decaf::util::StlList<URI> getConnectList() const;
+
+ /**
+ * @return Returns true if the command is one sent when a connection
+ * is being closed.
+ */
+ bool isShutdownCommand( const Pointer<Command>& command ) const;
+
+ /**
+ * Performs the actual Reconnect operation.
+ */
+ bool doReconnect();
+
+ /**
+ * Builds a set of Backup Transports for fast Failover.
+ */
+ bool buildBackups();
+
+ /**
+ * Looks up the correct Factory and create a new Composite version of the
+ * Transport requested.
+ *
+ * @param uri - The URI to connect to
+ */
+ Pointer<Transport> createTransport( const URI& location ) const
+ throw ( decaf::io::IOException );
+
};
}}}
Added: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportListener.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportListener.cpp?rev=749440&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportListener.cpp (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportListener.cpp Mon Mar 2 21:21:40 2009
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FailoverTransportListener.h"
+#include "FailoverTransport.h"
+
+#include <decaf/lang/exceptions/NullPointerException.h>
+#include <activemq/commands/Response.h>
+#include <activemq/state/Tracked.h>
+
+using namespace activemq;
+using namespace activemq::exceptions;
+using namespace activemq::state;
+using namespace activemq::transport;
+using namespace activemq::transport::failover;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+FailoverTransportListener::FailoverTransportListener( FailoverTransport* parent ) :
+ parent( parent ) {
+
+ if( this->parent == NULL ) {
+ throw NullPointerException(
+ __FILE__, __LINE__, "Pointer to Parent Transport was NULL" );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+FailoverTransportListener::~FailoverTransportListener() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransportListener::onCommand( const Pointer<Command>& command ) {
+
+ if( command == NULL ) {
+ return;
+ }
+
+ if( command->isResponse() ) {
+
+ Pointer<Response> response =
+ command.dynamicCast<Response, Pointer<Response>::CounterType >();
+ Pointer<Command> object;
+
+ synchronized( &( parent->requestMap ) ) {
+ object = parent->requestMap.remove( response->getCorrelationId() );
+ }
+
+ if( object != NULL ) {
+ try{
+ Pointer<Tracked> tracked =
+ object.dynamicCast<Tracked, Pointer<Tracked>::CounterType >();
+ tracked->onResponse();
+ }
+ AMQ_CATCH_NOTHROW( ClassCastException )
+ }
+ }
+
+ if( !parent->initialized && command->isBrokerInfo() ) {
+
+ Pointer<BrokerInfo> info =
+ command.dynamicCast<BrokerInfo, Pointer<BrokerInfo>::CounterType >();
+ std::vector< Pointer<BrokerInfo> >& peers = info->getPeerBrokerInfos();
+ for( std::size_t i = 0; i < peers.size(); ++i ) {
+ std::string brokerString = peers[i]->getBrokerURL();
+ parent->add( brokerString );
+ }
+ parent->initialized = true;
+ }
+
+ if( parent->transportListener != NULL ) {
+ parent->transportListener->onCommand( command );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransportListener::onException( const decaf::lang::Exception& ex ) {
+ try {
+ parent->handleTransportFailure( ex );
+ } catch( Exception& e ) {
+ if( parent->transportListener != NULL ) {
+ parent->transportListener->onException( e );
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransportListener::transportInterrupted() {
+ if( parent->transportListener != NULL ) {
+ parent->transportListener->transportInterrupted();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransportListener::transportResumed() {
+ if( parent->transportListener != NULL ) {
+ parent->transportListener->transportResumed();
+ }
+}
Propchange: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportListener.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportListener.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportListener.h?rev=749440&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportListener.h (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportListener.h Mon Mar 2 21:21:40 2009
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef FAILOVERTRANSPORTLISTENER_H_
+#define FAILOVERTRANSPORTLISTENER_H_
+
+#include <activemq/util/Config.h>
+#include <activemq/transport/TransportListener.h>
+#include <decaf/lang/Pointer.h>
+
+namespace activemq {
+namespace transport {
+namespace failover {
+
+ class FailoverTransport;
+
+ /**
+ * Utility class used by the Transport to perform the work of responding to events
+ * from the active Transport.
+ *
+ * @since 3.0
+ */
+ class AMQCPP_API FailoverTransportListener : public TransportListener {
+ private:
+
+ // The Transport that created this listener
+ FailoverTransport* parent;
+
+ public:
+
+ FailoverTransportListener( FailoverTransport* parent );
+
+ virtual ~FailoverTransportListener();
+
+ /**
+ * Event handler for the receipt of a command. The transport passes
+ * off all received commands to its listeners, the listener then owns
+ * the Object. If there is no registered listener the Transport deletes
+ * the command upon receipt.
+ *
+ * @param command the received command object.
+ */
+ virtual void onCommand( const Pointer<Command>& command );
+
+ /**
+ * Event handler for an exception from a command transport.
+ *
+ * @param ex The exception.
+ */
+ virtual void onException( const decaf::lang::Exception& ex );
+
+ /**
+ * The transport has suffered an interruption from which it hopes to recover
+ */
+ virtual void transportInterrupted();
+
+ /**
+ * The transport has resumed after an interruption
+ */
+ virtual void transportResumed();
+
+ };
+
+}}}
+
+#endif /* FAILOVERTRANSPORTLISTENER_H_ */
Propchange: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportListener.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.cpp?rev=749440&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.cpp (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.cpp Mon Mar 2 21:21:40 2009
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReconnectTask.h"
+#include "FailoverTransport.h"
+
+#include <activemq/exceptions/ActiveMQException.h>
+#include <decaf/lang/exceptions/NullPointerException.h>
+
+using namespace activemq;
+using namespace activemq::exceptions;
+using namespace activemq::transport;
+using namespace activemq::transport::failover;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+ReconnectTask::ReconnectTask( FailoverTransport* parent ) : parent( parent ) {
+
+ if( this->parent == NULL ) {
+ throw NullPointerException(
+ __FILE__, __LINE__, "Parent FailoverTransport passed was null" );
+ }
+
+ this->threadTerminated = false;
+ this->pending = false;
+ this->shutDown = false;
+
+ this->start();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ReconnectTask::~ReconnectTask() {
+ try{
+ this->shutdown();
+ }
+ AMQ_CATCHALL_NOTHROW()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ReconnectTask::shutdown( unsigned int timeout ) {
+
+ synchronized( &mutex ) {
+ shutDown = true;
+ pending = true;
+ mutex.notifyAll();
+
+ // Wait till the thread stops ( no need to wait if shutdown
+ // is called from thread that is shutting down)
+ if( !threadTerminated ) {
+ mutex.wait( timeout );
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ReconnectTask::wakeup() {
+
+ synchronized( &mutex ) {
+ if( shutDown) {
+ return;
+ }
+ pending = true;
+ mutex.notifyAll();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ReconnectTask::iterate() {
+
+ bool result = false;
+ bool buildBackup = true;
+ bool doReconnect = !parent->closed;
+
+ synchronized( &( parent->backupMutex ) ) {
+ if( parent->connectedTransport == NULL && !parent->closed ) {
+ result = parent->doReconnect();
+ buildBackup = false;
+ }
+ }
+
+ if( buildBackup ) {
+ parent->buildBackups();
+ } else {
+ // build backups on the next iteration
+ result = true;
+ this->wakeup();
+ }
+
+ return result;
+}
Propchange: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.h?rev=749440&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.h (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.h Mon Mar 2 21:21:40 2009
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TRANSPORT_FAILOVER_RECONNECTTASK_H_
+#define _ACTIVEMQ_TRANSPORT_FAILOVER_RECONNECTTASK_H_
+
+#include <activemq/util/Config.h>
+
+#include <decaf/lang/Thread.h>
+#include <decaf/lang/Runnable.h>
+#include <decaf/util/concurrent/Mutex.h>
+
+namespace activemq {
+namespace transport {
+namespace failover {
+
+ class FailoverTransport;
+
+ class AMQCPP_API ReconnectTask : public decaf::lang::Thread {
+ private:
+
+ decaf::util::concurrent::Mutex mutex;
+
+ bool threadTerminated;
+ bool pending;
+ bool shutDown;
+
+ FailoverTransport* parent;
+
+ public:
+
+ ReconnectTask( FailoverTransport* parent );
+
+ virtual ~ReconnectTask();
+
+ void shutdown( unsigned int timeout );
+
+ void shutdown() {
+ this->shutdown( 0 );
+ }
+
+ void wakeup();
+
+ protected:
+
+ bool iterate();
+
+ virtual void run();
+
+ };
+
+}}}
+
+#endif /* _ACTIVEMQ_TRANSPORT_FAILOVER_RECONNECTTASK_H_ */
Propchange: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/ReconnectTask.h
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.h?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.h Mon Mar 2 21:21:40 2009
@@ -257,7 +257,7 @@
*/
virtual void fireException( const exceptions::ActiveMQException& ex ){
if( listener != NULL ){
- listener->onTransportException( this, ex );
+ listener->onException( ex );
}
}
@@ -309,6 +309,22 @@
virtual bool isClosed() const {
return false;
}
+
+ /**
+ * @return the remote address for this connection
+ */
+ virtual std::string getRemoteAddress() const {
+ return "";
+ }
+
+ /**
+ * reconnect to another location
+ * @param uri
+ * @throws IOException on failure of if not supported
+ */
+ virtual void reconnect( const decaf::net::URI& uri )
+ throw( decaf::io::IOException ) {}
+
};
}}}
Modified: activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp Mon Mar 2 21:21:40 2009
@@ -156,11 +156,11 @@
public:
- Transport* transport;
decaf::util::concurrent::Mutex mutex;
+ bool caughtOne;
- MyTransportListener() : latch(1) { this->transport = NULL; }
- MyTransportListener( unsigned int num ) : latch( num ) { this->transport = NULL; }
+ MyTransportListener() : latch(1), caughtOne( false ) {}
+ MyTransportListener( unsigned int num ) : latch( num ), caughtOne( false ) {}
virtual ~MyTransportListener(){}
virtual void await() {
@@ -174,10 +174,9 @@
latch.countDown();
}
- virtual void onTransportException( Transport* source,
- const decaf::lang::Exception& ex AMQCPP_UNUSED){
- transport = source;
+ virtual void onException( const decaf::lang::Exception& ex AMQCPP_UNUSED){
+ this->caughtOne = true;
synchronized( &mutex )
{
mutex.notify();
@@ -356,16 +355,12 @@
transport.start();
- synchronized(&listener.mutex)
- {
- if(listener.transport != &transport)
- {
- listener.mutex.wait(1000);
- }
+ synchronized(&listener.mutex) {
+ if( !listener.caughtOne ) {
+ listener.mutex.wait(1000);
+ }
}
- CPPUNIT_ASSERT( listener.transport == &transport );
-
transport.close();
}
Modified: activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp?rev=749440&r1=749439&r2=749440&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp Mon Mar 2 21:21:40 2009
@@ -184,13 +184,13 @@
}
}catch( exceptions::ActiveMQException& ex ){
if( listener ){
- listener->onTransportException( this, ex );
+ listener->onException( ex );
}
}
catch( ... ){
if( listener ){
exceptions::ActiveMQException ex( __FILE__, __LINE__, "stuff" );
- listener->onTransportException( this, ex );
+ listener->onException( ex );
}
}
}
@@ -215,6 +215,13 @@
return false;
}
+ virtual std::string getRemoteAddress() const {
+ return "";
+ }
+
+ virtual void reconnect( const decaf::net::URI& uri )
+ throw( decaf::io::IOException ) {}
+
};
class MyBrokenTransport : public MyTransport{
@@ -251,8 +258,7 @@
}
}
- virtual void onTransportException(
- Transport* source AMQCPP_UNUSED,
+ virtual void onException(
const decaf::lang::Exception& ex AMQCPP_UNUSED)
{
synchronized( &mutex ){