You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/07/26 23:13:43 UTC

svn commit: r979445 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/activemq/transport/ main/activemq/transport/failover/ main/activemq/transport/mock/ test/activemq/transport/correlator/

Author: tabish
Date: Mon Jul 26 21:13:42 2010
New Revision: 979445

URL: http://svn.apache.org/viewvc?rev=979445&view=rev
Log:
fix for: https://issues.apache.org/activemq/browse/AMQCPP-307

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CompositeTransport.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Transport.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportListener.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/MockTransport.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CompositeTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CompositeTransport.h?rev=979445&r1=979444&r2=979445&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CompositeTransport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CompositeTransport.h Mon Jul 26 21:13:42 2010
@@ -46,10 +46,12 @@ namespace transport {
          * Add a URI to the list of URI's that will represent the set of Transports
          * that this Transport is a composite of.
          *
+         * @param rebalance
+         *      Indicates if the addition should cause a forced reconnect or not.
          * @param uris
-         *        The new URI set to add to the set this composite maintains.
+         *      The new URI set to add to the set this composite maintains.
          */
-        virtual void addURI( const List<URI>& uris ) = 0;
+        virtual void addURI( bool rebalance, const List<URI>& uris ) = 0;
 
         /**
          * Remove a URI from the set of URI's that represents the set of Transports
@@ -57,10 +59,12 @@ namespace transport {
          * has created a connected Transport should result in that Transport being
          * disposed of.
          *
+         * @param rebalance
+         *      Indicates if the removal should cause a forced reconnect or not.
          * @param uris
          *        The new URI set to remove to the set this composite maintains.
          */
-        virtual void removeURI( const List<URI>& uris ) = 0;
+        virtual void removeURI( bool rebalance, const List<URI>& uris ) = 0;
 
     };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h?rev=979445&r1=979444&r2=979445&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h Mon Jul 26 21:13:42 2010
@@ -201,6 +201,19 @@ namespace transport{
             return "";
         }
 
+        virtual bool isReconnectSupported() const {
+            return false;
+        }
+
+        virtual bool isUpdateURIsSupported() const {
+            return false;
+        }
+
+        virtual void updateURIs( bool rebalance AMQCPP_UNUSED,
+                                 const decaf::util::List<decaf::net::URI>& uris AMQCPP_UNUSED ) {
+            throw decaf::io::IOException();
+        }
+
         /**
          * {@inheritDoc}
          *

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Transport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Transport.h?rev=979445&r1=979444&r2=979445&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Transport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Transport.h Mon Jul 26 21:13:42 2010
@@ -22,6 +22,7 @@
 #include <decaf/io/OutputStream.h>
 #include <decaf/io/IOException.h>
 #include <decaf/io/Closeable.h>
+#include <decaf/util/List.h>
 #include <decaf/net/URI.h>
 #include <decaf/lang/Pointer.h>
 #include <decaf/lang/exceptions/UnsupportedOperationException.h>
@@ -168,6 +169,16 @@ namespace transport{
         virtual bool isClosed() const = 0;
 
         /**
+         * @return true if reconnect is supported.
+         */
+        virtual bool isReconnectSupported() const = 0;
+
+        /**
+         * @return true if updating uris is supported.
+         */
+        virtual bool isUpdateURIsSupported() const = 0;
+
+        /**
          * @return the remote address for this connection
          */
         virtual std::string getRemoteAddress() const = 0;
