You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/05/01 18:33:58 UTC

svn commit: r1478083 - /activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java

Author: tabish
Date: Wed May  1 16:33:57 2013
New Revision: 1478083

URL: http://svn.apache.org/r1478083
Log:
https://issues.apache.org/jira/browse/AMQ-4501

Partial fix, perform a proper URI comparison to avoid an unnecessary reconnect bounce.

Modified:
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=1478083&r1=1478082&r2=1478083&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Wed May  1 16:33:57 2013
@@ -122,7 +122,7 @@ public class FailoverTransport implement
     private boolean connectedToPriority = false;
 
     private boolean priorityBackup = false;
-    private ArrayList<URI> priorityList = new ArrayList<URI>();
+    private final ArrayList<URI> priorityList = new ArrayList<URI>();
     private boolean priorityBackupAvailable = false;
 
     public FailoverTransport() throws InterruptedIOException {
@@ -132,6 +132,7 @@ public class FailoverTransport implement
         reconnectTaskFactory = new TaskRunnerFactory();
         reconnectTaskFactory.init();
         reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() {
+            @Override
             public boolean iterate() {
                 boolean result = false;
                 if (!started) {
@@ -178,6 +179,7 @@ public class FailoverTransport implement
 
     TransportListener createTransportListener() {
         return new TransportListener() {
+            @Override
             public void onCommand(Object o) {
                 Command command = (Command) o;
                 if (command == null) {
@@ -204,6 +206,7 @@ public class FailoverTransport implement
                 }
             }
 
+            @Override
             public void onException(IOException error) {
                 try {
                     handleTransportFailure(error);
@@ -213,12 +216,14 @@ public class FailoverTransport implement
                 }
             }
 
+            @Override
             public void transportInterupted() {
                 if (transportListener != null) {
                     transportListener.transportInterupted();
                 }
             }
 
+            @Override
             public void transportResumed() {
                 if (transportListener != null) {
                     transportListener.transportResumed();
@@ -324,6 +329,7 @@ public class FailoverTransport implement
         }
     }
 
+    @Override
     public void start() throws Exception {
         synchronized (reconnectMutex) {
             if (LOG.isDebugEnabled()) {
@@ -344,6 +350,7 @@ public class FailoverTransport implement
         }
     }
 
+    @Override
     public void stop() throws Exception {
         Transport transportToStop = null;
         List<Transport> backupsToStop = new ArrayList<Transport>(backups.size());
@@ -541,6 +548,7 @@ public class FailoverTransport implement
         }
     }
 
+    @Override
     public void oneway(Object o) throws IOException {
 
         Command command = (Command) o;
@@ -692,18 +700,22 @@ public class FailoverTransport implement
         }
     }
 
+    @Override
     public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
         throw new AssertionError("Unsupported Method");
     }
 
+    @Override
     public Object request(Object command) throws IOException {
         throw new AssertionError("Unsupported Method");
     }
 
+    @Override
     public Object request(Object command, int timeout) throws IOException {
         throw new AssertionError("Unsupported Method");
     }
 
+    @Override
     public void add(boolean rebalance, URI u[]) {
         boolean newURI = false;
         for (URI uri : u) {
@@ -717,6 +729,7 @@ public class FailoverTransport implement
         }
     }
 
+    @Override
     public void remove(boolean rebalance, URI u[]) {
         for (URI uri : u) {
             uris.remove(uri);
@@ -782,10 +795,12 @@ public class FailoverTransport implement
         return l;
     }
 
+    @Override
     public TransportListener getTransportListener() {
         return transportListener;
     }
 
+    @Override
     public void setTransportListener(TransportListener commandListener) {
         synchronized (listenerMutex) {
             this.transportListener = commandListener;
@@ -793,6 +808,7 @@ public class FailoverTransport implement
         }
     }
 
+    @Override
     public <T> T narrow(Class<T> target) {
 
         if (target.isAssignableFrom(getClass())) {
@@ -838,6 +854,7 @@ public class FailoverTransport implement
         return connectedTransportURI == null ? "unconnected" : connectedTransportURI.toString();
     }
 
+    @Override
     public String getRemoteAddress() {
         Transport transport = connectedTransport.get();
         if (transport != null) {
@@ -846,6 +863,7 @@ public class FailoverTransport implement
         return null;
     }
 
+    @Override
     public boolean isFaultTolerant() {
         return true;
     }
@@ -904,7 +922,7 @@ public class FailoverTransport implement
                     failure = new IOException("No uris available to connect to.");
                 } else {
                     if (doRebalance) {
-                        if (connectList.get(0).equals(connectedTransportURI)) {
+                        if (compareURIs(connectList.get(0), connectedTransportURI)) {
                             // already connected to first in the list, no need to rebalance
                             doRebalance = false;
                             return false;
@@ -1189,18 +1207,22 @@ public class FailoverTransport implement
         return uris.indexOf(uri) == 0;
     }
 
+    @Override
     public boolean isDisposed() {
         return disposed;
     }
 
+    @Override
     public boolean isConnected() {
         return connected;
     }
 
+    @Override
     public void reconnect(URI uri) throws IOException {
         add(true, new URI[]{uri});
     }
 
+    @Override
     public boolean isReconnectSupported() {
         return this.reconnectSupported;
     }
@@ -1209,6 +1231,7 @@ public class FailoverTransport implement
         this.reconnectSupported = value;
     }
 
+    @Override
     public boolean isUpdateURIsSupported() {
         return this.updateURIsSupported;
     }
@@ -1217,6 +1240,7 @@ public class FailoverTransport implement
         this.updateURIsSupported = value;
     }
 
+    @Override
     public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException {
         if (isUpdateURIsSupported()) {
             HashSet<URI> copy = new HashSet<URI>(this.updated);
@@ -1265,6 +1289,7 @@ public class FailoverTransport implement
         this.rebalanceUpdateURIs = rebalanceUpdateURIs;
     }
 
+    @Override
     public int getReceiveCounter() {
         Transport transport = connectedTransport.get();
         if (transport == null) {
@@ -1290,36 +1315,46 @@ public class FailoverTransport implement
     private boolean contains(URI newURI) {
         boolean result = false;
         for (URI uri : uris) {
-            if (newURI.getPort() == uri.getPort()) {
-                InetAddress newAddr = null;
-                InetAddress addr = null;
-                try {
-                    newAddr = InetAddress.getByName(newURI.getHost());
-                    addr = InetAddress.getByName(uri.getHost());
-                } catch(IOException e) {
-
-                    if (newAddr == null) {
-                        LOG.error("Failed to Lookup INetAddress for URI[ " + newURI + " ] : " + e);
-                    } else {
-                        LOG.error("Failed to Lookup INetAddress for URI[ " + uri + " ] : " + e);
-                    }
+            if (compareURIs(newURI, uri)) {
+                result = true;
+                break;
+            }
+        }
 
-                    if (newURI.getHost().equalsIgnoreCase(uri.getHost())) {
-                        result = true;
-                        break;
-                    } else {
-                        continue;
-                    }
+        return result;
+    }
+
+    private boolean compareURIs(final URI first, final URI second) {
+
+        if (first == null || second == null) {
+            return false;
+        }
+
+        if (first.getPort() == second.getPort()) {
+            InetAddress firstAddr = null;
+            InetAddress secondAddr = null;
+            try {
+                firstAddr = InetAddress.getByName(first.getHost());
+                secondAddr = InetAddress.getByName(second.getHost());
+            } catch(IOException e) {
+
+                if (firstAddr == null) {
+                    LOG.error("Failed to Lookup INetAddress for URI[ " + firstAddr + " ] : " + e);
+                } else {
+                    LOG.error("Failed to Lookup INetAddress for URI[ " + secondAddr + " ] : " + e);
                 }
 
-                if (addr.equals(newAddr)) {
-                    result = true;
-                    break;
+                if (first.getHost().equalsIgnoreCase(second.getHost())) {
+                    return true;
                 }
             }
+
+            if (firstAddr.equals(secondAddr)) {
+                return true;
+            }
         }
 
-        return result;
+        return false;
     }
 
     private InputStreamReader getURLStream(String path) throws IOException {