You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/01/29 21:06:48 UTC
svn commit: r616506 -
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Author: rajdavies
Date: Tue Jan 29 12:06:47 2008
New Revision: 616506
URL: http://svn.apache.org/viewvc?rev=616506&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1572
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=616506&r1=616505&r2=616506&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Tue Jan 29 12:06:47 2008
@@ -21,6 +21,7 @@
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -34,6 +35,7 @@
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.transport.CompositeTransport;
+import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
@@ -78,6 +80,12 @@
private int connectFailures;
private long reconnectDelay = initialReconnectDelay;
private Exception connectionFailure;
+ private boolean firstConnection = true;
+ //optionally always have a backup created
+ private boolean backup=false;
+ private URI backupTransportURI;
+ private Transport backupTransport;
+
private final TransportListener myTransportListener = createTransportListener();
@@ -100,22 +108,44 @@
if (connectedTransport != null || disposed || connectionFailure != null) {
return false;
} else {
- ArrayList<Object> connectList = getConnectList();
+ List<URI> connectList = getConnectList();
if (connectList.isEmpty()) {
failure = new IOException("No uris available to connect to.");
} else {
if (!useExponentialBackOff) {
reconnectDelay = initialReconnectDelay;
}
- Iterator<Object> iter = connectList.iterator();
- for (int i = 0; iter.hasNext() && connectedTransport == null && !disposed; i++) {
- URI uri = (URI)iter.next();
+ if (backup && backupTransport != null) {
+ Transport t = backupTransport;
+ URI uri = backupTransportURI;
+ backupTransport=null;
+ backupTransportURI=null;
+ t.setTransportListener(myTransportListener);
+ try {
+ if (started) {
+ restoreTransport(t);
+ }
+ reconnectDelay = initialReconnectDelay;
+ connectedTransportURI = uri;
+ connectedTransport = t;
+ reconnectMutex.notifyAll();
+ connectFailures = 0;
+ LOG.info("Successfully reconnected to backup " + uri);
+ return false;
+ }catch (Exception e) {
+ LOG.debug("Backup transport failed",e);
+ }
+ }
+
+ Iterator<URI> iter = connectList.iterator();
+ while(iter.hasNext() && connectedTransport == null && !disposed) {
+ URI uri = iter.next();
try {
LOG.debug("Attempting connect to: " + uri);
Transport t = TransportFactory.compositeConnect(uri);
t.setTransportListener(myTransportListener);
t.start();
-
+
if (started) {
restoreTransport(t);
}
@@ -129,7 +159,26 @@
if (transportListener != null) {
transportListener.transportResumed();
}
- LOG.info("Successfully reconnected to " + uri);
+ if (firstConnection) {
+ firstConnection=false;
+ LOG.info("Successfully connected to " + uri);
+ if(backup) {
+ while(iter.hasNext() && backupTransport==null){
+ uri = iter.next();
+ try {
+ t = TransportFactory.compositeConnect(uri);
+ t.setTransportListener(new DefaultTransportListener());
+ t.start();
+ backupTransport=t;
+ backupTransportURI=uri;
+ }catch(Exception e) {
+ LOG.debug("Failed to create backup to " + uri,e);
+ }
+ }
+ }
+ }else {
+ LOG.info("Successfully reconnected to " + uri);
+ }
return false;
} catch (Exception e) {
failure = e;
@@ -488,18 +537,25 @@
}
}
- private ArrayList<Object> getConnectList() {
- ArrayList<Object> l = new ArrayList<Object>(uris);
+ private List<URI> getConnectList() {
+ ArrayList<URI> l = new ArrayList<URI>(uris);
+ boolean removed = false;
+ if (connectedTransportURI != null) {
+ removed = l.remove(connectedTransportURI);
+ }
if (randomize) {
// Randomly, reorder the list by random swapping
Random r = new Random();
r.setSeed(System.currentTimeMillis());
for (int i = 0; i < l.size(); i++) {
int p = r.nextInt(l.size());
- Object t = l.get(p);
+ URI t = l.get(p);
l.set(p, l.get(i));
l.set(i, t);
}
+ }
+ if (removed) {
+ l.add(connectedTransportURI);
}
return l;
}