You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/07/26 23:13:43 UTC
svn commit: r979445 - in /activemq/activemq-cpp/trunk/activemq-cpp/src:
main/activemq/transport/ main/activemq/transport/failover/
main/activemq/transport/mock/ test/activemq/transport/correlator/
Author: tabish
Date: Mon Jul 26 21:13:42 2010
New Revision: 979445
URL: http://svn.apache.org/viewvc?rev=979445&view=rev
Log:
fix for: https://issues.apache.org/activemq/browse/AMQCPP-307
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CompositeTransport.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Transport.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportListener.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/MockTransport.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CompositeTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CompositeTransport.h?rev=979445&r1=979444&r2=979445&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CompositeTransport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CompositeTransport.h Mon Jul 26 21:13:42 2010
@@ -46,10 +46,12 @@ namespace transport {
* Add a URI to the list of URI's that will represent the set of Transports
* that this Transport is a composite of.
*
+ * @param rebalance
+ * Indicates if the addition should cause a forced reconnect or not.
* @param uris
- * The new URI set to add to the set this composite maintains.
+ * The new URI set to add to the set this composite maintains.
*/
- virtual void addURI( const List<URI>& uris ) = 0;
+ virtual void addURI( bool rebalance, const List<URI>& uris ) = 0;
/**
* Remove a URI from the set of URI's that represents the set of Transports
@@ -57,10 +59,12 @@ namespace transport {
* has created a connected Transport should result in that Transport being
* disposed of.
*
+ * @param rebalance
+ * Indicates if the removal should cause a forced reconnect or not.
* @param uris
* The new URI set to remove to the set this composite maintains.
*/
- virtual void removeURI( const List<URI>& uris ) = 0;
+ virtual void removeURI( bool rebalance, const List<URI>& uris ) = 0;
};
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h?rev=979445&r1=979444&r2=979445&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h Mon Jul 26 21:13:42 2010
@@ -201,6 +201,19 @@ namespace transport{
return "";
}
+ virtual bool isReconnectSupported() const {
+ return false;
+ }
+
+ virtual bool isUpdateURIsSupported() const {
+ return false;
+ }
+
+ virtual void updateURIs( bool rebalance AMQCPP_UNUSED,
+ const decaf::util::List<decaf::net::URI>& uris AMQCPP_UNUSED ) {
+ throw decaf::io::IOException();
+ }
+
/**
* {@inheritDoc}
*
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Transport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Transport.h?rev=979445&r1=979444&r2=979445&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Transport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Transport.h Mon Jul 26 21:13:42 2010
@@ -22,6 +22,7 @@
#include <decaf/io/OutputStream.h>
#include <decaf/io/IOException.h>
#include <decaf/io/Closeable.h>
+#include <decaf/util/List.h>
#include <decaf/net/URI.h>
#include <decaf/lang/Pointer.h>
#include <decaf/lang/exceptions/UnsupportedOperationException.h>
@@ -168,6 +169,16 @@ namespace transport{
virtual bool isClosed() const = 0;
/**
+ * @return true if reconnect is supported.
+ */
+ virtual bool isReconnectSupported() const = 0;
+
+ /**
+ * @return true if updating uris is supported.
+ */
+ virtual bool isUpdateURIsSupported() const = 0;
+
+ /**
* @return the remote address for this connection
*/
virtual std::string getRemoteAddress() const = 0;
@@ -182,6 +193,19 @@ namespace transport{
*/
virtual void reconnect( const decaf::net::URI& uri ) = 0;
+ /**
+ * Updates the set of URIs the Transport can connect to. If the Transport
+ * doesn't support updating its URIs then an IOException is thrown.
+ *
+ * @param rebalance
+ * Indicates if a forced reconnection should be performed as a result of the update.
+ * @param uris
+ * The new list of URIs that can be used for connection.
+ *
+ * @throws IOException if an error occurs or updates aren't supported.
+ */
+ virtual void updateURIs( bool rebalance, const decaf::util::List<decaf::net::URI>& uris ) = 0;
+
};
}}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h?rev=979445&r1=979444&r2=979445&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h Mon Jul 26 21:13:42 2010
@@ -149,6 +149,14 @@ namespace transport{
return next->isConnected();
}
+ virtual bool isReconnectSupported() const {
+ return next->isReconnectSupported();
+ }
+
+ virtual bool isUpdateURIsSupported() const {
+ return next->isUpdateURIsSupported();
+ }
+
virtual bool isClosed() const {
return next->isClosed();
}
@@ -159,6 +167,10 @@ namespace transport{
virtual void reconnect( const decaf::net::URI& uri );
+ virtual void updateURIs( bool rebalance, const decaf::util::List<decaf::net::URI>& uris ) {
+ next->updateURIs( rebalance, uris );
+ }
+
};
}}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp?rev=979445&r1=979444&r2=979445&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp Mon Jul 26 21:13:42 2010
@@ -24,6 +24,7 @@
#include <activemq/threads/DedicatedTaskRunner.h>
#include <activemq/threads/CompositeTaskRunner.h>
#include <decaf/util/Random.h>
+#include <decaf/util/StringTokenizer.h>
#include <decaf/lang/System.h>
#include <decaf/lang/Integer.h>
@@ -64,6 +65,9 @@ FailoverTransport::FailoverTransport() {
this->connectionInterruptProcessingComplete = false;
this->firstConnection = true;
+ this->updateURIsSupported = true;
+ this->reconnectSupported = true;
+
this->transportListener = NULL;
this->uris.reset( new URIPool() );
this->stateTracker.setTrackTransactions( true );
@@ -104,13 +108,13 @@ void FailoverTransport::add( const std::
try {
uris->addURI( URI( uri ) );
- reconnect();
+ reconnect( false );
}
AMQ_CATCHALL_NOTHROW()
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::addURI( const List<URI>& uris ) {
+void FailoverTransport::addURI( bool rebalance, const List<URI>& uris ) {
std::auto_ptr< Iterator<URI> > iter( uris.iterator() );
@@ -118,11 +122,11 @@ void FailoverTransport::addURI( const Li
this->uris->addURI( iter->next() );
}
- reconnect();
+ reconnect( rebalance );
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::removeURI( const List<URI>& uris ) {
+void FailoverTransport::removeURI( bool rebalance, const List<URI>& uris ) {
std::auto_ptr< Iterator<URI> > iter( uris.iterator() );
@@ -130,7 +134,7 @@ void FailoverTransport::removeURI( const
this->uris->removeURI( iter->next() );
}
- reconnect();
+ reconnect( rebalance );
}
////////////////////////////////////////////////////////////////////////////////
@@ -140,7 +144,7 @@ void FailoverTransport::reconnect( const
this->uris->addURI( uri );
- reconnect();
+ reconnect( true );
}
AMQ_CATCH_RETHROW( IOException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
@@ -335,7 +339,7 @@ void FailoverTransport::start() {
if( connectedTransport != NULL ) {
stateTracker.restore( connectedTransport );
} else {
- reconnect();
+ reconnect( false );
}
}
}
@@ -396,10 +400,28 @@ void FailoverTransport::close() {
}
////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::reconnect() {
+void FailoverTransport::reconnect( bool rebalance ) {
+
+ Pointer<Transport> transport;
synchronized( &reconnectMutex ) {
if( started ) {
+
+ if( rebalance ) {
+
+ transport.swap( this->connectedTransport );
+
+ if( transport != NULL ) {
+
+ if( this->disposedListener != NULL ) {
+ transport->setTransportListener( disposedListener.get() );
+ }
+
+ // Hand off to the close task so it gets done in a different thread.
+ closeTask->add( transport );
+ }
+ }
+
taskRunner->wakeup();
}
}
@@ -443,8 +465,6 @@ void FailoverTransport::handleTransportF
if( transport != NULL ) {
- //std::cout << "Failover: Connection to has been unexpectedly terminated." << std::endl;
-
if( this->disposedListener != NULL ) {
transport->setTransportListener( disposedListener.get() );
}
@@ -476,6 +496,110 @@ void FailoverTransport::handleTransportF
}
////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::handleConnectionControl( const Pointer<Command>& control ) {
+
+ try {
+
+ Pointer<ConnectionControl> ctrlCommand = control.dynamicCast<ConnectionControl>();
+
+ std::string reconnectStr = ctrlCommand->getReconnectTo();
+ if( !reconnectStr.empty() ) {
+
+ std::remove(reconnectStr.begin(), reconnectStr.end(), ' ');
+
+ if( reconnectStr.length() > 0 ) {
+ try {
+ if( isReconnectSupported() ) {
+ reconnect( URI( reconnectStr ) );
+ }
+ } catch( Exception e ) {
+ }
+ }
+ }
+
+ processNewTransports( ctrlCommand->isRebalanceConnection(), ctrlCommand->getConnectedBrokers() );
+ }
+ AMQ_CATCH_RETHROW( Exception )
+ AMQ_CATCHALL_THROW( Exception )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::processNewTransports( bool rebalance, std::string newTransports ) {
+
+ if( !newTransports.empty() ) {
+
+ std::remove( newTransports.begin(), newTransports.end(), ' ' );
+
+ if( newTransports.length() > 0 && isUpdateURIsSupported() ) {
+
+ StlList<URI> list;
+ StringTokenizer tokenizer( newTransports, "," );
+
+ while( tokenizer.hasMoreTokens() ) {
+ std::string str = tokenizer.nextToken();
+ try {
+ URI uri( str );
+ list.add( uri );
+ } catch( Exception e ) {
+ //LOG.error( "Failed to parse broker address: " + str, e );
+ }
+ }
+
+ if( !list.isEmpty() ) {
+ try {
+ updateURIs( rebalance, list );
+ } catch( IOException e ) {
+ //LOG.error( "Failed to update transport URI's from: " + newTransports, e );
+ }
+ }
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::updateURIs( bool rebalance, const decaf::util::List<decaf::net::URI>& updatedURIs ) {
+
+ if( isUpdateURIsSupported() ) {
+
+ StlList<URI> copy( this->updated );
+ StlList<URI> add;
+
+ if( !updatedURIs.isEmpty() ) {
+
+ StlSet<URI> set;
+
+ for( std::size_t i = 0; i < updatedURIs.size(); i++ ) {
+ set.add( updatedURIs.get(i) );
+ }
+
+ Pointer< Iterator<URI> > setIter( set.iterator() );
+ while( setIter->hasNext() ) {
+ URI value = setIter->next();
+ if( copy.remove( value ) ) {
+ add.add( value );
+ }
+ }
+
+ synchronized( &reconnectMutex ) {
+
+ this->updated.clear();
+ Pointer< Iterator<URI> > listIter1( add.iterator() );
+ while( listIter1->hasNext() ) {
+ this->updated.add( listIter1->next() );
+ }
+
+ Pointer< Iterator<URI> > listIter2( copy.iterator() );
+ while( listIter2->hasNext() ) {
+ this->uris->removeURI( listIter2->next() );
+ }
+
+ this->addURI( rebalance, add );
+ }
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::isPending() const {
bool result = false;
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h?rev=979445&r1=979444&r2=979445&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h Mon Jul 26 21:13:42 2010
@@ -76,6 +76,8 @@ namespace failover {
int maxCacheSize;
bool connectionInterruptProcessingComplete;
bool firstConnection;
+ bool updateURIsSupported;
+ bool reconnectSupported;
mutable decaf::util::concurrent::Mutex reconnectMutex;
mutable decaf::util::concurrent::Mutex sleepMutex;
@@ -85,6 +87,7 @@ namespace failover {
decaf::util::StlMap<int, Pointer<Command> > requestMap;
Pointer<URIPool> uris;
+ decaf::util::StlList<URI> updated;
Pointer<URI> connectedTransportURI;
Pointer<Transport> connectedTransport;
Pointer<Exception> connectionFailure;
@@ -109,8 +112,11 @@ namespace failover {
/**
* Indicates that the Transport needs to reconnect to another URI in its
* list.
+ *
+ * @param rebalance
+ * Indicates if the current connection should be broken and reconnected.
*/
- void reconnect();
+ void reconnect( bool rebalance );
/**
* Adds a New URI to the List of URIs this transport can Connect to.
@@ -121,25 +127,9 @@ namespace failover {
public: // CompositeTransport methods
- /**
- * Add a URI to the list of URI's that will represent the set of Transports
- * that this Transport is a composite of.
- *
- * @param uris
- * The new URIs to add to the set this composite maintains.
- */
- virtual void addURI( const List<URI>& uris );
+ virtual void addURI( bool rebalance, const List<URI>& uris );
- /**
- * Remove a URI from the set of URI's that represents the set of Transports
- * that this Transport is composed of, removing a URI for which the composite
- * has created a connected Transport should result in that Transport being
- * disposed of.
- *
- * @param uris
- * The new URIs to remove to the set this composite maintains.
- */
- virtual void removeURI( const List<URI>& uris );
+ virtual void removeURI( bool rebalance, const List<URI>& uris );
public: // Transport Members
@@ -197,6 +187,8 @@ namespace failover {
virtual void reconnect( const decaf::net::URI& uri );
+ virtual void updateURIs( bool rebalance, const decaf::util::List<decaf::net::URI>& uris );
+
public: // CompositeTask Methods.
/**
@@ -328,6 +320,22 @@ namespace failover {
this->maxCacheSize = value;
}
+ bool isReconnectSupported() const {
+ return this->reconnectSupported;
+ }
+
+ void setReconnectSupported( bool value ) {
+ this->reconnectSupported = value;
+ }
+
+ bool isUpdateURIsSupported() const {
+ return this->updateURIsSupported;
+ }
+
+ void setUpdateURIsSupported( bool value ) {
+ this->updateURIsSupported = value;
+ }
+
void setConnectionInterruptProcessingComplete( const Pointer<commands::ConnectionId>& connectionId );
protected:
@@ -350,6 +358,16 @@ namespace failover {
*/
void handleTransportFailure( const decaf::lang::Exception& error );
+ /**
+ * Called when the Broker sends a ConnectionControl command which could
+ * signal that this Client needs to reconnect in order to rebalance the
+ * connections on a Broker or the set of Known brokers has changed.
+ *
+ * @param control
+ * The ConnectionControl command sent from the Broker.
+ */
+ void handleConnectionControl( const Pointer<Command>& control );
+
private:
/**
@@ -371,6 +389,8 @@ namespace failover {
*/
Pointer<Transport> createTransport( const URI& location ) const;
+ void processNewTransports( bool rebalance, std::string newTransports );
+
};
}}}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp?rev=979445&r1=979444&r2=979445&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp Mon Jul 26 21:13:42 2010
@@ -107,7 +107,7 @@ Pointer<Transport> FailoverTransportFact
transport->setMaxCacheSize(
Integer::parseInt( properties.getProperty( "maxCacheSize", "131072" ) ) );
- transport->addURI( data.getComponents() );
+ transport->addURI( false, data.getComponents() );
return transport;
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportListener.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportListener.cpp?rev=979445&r1=979444&r2=979445&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportListener.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportListener.cpp Mon Jul 26 21:13:42 2010
@@ -86,6 +86,10 @@ void FailoverTransportListener::onComman
parent->setInitialized( true );
}
+ if( command->isConnectionControl() ) {
+ parent->handleConnectionControl( command );
+ }
+
if( parent->transportListener != NULL ) {
parent->transportListener->onCommand( command );
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/MockTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/MockTransport.h?rev=979445&r1=979444&r2=979445&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/MockTransport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/MockTransport.h Mon Jul 26 21:13:42 2010
@@ -297,6 +297,19 @@ namespace mock{
this->failOnClose = value;
}
+ virtual bool isReconnectSupported() const {
+ return false;
+ }
+
+ virtual bool isUpdateURIsSupported() const {
+ return false;
+ }
+
+ virtual void updateURIs( bool rebalance AMQCPP_UNUSED,
+ const decaf::util::List<decaf::net::URI>& uris AMQCPP_UNUSED ) {
+ throw decaf::io::IOException();
+ }
+
};
}}}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp?rev=979445&r1=979444&r2=979445&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp Mon Jul 26 21:13:42 2010
@@ -230,6 +230,19 @@ namespace correlator{
virtual void reconnect( const decaf::net::URI& uri )
throw( decaf::io::IOException ) {}
+ virtual bool isReconnectSupported() const {
+ return false;
+ }
+
+ virtual bool isUpdateURIsSupported() const {
+ return false;
+ }
+
+ virtual void updateURIs( bool rebalance AMQCPP_UNUSED,
+ const decaf::util::List<decaf::net::URI>& uris AMQCPP_UNUSED ) {
+ throw decaf::io::IOException();
+ }
+
};
class MyBrokenTransport : public MyTransport{