You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/02/18 17:40:33 UTC

svn commit: r1447382 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover: BackupTransportPool.cpp BackupTransportPool.h FailoverTransport.cpp FailoverTransport.h

Author: tabish
Date: Mon Feb 18 16:40:33 2013
New Revision: 1447382

URL: http://svn.apache.org/r1447382
Log:
https://issues.apache.org/jira/browse/AMQCPP-463

Support priority backups and better support broker led re-balancing via ConnectionControl messages

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp?rev=1447382&r1=1447381&r2=1447382&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp Mon Feb 18 16:40:33 2013
@@ -22,6 +22,7 @@
 #include <activemq/exceptions/ActiveMQException.h>
 #include <activemq/transport/TransportFactory.h>
 #include <activemq/transport/TransportRegistry.h>
+#include <activemq/transport/failover/FailoverTransport.h>
 
 #include <decaf/lang/exceptions/NullPointerException.h>
 #include <decaf/lang/exceptions/IllegalStateException.h>
@@ -65,11 +66,13 @@ namespace failover {
 }}}
 
 ////////////////////////////////////////////////////////////////////////////////
-BackupTransportPool::BackupTransportPool(const Pointer<CompositeTaskRunner> taskRunner,
+BackupTransportPool::BackupTransportPool(FailoverTransport* parent,
+                                         const Pointer<CompositeTaskRunner> taskRunner,
                                          const Pointer<CloseTransportsTask> closeTask,
                                          const Pointer<URIPool> uriPool,
                                          const Pointer<URIPool> updates,
-                                         const Pointer<URIPool> priorityUriPool) : impl(new BackupTransportPoolImpl),
+                                         const Pointer<URIPool> priorityUriPool) : impl(NULL),
+                                                                                   parent(parent),
                                                                                    taskRunner(taskRunner),
                                                                                    closeTask(closeTask),
                                                                                    uriPool(uriPool),
@@ -78,6 +81,10 @@ BackupTransportPool::BackupTransportPool
                                                                                    backupPoolSize(1),
                                                                                    enabled(false) {
 
+    if (parent == NULL) {
+        throw NullPointerException(__FILE__, __LINE__, "Parent transport passed is NULL");
+    }
+
     if (taskRunner == NULL) {
         throw NullPointerException(__FILE__, __LINE__, "TaskRunner passed is NULL");
     }
@@ -94,18 +101,22 @@ BackupTransportPool::BackupTransportPool
         throw NullPointerException(__FILE__, __LINE__, "Close Transport Task passed is NULL");
     }
 
+    this->impl = new BackupTransportPoolImpl();
+
     // Add this instance as a Task so that we can create backups when nothing else is
     // going on.
     this->taskRunner->addTask(this);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-BackupTransportPool::BackupTransportPool(int backupPoolSize,
+BackupTransportPool::BackupTransportPool(FailoverTransport* parent,
+                                         int backupPoolSize,
                                          const Pointer<CompositeTaskRunner> taskRunner,
                                          const Pointer<CloseTransportsTask> closeTask,
                                          const Pointer<URIPool> uriPool,
                                          const Pointer<URIPool> updates,
-                                         const Pointer<URIPool> priorityUriPool) : impl(new BackupTransportPoolImpl),
+                                         const Pointer<URIPool> priorityUriPool) : impl(NULL),
+                                                                                   parent(parent),
                                                                                    taskRunner(taskRunner),
                                                                                    closeTask(closeTask),
                                                                                    uriPool(uriPool),
@@ -114,6 +125,10 @@ BackupTransportPool::BackupTransportPool
                                                                                    backupPoolSize(backupPoolSize),
                                                                                    enabled(false) {
 
+    if (parent == NULL) {
+        throw NullPointerException(__FILE__, __LINE__, "Parent transport passed is NULL");
+    }
+
     if (taskRunner == NULL) {
         throw NullPointerException(__FILE__, __LINE__, "TaskRunner passed is NULL");
     }
@@ -130,6 +145,8 @@ BackupTransportPool::BackupTransportPool
         throw NullPointerException(__FILE__, __LINE__, "Close Transport Task passed is NULL");
     }
 
+    this->impl = new BackupTransportPoolImpl();
+
     // Add this instance as a Task so that we can create backups when nothing else is
     // going on.
     this->taskRunner->addTask(this);
@@ -223,6 +240,8 @@ bool BackupTransportPool::iterate() {
             uriPool = updates;
         }
 
