You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/05/01 18:33:58 UTC
svn commit: r1478083 -
/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Author: tabish
Date: Wed May 1 16:33:57 2013
New Revision: 1478083
URL: http://svn.apache.org/r1478083
Log:
https://issues.apache.org/jira/browse/AMQ-4501
Partial fix, perform a proper URI comparison to avoid an unnecessary reconnect bounce.
Modified:
activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=1478083&r1=1478082&r2=1478083&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Wed May 1 16:33:57 2013
@@ -122,7 +122,7 @@ public class FailoverTransport implement
private boolean connectedToPriority = false;
private boolean priorityBackup = false;
- private ArrayList<URI> priorityList = new ArrayList<URI>();
+ private final ArrayList<URI> priorityList = new ArrayList<URI>();
private boolean priorityBackupAvailable = false;
public FailoverTransport() throws InterruptedIOException {
@@ -132,6 +132,7 @@ public class FailoverTransport implement
reconnectTaskFactory = new TaskRunnerFactory();
reconnectTaskFactory.init();
reconnectTask = reconnectTaskFactory.createTaskRunner(new Task() {
+ @Override
public boolean iterate() {
boolean result = false;
if (!started) {
@@ -178,6 +179,7 @@ public class FailoverTransport implement
TransportListener createTransportListener() {
return new TransportListener() {
+ @Override
public void onCommand(Object o) {
Command command = (Command) o;
if (command == null) {
@@ -204,6 +206,7 @@ public class FailoverTransport implement
}
}
+ @Override
public void onException(IOException error) {
try {
handleTransportFailure(error);
@@ -213,12 +216,14 @@ public class FailoverTransport implement
}
}
+ @Override
public void transportInterupted() {
if (transportListener != null) {
transportListener.transportInterupted();
}
}
+ @Override
public void transportResumed() {
if (transportListener != null) {
transportListener.transportResumed();
@@ -324,6 +329,7 @@ public class FailoverTransport implement
}
}
+ @Override
public void start() throws Exception {
synchronized (reconnectMutex) {
if (LOG.isDebugEnabled()) {
@@ -344,6 +350,7 @@ public class FailoverTransport implement
}
}
+ @Override
public void stop() throws Exception {
Transport transportToStop = null;
List<Transport> backupsToStop = new ArrayList<Transport>(backups.size());
@@ -541,6 +548,7 @@ public class FailoverTransport implement
}
}
+ @Override
public void oneway(Object o) throws IOException {
Command command = (Command) o;
@@ -692,18 +700,22 @@ public class FailoverTransport implement
}
}
+ @Override
public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
throw new AssertionError("Unsupported Method");
}
+ @Override
public Object request(Object command) throws IOException {
throw new AssertionError("Unsupported Method");
}
+ @Override
public Object request(Object command, int timeout) throws IOException {
throw new AssertionError("Unsupported Method");
}
+ @Override
public void add(boolean rebalance, URI u[]) {
boolean newURI = false;
for (URI uri : u) {
@@ -717,6 +729,7 @@ public class FailoverTransport implement
}
}
+ @Override
public void remove(boolean rebalance, URI u[]) {
for (URI uri : u) {
uris.remove(uri);
@@ -782,10 +795,12 @@ public class FailoverTransport implement
return l;
}
+ @Override
public TransportListener getTransportListener() {
return transportListener;
}
+ @Override
public void setTransportListener(TransportListener commandListener) {
synchronized (listenerMutex) {
this.transportListener = commandListener;
@@ -793,6 +808,7 @@ public class FailoverTransport implement
}
}
+ @Override
public <T> T narrow(Class<T> target) {
if (target.isAssignableFrom(getClass())) {
@@ -838,6 +854,7 @@ public class FailoverTransport implement
return connectedTransportURI == null ? "unconnected" : connectedTransportURI.toString();
}
+ @Override
public String getRemoteAddress() {
Transport transport = connectedTransport.get();
if (transport != null) {
@@ -846,6 +863,7 @@ public class FailoverTransport implement
return null;
}
+ @Override
public boolean isFaultTolerant() {
return true;
}
@@ -904,7 +922,7 @@ public class FailoverTransport implement
failure = new IOException("No uris available to connect to.");
} else {
if (doRebalance) {
- if (connectList.get(0).equals(connectedTransportURI)) {
+ if (compareURIs(connectList.get(0), connectedTransportURI)) {
// already connected to first in the list, no need to rebalance
doRebalance = false;
return false;
@@ -1189,18 +1207,22 @@ public class FailoverTransport implement
return uris.indexOf(uri) == 0;
}
+ @Override
public boolean isDisposed() {
return disposed;
}
+ @Override
public boolean isConnected() {
return connected;
}
+ @Override
public void reconnect(URI uri) throws IOException {
add(true, new URI[]{uri});
}
+ @Override
public boolean isReconnectSupported() {
return this.reconnectSupported;
}
@@ -1209,6 +1231,7 @@ public class FailoverTransport implement
this.reconnectSupported = value;
}
+ @Override
public boolean isUpdateURIsSupported() {
return this.updateURIsSupported;
}
@@ -1217,6 +1240,7 @@ public class FailoverTransport implement
this.updateURIsSupported = value;
}
+ @Override
public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException {
if (isUpdateURIsSupported()) {
HashSet<URI> copy = new HashSet<URI>(this.updated);
@@ -1265,6 +1289,7 @@ public class FailoverTransport implement
this.rebalanceUpdateURIs = rebalanceUpdateURIs;
}
+ @Override
public int getReceiveCounter() {
Transport transport = connectedTransport.get();
if (transport == null) {
@@ -1290,36 +1315,46 @@ public class FailoverTransport implement
private boolean contains(URI newURI) {
boolean result = false;
for (URI uri : uris) {
- if (newURI.getPort() == uri.getPort()) {
- InetAddress newAddr = null;
- InetAddress addr = null;
- try {
- newAddr = InetAddress.getByName(newURI.getHost());
- addr = InetAddress.getByName(uri.getHost());
- } catch(IOException e) {
-
- if (newAddr == null) {
- LOG.error("Failed to Lookup INetAddress for URI[ " + newURI + " ] : " + e);
- } else {
- LOG.error("Failed to Lookup INetAddress for URI[ " + uri + " ] : " + e);
- }
+ if (compareURIs(newURI, uri)) {
+ result = true;
+ break;
+ }
+ }
- if (newURI.getHost().equalsIgnoreCase(uri.getHost())) {
- result = true;
- break;
- } else {
- continue;
- }
+ return result;
+ }
+
+ private boolean compareURIs(final URI first, final URI second) {
+
+ if (first == null || second == null) {
+ return false;
+ }
+
+ if (first.getPort() == second.getPort()) {
+ InetAddress firstAddr = null;
+ InetAddress secondAddr = null;
+ try {
+ firstAddr = InetAddress.getByName(first.getHost());
+ secondAddr = InetAddress.getByName(second.getHost());
+ } catch(IOException e) {
+
+ if (firstAddr == null) {
+ LOG.error("Failed to Lookup INetAddress for URI[ " + firstAddr + " ] : " + e);
+ } else {
+ LOG.error("Failed to Lookup INetAddress for URI[ " + secondAddr + " ] : " + e);
}
- if (addr.equals(newAddr)) {
- result = true;
- break;
+ if (first.getHost().equalsIgnoreCase(second.getHost())) {
+ return true;
}
}
+
+ if (firstAddr.equals(secondAddr)) {
+ return true;
+ }
}
- return result;
+ return false;
}
private InputStreamReader getURLStream(String path) throws IOException {