You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/05/01 22:53:52 UTC
svn commit: r1478184 - in /activemq/trunk:
activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
Author: tabish
Date: Wed May 1 20:53:51 2013
New Revision: 1478184
URL: http://svn.apache.org/r1478184
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4501
Modified:
activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=1478184&r1=1478183&r2=1478184&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Wed May 1 20:53:51 2013
@@ -143,7 +143,6 @@ public class FailoverTransport implement
if ((connectedTransport.get() == null || doRebalance || priorityBackupAvailable) && !disposed) {
result = doReconnect();
buildBackup = false;
- connectedToPriority = isPriority(connectedTransportURI);
}
}
if (buildBackup) {
@@ -264,6 +263,7 @@ public class FailoverTransport implement
failedConnectTransportURI = connectedTransportURI;
connectedTransportURI = null;
connected = false;
+ connectedToPriority = false;
// notify before any reconnect attempt so ack state can be whacked
if (transportListener != null) {
@@ -922,7 +922,7 @@ public class FailoverTransport implement
failure = new IOException("No uris available to connect to.");
} else {
if (doRebalance) {
- if (compareURIs(connectList.get(0), connectedTransportURI)) {
+ if (connectedToPriority || compareURIs(connectList.get(0), connectedTransportURI)) {
// already connected to first in the list, no need to rebalance
doRebalance = false;
return false;
@@ -930,6 +930,7 @@ public class FailoverTransport implement
if (LOG.isDebugEnabled()) {
LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList);
}
+
try {
Transport transport = this.connectedTransport.getAndSet(null);
if (transport != null) {
@@ -1008,12 +1009,13 @@ public class FailoverTransport implement
restoreTransport(transport);
}
- if (LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("Connection established");
- }
+ }
reconnectDelay = initialReconnectDelay;
connectedTransportURI = uri;
connectedTransport.set(transport);
+ connectedToPriority = isPriority(connectedTransportURI);
reconnectMutex.notifyAll();
connectFailures = 0;
@@ -1201,6 +1203,10 @@ public class FailoverTransport implement
}
protected boolean isPriority(URI uri) {
+ if (!priorityBackup) {
+ return false;
+ }
+
if (!priorityList.isEmpty()) {
return priorityList.contains(uri);
}
@@ -1326,8 +1332,9 @@ public class FailoverTransport implement
private boolean compareURIs(final URI first, final URI second) {
+ boolean result = false;
if (first == null || second == null) {
- return false;
+ return result;
}
if (first.getPort() == second.getPort()) {
@@ -1336,25 +1343,26 @@ public class FailoverTransport implement
try {
firstAddr = InetAddress.getByName(first.getHost());
secondAddr = InetAddress.getByName(second.getHost());
+
+ if (firstAddr.equals(secondAddr)) {
+ result = true;
+ }
+
} catch(IOException e) {
if (firstAddr == null) {
- LOG.error("Failed to Lookup INetAddress for URI[ " + firstAddr + " ] : " + e);
+ LOG.error("Failed to Lookup INetAddress for URI[ " + first + " ] : " + e);
} else {
- LOG.error("Failed to Lookup INetAddress for URI[ " + secondAddr + " ] : " + e);
+ LOG.error("Failed to Lookup INetAddress for URI[ " + second + " ] : " + e);
}
if (first.getHost().equalsIgnoreCase(second.getHost())) {
- return true;
+ result = true;
}
}
-
- if (firstAddr.equals(secondAddr)) {
- return true;
- }
}
- return false;
+ return result;
}
private InputStreamReader getURLStream(String path) throws IOException {
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java?rev=1478184&r1=1478183&r2=1478184&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java Wed May 1 20:53:51 2013
@@ -16,13 +16,11 @@
*/
package org.apache.activemq.transport.failover;
-import org.apache.activemq.broker.BrokerService;
+import java.util.HashMap;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-import java.util.Map;
-
public class FailoverPriorityTest extends FailoverClusterTestSupport {
protected final Logger LOG = LoggerFactory.getLogger(getClass());
@@ -30,7 +28,7 @@ public class FailoverPriorityTest extend
private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61616";
private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61617";
private static final String BROKER_C_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61618";
- private HashMap<String,String> urls = new HashMap<String,String>();
+ private final HashMap<String,String> urls = new HashMap<String,String>();
@Override
public void setUp() throws Exception {
@@ -42,8 +40,8 @@ public class FailoverPriorityTest extend
private static final String BROKER_A_NAME = "BROKERA";
private static final String BROKER_B_NAME = "BROKERB";
private static final String BROKER_C_NAME = "BROKERC";
-
-
+
+
public void testPriorityBackup() throws Exception {
createBrokerA();
createBrokerB();
@@ -57,7 +55,7 @@ public class FailoverPriorityTest extend
restart(false, BROKER_A_NAME, BROKER_B_NAME);
-
+
for (int i = 0; i < 3; i++) {
restart(true, BROKER_A_NAME, BROKER_B_NAME);
}
@@ -126,7 +124,36 @@ public class FailoverPriorityTest extend
restart(true, BROKER_A_NAME, BROKER_B_NAME);
}
-
+
+ public void testPriorityBackupAndUpdateClients() throws Exception {
+ // Broker A
+ addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
+ addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS, true);
+ addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
+ getBroker(BROKER_A_NAME).start();
+
+ // Broker B
+ addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
+ addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS, true);
+ addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
+ getBroker(BROKER_B_NAME).start();
+
+ getBroker(BROKER_B_NAME).waitUntilStarted();
+ Thread.sleep(1000);
+
+ setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?randomize=false&priorityBackup=true&initialReconnectDelay=1000&useExponentialBackOff=false");
+
+ LOG.info("Client URI will be: " + getClientUrl());
+
+ createClients(5);
+
+ // Let's wait a little bit longer just in case it takes a while to realize that the
+ // Broker A is the one with higher priority.
+ Thread.sleep(5000);
+
+ assertAllConnectedTo(urls.get(BROKER_A_NAME));
+ }
+
private void restart(boolean primary, String primaryName, String secondaryName) throws Exception {
Thread.sleep(1000);
@@ -159,9 +186,9 @@ public class FailoverPriorityTest extend
Thread.sleep(5000);
assertAllConnectedTo(urls.get(primaryName));
-
+
}
-
+
private void createBrokerByName(String name) throws Exception {
if (name.equals(BROKER_A_NAME)) {
createBrokerA();