+        bool wakeupParent = false;
+
         while (isEnabled() && (int) this->impl->backups.size() < backupPoolSize) {
 
             URI connectTo;
@@ -230,6 +249,7 @@ bool BackupTransportPool::iterate() {
             // Try for a URI, if one isn't free return and indicate this task
             // is done for now, the next time a backup is requested this task
             // will become pending again and we will attempt to fill the pool.
+            // This will break the loop once we've tried all possible UIRs.
             try {
                 connectTo = uriPool->getURI();
             } catch (NoSuchElementException& ex) {
@@ -241,6 +261,10 @@ bool BackupTransportPool::iterate() {
 
             if (priorityUriPool->contains(connectTo)) {
                 backup->setPriority(true);
+
+                if (!parent->isConnectedToPriority()) {
+                    wakeupParent = true;
+                }
             }
 
             try {
@@ -263,14 +287,20 @@ bool BackupTransportPool::iterate() {
                 // return those to the pool.
                 failures.add(connectTo);
             }
+
+            // We connected to a priority backup and the parent isn't already using one
+            // so wake it up and quick the backups process for now.
+            if (wakeupParent) {
+                this->parent->reconnect(true);
+                break;
+            }
         }
 
         // return all failures to the URI Pool, we can try again later.
         uriPool->addURIs(failures);
+        this->impl->pending = false;
     }
 
-    this->impl->pending = false;
-
     return false;
 }
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.h?rev=1447382&r1=1447381&r2=1447382&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.h Mon Feb 18 16:40:33 2013
@@ -38,6 +38,7 @@ namespace failover {
     using activemq::threads::CompositeTaskRunner;
 
     class BackupTransportPoolImpl;
+    class FailoverTransport;
 
     class AMQCPP_API BackupTransportPool : public activemq::threads::CompositeTask {
     private:
@@ -46,6 +47,7 @@ namespace failover {
 
         BackupTransportPoolImpl* impl;
 
+        FailoverTransport* parent;
         Pointer<CompositeTaskRunner> taskRunner;
         Pointer<CloseTransportsTask> closeTask;
         Pointer<URIPool> uriPool;
@@ -57,13 +59,15 @@ namespace failover {
 
     public:
 
-        BackupTransportPool(const Pointer<CompositeTaskRunner> taskRunner,
+        BackupTransportPool(FailoverTransport* parent,
+                            const Pointer<CompositeTaskRunner> taskRunner,
                             const Pointer<CloseTransportsTask> closeTask,
                             const Pointer<URIPool> uriPool,
                             const Pointer<URIPool> updates,
                             const Pointer<URIPool> priorityUriPool);
 
-        BackupTransportPool(int backupPoolSize,
+        BackupTransportPool(FailoverTransport* parent,
+                            int backupPoolSize,
                             const Pointer<CompositeTaskRunner> taskRunner,
                             const Pointer<CloseTransportsTask> closeTask,
                             const Pointer<URIPool> uriPool,

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp?rev=1447382&r1=1447381&r2=1447382&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp Mon Feb 18 16:40:33 2013
@@ -160,7 +160,8 @@ namespace failover {
             myTransportListener(new FailoverTransportListener(parent)),
             transportListener(NULL) {
 
-            this->backups.reset(new BackupTransportPool(taskRunner, closeTask, uris, updated, priorityUris));
+            this->backups.reset(
+                new BackupTransportPool(parent, taskRunner, closeTask, uris, updated, priorityUris));
 
             this->taskRunner->addTask(parent);
             this->taskRunner->addTask(this->closeTask.get());
@@ -711,6 +712,7 @@ void FailoverTransport::handleTransportF
             this->impl->uris->addURI(failedUri);
             this->impl->connectedTransportURI.reset(NULL);
             this->impl->connected = false;
+            this->impl->connectedToPrioirty = false;
 
             // Place the State Tracker into a reconnection state.
             this->stateTracker.transportInterrupted();
@@ -936,6 +938,9 @@ bool FailoverTransport::iterate() {
                         this->impl->reconnectMutex.notifyAll();
                         this->impl->connectFailures = 0;
 
+                        this->impl->connectedToPrioirty =
+                            connectList->getPriorityURI().equals(uri) || this->impl->connectedToPrioirty;
+
                         // Make sure on initial startup, that the transportListener
                         // has been initialized for this instance.
                         synchronized(&this->impl->listenerMutex) {
@@ -1291,6 +1296,11 @@ void FailoverTransport::setPriorityBacku
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::isConnectedToPriority() const {
+    return this->impl->connectedToPrioirty;
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void FailoverTransport::setPriorityURIs(const std::string& priorityURIs AMQCPP_UNUSED) {
     StringTokenizer tokenizer(priorityURIs, ",");
     while (tokenizer.hasMoreTokens()) {

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h?rev=1447382&r1=1447381&r2=1447382&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h Mon Feb 18 16:40:33 2013
@@ -231,6 +231,8 @@ namespace failover {
 
         void setConnectionInterruptProcessingComplete(const Pointer<commands::ConnectionId> connectionId);
 
+        bool isConnectedToPriority() const;
+
     protected:
 
         /**