You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/01/30 13:10:25 UTC

svn commit: r616733 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport: Transport.java failover/FailoverTransport.java

Author: rajdavies
Date: Wed Jan 30 04:10:22 2008
New Revision: 616733

URL: http://svn.apache.org/viewvc?rev=616733&view=rev
Log:
Further enhancement for https://issues.apache.org/activemq/browse/AMQ-1572

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java?rev=616733&r1=616732&r2=616733&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java Wed Jan 30 04:10:22 2008
@@ -132,7 +132,7 @@
     /**
      * Indicates if the transport can handle faults
      * 
-     * @return tru if fault tolerant
+     * @return true if fault tolerant
      */
     boolean isFaultTolerant();
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=616733&r1=616732&r2=616733&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Wed Jan 30 04:10:22 2008
@@ -83,8 +83,8 @@
     private boolean firstConnection = true;
     //optionally always have a backup created
     private boolean backup=false;
-    private URI backupTransportURI;
-    private Transport backupTransport;
+    private List<BackupTransport> backups=new CopyOnWriteArrayList<BackupTransport>();
+    private int backupPoolSize=2;
     
 
     private final TransportListener myTransportListener = createTransportListener();
@@ -95,127 +95,12 @@
 
         // Setup a task that is used to reconnect the a connection async.
         reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
-
             public boolean iterate() {
-
-                Exception failure = null;
-                synchronized (reconnectMutex) {
-
-                    if (disposed || connectionFailure != null) {
-                        reconnectMutex.notifyAll();
-                    }
-
-                    if (connectedTransport != null || disposed || connectionFailure != null) {
-                        return false;
-                    } else {
-                        List<URI> connectList = getConnectList();
-                        if (connectList.isEmpty()) {
-                            failure = new IOException("No uris available to connect to.");
-                        } else {
-                            if (!useExponentialBackOff) {
-                                reconnectDelay = initialReconnectDelay;
-                            }
-                            if (backup && backupTransport != null) {
-                                Transport t = backupTransport;
-                                URI uri = backupTransportURI;
-                                backupTransport=null;
-                                backupTransportURI=null;
-                                t.setTransportListener(myTransportListener);
-                                try {
-                                    if (started) { 
-                                            restoreTransport(t);  
-                                    }
-                                    reconnectDelay = initialReconnectDelay;
-                                    connectedTransportURI = uri;
-                                    connectedTransport = t;
-                                    reconnectMutex.notifyAll();
-                                    connectFailures = 0;
-                                    LOG.info("Successfully reconnected to backup " + uri);
-                                    return false;
-                                }catch (Exception e) {
-                                    LOG.debug("Backup transport failed",e);
-                                 }
-                            }
-                            
-                            Iterator<URI> iter = connectList.iterator();
-                            while(iter.hasNext() && connectedTransport == null && !disposed) {
-                                URI uri = iter.next();
-                                try {
-                                    LOG.debug("Attempting connect to: " + uri);
-                                    Transport t = TransportFactory.compositeConnect(uri);
-                                    t.setTransportListener(myTransportListener);
-                                    t.start();
-                                    
-                                    if (started) {
-                                        restoreTransport(t);
-                                    }
-
-                                    LOG.debug("Connection established");
-                                    reconnectDelay = initialReconnectDelay;
-                                    connectedTransportURI = uri;
-                                    connectedTransport = t;
-                                    reconnectMutex.notifyAll();
-                                    connectFailures = 0;
-                                    if (transportListener != null) {
-                                        transportListener.transportResumed();
-                                    }
-                                    if (firstConnection) {
-                                        firstConnection=false;
-                                        LOG.info("Successfully connected to " + uri);
-                                        if(backup) {
-                                            while(iter.hasNext() && backupTransport==null){
-                                                uri = iter.next();
-                                                try {
-                                                    t = TransportFactory.compositeConnect(uri);
-                                                    t.setTransportListener(new DefaultTransportListener());
-                                                    t.start();
-                                                    backupTransport=t;
-                                                    backupTransportURI=uri;
-                                                }catch(Exception e) {
-                                                    LOG.debug("Failed to create backup to " + uri,e);
-                                                }
-                                            }
-                                        }
-                                    }else {
-                                        LOG.info("Successfully reconnected to " + uri);
-                                    }
-                                    return false;
-                                } catch (Exception e) {
-                                    failure = e;
-                                    LOG.debug("Connect fail to: " + uri + ", reason: " + e);
-                                }
-                            }
-                        }
-                    }
-
-                    if (maxReconnectAttempts > 0 && ++connectFailures >= maxReconnectAttempts) {
-                        LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
-                        connectionFailure = failure;
-                        reconnectMutex.notifyAll();
-                        return false;
-                    }
-                }
-
-                if (!disposed) {
-
-                    LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
-                    synchronized (sleepMutex) {
-                        try {
-                            sleepMutex.wait(reconnectDelay);
-                        } catch (InterruptedException e) {
-                            Thread.currentThread().interrupt();
-                        }
-                    }
-
-                    if (useExponentialBackOff) {
-                        // Exponential increment of reconnect delay.
-                        reconnectDelay *= backOffMultiplier;
-                        if (reconnectDelay > maxReconnectDelay) {
-                            reconnectDelay = maxReconnectDelay;
-                        }
-                    }
-                }
-                return !disposed;
+            	boolean result = doReconnect();
+            	if(!result) {
+            		buildBackups();
+            	}
+            	return result;
             }
 
         }, "ActiveMQ Failover Worker: " + System.identityHashCode(this));
