You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/02/13 22:15:56 UTC

svn commit: r1445930 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover: FailoverTransport.cpp FailoverTransport.h URIPool.cpp URIPool.h

Author: tabish
Date: Wed Feb 13 21:15:56 2013
New Revision: 1445930

URL: http://svn.apache.org/r1445930
Log:
https://issues.apache.org/jira/browse/AMQCPP-463

Initial work to add this feature. 

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/URIPool.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/URIPool.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp?rev=1445930&r1=1445929&r2=1445930&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp Wed Feb 13 21:15:56 2013
@@ -43,48 +43,117 @@ using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
-FailoverTransport::FailoverTransport() : closed(false),
-                                         connected(false),
-                                         started(false),
-                                         timeout(-1),
-                                         initialReconnectDelay(10),
-                                         maxReconnectDelay(1000*30),
-                                         backOffMultiplier(2),
-                                         useExponentialBackOff(true),
-                                         initialized(false),
-                                         maxReconnectAttempts(0),
-                                         startupMaxReconnectAttempts(0),
-                                         connectFailures(0),
-                                         reconnectDelay(10),
-                                         trackMessages(false),
-                                         trackTransactionProducers(true),
-                                         maxCacheSize(128*1024),
-                                         connectionInterruptProcessingComplete(false),
-                                         firstConnection(true),
-                                         updateURIsSupported(true),
-                                         reconnectSupported(true),
-                                         reconnectMutex(),
-                                         sleepMutex(),
-                                         listenerMutex(),
-                                         stateTracker(),
-                                         requestMap(),
-                                         uris(new URIPool()),
-                                         updated(),
-                                         connectedTransportURI(),
-                                         connectedTransport(),
-                                         connectionFailure(),
-                                         backups(),
-                                         closeTask(new CloseTransportsTask()),
-                                         taskRunner(new CompositeTaskRunner()),
-                                         disposedListener(),
-                                         myTransportListener(new FailoverTransportListener(this)),
-                                         transportListener(NULL) {
+namespace activemq {
+namespace transport {
+namespace failover {
+
+    class FailoverTransportImpl {
+    private:
+
+        FailoverTransportImpl(const FailoverTransportImpl&);
+        FailoverTransportImpl& operator= (const FailoverTransportImpl&);
+
+    public:
+
+        bool closed;
+        bool connected;
+        bool started;
+
+        long long timeout;
+        long long initialReconnectDelay;
+        long long maxReconnectDelay;
+        long long backOffMultiplier;
+        bool useExponentialBackOff;
+        bool initialized;
+        int maxReconnectAttempts;
+        int startupMaxReconnectAttempts;
+        int connectFailures;
+        long long reconnectDelay;
+        bool trackMessages;
+        bool trackTransactionProducers;
+        int maxCacheSize;
+        bool connectionInterruptProcessingComplete;
+        bool firstConnection;
+        bool updateURIsSupported;
+        bool reconnectSupported;
+        bool rebalanceUpdateURIs;
+        bool priorityBackup;
+
+        mutable decaf::util::concurrent::Mutex reconnectMutex;
+        mutable decaf::util::concurrent::Mutex sleepMutex;
+        mutable decaf::util::concurrent::Mutex listenerMutex;
+
+        decaf::util::StlMap<int, Pointer<Command> > requestMap;
+
+        Pointer<URIPool> uris;
+        Pointer<URIPool> priorityUris;
+        decaf::util::LinkedList<URI> updated;
+        Pointer<URI> connectedTransportURI;
+        Pointer<Transport> connectedTransport;
+        Pointer<Exception> connectionFailure;
+        Pointer<BackupTransportPool> backups;
+        Pointer<CloseTransportsTask> closeTask;
+        Pointer<CompositeTaskRunner> taskRunner;
+        Pointer<TransportListener> disposedListener;
+        Pointer<TransportListener> myTransportListener;
+
+        FailoverTransportImpl(FailoverTransport* parent) :
+            closed(false),
+            connected(false),
+            started(false),
+            timeout(-1),
+            initialReconnectDelay(10),
+            maxReconnectDelay(1000*30),
+            backOffMultiplier(2),
+            useExponentialBackOff(true),
+            initialized(false),
+            maxReconnectAttempts(0),
+            startupMaxReconnectAttempts(0),
+            connectFailures(0),
+            reconnectDelay(10),
+            trackMessages(false),
+            trackTransactionProducers(true),
+            maxCacheSize(128*1024),
+            connectionInterruptProcessingComplete(false),
+            firstConnection(true),
+            updateURIsSupported(true),
+            reconnectSupported(true),
+            rebalanceUpdateURIs(true),
+            priorityBackup(false),
+            reconnectMutex(),
+            sleepMutex(),
+            listenerMutex(),
+            requestMap(),
+            uris(new URIPool()),
+            priorityUris(new URIPool()),
+            updated(),
+            connectedTransportURI(),
+            connectedTransport(),
+            connectionFailure(),
+            backups(),
+            closeTask(new CloseTransportsTask()),
+            taskRunner(new CompositeTaskRunner()),
+            disposedListener(),
+            myTransportListener(new FailoverTransportListener(parent)) {
+
+            this->backups.reset(new BackupTransportPool(taskRunner, closeTask, uris));
+
+            this->taskRunner->addTask(parent);
+            this->taskRunner->addTask(this->closeTask.get());
+        }
+
+        bool isPriority(const decaf::net::URI& uri) {
+            return priorityUris->contains(uri) || uris->isPriority(uri);
+        }
 
-    this->stateTracker.setTrackTransactions(true);
-    this->backups.reset(new BackupTransportPool(taskRunner, closeTask, uris));
+    };
 
-    this->taskRunner->addTask(this);
-    this->taskRunner->addTask(this->closeTask.get());
+}}}
+
+////////////////////////////////////////////////////////////////////////////////
+FailoverTransport::FailoverTransport() : stateTracker(), impl(NULL), transportListener(NULL) {
+    this->impl = new FailoverTransportImpl(this);
+    this->stateTracker.setTrackTransactions(true);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -92,7 +161,11 @@ FailoverTransport::~FailoverTransport() 
     try {
         close();
     }
-    AMQ_CATCH_NOTHROW( Exception)
+    AMQ_CATCHALL_NOTHROW()
+
+    try {
+        delete this->impl;
+    }
     AMQ_CATCHALL_NOTHROW()
 }
 
@@ -100,7 +173,7 @@ FailoverTransport::~FailoverTransport() 
 void FailoverTransport::add(const std::string& uri) {
 
     try {
-        uris->addURI(URI(uri));
+        this->impl->uris->addURI(URI(uri));
         reconnect(false);
     }
     AMQ_CATCHALL_NOTHROW()
@@ -112,7 +185,7 @@ void FailoverTransport::addURI(bool reba
     std::auto_ptr<Iterator<URI> > iter(uris.iterator());
 
     while (iter->hasNext()) {
-        this->uris->addURI(iter->next());
+        this->impl->uris->addURI(iter->next());
     }
 
     reconnect(rebalance);
@@ -123,7 +196,7 @@ void FailoverTransport::removeURI(bool r
 
     std::auto_ptr<Iterator<URI> > iter(uris.iterator());
 
-    synchronized( &reconnectMutex ) {
+    synchronized( &this->impl->reconnectMutex ) {
 
         // We need to lock so that the reconnect doesn't get kicked off until
         // we have a chance to remove the URIs in case one of them was the one
@@ -132,7 +205,7 @@ void FailoverTransport::removeURI(bool r
         reconnect(rebalance);
 
         while (iter->hasNext()) {
-            this->uris->removeURI(iter->next());
+            this->impl->uris->removeURI(iter->next());
         }
     }
 }
@@ -141,9 +214,7 @@ void FailoverTransport::removeURI(bool r
 void FailoverTransport::reconnect(const decaf::net::URI& uri) {
 
     try {
-
-        this->uris->addURI(uri);
-
+        this->impl->uris->addURI(uri);
         reconnect(true);
     }
     AMQ_CATCH_RETHROW( IOException)
@@ -153,15 +224,15 @@ void FailoverTransport::reconnect(const 
 
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::setTransportListener(TransportListener* listener) {
-    synchronized( &listenerMutex ) {
+    synchronized( &this->impl->listenerMutex ) {
         this->transportListener = listener;
-        listenerMutex.notifyAll();
+        this->impl->listenerMutex.notifyAll();
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 TransportListener* FailoverTransport::getTransportListener() const {
-    synchronized( &listenerMutex ) {
+    synchronized( &this->impl->listenerMutex ) {
         return this->transportListener;
     }
 
@@ -170,9 +241,9 @@ TransportListener* FailoverTransport::ge
 
 ////////////////////////////////////////////////////////////////////////////////
 std::string FailoverTransport::getRemoteAddress() const {
-    synchronized( &reconnectMutex ) {
-        if (connectedTransport != NULL) {
-            return connectedTransport->getRemoteAddress();
+    synchronized( &this->impl->reconnectMutex ) {
+        if (this->impl->connectedTransport != NULL) {
+            return this->impl->connectedTransport->getRemoteAddress();
         }
     }
     return "";
@@ -185,9 +256,9 @@ void FailoverTransport::oneway(const Poi
 
     try {
 
-        synchronized(&reconnectMutex) {
+        synchronized(&this->impl->reconnectMutex) {
 
-            if (command != NULL && connectedTransport == NULL) {
+            if (command != NULL && this->impl->connectedTransport == NULL) {
 
                 if (command->isShutdownInfo()) {
                     // Skipping send of ShutdownInfo command when not connected.
@@ -201,7 +272,7 @@ void FailoverTransport::oneway(const Poi
                     if (command->isResponseRequired()) {
                         Pointer<Response> response(new Response());
                         response->setCorrelationId(command->getCommandId());
-                        myTransportListener->onCommand(response);
+                        this->impl->myTransportListener->onCommand(response);
                     }
 
                     return;
@@ -209,33 +280,34 @@ void FailoverTransport::oneway(const Poi
             }
 
             // Keep trying until the message is sent.
-            for (int i = 0; !closed; i++) {
+            for (int i = 0; !this->impl->closed; i++) {
                 try {
 
                     // Wait for transport to be connected.
-                    Pointer<Transport> transport = connectedTransport;
+                    Pointer<Transport> transport = this->impl->connectedTransport;
                     long long start = System::currentTimeMillis();
                     bool timedout = false;
 
-                    while (transport == NULL && !closed && connectionFailure == NULL) {
+                    while (transport == NULL && !this->impl->closed && this->impl->connectionFailure == NULL) {
                         long long end = System::currentTimeMillis();
-                        if (timeout > 0 && (end - start > timeout)) {
+                        if (this->impl->timeout > 0 && (end - start > this->impl->timeout)) {
                             timedout = true;
                             break;
                         }
 
-                        reconnectMutex.wait(100);
-                        transport = connectedTransport;
+                        this->impl->reconnectMutex.wait(100);
+                        transport = this->impl->connectedTransport;
                     }
 
                     if (transport == NULL) {
                         // Previous loop may have exited due to us being disposed.
-                        if (closed) {
+                        if (this->impl->closed) {
                             error.reset(new IOException(__FILE__, __LINE__, "Transport disposed."));
-                        } else if (connectionFailure != NULL) {
-                            error = connectionFailure;
+                        } else if (this->impl->connectionFailure != NULL) {
+                            error = this->impl->connectionFailure;
                         } else if (timedout == true) {
-                            error.reset(new IOException(__FILE__, __LINE__, "Failover timeout of %d ms reached.", timeout));
+                            error.reset(new IOException(__FILE__, __LINE__,
+                                "Failover timeout of %d ms reached.", this->impl->timeout));
                         } else {
                             error.reset(new IOException(__FILE__, __LINE__, "Unexpected failure."));
                         }
@@ -249,11 +321,11 @@ void FailoverTransport::oneway(const Poi
                     Pointer<Tracked> tracked;
                     try {
                         tracked = stateTracker.track(command);
-                        synchronized( &requestMap ) {
+                        synchronized( &this->impl->requestMap ) {
                             if (tracked != NULL && tracked->isWaitingForResponse()) {
-                                requestMap.put(command->getCommandId(), tracked);
+                                this->impl->requestMap.put(command->getCommandId(), tracked);
                             } else if (tracked == NULL && command->isResponseRequired()) {
-                                requestMap.put(command->getCommandId(), command);
+                                this->impl->requestMap.put(command->getCommandId(), command);
                             }
                         }
                     } catch (Exception& ex) {
@@ -277,7 +349,7 @@ void FailoverTransport::oneway(const Poi
                             // since we will retry in this method.. take it out of the
                             // request map so that it is not sent 2 times on recovery
                             if (command->isResponseRequired()) {
-                                requestMap.remove(command->getCommandId());
+                                this->impl->requestMap.remove(command->getCommandId());
                             }
 
                             // Rethrow the exception so it will handled by
@@ -301,7 +373,7 @@ void FailoverTransport::oneway(const Poi
     AMQ_CATCH_NOTHROW( Exception)
     AMQ_CATCHALL_NOTHROW()
 
-    if (!closed) {
+    if (!this->impl->closed) {
         if (error != NULL) {
             throw IOException(*error);
         }
@@ -329,22 +401,22 @@ void FailoverTransport::start() {
 
     try {
 
-        synchronized(&reconnectMutex) {
+        synchronized(&this->impl->reconnectMutex) {
 
-            if (this->started) {
+            if (this->impl->started) {
                 return;
             }
 
-            started = true;
+            this->impl->started = true;
 
-            taskRunner->start();
+            this->impl->taskRunner->start();
 
             stateTracker.setMaxCacheSize(this->getMaxCacheSize());
             stateTracker.setTrackMessages(this->isTrackMessages());
             stateTracker.setTrackTransactionProducers(this->isTrackTransactionProducers());
 
-            if (connectedTransport != NULL) {
-                stateTracker.restore(connectedTransport);
+            if (this->impl->connectedTransport != NULL) {
+                stateTracker.restore(this->impl->connectedTransport);
             } else {
                 reconnect(false);
             }
@@ -372,30 +444,30 @@ void FailoverTransport::close() {
 
         Pointer<Transport> transportToStop;
 
-        synchronized( &reconnectMutex ) {
-            if (!started) {
+        synchronized( &this->impl->reconnectMutex ) {
+            if (!this->impl->started) {
                 return;
             }
 
-            started = false;
-            closed = true;
-            connected = false;
+            this->impl->started = false;
+            this->impl->closed = true;
+            this->impl->connected = false;
 
-            backups->setEnabled(false);
-            requestMap.clear();
+            this->impl->backups->setEnabled(false);
+            this->impl->requestMap.clear();
 
-            if (connectedTransport != NULL) {
-                transportToStop.swap(connectedTransport);
+            if (this->impl->connectedTransport != NULL) {
+                transportToStop.swap(this->impl->connectedTransport);
             }
 
-            reconnectMutex.notifyAll();
+            this->impl->reconnectMutex.notifyAll();
         }
 
-        synchronized( &sleepMutex ) {
-            sleepMutex.notifyAll();
+        synchronized( &this->impl->sleepMutex ) {
+            this->impl->sleepMutex.notifyAll();
         }
 
-        taskRunner->shutdown(2000);
+        this->impl->taskRunner->shutdown(2000);
 
         if (transportToStop != NULL) {
             transportToStop->close();
@@ -411,30 +483,30 @@ void FailoverTransport::reconnect(bool r
 
     Pointer<Transport> transport;
 
-    synchronized( &reconnectMutex ) {
-        if (started) {
+    synchronized( &this->impl->reconnectMutex ) {
+        if (this->impl->started) {
 
             if (rebalance) {
 
-                transport.swap(this->connectedTransport);
+                transport.swap(this->impl->connectedTransport);
 
                 if (transport != NULL) {
 
-                    if (this->disposedListener != NULL) {
-                        transport->setTransportListener(disposedListener.get());
+                    if (this->impl->disposedListener != NULL) {
+                        transport->setTransportListener(this->impl->disposedListener.get());
                     }
 
                     // Hand off to the close task so it gets done in a different thread.
-                    closeTask->add(transport);
+                    this->impl->closeTask->add(transport);
 
-                    if (this->connectedTransportURI != NULL) {
-                        this->uris->addURI(*this->connectedTransportURI);
-                        this->connectedTransportURI.reset(NULL);
+                    if (this->impl->connectedTransportURI != NULL) {
+                        this->impl->uris->addURI(*this->impl->connectedTransportURI);
+                        this->impl->connectedTransportURI.reset(NULL);
                     }
                 }
             }
 
-            taskRunner->wakeup();
+            this->impl->taskRunner->wakeup();
         }
     }
 }
@@ -454,8 +526,8 @@ void FailoverTransport::restoreTransport
         stateTracker.restore(transport);
 
         decaf::util::StlMap<int, Pointer<Command> > commands;
-        synchronized(&requestMap) {
-            commands.copy(requestMap);
+        synchronized(&this->impl->requestMap) {
+            commands.copy(this->impl->requestMap);
         }
 
         Pointer<Iterator<Pointer<Command> > > iter(commands.values().iterator());
@@ -472,25 +544,25 @@ void FailoverTransport::restoreTransport
 void FailoverTransport::handleTransportFailure(const decaf::lang::Exception& error AMQCPP_UNUSED) {
 
     Pointer<Transport> transport;
-    synchronized( &reconnectMutex ) {
-        connectedTransport.swap(transport);
+    synchronized(&this->impl->reconnectMutex) {
+        this->impl->connectedTransport.swap(transport);
     }
 
     if (transport != NULL) {
 
-        if (this->disposedListener != NULL) {
-            transport->setTransportListener(disposedListener.get());
+        if (this->impl->disposedListener != NULL) {
+            transport->setTransportListener(this->impl->disposedListener.get());
         }
 
         // Hand off to the close task so it gets done in a different thread.
-        closeTask->add(transport);
+        this->impl->closeTask->add(transport);
 
-        synchronized( &reconnectMutex ) {
+        synchronized(&this->impl->reconnectMutex) {
 
-            initialized = false;
-            uris->addURI(*connectedTransportURI);
-            connectedTransportURI.reset(NULL);
-            connected = false;
+            this->impl->initialized = false;
+            this->impl->uris->addURI(*this->impl->connectedTransportURI);
+            this->impl->connectedTransportURI.reset(NULL);
+            this->impl->connected = false;
 
             // Place the State Tracker into a reconnection state.
             this->stateTracker.transportInterrupted();
@@ -501,8 +573,8 @@ void FailoverTransport::handleTransportF
                 transportListener->transportInterrupted();
             }
 
-            if (started) {
-                taskRunner->wakeup();
+            if (this->impl->started) {
+                this->impl->taskRunner->wakeup();
             }
         }
     }
@@ -572,7 +644,7 @@ void FailoverTransport::updateURIs(bool 
 
     if (isUpdateURIsSupported()) {
 
-        LinkedList<URI> copy(this->updated);
+        LinkedList<URI> copy(this->impl->updated);
         LinkedList<URI> add;
 
         if (!updatedURIs.isEmpty()) {
@@ -591,17 +663,17 @@ void FailoverTransport::updateURIs(bool 
                 }
             }
 
-            synchronized( &reconnectMutex ) {
+            synchronized( &this->impl->reconnectMutex ) {
 
-                this->updated.clear();
+                this->impl->updated.clear();
                 Pointer<Iterator<URI> > listIter1(add.iterator());
                 while (listIter1->hasNext()) {
-                    this->updated.add(listIter1->next());
+                    this->impl->updated.add(listIter1->next());
                 }
 
                 Pointer<Iterator<URI> > listIter2(copy.iterator());
                 while (listIter2->hasNext()) {
-                    this->uris->removeURI(listIter2->next());
+                    this->impl->uris->removeURI(listIter2->next());
                 }
 
                 this->addURI(rebalance, add);
@@ -614,20 +686,20 @@ void FailoverTransport::updateURIs(bool 
 bool FailoverTransport::isPending() const {
     bool result = false;
 
-    synchronized( &reconnectMutex ) {
-        if (this->connectedTransport == NULL && !closed && started) {
+    synchronized(&this->impl->reconnectMutex) {
+        if (this->impl->connectedTransport == NULL && !this->impl->closed && this->impl->started) {
 
             int reconnectAttempts = 0;
-            if (firstConnection) {
-                if (startupMaxReconnectAttempts != 0) {
-                    reconnectAttempts = startupMaxReconnectAttempts;
+            if (this->impl->firstConnection) {
+                if (this->impl->startupMaxReconnectAttempts != 0) {
+                    reconnectAttempts = this->impl->startupMaxReconnectAttempts;
                 }
             }
             if (reconnectAttempts == 0) {
-                reconnectAttempts = maxReconnectAttempts;
+                reconnectAttempts = this->impl->maxReconnectAttempts;
             }
 
-            if (reconnectAttempts > 0 && connectFailures >= reconnectAttempts) {
+            if (reconnectAttempts > 0 && this->impl->connectFailures >= reconnectAttempts) {
                 result = false;
             } else {
                 result = true;
@@ -643,13 +715,13 @@ bool FailoverTransport::iterate() {
 
     Pointer<Exception> failure;
 
-    synchronized( &reconnectMutex ) {
+    synchronized( &this->impl->reconnectMutex ) {
 
-        if (closed || connectionFailure != NULL) {
-            reconnectMutex.notifyAll();
+        if (this->impl->closed || this->impl->connectionFailure != NULL) {
+            this->impl->reconnectMutex.notifyAll();
         }
 
-        if (connectedTransport != NULL || closed || connectionFailure != NULL) {
+        if (this->impl->connectedTransport != NULL || this->impl->closed || this->impl->connectionFailure != NULL) {
             return false;
         } else {
 
@@ -657,51 +729,51 @@ bool FailoverTransport::iterate() {
             Pointer<Transport> transport;
             URI uri;
 
-            if (!useExponentialBackOff) {
-                reconnectDelay = initialReconnectDelay;
+            if (!this->impl->useExponentialBackOff) {
+                this->impl->reconnectDelay = this->impl->initialReconnectDelay;
             }
 
-            if (backups->isEnabled()) {
+            if (this->impl->backups->isEnabled()) {
 
-                Pointer<BackupTransport> backupTransport = backups->getBackup();
+                Pointer<BackupTransport> backupTransport = this->impl->backups->getBackup();
 
                 if (backupTransport != NULL) {
 
                     transport = backupTransport->getTransport();
                     uri = backupTransport->getUri();
-                    transport->setTransportListener(myTransportListener.get());
+                    transport->setTransportListener(this->impl->myTransportListener.get());
 
                     try {
 
-                        if (started) {
+                        if (this->impl->started) {
                             restoreTransport(transport);
                         }
 
                     } catch (Exception& e) {
 
                         if (transport != NULL) {
-                            if (this->disposedListener != NULL) {
-                                transport->setTransportListener(disposedListener.get());
+                            if (this->impl->disposedListener != NULL) {
+                                transport->setTransportListener(this->impl->disposedListener.get());
                             }
 
                             // Hand off to the close task so it gets done in a different thread
                             // this prevents a deadlock from occurring if the Transport happens
                             // to call back through our onException method or locks in some other
                             // way.
-                            closeTask->add(transport);
-                            taskRunner->wakeup();
+                            this->impl->closeTask->add(transport);
+                            this->impl->taskRunner->wakeup();
                             transport.reset(NULL);
                         }
 
-                        this->uris->addURI(uri);
+                        this->impl->uris->addURI(uri);
                     }
                 }
             }
 
-            while (transport == NULL && !closed) {
+            while (transport == NULL && !this->impl->closed) {
 
                 try {
-                    uri = uris->getURI();
+                    uri = this->impl->uris->getURI();
                 } catch (NoSuchElementException& ex) {
                     break;
                 }
@@ -709,10 +781,10 @@ bool FailoverTransport::iterate() {
                 try {
 
                     transport = createTransport(uri);
-                    transport->setTransportListener(myTransportListener.get());
+                    transport->setTransportListener(this->impl->myTransportListener.get());
                     transport->start();
 
-                    if (started) {
+                    if (this->impl->started) {
                         restoreTransport(transport);
                     }
 
@@ -720,8 +792,8 @@ bool FailoverTransport::iterate() {
                     e.setMark(__FILE__, __LINE__);
 
                     if (transport != NULL) {
-                        if (this->disposedListener != NULL) {
-                            transport->setTransportListener(disposedListener.get());
+                        if (this->impl->disposedListener != NULL) {
+                            transport->setTransportListener(this->impl->disposedListener.get());
                         }
 
                         try {
@@ -733,8 +805,8 @@ bool FailoverTransport::iterate() {
                         // this prevents a deadlock from occurring if the Transport happens
                         // to call back through our onException method or locks in some other
                         // way.
-                        closeTask->add(transport);
-                        taskRunner->wakeup();
+                        this->impl->closeTask->add(transport);
+                        this->impl->taskRunner->wakeup();
                         transport.reset(NULL);
                     }
 
@@ -744,22 +816,22 @@ bool FailoverTransport::iterate() {
             }
 
             // Return the failures to the pool, we will try again on the next iteration.
-            this->uris->addURIs(failures);
+            this->impl->uris->addURIs(failures);
 
             if (transport != NULL) {
-                reconnectDelay = initialReconnectDelay;
-                connectedTransportURI.reset(new URI(uri));
-                connectedTransport = transport;
-                reconnectMutex.notifyAll();
-                connectFailures = 0;
-                connected = true;
+                this->impl->reconnectDelay = this->impl->initialReconnectDelay;
+                this->impl->connectedTransportURI.reset(new URI(uri));
+                this->impl->connectedTransport = transport;
+                this->impl->reconnectMutex.notifyAll();
+                this->impl->connectFailures = 0;
+                this->impl->connected = true;
 
                 // Make sure on initial startup, that the transportListener
                 // has been initialized for this instance.
-                synchronized( &listenerMutex ) {
+                synchronized(&this->impl->listenerMutex) {
                     if (transportListener == NULL) {
                         // if it isn't set after 2secs - it probably never will be
-                        listenerMutex.wait(2000);
+                        this->impl->listenerMutex.wait(2000);
                     }
                 }
 
@@ -767,8 +839,8 @@ bool FailoverTransport::iterate() {
                     transportListener->transportResumed();
                 }
 
-                if (firstConnection) {
-                    firstConnection = false;
+                if (this->impl->firstConnection) {
+                    this->impl->firstConnection = false;
                 }
 
                 return false;
@@ -776,23 +848,23 @@ bool FailoverTransport::iterate() {
         }
 
         int reconnectAttempts = 0;
-        if (firstConnection) {
-            if (startupMaxReconnectAttempts != 0) {
-                reconnectAttempts = startupMaxReconnectAttempts;
+        if (this->impl->firstConnection) {
+            if (this->impl->startupMaxReconnectAttempts != 0) {
+                reconnectAttempts = this->impl->startupMaxReconnectAttempts;
             }
         }
         if (reconnectAttempts == 0) {
-            reconnectAttempts = maxReconnectAttempts;
+            reconnectAttempts = this->impl->maxReconnectAttempts;
         }
 
-        if (reconnectAttempts > 0 && ++connectFailures >= reconnectAttempts) {
-            connectionFailure = failure;
+        if (reconnectAttempts > 0 && ++this->impl->connectFailures >= reconnectAttempts) {
+            this->impl->connectionFailure = failure;
 
             // Make sure on initial startup, that the transportListener has been initialized
             // for this instance.
-            synchronized( &listenerMutex ) {
+            synchronized(&this->impl->listenerMutex) {
                 if (transportListener == NULL) {
-                    listenerMutex.wait(2000);
+                    this->impl->listenerMutex.wait(2000);
                 }
             }
 
@@ -800,38 +872,38 @@ bool FailoverTransport::iterate() {
 
                 Pointer<IOException> ioException;
                 try {
-                    ioException = connectionFailure.dynamicCast<IOException>();
+                    ioException = this->impl->connectionFailure.dynamicCast<IOException>();
                 }
                 AMQ_CATCH_NOTHROW( ClassCastException)
 
                 if (ioException != NULL) {
-                    transportListener->onException(*connectionFailure);
+                    transportListener->onException(*this->impl->connectionFailure);
                 } else {
-                    transportListener->onException(IOException(*connectionFailure));
+                    transportListener->onException(IOException(*this->impl->connectionFailure));
                 }
             }
 
-            reconnectMutex.notifyAll();
+            this->impl->reconnectMutex.notifyAll();
             return false;
         }
     }
 
-    if (!closed) {
+    if (!this->impl->closed) {
 
-        synchronized( &sleepMutex ) {
-            sleepMutex.wait((unsigned int) reconnectDelay);
+        synchronized(&this->impl->sleepMutex) {
+            this->impl->sleepMutex.wait((unsigned int) this->impl->reconnectDelay);
         }
 
-        if (useExponentialBackOff) {
+        if (this->impl->useExponentialBackOff) {
             // Exponential increment of reconnect delay.
-            reconnectDelay *= backOffMultiplier;
-            if (reconnectDelay > maxReconnectDelay) {
-                reconnectDelay = maxReconnectDelay;
+            this->impl->reconnectDelay *= this->impl->backOffMultiplier;
+            if (this->impl->reconnectDelay > this->impl->maxReconnectDelay) {
+                this->impl->reconnectDelay = this->impl->maxReconnectDelay;
             }
         }
     }
 
-    return !closed;
+    return !this->impl->closed;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -857,29 +929,29 @@ Pointer<Transport> FailoverTransport::cr
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::setConnectionInterruptProcessingComplete(const Pointer<commands::ConnectionId> connectionId) {
 
-    synchronized( &reconnectMutex ) {
+    synchronized(&this->impl->reconnectMutex) {
         stateTracker.connectionInterruptProcessingComplete(this, connectionId);
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 bool FailoverTransport::isConnected() const {
-    return this->connected;
+    return this->impl->connected;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 bool FailoverTransport::isClosed() const {
-    return this->closed;
+    return this->impl->closed;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 bool FailoverTransport::isInitialized() const {
-    return this->initialized;
+    return this->impl->initialized;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::setInitialized(bool value) {
-    this->initialized = value;
+    this->impl->initialized = value;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -889,8 +961,8 @@ Transport* FailoverTransport::narrow(con
         return this;
     }
 
-    if (this->connectedTransport != NULL) {
-        return this->connectedTransport->narrow(typeId);
+    if (this->impl->connectedTransport != NULL) {
+        return this->impl->connectedTransport->narrow(typeId);
     }
 
     return NULL;
@@ -901,9 +973,9 @@ void FailoverTransport::processResponse(
 
     Pointer<Command> object;
 
-    synchronized(&(this->requestMap)) {
+    synchronized(&(this->impl->requestMap)) {
         try {
-            object = this->requestMap.remove(response->getCorrelationId());
+            object = this->impl->requestMap.remove(response->getCorrelationId());
         } catch (NoSuchElementException& ex) {
             // Not tracking this request in our map, not an error.
         }
@@ -922,7 +994,7 @@ void FailoverTransport::processResponse(
 Pointer<wireformat::WireFormat> FailoverTransport::getWireFormat() const {
 
     Pointer<wireformat::WireFormat> result;
-    Pointer<Transport> transport = this->connectedTransport;
+    Pointer<Transport> transport = this->impl->connectedTransport;
 
     if (transport != NULL) {
         result = transport->getWireFormat();
@@ -933,160 +1005,194 @@ Pointer<wireformat::WireFormat> Failover
 
 ////////////////////////////////////////////////////////////////////////////////
 long long FailoverTransport::getTimeout() const {
-    return this->timeout;
+    return this->impl->timeout;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::setTimeout(long long value) {
-    this->timeout = value;
+    this->impl->timeout = value;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 long long FailoverTransport::getInitialReconnectDelay() const {
-    return this->initialReconnectDelay;
+    return this->impl->initialReconnectDelay;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::setInitialReconnectDelay(long long value) {
-    this->initialReconnectDelay = value;
+    this->impl->initialReconnectDelay = value;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 long long FailoverTransport::getMaxReconnectDelay() const {
-    return this->maxReconnectDelay;
+    return this->impl->maxReconnectDelay;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::setMaxReconnectDelay(long long value) {
-    this->maxReconnectDelay = value;
+    this->impl->maxReconnectDelay = value;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 long long FailoverTransport::getBackOffMultiplier() const {
-    return this->backOffMultiplier;
+    return this->impl->backOffMultiplier;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::setBackOffMultiplier(long long value) {
-    this->backOffMultiplier = value;
+    this->impl->backOffMultiplier = value;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 bool FailoverTransport::isUseExponentialBackOff() const {
-    return this->useExponentialBackOff;
+    return this->impl->useExponentialBackOff;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::setUseExponentialBackOff(bool value) {
-    this->useExponentialBackOff = value;
+    this->impl->useExponentialBackOff = value;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 bool FailoverTransport::isRandomize() const {
-    return this->uris->isRandomize();
+    return this->impl->uris->isRandomize();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::setRandomize(bool value) {
-    this->uris->setRandomize(value);
+    this->impl->uris->setRandomize(value);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 int FailoverTransport::getMaxReconnectAttempts() const {
-    return this->maxReconnectAttempts;
+    return this->impl->maxReconnectAttempts;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::setMaxReconnectAttempts(int value) {
-    this->maxReconnectAttempts = value;
+    this->impl->maxReconnectAttempts = value;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 int FailoverTransport::getStartupMaxReconnectAttempts() const {
-    return this->startupMaxReconnectAttempts;
+    return this->impl->startupMaxReconnectAttempts;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::setStartupMaxReconnectAttempts(int value) {
-    this->startupMaxReconnectAttempts = value;
+    this->impl->startupMaxReconnectAttempts = value;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 long long FailoverTransport::getReconnectDelay() const {
-    return this->reconnectDelay;
+    return this->impl->reconnectDelay;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::setReconnectDelay(long long value) {
-    this->reconnectDelay = value;
+    this->impl->reconnectDelay = value;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 bool FailoverTransport::isBackup() const {
-    return this->backups->isEnabled();
+    return this->impl->backups->isEnabled();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::setBackup(bool value) {
-    this->backups->setEnabled(value);
+    this->impl->backups->setEnabled(value);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 int FailoverTransport::getBackupPoolSize() const {
-    return this->backups->getBackupPoolSize();
+    return this->impl->backups->getBackupPoolSize();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::setBackupPoolSize(int value) {
-    this->backups->setBackupPoolSize(value);
+    this->impl->backups->setBackupPoolSize(value);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 bool FailoverTransport::isTrackMessages() const {
-    return this->trackMessages;
+    return this->impl->trackMessages;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::setTrackMessages(bool value) {
-    this->trackMessages = value;
+    this->impl->trackMessages = value;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 bool FailoverTransport::isTrackTransactionProducers() const {
-    return this->trackTransactionProducers;
+    return this->impl->trackTransactionProducers;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::setTrackTransactionProducers(bool value) {
-    this->trackTransactionProducers = value;
+    this->impl->trackTransactionProducers = value;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 int FailoverTransport::getMaxCacheSize() const {
-    return this->maxCacheSize;
+    return this->impl->maxCacheSize;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::setMaxCacheSize(int value) {
-    this->maxCacheSize = value;
+    this->impl->maxCacheSize = value;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 bool FailoverTransport::isReconnectSupported() const {
-    return this->reconnectSupported;
+    return this->impl->reconnectSupported;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::setReconnectSupported(bool value) {
-    this->reconnectSupported = value;
+    this->impl->reconnectSupported = value;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 bool FailoverTransport::isUpdateURIsSupported() const {
-    return this->updateURIsSupported;
+    return this->impl->updateURIsSupported;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::setUpdateURIsSupported(bool value) {
-    this->updateURIsSupported = value;
+    this->impl->updateURIsSupported = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::isRebalanceUpdateURIs() const {
+    return this->impl->rebalanceUpdateURIs;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setRebalanceUpdateURIs(bool rebalanceUpdateURIs) {
+    this->impl->rebalanceUpdateURIs = rebalanceUpdateURIs;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::isPriorityBackup() const {
+    return this->impl->priorityBackup;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setPriorityBackup(bool priorityBackup) {
+    this->impl->priorityBackup = priorityBackup;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setPriorityURIs(const std::string& priorityURIs AMQCPP_UNUSED) {
+//    StringTokenizer tokenizer = new StringTokenizer(priorityURIs, ",");
+//    while (tokenizer.hasMoreTokens()) {
+//        String str = tokenizer.nextToken();
+//        try {
+//            URI uri = new URI(str);
+//            priorityList.add(uri);
+//        } catch (Exception e) {
+//            LOG.error("Failed to parse broker address: " + str, e);
+//        }
+//    }
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h?rev=1445930&r1=1445929&r2=1445930&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h Wed Feb 13 21:15:56 2013
@@ -45,57 +45,21 @@ namespace transport {
 namespace failover {
 
     using namespace decaf::lang;
-    using decaf::net::URI;
-    using namespace decaf::util;
-    using namespace activemq::threads;
     using activemq::commands::Command;
     using activemq::commands::Response;
 
+    class FailoverTransportImpl;
+
     class AMQCPP_API FailoverTransport : public CompositeTransport,
                                          public activemq::threads::CompositeTask {
     private:
 
         friend class FailoverTransportListener;
 
-        bool closed;
-        bool connected;
-        bool started;
-
-        long long timeout;
-        long long initialReconnectDelay;
-        long long maxReconnectDelay;
-        long long backOffMultiplier;
-        bool useExponentialBackOff;
-        bool initialized;
-        int maxReconnectAttempts;
-        int startupMaxReconnectAttempts;
-        int connectFailures;
-        long long reconnectDelay;
-        bool trackMessages;
-        bool trackTransactionProducers;
-        int maxCacheSize;
-        bool connectionInterruptProcessingComplete;
-        bool firstConnection;
-        bool updateURIsSupported;
-        bool reconnectSupported;
-
-        mutable decaf::util::concurrent::Mutex reconnectMutex;
-        mutable decaf::util::concurrent::Mutex sleepMutex;
-        mutable decaf::util::concurrent::Mutex listenerMutex;
-
         state::ConnectionStateTracker stateTracker;
-        decaf::util::StlMap<int, Pointer<Command> > requestMap;
 
-        Pointer<URIPool> uris;
-        decaf::util::LinkedList<URI> updated;
-        Pointer<URI> connectedTransportURI;
-        Pointer<Transport> connectedTransport;
-        Pointer<Exception> connectionFailure;
-        Pointer<BackupTransportPool> backups;
-        Pointer<CloseTransportsTask> closeTask;
-        Pointer<CompositeTaskRunner> taskRunner;
-        Pointer<TransportListener> disposedListener;
-        Pointer<TransportListener> myTransportListener;
+        FailoverTransportImpl* impl;
+
         TransportListener* transportListener;
 
     private:
@@ -127,9 +91,9 @@ namespace failover {
 
     public: // CompositeTransport methods
 
-        virtual void addURI(bool rebalance, const List<URI>& uris);
+        virtual void addURI(bool rebalance, const List<decaf::net::URI>& uris);
 
-        virtual void removeURI(bool rebalance, const List<URI>& uris);
+        virtual void removeURI(bool rebalance, const List<decaf::net::URI>& uris);
 
     public:
 
@@ -259,6 +223,16 @@ namespace failover {
 
         void setUpdateURIsSupported(bool value);
 
+        bool isRebalanceUpdateURIs() const;
+
+        void setRebalanceUpdateURIs(bool rebalanceUpdateURIs);
+
+        bool isPriorityBackup() const;
+
+        void setPriorityBackup(bool priorityBackup);
+
+        void setPriorityURIs(const std::string& priorityURIs);
+
         void setConnectionInterruptProcessingComplete(const Pointer<commands::ConnectionId> connectionId);
 
     protected:
@@ -301,7 +275,7 @@ namespace failover {
          *
          * @throw IOException if an I/O error occurs while creating the new Transport.
          */
-        Pointer<Transport> createTransport(const URI& location) const;
+        Pointer<Transport> createTransport(const decaf::net::URI& location) const;
 
         void processNewTransports(bool rebalance, std::string newTransports);
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/URIPool.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/URIPool.cpp?rev=1445930&r1=1445929&r2=1445930&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/URIPool.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/URIPool.cpp Wed Feb 13 21:15:56 2013
@@ -101,3 +101,21 @@ void URIPool::removeURI(const URI& uri) 
         }
     }
 }
+
+////////////////////////////////////////////////////////////////////////////////
+bool URIPool::contains(const decaf::net::URI& uri) const {
+    synchronized(&uriPool) {
+        return uriPool.contains(uri);
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool URIPool::isPriority(const decaf::net::URI& uri) const {
+    synchronized(&uriPool) {
+        if (uriPool.isEmpty()) {
+            return false;
+        }
+
+        return uriPool.getFirst().equals(uri);
+    }
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/URIPool.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/URIPool.h?rev=1445930&r1=1445929&r2=1445930&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/URIPool.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/URIPool.h Wed Feb 13 21:15:56 2013
@@ -34,7 +34,7 @@ namespace failover {
     class AMQCPP_API URIPool {
     private:
 
-        LinkedList<URI> uriPool;
+        mutable LinkedList<URI> uriPool;
         bool randomize;
 
     public:
@@ -108,6 +108,21 @@ namespace failover {
             this->randomize = value;
         }
 
+        /**
+         * Returns true if the given URI is contained in this set of URIs.
+         *
+         * @returns true if the URI is in the list.
+         */
+        bool contains(const decaf::net::URI& uri) const;
+
+        /**
+         * Returns true if the URI given is the first in the list of
+         * URIs contained in this pool.
+         *
+         * @returns true if the URI is index 0 in the URI list.
+         */
+        bool isPriority(const decaf::net::URI& uri) const;
+
     };
 
 }}}