You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/02/13 22:15:56 UTC
svn commit: r1445930 - in
/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover:
FailoverTransport.cpp FailoverTransport.h URIPool.cpp URIPool.h
Author: tabish
Date: Wed Feb 13 21:15:56 2013
New Revision: 1445930
URL: http://svn.apache.org/r1445930
Log:
https://issues.apache.org/jira/browse/AMQCPP-463
Initial work to add this feature.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/URIPool.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/URIPool.h
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp?rev=1445930&r1=1445929&r2=1445930&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp Wed Feb 13 21:15:56 2013
@@ -43,48 +43,117 @@ using namespace decaf::lang;
using namespace decaf::lang::exceptions;
////////////////////////////////////////////////////////////////////////////////
-FailoverTransport::FailoverTransport() : closed(false),
- connected(false),
- started(false),
- timeout(-1),
- initialReconnectDelay(10),
- maxReconnectDelay(1000*30),
- backOffMultiplier(2),
- useExponentialBackOff(true),
- initialized(false),
- maxReconnectAttempts(0),
- startupMaxReconnectAttempts(0),
- connectFailures(0),
- reconnectDelay(10),
- trackMessages(false),
- trackTransactionProducers(true),
- maxCacheSize(128*1024),
- connectionInterruptProcessingComplete(false),
- firstConnection(true),
- updateURIsSupported(true),
- reconnectSupported(true),
- reconnectMutex(),
- sleepMutex(),
- listenerMutex(),
- stateTracker(),
- requestMap(),
- uris(new URIPool()),
- updated(),
- connectedTransportURI(),
- connectedTransport(),
- connectionFailure(),
- backups(),
- closeTask(new CloseTransportsTask()),
- taskRunner(new CompositeTaskRunner()),
- disposedListener(),
- myTransportListener(new FailoverTransportListener(this)),
- transportListener(NULL) {
+namespace activemq {
+namespace transport {
+namespace failover {
+
+ class FailoverTransportImpl {
+ private:
+
+ FailoverTransportImpl(const FailoverTransportImpl&);
+ FailoverTransportImpl& operator= (const FailoverTransportImpl&);
+
+ public:
+
+ bool closed;
+ bool connected;
+ bool started;
+
+ long long timeout;
+ long long initialReconnectDelay;
+ long long maxReconnectDelay;
+ long long backOffMultiplier;
+ bool useExponentialBackOff;
+ bool initialized;
+ int maxReconnectAttempts;
+ int startupMaxReconnectAttempts;
+ int connectFailures;
+ long long reconnectDelay;
+ bool trackMessages;
+ bool trackTransactionProducers;
+ int maxCacheSize;
+ bool connectionInterruptProcessingComplete;
+ bool firstConnection;
+ bool updateURIsSupported;
+ bool reconnectSupported;
+ bool rebalanceUpdateURIs;
+ bool priorityBackup;
+
+ mutable decaf::util::concurrent::Mutex reconnectMutex;
+ mutable decaf::util::concurrent::Mutex sleepMutex;
+ mutable decaf::util::concurrent::Mutex listenerMutex;
+
+ decaf::util::StlMap<int, Pointer<Command> > requestMap;
+
+ Pointer<URIPool> uris;
+ Pointer<URIPool> priorityUris;
+ decaf::util::LinkedList<URI> updated;
+ Pointer<URI> connectedTransportURI;
+ Pointer<Transport> connectedTransport;
+ Pointer<Exception> connectionFailure;
+ Pointer<BackupTransportPool> backups;
+ Pointer<CloseTransportsTask> closeTask;
+ Pointer<CompositeTaskRunner> taskRunner;
+ Pointer<TransportListener> disposedListener;
+ Pointer<TransportListener> myTransportListener;
+
+ FailoverTransportImpl(FailoverTransport* parent) :
+ closed(false),
+ connected(false),
+ started(false),
+ timeout(-1),
+ initialReconnectDelay(10),
+ maxReconnectDelay(1000*30),
+ backOffMultiplier(2),
+ useExponentialBackOff(true),
+ initialized(false),
+ maxReconnectAttempts(0),
+ startupMaxReconnectAttempts(0),
+ connectFailures(0),
+ reconnectDelay(10),
+ trackMessages(false),
+ trackTransactionProducers(true),
+ maxCacheSize(128*1024),
+ connectionInterruptProcessingComplete(false),
+ firstConnection(true),
+ updateURIsSupported(true),
+ reconnectSupported(true),
+ rebalanceUpdateURIs(true),
+ priorityBackup(false),
+ reconnectMutex(),
+ sleepMutex(),
+ listenerMutex(),
+ requestMap(),
+ uris(new URIPool()),
+ priorityUris(new URIPool()),
+ updated(),
+ connectedTransportURI(),
+ connectedTransport(),
+ connectionFailure(),
+ backups(),
+ closeTask(new CloseTransportsTask()),
+ taskRunner(new CompositeTaskRunner()),
+ disposedListener(),
+ myTransportListener(new FailoverTransportListener(parent)) {
+
+ this->backups.reset(new BackupTransportPool(taskRunner, closeTask, uris));
+
+ this->taskRunner->addTask(parent);
+ this->taskRunner->addTask(this->closeTask.get());
+ }
+
+ bool isPriority(const decaf::net::URI& uri) {
+ return priorityUris->contains(uri) || uris->isPriority(uri);
+ }
- this->stateTracker.setTrackTransactions(true);
- this->backups.reset(new BackupTransportPool(taskRunner, closeTask, uris));
+ };
- this->taskRunner->addTask(this);
- this->taskRunner->addTask(this->closeTask.get());
+}}}
+
+////////////////////////////////////////////////////////////////////////////////
+FailoverTransport::FailoverTransport() : stateTracker(), impl(NULL), transportListener(NULL) {
+ this->impl = new FailoverTransportImpl(this);
+ this->stateTracker.setTrackTransactions(true);
}
////////////////////////////////////////////////////////////////////////////////
@@ -92,7 +161,11 @@ FailoverTransport::~FailoverTransport()
try {
close();
}
- AMQ_CATCH_NOTHROW( Exception)
+ AMQ_CATCHALL_NOTHROW()
+
+ try {
+ delete this->impl;
+ }
AMQ_CATCHALL_NOTHROW()
}
@@ -100,7 +173,7 @@ FailoverTransport::~FailoverTransport()
void FailoverTransport::add(const std::string& uri) {
try {
- uris->addURI(URI(uri));
+ this->impl->uris->addURI(URI(uri));
reconnect(false);
}
AMQ_CATCHALL_NOTHROW()
@@ -112,7 +185,7 @@ void FailoverTransport::addURI(bool reba
std::auto_ptr<Iterator<URI> > iter(uris.iterator());
while (iter->hasNext()) {
- this->uris->addURI(iter->next());
+ this->impl->uris->addURI(iter->next());
}
reconnect(rebalance);
@@ -123,7 +196,7 @@ void FailoverTransport::removeURI(bool r
std::auto_ptr<Iterator<URI> > iter(uris.iterator());
- synchronized( &reconnectMutex ) {
+ synchronized( &this->impl->reconnectMutex ) {
// We need to lock so that the reconnect doesn't get kicked off until
// we have a chance to remove the URIs in case one of them was the one
@@ -132,7 +205,7 @@ void FailoverTransport::removeURI(bool r
reconnect(rebalance);
while (iter->hasNext()) {
- this->uris->removeURI(iter->next());
+ this->impl->uris->removeURI(iter->next());
}
}
}
@@ -141,9 +214,7 @@ void FailoverTransport::removeURI(bool r
void FailoverTransport::reconnect(const decaf::net::URI& uri) {
try {
-
- this->uris->addURI(uri);
-
+ this->impl->uris->addURI(uri);
reconnect(true);
}
AMQ_CATCH_RETHROW( IOException)
@@ -153,15 +224,15 @@ void FailoverTransport::reconnect(const
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setTransportListener(TransportListener* listener) {
- synchronized( &listenerMutex ) {
+ synchronized( &this->impl->listenerMutex ) {
this->transportListener = listener;
- listenerMutex.notifyAll();
+ this->impl->listenerMutex.notifyAll();
}
}
////////////////////////////////////////////////////////////////////////////////
TransportListener* FailoverTransport::getTransportListener() const {
- synchronized( &listenerMutex ) {
+ synchronized( &this->impl->listenerMutex ) {
return this->transportListener;
}
@@ -170,9 +241,9 @@ TransportListener* FailoverTransport::ge
////////////////////////////////////////////////////////////////////////////////
std::string FailoverTransport::getRemoteAddress() const {
- synchronized( &reconnectMutex ) {
- if (connectedTransport != NULL) {
- return connectedTransport->getRemoteAddress();
+ synchronized( &this->impl->reconnectMutex ) {
+ if (this->impl->connectedTransport != NULL) {
+ return this->impl->connectedTransport->getRemoteAddress();
}
}
return "";
@@ -185,9 +256,9 @@ void FailoverTransport::oneway(const Poi
try {
- synchronized(&reconnectMutex) {
+ synchronized(&this->impl->reconnectMutex) {
- if (command != NULL && connectedTransport == NULL) {
+ if (command != NULL && this->impl->connectedTransport == NULL) {
if (command->isShutdownInfo()) {
// Skipping send of ShutdownInfo command when not connected.
@@ -201,7 +272,7 @@ void FailoverTransport::oneway(const Poi
if (command->isResponseRequired()) {
Pointer<Response> response(new Response());
response->setCorrelationId(command->getCommandId());
- myTransportListener->onCommand(response);
+ this->impl->myTransportListener->onCommand(response);
}
return;
@@ -209,33 +280,34 @@ void FailoverTransport::oneway(const Poi
}
// Keep trying until the message is sent.
- for (int i = 0; !closed; i++) {
+ for (int i = 0; !this->impl->closed; i++) {
try {
// Wait for transport to be connected.
- Pointer<Transport> transport = connectedTransport;
+ Pointer<Transport> transport = this->impl->connectedTransport;
long long start = System::currentTimeMillis();
bool timedout = false;
- while (transport == NULL && !closed && connectionFailure == NULL) {
+ while (transport == NULL && !this->impl->closed && this->impl->connectionFailure == NULL) {
long long end = System::currentTimeMillis();
- if (timeout > 0 && (end - start > timeout)) {
+ if (this->impl->timeout > 0 && (end - start > this->impl->timeout)) {
timedout = true;
break;
}
- reconnectMutex.wait(100);
- transport = connectedTransport;
+ this->impl->reconnectMutex.wait(100);
+ transport = this->impl->connectedTransport;
}
if (transport == NULL) {
// Previous loop may have exited due to us being disposed.
- if (closed) {
+ if (this->impl->closed) {
error.reset(new IOException(__FILE__, __LINE__, "Transport disposed."));
- } else if (connectionFailure != NULL) {
- error = connectionFailure;
+ } else if (this->impl->connectionFailure != NULL) {
+ error = this->impl->connectionFailure;
} else if (timedout == true) {
- error.reset(new IOException(__FILE__, __LINE__, "Failover timeout of %d ms reached.", timeout));
+ error.reset(new IOException(__FILE__, __LINE__,
+ "Failover timeout of %d ms reached.", this->impl->timeout));
} else {
error.reset(new IOException(__FILE__, __LINE__, "Unexpected failure."));
}
@@ -249,11 +321,11 @@ void FailoverTransport::oneway(const Poi
Pointer<Tracked> tracked;
try {
tracked = stateTracker.track(command);
- synchronized( &requestMap ) {
+ synchronized( &this->impl->requestMap ) {
if (tracked != NULL && tracked->isWaitingForResponse()) {
- requestMap.put(command->getCommandId(), tracked);
+ this->impl->requestMap.put(command->getCommandId(), tracked);
} else if (tracked == NULL && command->isResponseRequired()) {
- requestMap.put(command->getCommandId(), command);
+ this->impl->requestMap.put(command->getCommandId(), command);
}
}
} catch (Exception& ex) {
@@ -277,7 +349,7 @@ void FailoverTransport::oneway(const Poi
// since we will retry in this method.. take it out of the
// request map so that it is not sent 2 times on recovery
if (command->isResponseRequired()) {
- requestMap.remove(command->getCommandId());
+ this->impl->requestMap.remove(command->getCommandId());
}
// Rethrow the exception so it will handled by
@@ -301,7 +373,7 @@ void FailoverTransport::oneway(const Poi
AMQ_CATCH_NOTHROW( Exception)
AMQ_CATCHALL_NOTHROW()
- if (!closed) {
+ if (!this->impl->closed) {
if (error != NULL) {
throw IOException(*error);
}
@@ -329,22 +401,22 @@ void FailoverTransport::start() {
try {
- synchronized(&reconnectMutex) {
+ synchronized(&this->impl->reconnectMutex) {
- if (this->started) {
+ if (this->impl->started) {
return;
}
- started = true;
+ this->impl->started = true;
- taskRunner->start();
+ this->impl->taskRunner->start();
stateTracker.setMaxCacheSize(this->getMaxCacheSize());
stateTracker.setTrackMessages(this->isTrackMessages());
stateTracker.setTrackTransactionProducers(this->isTrackTransactionProducers());
- if (connectedTransport != NULL) {
- stateTracker.restore(connectedTransport);
+ if (this->impl->connectedTransport != NULL) {
+ stateTracker.restore(this->impl->connectedTransport);
} else {
reconnect(false);
}
@@ -372,30 +444,30 @@ void FailoverTransport::close() {
Pointer<Transport> transportToStop;
- synchronized( &reconnectMutex ) {
- if (!started) {
+ synchronized( &this->impl->reconnectMutex ) {
+ if (!this->impl->started) {
return;
}
- started = false;
- closed = true;
- connected = false;
+ this->impl->started = false;
+ this->impl->closed = true;
+ this->impl->connected = false;
- backups->setEnabled(false);
- requestMap.clear();
+ this->impl->backups->setEnabled(false);
+ this->impl->requestMap.clear();
- if (connectedTransport != NULL) {
- transportToStop.swap(connectedTransport);
+ if (this->impl->connectedTransport != NULL) {
+ transportToStop.swap(this->impl->connectedTransport);
}
- reconnectMutex.notifyAll();
+ this->impl->reconnectMutex.notifyAll();
}
- synchronized( &sleepMutex ) {
- sleepMutex.notifyAll();
+ synchronized( &this->impl->sleepMutex ) {
+ this->impl->sleepMutex.notifyAll();
}
- taskRunner->shutdown(2000);
+ this->impl->taskRunner->shutdown(2000);
if (transportToStop != NULL) {
transportToStop->close();
@@ -411,30 +483,30 @@ void FailoverTransport::reconnect(bool r
Pointer<Transport> transport;
- synchronized( &reconnectMutex ) {
- if (started) {
+ synchronized( &this->impl->reconnectMutex ) {
+ if (this->impl->started) {
if (rebalance) {
- transport.swap(this->connectedTransport);
+ transport.swap(this->impl->connectedTransport);
if (transport != NULL) {
- if (this->disposedListener != NULL) {
- transport->setTransportListener(disposedListener.get());
+ if (this->impl->disposedListener != NULL) {
+ transport->setTransportListener(this->impl->disposedListener.get());
}
// Hand off to the close task so it gets done in a different thread.
- closeTask->add(transport);
+ this->impl->closeTask->add(transport);
- if (this->connectedTransportURI != NULL) {
- this->uris->addURI(*this->connectedTransportURI);
- this->connectedTransportURI.reset(NULL);
+ if (this->impl->connectedTransportURI != NULL) {
+ this->impl->uris->addURI(*this->impl->connectedTransportURI);
+ this->impl->connectedTransportURI.reset(NULL);
}
}
}
- taskRunner->wakeup();
+ this->impl->taskRunner->wakeup();
}
}
}
@@ -454,8 +526,8 @@ void FailoverTransport::restoreTransport
stateTracker.restore(transport);
decaf::util::StlMap<int, Pointer<Command> > commands;
- synchronized(&requestMap) {
- commands.copy(requestMap);
+ synchronized(&this->impl->requestMap) {
+ commands.copy(this->impl->requestMap);
}
Pointer<Iterator<Pointer<Command> > > iter(commands.values().iterator());
@@ -472,25 +544,25 @@ void FailoverTransport::restoreTransport
void FailoverTransport::handleTransportFailure(const decaf::lang::Exception& error AMQCPP_UNUSED) {
Pointer<Transport> transport;
- synchronized( &reconnectMutex ) {
- connectedTransport.swap(transport);
+ synchronized(&this->impl->reconnectMutex) {
+ this->impl->connectedTransport.swap(transport);
}
if (transport != NULL) {
- if (this->disposedListener != NULL) {
- transport->setTransportListener(disposedListener.get());
+ if (this->impl->disposedListener != NULL) {
+ transport->setTransportListener(this->impl->disposedListener.get());
}
// Hand off to the close task so it gets done in a different thread.
- closeTask->add(transport);
+ this->impl->closeTask->add(transport);
- synchronized( &reconnectMutex ) {
+ synchronized(&this->impl->reconnectMutex) {
- initialized = false;
- uris->addURI(*connectedTransportURI);
- connectedTransportURI.reset(NULL);
- connected = false;
+ this->impl->initialized = false;
+ this->impl->uris->addURI(*this->impl->connectedTransportURI);
+ this->impl->connectedTransportURI.reset(NULL);
+ this->impl->connected = false;
// Place the State Tracker into a reconnection state.
this->stateTracker.transportInterrupted();
@@ -501,8 +573,8 @@ void FailoverTransport::handleTransportF
transportListener->transportInterrupted();
}
- if (started) {
- taskRunner->wakeup();
+ if (this->impl->started) {
+ this->impl->taskRunner->wakeup();
}
}
}
@@ -572,7 +644,7 @@ void FailoverTransport::updateURIs(bool
if (isUpdateURIsSupported()) {
- LinkedList<URI> copy(this->updated);
+ LinkedList<URI> copy(this->impl->updated);
LinkedList<URI> add;
if (!updatedURIs.isEmpty()) {
@@ -591,17 +663,17 @@ void FailoverTransport::updateURIs(bool
}
}
- synchronized( &reconnectMutex ) {
+ synchronized( &this->impl->reconnectMutex ) {
- this->updated.clear();
+ this->impl->updated.clear();
Pointer<Iterator<URI> > listIter1(add.iterator());
while (listIter1->hasNext()) {
- this->updated.add(listIter1->next());
+ this->impl->updated.add(listIter1->next());
}
Pointer<Iterator<URI> > listIter2(copy.iterator());
while (listIter2->hasNext()) {
- this->uris->removeURI(listIter2->next());
+ this->impl->uris->removeURI(listIter2->next());
}
this->addURI(rebalance, add);
@@ -614,20 +686,20 @@ void FailoverTransport::updateURIs(bool
bool FailoverTransport::isPending() const {
bool result = false;
- synchronized( &reconnectMutex ) {
- if (this->connectedTransport == NULL && !closed && started) {
+ synchronized(&this->impl->reconnectMutex) {
+ if (this->impl->connectedTransport == NULL && !this->impl->closed && this->impl->started) {
int reconnectAttempts = 0;
- if (firstConnection) {
- if (startupMaxReconnectAttempts != 0) {
- reconnectAttempts = startupMaxReconnectAttempts;
+ if (this->impl->firstConnection) {
+ if (this->impl->startupMaxReconnectAttempts != 0) {
+ reconnectAttempts = this->impl->startupMaxReconnectAttempts;
}
}
if (reconnectAttempts == 0) {
- reconnectAttempts = maxReconnectAttempts;
+ reconnectAttempts = this->impl->maxReconnectAttempts;
}
- if (reconnectAttempts > 0 && connectFailures >= reconnectAttempts) {
+ if (reconnectAttempts > 0 && this->impl->connectFailures >= reconnectAttempts) {
result = false;
} else {
result = true;
@@ -643,13 +715,13 @@ bool FailoverTransport::iterate() {
Pointer<Exception> failure;
- synchronized( &reconnectMutex ) {
+ synchronized( &this->impl->reconnectMutex ) {
- if (closed || connectionFailure != NULL) {
- reconnectMutex.notifyAll();
+ if (this->impl->closed || this->impl->connectionFailure != NULL) {
+ this->impl->reconnectMutex.notifyAll();
}
- if (connectedTransport != NULL || closed || connectionFailure != NULL) {
+ if (this->impl->connectedTransport != NULL || this->impl->closed || this->impl->connectionFailure != NULL) {
return false;
} else {
@@ -657,51 +729,51 @@ bool FailoverTransport::iterate() {
Pointer<Transport> transport;
URI uri;
- if (!useExponentialBackOff) {
- reconnectDelay = initialReconnectDelay;
+ if (!this->impl->useExponentialBackOff) {
+ this->impl->reconnectDelay = this->impl->initialReconnectDelay;
}
- if (backups->isEnabled()) {
+ if (this->impl->backups->isEnabled()) {
- Pointer<BackupTransport> backupTransport = backups->getBackup();
+ Pointer<BackupTransport> backupTransport = this->impl->backups->getBackup();
if (backupTransport != NULL) {
transport = backupTransport->getTransport();
uri = backupTransport->getUri();
- transport->setTransportListener(myTransportListener.get());
+ transport->setTransportListener(this->impl->myTransportListener.get());
try {
- if (started) {
+ if (this->impl->started) {
restoreTransport(transport);
}
} catch (Exception& e) {
if (transport != NULL) {
- if (this->disposedListener != NULL) {
- transport->setTransportListener(disposedListener.get());
+ if (this->impl->disposedListener != NULL) {
+ transport->setTransportListener(this->impl->disposedListener.get());
}
// Hand off to the close task so it gets done in a different thread
// this prevents a deadlock from occurring if the Transport happens
// to call back through our onException method or locks in some other
// way.
- closeTask->add(transport);
- taskRunner->wakeup();
+ this->impl->closeTask->add(transport);
+ this->impl->taskRunner->wakeup();
transport.reset(NULL);
}
- this->uris->addURI(uri);
+ this->impl->uris->addURI(uri);
}
}
}
- while (transport == NULL && !closed) {
+ while (transport == NULL && !this->impl->closed) {
try {
- uri = uris->getURI();
+ uri = this->impl->uris->getURI();
} catch (NoSuchElementException& ex) {
break;
}
@@ -709,10 +781,10 @@ bool FailoverTransport::iterate() {
try {
transport = createTransport(uri);
- transport->setTransportListener(myTransportListener.get());
+ transport->setTransportListener(this->impl->myTransportListener.get());
transport->start();
- if (started) {
+ if (this->impl->started) {
restoreTransport(transport);
}
@@ -720,8 +792,8 @@ bool FailoverTransport::iterate() {
e.setMark(__FILE__, __LINE__);
if (transport != NULL) {
- if (this->disposedListener != NULL) {
- transport->setTransportListener(disposedListener.get());
+ if (this->impl->disposedListener != NULL) {
+ transport->setTransportListener(this->impl->disposedListener.get());
}
try {
@@ -733,8 +805,8 @@ bool FailoverTransport::iterate() {
// this prevents a deadlock from occurring if the Transport happens
// to call back through our onException method or locks in some other
// way.
- closeTask->add(transport);
- taskRunner->wakeup();
+ this->impl->closeTask->add(transport);
+ this->impl->taskRunner->wakeup();
transport.reset(NULL);
}
@@ -744,22 +816,22 @@ bool FailoverTransport::iterate() {
}
// Return the failures to the pool, we will try again on the next iteration.
- this->uris->addURIs(failures);
+ this->impl->uris->addURIs(failures);
if (transport != NULL) {
- reconnectDelay = initialReconnectDelay;
- connectedTransportURI.reset(new URI(uri));
- connectedTransport = transport;
- reconnectMutex.notifyAll();
- connectFailures = 0;
- connected = true;
+ this->impl->reconnectDelay = this->impl->initialReconnectDelay;
+ this->impl->connectedTransportURI.reset(new URI(uri));
+ this->impl->connectedTransport = transport;
+ this->impl->reconnectMutex.notifyAll();
+ this->impl->connectFailures = 0;
+ this->impl->connected = true;
// Make sure on initial startup, that the transportListener
// has been initialized for this instance.
- synchronized( &listenerMutex ) {
+ synchronized(&this->impl->listenerMutex) {
if (transportListener == NULL) {
// if it isn't set after 2secs - it probably never will be
- listenerMutex.wait(2000);
+ this->impl->listenerMutex.wait(2000);
}
}
@@ -767,8 +839,8 @@ bool FailoverTransport::iterate() {
transportListener->transportResumed();
}
- if (firstConnection) {
- firstConnection = false;
+ if (this->impl->firstConnection) {
+ this->impl->firstConnection = false;
}
return false;
@@ -776,23 +848,23 @@ bool FailoverTransport::iterate() {
}
int reconnectAttempts = 0;
- if (firstConnection) {
- if (startupMaxReconnectAttempts != 0) {
- reconnectAttempts = startupMaxReconnectAttempts;
+ if (this->impl->firstConnection) {
+ if (this->impl->startupMaxReconnectAttempts != 0) {
+ reconnectAttempts = this->impl->startupMaxReconnectAttempts;
}
}
if (reconnectAttempts == 0) {
- reconnectAttempts = maxReconnectAttempts;
+ reconnectAttempts = this->impl->maxReconnectAttempts;
}
- if (reconnectAttempts > 0 && ++connectFailures >= reconnectAttempts) {
- connectionFailure = failure;
+ if (reconnectAttempts > 0 && ++this->impl->connectFailures >= reconnectAttempts) {
+ this->impl->connectionFailure = failure;
// Make sure on initial startup, that the transportListener has been initialized
// for this instance.
- synchronized( &listenerMutex ) {
+ synchronized(&this->impl->listenerMutex) {
if (transportListener == NULL) {
- listenerMutex.wait(2000);
+ this->impl->listenerMutex.wait(2000);
}
}
@@ -800,38 +872,38 @@ bool FailoverTransport::iterate() {
Pointer<IOException> ioException;
try {
- ioException = connectionFailure.dynamicCast<IOException>();
+ ioException = this->impl->connectionFailure.dynamicCast<IOException>();
}
AMQ_CATCH_NOTHROW( ClassCastException)
if (ioException != NULL) {
- transportListener->onException(*connectionFailure);
+ transportListener->onException(*this->impl->connectionFailure);
} else {
- transportListener->onException(IOException(*connectionFailure));
+ transportListener->onException(IOException(*this->impl->connectionFailure));
}
}
- reconnectMutex.notifyAll();
+ this->impl->reconnectMutex.notifyAll();
return false;
}
}
- if (!closed) {
+ if (!this->impl->closed) {
- synchronized( &sleepMutex ) {
- sleepMutex.wait((unsigned int) reconnectDelay);
+ synchronized(&this->impl->sleepMutex) {
+ this->impl->sleepMutex.wait((unsigned int) this->impl->reconnectDelay);
}
- if (useExponentialBackOff) {
+ if (this->impl->useExponentialBackOff) {
// Exponential increment of reconnect delay.
- reconnectDelay *= backOffMultiplier;
- if (reconnectDelay > maxReconnectDelay) {
- reconnectDelay = maxReconnectDelay;
+ this->impl->reconnectDelay *= this->impl->backOffMultiplier;
+ if (this->impl->reconnectDelay > this->impl->maxReconnectDelay) {
+ this->impl->reconnectDelay = this->impl->maxReconnectDelay;
}
}
}
- return !closed;
+ return !this->impl->closed;
}
////////////////////////////////////////////////////////////////////////////////
@@ -857,29 +929,29 @@ Pointer<Transport> FailoverTransport::cr
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setConnectionInterruptProcessingComplete(const Pointer<commands::ConnectionId> connectionId) {
- synchronized( &reconnectMutex ) {
+ synchronized(&this->impl->reconnectMutex) {
stateTracker.connectionInterruptProcessingComplete(this, connectionId);
}
}
////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::isConnected() const {
- return this->connected;
+ return this->impl->connected;
}
////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::isClosed() const {
- return this->closed;
+ return this->impl->closed;
}
////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::isInitialized() const {
- return this->initialized;
+ return this->impl->initialized;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setInitialized(bool value) {
- this->initialized = value;
+ this->impl->initialized = value;
}
////////////////////////////////////////////////////////////////////////////////
@@ -889,8 +961,8 @@ Transport* FailoverTransport::narrow(con
return this;
}
- if (this->connectedTransport != NULL) {
- return this->connectedTransport->narrow(typeId);
+ if (this->impl->connectedTransport != NULL) {
+ return this->impl->connectedTransport->narrow(typeId);
}
return NULL;
@@ -901,9 +973,9 @@ void FailoverTransport::processResponse(
Pointer<Command> object;
- synchronized(&(this->requestMap)) {
+ synchronized(&(this->impl->requestMap)) {
try {
- object = this->requestMap.remove(response->getCorrelationId());
+ object = this->impl->requestMap.remove(response->getCorrelationId());
} catch (NoSuchElementException& ex) {
// Not tracking this request in our map, not an error.
}
@@ -922,7 +994,7 @@ void FailoverTransport::processResponse(
Pointer<wireformat::WireFormat> FailoverTransport::getWireFormat() const {
Pointer<wireformat::WireFormat> result;
- Pointer<Transport> transport = this->connectedTransport;
+ Pointer<Transport> transport = this->impl->connectedTransport;
if (transport != NULL) {
result = transport->getWireFormat();
@@ -933,160 +1005,194 @@ Pointer<wireformat::WireFormat> Failover
////////////////////////////////////////////////////////////////////////////////
long long FailoverTransport::getTimeout() const {
- return this->timeout;
+ return this->impl->timeout;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setTimeout(long long value) {
- this->timeout = value;
+ this->impl->timeout = value;
}
////////////////////////////////////////////////////////////////////////////////
long long FailoverTransport::getInitialReconnectDelay() const {
- return this->initialReconnectDelay;
+ return this->impl->initialReconnectDelay;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setInitialReconnectDelay(long long value) {
- this->initialReconnectDelay = value;
+ this->impl->initialReconnectDelay = value;
}
////////////////////////////////////////////////////////////////////////////////
long long FailoverTransport::getMaxReconnectDelay() const {
- return this->maxReconnectDelay;
+ return this->impl->maxReconnectDelay;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setMaxReconnectDelay(long long value) {
- this->maxReconnectDelay = value;
+ this->impl->maxReconnectDelay = value;
}
////////////////////////////////////////////////////////////////////////////////
long long FailoverTransport::getBackOffMultiplier() const {
- return this->backOffMultiplier;
+ return this->impl->backOffMultiplier;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setBackOffMultiplier(long long value) {
- this->backOffMultiplier = value;
+ this->impl->backOffMultiplier = value;
}
////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::isUseExponentialBackOff() const {
- return this->useExponentialBackOff;
+ return this->impl->useExponentialBackOff;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setUseExponentialBackOff(bool value) {
- this->useExponentialBackOff = value;
+ this->impl->useExponentialBackOff = value;
}
////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::isRandomize() const {
- return this->uris->isRandomize();
+ return this->impl->uris->isRandomize();
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setRandomize(bool value) {
- this->uris->setRandomize(value);
+ this->impl->uris->setRandomize(value);
}
////////////////////////////////////////////////////////////////////////////////
int FailoverTransport::getMaxReconnectAttempts() const {
- return this->maxReconnectAttempts;
+ return this->impl->maxReconnectAttempts;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setMaxReconnectAttempts(int value) {
- this->maxReconnectAttempts = value;
+ this->impl->maxReconnectAttempts = value;
}
////////////////////////////////////////////////////////////////////////////////
int FailoverTransport::getStartupMaxReconnectAttempts() const {
- return this->startupMaxReconnectAttempts;
+ return this->impl->startupMaxReconnectAttempts;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setStartupMaxReconnectAttempts(int value) {
- this->startupMaxReconnectAttempts = value;
+ this->impl->startupMaxReconnectAttempts = value;
}
////////////////////////////////////////////////////////////////////////////////
long long FailoverTransport::getReconnectDelay() const {
- return this->reconnectDelay;
+ return this->impl->reconnectDelay;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setReconnectDelay(long long value) {
- this->reconnectDelay = value;
+ this->impl->reconnectDelay = value;
}
////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::isBackup() const {
- return this->backups->isEnabled();
+ return this->impl->backups->isEnabled();
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setBackup(bool value) {
- this->backups->setEnabled(value);
+ this->impl->backups->setEnabled(value);
}
////////////////////////////////////////////////////////////////////////////////
int FailoverTransport::getBackupPoolSize() const {
- return this->backups->getBackupPoolSize();
+ return this->impl->backups->getBackupPoolSize();
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setBackupPoolSize(int value) {
- this->backups->setBackupPoolSize(value);
+ this->impl->backups->setBackupPoolSize(value);
}
////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::isTrackMessages() const {
- return this->trackMessages;
+ return this->impl->trackMessages;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setTrackMessages(bool value) {
- this->trackMessages = value;
+ this->impl->trackMessages = value;
}
////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::isTrackTransactionProducers() const {
- return this->trackTransactionProducers;
+ return this->impl->trackTransactionProducers;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setTrackTransactionProducers(bool value) {
- this->trackTransactionProducers = value;
+ this->impl->trackTransactionProducers = value;
}
////////////////////////////////////////////////////////////////////////////////
int FailoverTransport::getMaxCacheSize() const {
- return this->maxCacheSize;
+ return this->impl->maxCacheSize;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setMaxCacheSize(int value) {
- this->maxCacheSize = value;
+ this->impl->maxCacheSize = value;
}
////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::isReconnectSupported() const {
- return this->reconnectSupported;
+ return this->impl->reconnectSupported;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setReconnectSupported(bool value) {
- this->reconnectSupported = value;
+ this->impl->reconnectSupported = value;
}
////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::isUpdateURIsSupported() const {
- return this->updateURIsSupported;
+ return this->impl->updateURIsSupported;
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransport::setUpdateURIsSupported(bool value) {
- this->updateURIsSupported = value;
+ this->impl->updateURIsSupported = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::isRebalanceUpdateURIs() const {
+ return this->impl->rebalanceUpdateURIs;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setRebalanceUpdateURIs(bool rebalanceUpdateURIs) {
+ this->impl->rebalanceUpdateURIs = rebalanceUpdateURIs;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::isPriorityBackup() const {
+ return this->impl->priorityBackup;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setPriorityBackup(bool priorityBackup) {
+ this->impl->priorityBackup = priorityBackup;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setPriorityURIs(const std::string& priorityURIs AMQCPP_UNUSED) {
+// StringTokenizer tokenizer = new StringTokenizer(priorityURIs, ",");
+// while (tokenizer.hasMoreTokens()) {
+// String str = tokenizer.nextToken();
+// try {
+// URI uri = new URI(str);
+// priorityList.add(uri);
+// } catch (Exception e) {
+// LOG.error("Failed to parse broker address: " + str, e);
+// }
+// }
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h?rev=1445930&r1=1445929&r2=1445930&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h Wed Feb 13 21:15:56 2013
@@ -45,57 +45,21 @@ namespace transport {
namespace failover {
using namespace decaf::lang;
- using decaf::net::URI;
- using namespace decaf::util;
- using namespace activemq::threads;
using activemq::commands::Command;
using activemq::commands::Response;
+ class FailoverTransportImpl;
+
class AMQCPP_API FailoverTransport : public CompositeTransport,
public activemq::threads::CompositeTask {
private:
friend class FailoverTransportListener;
- bool closed;
- bool connected;
- bool started;
-
- long long timeout;
- long long initialReconnectDelay;
- long long maxReconnectDelay;
- long long backOffMultiplier;
- bool useExponentialBackOff;
- bool initialized;
- int maxReconnectAttempts;
- int startupMaxReconnectAttempts;
- int connectFailures;
- long long reconnectDelay;
- bool trackMessages;
- bool trackTransactionProducers;
- int maxCacheSize;
- bool connectionInterruptProcessingComplete;
- bool firstConnection;
- bool updateURIsSupported;
- bool reconnectSupported;
-
- mutable decaf::util::concurrent::Mutex reconnectMutex;
- mutable decaf::util::concurrent::Mutex sleepMutex;
- mutable decaf::util::concurrent::Mutex listenerMutex;
-
state::ConnectionStateTracker stateTracker;
- decaf::util::StlMap<int, Pointer<Command> > requestMap;
- Pointer<URIPool> uris;
- decaf::util::LinkedList<URI> updated;
- Pointer<URI> connectedTransportURI;
- Pointer<Transport> connectedTransport;
- Pointer<Exception> connectionFailure;
- Pointer<BackupTransportPool> backups;
- Pointer<CloseTransportsTask> closeTask;
- Pointer<CompositeTaskRunner> taskRunner;
- Pointer<TransportListener> disposedListener;
- Pointer<TransportListener> myTransportListener;
+ FailoverTransportImpl* impl;
+
TransportListener* transportListener;
private:
@@ -127,9 +91,9 @@ namespace failover {
public: // CompositeTransport methods
- virtual void addURI(bool rebalance, const List<URI>& uris);
+ virtual void addURI(bool rebalance, const List<decaf::net::URI>& uris);
- virtual void removeURI(bool rebalance, const List<URI>& uris);
+ virtual void removeURI(bool rebalance, const List<decaf::net::URI>& uris);
public:
@@ -259,6 +223,16 @@ namespace failover {
void setUpdateURIsSupported(bool value);
+ bool isRebalanceUpdateURIs() const;
+
+ void setRebalanceUpdateURIs(bool rebalanceUpdateURIs);
+
+ bool isPriorityBackup() const;
+
+ void setPriorityBackup(bool priorityBackup);
+
+ void setPriorityURIs(const std::string& priorityURIs);
+
void setConnectionInterruptProcessingComplete(const Pointer<commands::ConnectionId> connectionId);
protected:
@@ -301,7 +275,7 @@ namespace failover {
*
* @throw IOException if an I/O error occurs while creating the new Transport.
*/
- Pointer<Transport> createTransport(const URI& location) const;
+ Pointer<Transport> createTransport(const decaf::net::URI& location) const;
void processNewTransports(bool rebalance, std::string newTransports);
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/URIPool.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/URIPool.cpp?rev=1445930&r1=1445929&r2=1445930&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/URIPool.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/URIPool.cpp Wed Feb 13 21:15:56 2013
@@ -101,3 +101,21 @@ void URIPool::removeURI(const URI& uri)
}
}
}
+
+////////////////////////////////////////////////////////////////////////////////
+bool URIPool::contains(const decaf::net::URI& uri) const {
+ synchronized(&uriPool) {
+ return uriPool.contains(uri);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool URIPool::isPriority(const decaf::net::URI& uri) const {
+ synchronized(&uriPool) {
+ if (uriPool.isEmpty()) {
+ return false;
+ }
+
+ return uriPool.getFirst().equals(uri);
+ }
+}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/URIPool.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/URIPool.h?rev=1445930&r1=1445929&r2=1445930&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/URIPool.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/URIPool.h Wed Feb 13 21:15:56 2013
@@ -34,7 +34,7 @@ namespace failover {
class AMQCPP_API URIPool {
private:
- LinkedList<URI> uriPool;
+ mutable LinkedList<URI> uriPool;
bool randomize;
public:
@@ -108,6 +108,21 @@ namespace failover {
this->randomize = value;
}
+ /**
+ * Returns true if the given URI is contained in this set of URIs.
+ *
+ * @returns true if the URI is in the list.
+ */
+ bool contains(const decaf::net::URI& uri) const;
+
+ /**
+ * Returns true if the URI given is the first in the list of
+ * URIs contained in this pool.
+ *
+ * @returns true if the URI is index 0 in the URI list.
+ */
+ bool isPriority(const decaf::net::URI& uri) const;
+
};
}}}