You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/01/29 21:06:48 UTC

svn commit: r616506 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java

Author: rajdavies
Date: Tue Jan 29 12:06:47 2008
New Revision: 616506

URL: http://svn.apache.org/viewvc?rev=616506&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1572

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=616506&r1=616505&r2=616506&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Tue Jan 29 12:06:47 2008
@@ -21,6 +21,7 @@
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -34,6 +35,7 @@
 import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.transport.CompositeTransport;
+import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.FutureResponse;
 import org.apache.activemq.transport.ResponseCallback;
 import org.apache.activemq.transport.Transport;
@@ -78,6 +80,12 @@
     private int connectFailures;
     private long reconnectDelay = initialReconnectDelay;
     private Exception connectionFailure;
+    private boolean firstConnection = true;
+    //optionally always have a backup created
+    private boolean backup=false;
+    private URI backupTransportURI;
+    private Transport backupTransport;
+    
 
     private final TransportListener myTransportListener = createTransportListener();
 
@@ -100,22 +108,44 @@
                     if (connectedTransport != null || disposed || connectionFailure != null) {
                         return false;
                     } else {
-                        ArrayList<Object> connectList = getConnectList();
+                        List<URI> connectList = getConnectList();
                         if (connectList.isEmpty()) {
                             failure = new IOException("No uris available to connect to.");
                         } else {
                             if (!useExponentialBackOff) {
                                 reconnectDelay = initialReconnectDelay;
                             }
-                            Iterator<Object> iter = connectList.iterator();
-                            for (int i = 0; iter.hasNext() && connectedTransport == null && !disposed; i++) {
-                                URI uri = (URI)iter.next();
+                            if (backup && backupTransport != null) {
+                                Transport t = backupTransport;
+                                URI uri = backupTransportURI;
+                                backupTransport=null;
+                                backupTransportURI=null;
+                                t.setTransportListener(myTransportListener);
+                                try {
+                                    if (started) { 
+                                            restoreTransport(t);  
+                                    }
+                                    reconnectDelay = initialReconnectDelay;
+                                    connectedTransportURI = uri;
+                                    connectedTransport = t;
+                                    reconnectMutex.notifyAll();
+                                    connectFailures = 0;
+                                    LOG.info("Successfully reconnected to backup " + uri);
+                                    return false;
+                                }catch (Exception e) {
+                                    LOG.debug("Backup transport failed",e);
+                                 }
+                            }
+                            
+                            Iterator<URI> iter = connectList.iterator();
+                            while(iter.hasNext() && connectedTransport == null && !disposed) {
+                                URI uri = iter.next();
                                 try {
                                     LOG.debug("Attempting connect to: " + uri);
                                     Transport t = TransportFactory.compositeConnect(uri);
                                     t.setTransportListener(myTransportListener);
                                     t.start();
-
+                                    
                                     if (started) {
                                         restoreTransport(t);
                                     }
@@ -129,7 +159,26 @@
                                     if (transportListener != null) {
                                         transportListener.transportResumed();
                                     }
-                                    LOG.info("Successfully reconnected to " + uri);
+                                    if (firstConnection) {
+                                        firstConnection=false;
+                                        LOG.info("Successfully connected to " + uri);
+                                        if(backup) {
+                                            while(iter.hasNext() && backupTransport==null){
+                                                uri = iter.next();
+                                                try {
+                                                    t = TransportFactory.compositeConnect(uri);
+                                                    t.setTransportListener(new DefaultTransportListener());
+                                                    t.start();
+                                                    backupTransport=t;
+                                                    backupTransportURI=uri;
+                                                }catch(Exception e) {
+                                                    LOG.debug("Failed to create backup to " + uri,e);
+                                                }
+                                            }
+                                        }
+                                    }else {
+                                        LOG.info("Successfully reconnected to " + uri);
+                                    }
                                     return false;
                                 } catch (Exception e) {
                                     failure = e;
@@ -488,18 +537,25 @@
         }
     }
 
-    private ArrayList<Object> getConnectList() {
-        ArrayList<Object> l = new ArrayList<Object>(uris);
+    private List<URI> getConnectList() {
+        ArrayList<URI> l = new ArrayList<URI>(uris);
+        boolean removed = false;
+        if (connectedTransportURI != null) {
+            removed = l.remove(connectedTransportURI);
+        }
         if (randomize) {
             // Randomly, reorder the list by random swapping
             Random r = new Random();
             r.setSeed(System.currentTimeMillis());
             for (int i = 0; i < l.size(); i++) {
                 int p = r.nextInt(l.size());
-                Object t = l.get(p);
+                URI t = l.get(p);
                 l.set(p, l.get(i));
                 l.set(i, t);
             }
+        }
+        if (removed) {
+            l.add(connectedTransportURI);
         }
         return l;
     }