You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/02/18 17:40:33 UTC
svn commit: r1447382 - in
/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover:
BackupTransportPool.cpp BackupTransportPool.h FailoverTransport.cpp
FailoverTransport.h
Author: tabish
Date: Mon Feb 18 16:40:33 2013
New Revision: 1447382
URL: http://svn.apache.org/r1447382
Log:
https://issues.apache.org/jira/browse/AMQCPP-463
Support priority backups and better support broker led re-balancing via ConnectionControl messages
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp?rev=1447382&r1=1447381&r2=1447382&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp Mon Feb 18 16:40:33 2013
@@ -22,6 +22,7 @@
#include <activemq/exceptions/ActiveMQException.h>
#include <activemq/transport/TransportFactory.h>
#include <activemq/transport/TransportRegistry.h>
+#include <activemq/transport/failover/FailoverTransport.h>
#include <decaf/lang/exceptions/NullPointerException.h>
#include <decaf/lang/exceptions/IllegalStateException.h>
@@ -65,11 +66,13 @@ namespace failover {
}}}
////////////////////////////////////////////////////////////////////////////////
-BackupTransportPool::BackupTransportPool(const Pointer<CompositeTaskRunner> taskRunner,
+BackupTransportPool::BackupTransportPool(FailoverTransport* parent,
+ const Pointer<CompositeTaskRunner> taskRunner,
const Pointer<CloseTransportsTask> closeTask,
const Pointer<URIPool> uriPool,
const Pointer<URIPool> updates,
- const Pointer<URIPool> priorityUriPool) : impl(new BackupTransportPoolImpl),
+ const Pointer<URIPool> priorityUriPool) : impl(NULL),
+ parent(parent),
taskRunner(taskRunner),
closeTask(closeTask),
uriPool(uriPool),
@@ -78,6 +81,10 @@ BackupTransportPool::BackupTransportPool
backupPoolSize(1),
enabled(false) {
+ if (parent == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "Parent transport passed is NULL");
+ }
+
if (taskRunner == NULL) {
throw NullPointerException(__FILE__, __LINE__, "TaskRunner passed is NULL");
}
@@ -94,18 +101,22 @@ BackupTransportPool::BackupTransportPool
throw NullPointerException(__FILE__, __LINE__, "Close Transport Task passed is NULL");
}
+ this->impl = new BackupTransportPoolImpl();
+
// Add this instance as a Task so that we can create backups when nothing else is
// going on.
this->taskRunner->addTask(this);
}
////////////////////////////////////////////////////////////////////////////////
-BackupTransportPool::BackupTransportPool(int backupPoolSize,
+BackupTransportPool::BackupTransportPool(FailoverTransport* parent,
+ int backupPoolSize,
const Pointer<CompositeTaskRunner> taskRunner,
const Pointer<CloseTransportsTask> closeTask,
const Pointer<URIPool> uriPool,
const Pointer<URIPool> updates,
- const Pointer<URIPool> priorityUriPool) : impl(new BackupTransportPoolImpl),
+ const Pointer<URIPool> priorityUriPool) : impl(NULL),
+ parent(parent),
taskRunner(taskRunner),
closeTask(closeTask),
uriPool(uriPool),
@@ -114,6 +125,10 @@ BackupTransportPool::BackupTransportPool
backupPoolSize(backupPoolSize),
enabled(false) {
+ if (parent == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "Parent transport passed is NULL");
+ }
+
if (taskRunner == NULL) {
throw NullPointerException(__FILE__, __LINE__, "TaskRunner passed is NULL");
}
@@ -130,6 +145,8 @@ BackupTransportPool::BackupTransportPool
throw NullPointerException(__FILE__, __LINE__, "Close Transport Task passed is NULL");
}
+ this->impl = new BackupTransportPoolImpl();
+
// Add this instance as a Task so that we can create backups when nothing else is
// going on.
this->taskRunner->addTask(this);
@@ -223,6 +240,8 @@ bool BackupTransportPool::iterate() {
uriPool = updates;
}
+ bool wakeupParent = false;
+
while (isEnabled() && (int) this->impl->backups.size() < backupPoolSize) {
URI connectTo;
@@ -230,6 +249,7 @@ bool BackupTransportPool::iterate() {
// Try for a URI, if one isn't free return and indicate this task
// is done for now, the next time a backup is requested this task
// will become pending again and we will attempt to fill the pool.
+ // This will break the loop once we've tried all possible UIRs.
try {
connectTo = uriPool->getURI();
} catch (NoSuchElementException& ex) {
@@ -241,6 +261,10 @@ bool BackupTransportPool::iterate() {
if (priorityUriPool->contains(connectTo)) {
backup->setPriority(true);
+
+ if (!parent->isConnectedToPriority()) {
+ wakeupParent = true;
+ }
}
try {
@@ -263,14 +287,20 @@ bool BackupTransportPool::iterate() {
// return those to the pool.
failures.add(connectTo);
}
+
+ // We connected to a priority backup and the parent isn't already using one
+ // so wake it up and quick the backups process for now.
+ if (wakeupParent) {
+ this->parent->reconnect(true);
+ break;
+ }
}
// return all failures to the URI Pool, we can try again later.
uriPool->addURIs(failures);
+ this->impl->pending = false;
}
- this->impl->pending = false;
-
return false;
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.h?rev=1447382&r1=1447381&r2=1447382&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.h Mon Feb 18 16:40:33 2013
@@ -38,6 +38,7 @@ namespace failover {
using activemq::threads::CompositeTaskRunner;
class BackupTransportPoolImpl;
+ class FailoverTransport;
class AMQCPP_API BackupTransportPool : public activemq::threads::CompositeTask {
private:
@@ -46,6 +47,7 @@ namespace failover {
BackupTransportPoolImpl* impl;
+ FailoverTransport* parent;
Pointer<CompositeTaskRunner> taskRunner;
Pointer<CloseTransportsTask> closeTask;
Pointer<URIPool> uriPool;
@@ -57,13 +59,15 @@ namespace failover {
public:
- BackupTransportPool(const Pointer<CompositeTaskRunner> taskRunner,
+ BackupTransportPool(FailoverTransport* parent,
+ const Pointer<CompositeTaskRunner> taskRunner,
const Pointer<CloseTransportsTask> closeTask,
const Pointer<URIPool> uriPool,
const Pointer<URIPool> updates,
const Pointer<URIPool> priorityUriPool);
- BackupTransportPool(int backupPoolSize,
+ BackupTransportPool(FailoverTransport* parent,
+ int backupPoolSize,
const Pointer<CompositeTaskRunner> taskRunner,
const Pointer<CloseTransportsTask> closeTask,
const Pointer<URIPool> uriPool,
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp?rev=1447382&r1=1447381&r2=1447382&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp Mon Feb 18 16:40:33 2013
@@ -160,7 +160,8 @@ namespace failover {
myTransportListener(new FailoverTransportListener(parent)),
transportListener(NULL) {
- this->backups.reset(new BackupTransportPool(taskRunner, closeTask, uris, updated, priorityUris));
+ this->backups.reset(
+ new BackupTransportPool(parent, taskRunner, closeTask, uris, updated, priorityUris));
this->taskRunner->addTask(parent);
this->taskRunner->addTask(this->closeTask.get());
@@ -711,6 +712,7 @@ void FailoverTransport::handleTransportF
this->impl->uris->addURI(failedUri);
this->impl->connectedTransportURI.reset(NULL);
this->impl->connected = false;
+ this->impl->connectedToPrioirty = false;
// Place the State Tracker into a reconnection state.
this->stateTracker.transportInterrupted();
@@ -936,6 +938,9 @@ bool FailoverTransport::iterate() {
this->impl->reconnectMutex.notifyAll();
this->impl->connectFailures = 0;
+ this->impl->connectedToPrioirty =
+ connectList->getPriorityURI().equals(uri) || this->impl->connectedToPrioirty;
+
// Make sure on initial startup, that the transportListener
// has been initialized for this instance.
synchronized(&this->impl->listenerMutex) {
@@ -1291,6 +1296,11 @@ void FailoverTransport::setPriorityBacku
}
////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::isConnectedToPriority() const {
+ return this->impl->connectedToPrioirty;
+}
+
+////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setPriorityURIs(const std::string& priorityURIs AMQCPP_UNUSED) {
StringTokenizer tokenizer(priorityURIs, ",");
while (tokenizer.hasMoreTokens()) {
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h?rev=1447382&r1=1447381&r2=1447382&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h Mon Feb 18 16:40:33 2013
@@ -231,6 +231,8 @@ namespace failover {
void setConnectionInterruptProcessingComplete(const Pointer<commands::ConnectionId> connectionId);
+ bool isConnectedToPriority() const;
+
protected:
/**