@@ -182,6 +193,19 @@ namespace transport{
          */
         virtual void reconnect( const decaf::net::URI& uri ) = 0;
 
+        /**
+         * Updates the set of URIs the Transport can connect to.  If the Transport
+         * doesn't support updating its URIs then an IOException is thrown.
+         *
+         * @param rebalance
+         *      Indicates if a forced reconnection should be performed as a result of the update.
+         * @param uris
+         *      The new list of URIs that can be used for connection.
+         *
+         * @throws IOException if an error occurs or updates aren't supported.
+         */
+        virtual void updateURIs( bool rebalance, const decaf::util::List<decaf::net::URI>& uris ) = 0;
+
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h?rev=979445&r1=979444&r2=979445&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h Mon Jul 26 21:13:42 2010
@@ -149,6 +149,14 @@ namespace transport{
             return next->isConnected();
         }
 
+        virtual bool isReconnectSupported() const {
+            return next->isReconnectSupported();
+        }
+
+        virtual bool isUpdateURIsSupported() const {
+            return next->isUpdateURIsSupported();
+        }
+
         virtual bool isClosed() const {
             return next->isClosed();
         }
@@ -159,6 +167,10 @@ namespace transport{
 
         virtual void reconnect( const decaf::net::URI& uri );
 
+        virtual void updateURIs( bool rebalance, const decaf::util::List<decaf::net::URI>& uris ) {
+            next->updateURIs( rebalance, uris );
+        }
+
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp?rev=979445&r1=979444&r2=979445&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp Mon Jul 26 21:13:42 2010
@@ -24,6 +24,7 @@
 #include <activemq/threads/DedicatedTaskRunner.h>
 #include <activemq/threads/CompositeTaskRunner.h>
 #include <decaf/util/Random.h>
+#include <decaf/util/StringTokenizer.h>
 #include <decaf/lang/System.h>
 #include <decaf/lang/Integer.h>
 
@@ -64,6 +65,9 @@ FailoverTransport::FailoverTransport() {
     this->connectionInterruptProcessingComplete = false;
     this->firstConnection = true;
 
+    this->updateURIsSupported = true;
+    this->reconnectSupported = true;
+
     this->transportListener = NULL;
     this->uris.reset( new URIPool() );
     this->stateTracker.setTrackTransactions( true );
@@ -104,13 +108,13 @@ void FailoverTransport::add( const std::
     try {
         uris->addURI( URI( uri ) );
 
-        reconnect();
+        reconnect( false );
     }
     AMQ_CATCHALL_NOTHROW()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::addURI( const List<URI>& uris ) {
+void FailoverTransport::addURI( bool rebalance, const List<URI>& uris ) {
 
     std::auto_ptr< Iterator<URI> > iter( uris.iterator() );
 
@@ -118,11 +122,11 @@ void FailoverTransport::addURI( const Li
         this->uris->addURI( iter->next() );
     }
 
-    reconnect();
+    reconnect( rebalance );
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::removeURI( const List<URI>& uris ) {
+void FailoverTransport::removeURI( bool rebalance, const List<URI>& uris ) {
 
     std::auto_ptr< Iterator<URI> > iter( uris.iterator() );
 
@@ -130,7 +134,7 @@ void FailoverTransport::removeURI( const
         this->uris->removeURI( iter->next() );
     }
 
-    reconnect();
+    reconnect( rebalance );
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -140,7 +144,7 @@ void FailoverTransport::reconnect( const
 
         this->uris->addURI( uri );
 
-        reconnect();
+        reconnect( true );
     }
     AMQ_CATCH_RETHROW( IOException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
@@ -335,7 +339,7 @@ void FailoverTransport::start() {
             if( connectedTransport != NULL ) {
                 stateTracker.restore( connectedTransport );
             } else {
-                reconnect();
+                reconnect( false );
             }
         }
     }
@@ -396,10 +400,28 @@ void FailoverTransport::close() {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void FailoverTransport::reconnect() {
+void FailoverTransport::reconnect( bool rebalance ) {
+
+    Pointer<Transport> transport;
 
     synchronized( &reconnectMutex  ) {
         if( started ) {
+
+            if( rebalance ) {
+
+                transport.swap( this->connectedTransport );
+
+                if( transport != NULL ) {
+
+                    if( this->disposedListener != NULL ) {
+                        transport->setTransportListener( disposedListener.get() );
+                    }
+
+                    // Hand off to the close task so it gets done in a different thread.
+                    closeTask->add( transport );
+                }
+            }
+
             taskRunner->wakeup();
         }
     }
@@ -443,8 +465,6 @@ void FailoverTransport::handleTransportF
 
     if( transport != NULL ) {
 
-        //std::cout << "Failover: Connection to has been unexpectedly terminated." << std::endl;
-
         if( this->disposedListener != NULL ) {
             transport->setTransportListener( disposedListener.get() );
         }
@@ -476,6 +496,110 @@ void FailoverTransport::handleTransportF
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::handleConnectionControl( const Pointer<Command>& control ) {
+
+    try {
+
+        Pointer<ConnectionControl> ctrlCommand = control.dynamicCast<ConnectionControl>();
+
+        std::string reconnectStr = ctrlCommand->getReconnectTo();
+        if( !reconnectStr.empty() ) {
+
+            std::remove(reconnectStr.begin(), reconnectStr.end(), ' ');
+
+            if( reconnectStr.length() > 0 ) {
+                try {
+                    if( isReconnectSupported() ) {
+                        reconnect( URI( reconnectStr ) );
+                    }
+                } catch( Exception e ) {
+                }
+            }
+        }
+
+        processNewTransports( ctrlCommand->isRebalanceConnection(), ctrlCommand->getConnectedBrokers() );
+    }
+    AMQ_CATCH_RETHROW( Exception )
+    AMQ_CATCHALL_THROW( Exception )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::processNewTransports( bool rebalance, std::string newTransports ) {
+
+    if( !newTransports.empty() ) {
+
+        std::remove( newTransports.begin(), newTransports.end(), ' ' );
+
+        if( newTransports.length() > 0 && isUpdateURIsSupported() ) {
+
+            StlList<URI> list;
+            StringTokenizer tokenizer( newTransports, "," );
+
+            while( tokenizer.hasMoreTokens() ) {
+                std::string str = tokenizer.nextToken();
+                try {
+                    URI uri( str );
+                    list.add( uri );
+                } catch( Exception e ) {
+                    //LOG.error( "Failed to parse broker address: " + str, e );
+                }
+            }
+
+            if( !list.isEmpty() ) {
+                try {
+                    updateURIs( rebalance, list );
+                } catch( IOException e ) {
+                    //LOG.error( "Failed to update transport URI's from: " + newTransports, e );
+                }
+            }
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::updateURIs( bool rebalance, const decaf::util::List<decaf::net::URI>& updatedURIs ) {
+
+    if( isUpdateURIsSupported() ) {
+
+        StlList<URI> copy( this->updated );
+        StlList<URI> add;
+
+        if( !updatedURIs.isEmpty() ) {
+
+            StlSet<URI> set;
+
+            for( std::size_t i = 0; i < updatedURIs.size(); i++ ) {
+                set.add( updatedURIs.get(i) );
+            }
+
+            Pointer< Iterator<URI> > setIter( set.iterator() );
+            while( setIter->hasNext() ) {
+                URI value = setIter->next();
+                if( copy.remove( value ) ) {
+                    add.add( value );
+                }
+            }
+
+            synchronized( &reconnectMutex ) {
+
+                this->updated.clear();
+                Pointer< Iterator<URI> > listIter1( add.iterator() );
+                while( listIter1->hasNext() ) {
+                    this->updated.add( listIter1->next() );
+                }
+
+                Pointer< Iterator<URI> > listIter2( copy.iterator() );
+                while( listIter2->hasNext() ) {
+                    this->uris->removeURI( listIter2->next() );
+                }
+
+                this->addURI( rebalance, add );
+            }
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
 bool FailoverTransport::isPending() const {
     bool result = false;
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h?rev=979445&r1=979444&r2=979445&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h Mon Jul 26 21:13:42 2010
@@ -76,6 +76,8 @@ namespace failover {
         int maxCacheSize;
         bool connectionInterruptProcessingComplete;
         bool firstConnection;
+        bool updateURIsSupported;
+        bool reconnectSupported;
 
         mutable decaf::util::concurrent::Mutex reconnectMutex;
         mutable decaf::util::concurrent::Mutex sleepMutex;
@@ -85,6 +87,7 @@ namespace failover {
         decaf::util::StlMap<int, Pointer<Command> > requestMap;
 
         Pointer<URIPool> uris;
+        decaf::util::StlList<URI> updated;
         Pointer<URI> connectedTransportURI;
         Pointer<Transport> connectedTransport;
         Pointer<Exception> connectionFailure;
@@ -109,8 +112,11 @@ namespace failover {
         /**
          * Indicates that the Transport needs to reconnect to another URI in its
          * list.
+         *
+         * @param rebalance
+         *      Indicates if the current connection should be broken and reconnected.
          */
-        void reconnect();
+        void reconnect( bool rebalance );
 
         /**
          * Adds a New URI to the List of URIs this transport can Connect to.
@@ -121,25 +127,9 @@ namespace failover {
 
     public: // CompositeTransport methods
 
-        /**
-         * Add a URI to the list of URI's that will represent the set of Transports
-         * that this Transport is a composite of.
-         *
-         * @param uris
-         *        The new URIs to add to the set this composite maintains.
-         */
-        virtual void addURI( const List<URI>& uris );
+        virtual void addURI( bool rebalance, const List<URI>& uris );
 
-        /**
-         * Remove a URI from the set of URI's that represents the set of Transports
-         * that this Transport is composed of, removing a URI for which the composite
-         * has created a connected Transport should result in that Transport being
-         * disposed of.
-         *
-         * @param uris
-         *        The new URIs to remove to the set this composite maintains.
-         */
-        virtual void removeURI( const List<URI>& uris );
+        virtual void removeURI( bool rebalance, const List<URI>& uris );
 
     public: // Transport Members
 
@@ -197,6 +187,8 @@ namespace failover {
 
         virtual void reconnect( const decaf::net::URI& uri );
 
+        virtual void updateURIs( bool rebalance, const decaf::util::List<decaf::net::URI>& uris );
+
     public:  // CompositeTask Methods.
 
         /**
@@ -328,6 +320,22 @@ namespace failover {
             this->maxCacheSize = value;
         }
 
+        bool isReconnectSupported() const {
+            return this->reconnectSupported;
+        }
+
+        void setReconnectSupported( bool value ) {
+            this->reconnectSupported = value;
+        }
+
+        bool isUpdateURIsSupported() const {
+            return this->updateURIsSupported;
+        }
+
+        void setUpdateURIsSupported( bool value ) {
+            this->updateURIsSupported = value;
+        }
+
         void setConnectionInterruptProcessingComplete( const Pointer<commands::ConnectionId>& connectionId );
 
     protected:
@@ -350,6 +358,16 @@ namespace failover {
          */
         void handleTransportFailure( const decaf::lang::Exception& error );
 
+        /**
+         * Called when the Broker sends a ConnectionControl command which could
+         * signal that this Client needs to reconnect in order to rebalance the
+         * connections on a Broker or the set of Known brokers has changed.
+         *
+         * @param control
+         *      The ConnectionControl command sent from the Broker.
+         */
+        void handleConnectionControl( const Pointer<Command>& control );
+
     private:
 
         /**
@@ -371,6 +389,8 @@ namespace failover {
          */
         Pointer<Transport> createTransport( const URI& location ) const;
 
+        void processNewTransports( bool rebalance, std::string newTransports );
+
     };
 
 }}}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp?rev=979445&r1=979444&r2=979445&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp Mon Jul 26 21:13:42 2010
@@ -107,7 +107,7 @@ Pointer<Transport> FailoverTransportFact
         transport->setMaxCacheSize(
             Integer::parseInt( properties.getProperty( "maxCacheSize", "131072" ) ) );
 
-        transport->addURI( data.getComponents() );
+        transport->addURI( false, data.getComponents() );
 
         return transport;
     }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportListener.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportListener.cpp?rev=979445&r1=979444&r2=979445&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportListener.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportListener.cpp Mon Jul 26 21:13:42 2010
@@ -86,6 +86,10 @@ void FailoverTransportListener::onComman
         parent->setInitialized( true );
     }
 
+    if( command->isConnectionControl() ) {
+        parent->handleConnectionControl( command );
+    }
+
     if( parent->transportListener != NULL ) {
         parent->transportListener->onCommand( command );
     }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/MockTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/MockTransport.h?rev=979445&r1=979444&r2=979445&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/MockTransport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/MockTransport.h Mon Jul 26 21:13:42 2010
@@ -297,6 +297,19 @@ namespace mock{
             this->failOnClose = value;
         }
 
+        virtual bool isReconnectSupported() const {
+            return false;
+        }
+
+        virtual bool isUpdateURIsSupported() const {
+            return false;
+        }
+
+        virtual void updateURIs( bool rebalance AMQCPP_UNUSED,
+                                 const decaf::util::List<decaf::net::URI>& uris AMQCPP_UNUSED ) {
+            throw decaf::io::IOException();
+        }
+
     };
 
 }}}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp?rev=979445&r1=979444&r2=979445&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp Mon Jul 26 21:13:42 2010
@@ -230,6 +230,19 @@ namespace correlator{
         virtual void reconnect( const decaf::net::URI& uri )
             throw( decaf::io::IOException ) {}
 
+        virtual bool isReconnectSupported() const {
+            return false;
+        }
+
+        virtual bool isUpdateURIsSupported() const {
+            return false;
+        }
+
+        virtual void updateURIs( bool rebalance AMQCPP_UNUSED,
+                                 const decaf::util::List<decaf::net::URI>& uris AMQCPP_UNUSED ) {
+            throw decaf::io::IOException();
+        }
+
     };
 
     class MyBrokenTransport : public MyTransport{