@@ -394,6 +279,22 @@
     public void setRandomize(boolean randomize) {
         this.randomize = randomize;
     }
+    
+    public boolean isBackup() {
+		return backup;
+	}
+
+	public void setBackup(boolean backup) {
+		this.backup = backup;
+	}
+
+	public int getBackupPoolSize() {
+		return backupPoolSize;
+	}
+
+	public void setBackupPoolSize(int backupPoolSize) {
+		this.backupPoolSize = backupPoolSize;
+	}
 
     public void oneway(Object o) throws IOException {
         Command command = (Command)o;
@@ -613,5 +514,142 @@
     public boolean isFaultTolerant() {
         return true;
     }
+    
+   final boolean doReconnect() {
+
+        Exception failure = null;
+        synchronized (reconnectMutex) {
+
+            if (disposed || connectionFailure != null) {
+                reconnectMutex.notifyAll();
+            }
+
+            if (connectedTransport != null || disposed || connectionFailure != null) {
+                return false;
+            } else {
+                List<URI> connectList = getConnectList();
+                if (connectList.isEmpty()) {
+                    failure = new IOException("No uris available to connect to.");
+                } else {
+                    if (!useExponentialBackOff) {
+                        reconnectDelay = initialReconnectDelay;
+                    }
+                    if (backup && !backups.isEmpty()) {
+                    	BackupTransport bt = backups.remove(0);
+                        Transport t = bt.getTransport();
+                        URI uri = bt.getUri();
+                        t.setTransportListener(myTransportListener);
+                        try {
+                            if (started) { 
+                                    restoreTransport(t);  
+                            }
+                            reconnectDelay = initialReconnectDelay;
+                            connectedTransportURI = uri;
+                            connectedTransport = t;
+                            reconnectMutex.notifyAll();
+                            connectFailures = 0;
+                            LOG.info("Successfully reconnected to backup " + uri);
+                            return false;
+                        }catch (Exception e) {
+                            LOG.debug("Backup transport failed",e);
+                         }
+                    }
+                    
+                    Iterator<URI> iter = connectList.iterator();
+                    while(iter.hasNext() && connectedTransport == null && !disposed) {
+                        URI uri = iter.next();
+                        try {
+                            LOG.debug("Attempting connect to: " + uri);
+                            Transport t = TransportFactory.compositeConnect(uri);
+                            t.setTransportListener(myTransportListener);
+                            t.start();
+                            
+                            if (started) {
+                                restoreTransport(t);
+                            }
+
+                            LOG.debug("Connection established");
+                            reconnectDelay = initialReconnectDelay;
+                            connectedTransportURI = uri;
+                            connectedTransport = t;
+                            reconnectMutex.notifyAll();
+                            connectFailures = 0;
+                            if (transportListener != null) {
+                                transportListener.transportResumed();
+                            }
+                            if (firstConnection) {
+                                firstConnection=false;
+                                LOG.info("Successfully connected to " + uri);
+                            }else {
+                                LOG.info("Successfully reconnected to " + uri);
+                            }
+                            return false;
+                        } catch (Exception e) {
+                            failure = e;
+                            LOG.debug("Connect fail to: " + uri + ", reason: " + e);
+                        }
+                    }
+                }
+            }
+
+            if (maxReconnectAttempts > 0 && ++connectFailures >= maxReconnectAttempts) {
+                LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
+                connectionFailure = failure;
+                reconnectMutex.notifyAll();
+                return false;
+            }
+        }
+
+        if (!disposed) {
+
+            LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
+            synchronized (sleepMutex) {
+                try {
+                    sleepMutex.wait(reconnectDelay);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+
+            if (useExponentialBackOff) {
+                // Exponential increment of reconnect delay.
+                reconnectDelay *= backOffMultiplier;
+                if (reconnectDelay > maxReconnectDelay) {
+                    reconnectDelay = maxReconnectDelay;
+                }
+            }
+        }
+        return !disposed;
+    }
+
+   
+   final boolean buildBackups() {
+	   synchronized (reconnectMutex) {
+		   if (backup && backups.size() < backupPoolSize) {
+			   List<URI> connectList = getConnectList();
+			   for (Iterator<URI>iter = connectList.iterator();iter.hasNext() && backups.size() < backupPoolSize;) {
+				   URI uri = iter.next();
+				   if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
+					   try {
+						   BackupTransport bt = new BackupTransport();
+						   bt.setUri(uri);
+						   if (!backups.contains(bt)) {
+							   Transport t = TransportFactory.compositeConnect(uri);
+		                       t.setTransportListener(new DefaultTransportListener());
+		                       t.start();
+		                       bt.setTransport(t);
+		                       backups.add(bt);
+						   }
+					   }catch(Exception e) {
+						   LOG.debug("Failed to build backup ",e);
+					   }
+				   }
+			   }
+		   }
+	   }
+	   return false;
+   }
+
+
 
 }