You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/01/30 13:10:25 UTC
svn commit: r616733 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport:
Transport.java failover/FailoverTransport.java
Author: rajdavies
Date: Wed Jan 30 04:10:22 2008
New Revision: 616733
URL: http://svn.apache.org/viewvc?rev=616733&view=rev
Log:
Further enhancement for https://issues.apache.org/activemq/browse/AMQ-1572
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java?rev=616733&r1=616732&r2=616733&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java Wed Jan 30 04:10:22 2008
@@ -132,7 +132,7 @@
/**
* Indicates if the transport can handle faults
*
- * @return tru if fault tolerant
+ * @return true if fault tolerant
*/
boolean isFaultTolerant();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=616733&r1=616732&r2=616733&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Wed Jan 30 04:10:22 2008
@@ -83,8 +83,8 @@
private boolean firstConnection = true;
//optionally always have a backup created
private boolean backup=false;
- private URI backupTransportURI;
- private Transport backupTransport;
+ private List<BackupTransport> backups=new CopyOnWriteArrayList<BackupTransport>();
+ private int backupPoolSize=2;
private final TransportListener myTransportListener = createTransportListener();
@@ -95,127 +95,12 @@
// Setup a task that is used to reconnect the a connection async.
reconnectTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
-
public boolean iterate() {
-
- Exception failure = null;
- synchronized (reconnectMutex) {
-
- if (disposed || connectionFailure != null) {
- reconnectMutex.notifyAll();
- }
-
- if (connectedTransport != null || disposed || connectionFailure != null) {
- return false;
- } else {
- List<URI> connectList = getConnectList();
- if (connectList.isEmpty()) {
- failure = new IOException("No uris available to connect to.");
- } else {
- if (!useExponentialBackOff) {
- reconnectDelay = initialReconnectDelay;
- }
- if (backup && backupTransport != null) {
- Transport t = backupTransport;
- URI uri = backupTransportURI;
- backupTransport=null;
- backupTransportURI=null;
- t.setTransportListener(myTransportListener);
- try {
- if (started) {
- restoreTransport(t);
- }
- reconnectDelay = initialReconnectDelay;
- connectedTransportURI = uri;
- connectedTransport = t;
- reconnectMutex.notifyAll();
- connectFailures = 0;
- LOG.info("Successfully reconnected to backup " + uri);
- return false;
- }catch (Exception e) {
- LOG.debug("Backup transport failed",e);
- }
- }
-
- Iterator<URI> iter = connectList.iterator();
- while(iter.hasNext() && connectedTransport == null && !disposed) {
- URI uri = iter.next();
- try {
- LOG.debug("Attempting connect to: " + uri);
- Transport t = TransportFactory.compositeConnect(uri);
- t.setTransportListener(myTransportListener);
- t.start();
-
- if (started) {
- restoreTransport(t);
- }
-
- LOG.debug("Connection established");
- reconnectDelay = initialReconnectDelay;
- connectedTransportURI = uri;
- connectedTransport = t;
- reconnectMutex.notifyAll();
- connectFailures = 0;
- if (transportListener != null) {
- transportListener.transportResumed();
- }
- if (firstConnection) {
- firstConnection=false;
- LOG.info("Successfully connected to " + uri);
- if(backup) {
- while(iter.hasNext() && backupTransport==null){
- uri = iter.next();
- try {
- t = TransportFactory.compositeConnect(uri);
- t.setTransportListener(new DefaultTransportListener());
- t.start();
- backupTransport=t;
- backupTransportURI=uri;
- }catch(Exception e) {
- LOG.debug("Failed to create backup to " + uri,e);
- }
- }
- }
- }else {
- LOG.info("Successfully reconnected to " + uri);
- }
- return false;
- } catch (Exception e) {
- failure = e;
- LOG.debug("Connect fail to: " + uri + ", reason: " + e);
- }
- }
- }
- }
-
- if (maxReconnectAttempts > 0 && ++connectFailures >= maxReconnectAttempts) {
- LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
- connectionFailure = failure;
- reconnectMutex.notifyAll();
- return false;
- }
- }
-
- if (!disposed) {
-
- LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
- synchronized (sleepMutex) {
- try {
- sleepMutex.wait(reconnectDelay);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- if (useExponentialBackOff) {
- // Exponential increment of reconnect delay.
- reconnectDelay *= backOffMultiplier;
- if (reconnectDelay > maxReconnectDelay) {
- reconnectDelay = maxReconnectDelay;
- }
- }
- }
- return !disposed;
+ boolean result = doReconnect();
+ if(!result) {
+ buildBackups();
+ }
+ return result;
}
}, "ActiveMQ Failover Worker: " + System.identityHashCode(this));
@@ -394,6 +279,22 @@
public void setRandomize(boolean randomize) {
this.randomize = randomize;
}
+
+ public boolean isBackup() {
+ return backup;
+ }
+
+ public void setBackup(boolean backup) {
+ this.backup = backup;
+ }
+
+ public int getBackupPoolSize() {
+ return backupPoolSize;
+ }
+
+ public void setBackupPoolSize(int backupPoolSize) {
+ this.backupPoolSize = backupPoolSize;
+ }
public void oneway(Object o) throws IOException {
Command command = (Command)o;
@@ -613,5 +514,142 @@
public boolean isFaultTolerant() {
return true;
}
+
+ final boolean doReconnect() {
+
+ Exception failure = null;
+ synchronized (reconnectMutex) {
+
+ if (disposed || connectionFailure != null) {
+ reconnectMutex.notifyAll();
+ }
+
+ if (connectedTransport != null || disposed || connectionFailure != null) {
+ return false;
+ } else {
+ List<URI> connectList = getConnectList();
+ if (connectList.isEmpty()) {
+ failure = new IOException("No uris available to connect to.");
+ } else {
+ if (!useExponentialBackOff) {
+ reconnectDelay = initialReconnectDelay;
+ }
+ if (backup && !backups.isEmpty()) {
+ BackupTransport bt = backups.remove(0);
+ Transport t = bt.getTransport();
+ URI uri = bt.getUri();
+ t.setTransportListener(myTransportListener);
+ try {
+ if (started) {
+ restoreTransport(t);
+ }
+ reconnectDelay = initialReconnectDelay;
+ connectedTransportURI = uri;
+ connectedTransport = t;
+ reconnectMutex.notifyAll();
+ connectFailures = 0;
+ LOG.info("Successfully reconnected to backup " + uri);
+ return false;
+ }catch (Exception e) {
+ LOG.debug("Backup transport failed",e);
+ }
+ }
+
+ Iterator<URI> iter = connectList.iterator();
+ while(iter.hasNext() && connectedTransport == null && !disposed) {
+ URI uri = iter.next();
+ try {
+ LOG.debug("Attempting connect to: " + uri);
+ Transport t = TransportFactory.compositeConnect(uri);
+ t.setTransportListener(myTransportListener);
+ t.start();
+
+ if (started) {
+ restoreTransport(t);
+ }
+
+ LOG.debug("Connection established");
+ reconnectDelay = initialReconnectDelay;
+ connectedTransportURI = uri;
+ connectedTransport = t;
+ reconnectMutex.notifyAll();
+ connectFailures = 0;
+ if (transportListener != null) {
+ transportListener.transportResumed();
+ }
+ if (firstConnection) {
+ firstConnection=false;
+ LOG.info("Successfully connected to " + uri);
+ }else {
+ LOG.info("Successfully reconnected to " + uri);
+ }
+ return false;
+ } catch (Exception e) {
+ failure = e;
+ LOG.debug("Connect fail to: " + uri + ", reason: " + e);
+ }
+ }
+ }
+ }
+
+ if (maxReconnectAttempts > 0 && ++connectFailures >= maxReconnectAttempts) {
+ LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
+ connectionFailure = failure;
+ reconnectMutex.notifyAll();
+ return false;
+ }
+ }
+
+ if (!disposed) {
+
+ LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
+ synchronized (sleepMutex) {
+ try {
+ sleepMutex.wait(reconnectDelay);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ if (useExponentialBackOff) {
+ // Exponential increment of reconnect delay.
+ reconnectDelay *= backOffMultiplier;
+ if (reconnectDelay > maxReconnectDelay) {
+ reconnectDelay = maxReconnectDelay;
+ }
+ }
+ }
+ return !disposed;
+ }
+
+
+ final boolean buildBackups() {
+ synchronized (reconnectMutex) {
+ if (backup && backups.size() < backupPoolSize) {
+ List<URI> connectList = getConnectList();
+ for (Iterator<URI>iter = connectList.iterator();iter.hasNext() && backups.size() < backupPoolSize;) {
+ URI uri = iter.next();
+ if (connectedTransportURI != null && !connectedTransportURI.equals(uri)) {
+ try {
+ BackupTransport bt = new BackupTransport();
+ bt.setUri(uri);
+ if (!backups.contains(bt)) {
+ Transport t = TransportFactory.compositeConnect(uri);
+ t.setTransportListener(new DefaultTransportListener());
+ t.start();
+ bt.setTransport(t);
+ backups.add(bt);
+ }
+ }catch(Exception e) {
+ LOG.debug("Failed to build backup ",e);
+ }
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+
}