You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/05/01 22:53:52 UTC

svn commit: r1478184 - in /activemq/trunk: activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java

Author: tabish
Date: Wed May  1 20:53:51 2013
New Revision: 1478184

URL: http://svn.apache.org/r1478184
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4501

Modified:
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=1478184&r1=1478183&r2=1478184&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Wed May  1 20:53:51 2013
@@ -143,7 +143,6 @@ public class FailoverTransport implement
                     if ((connectedTransport.get() == null || doRebalance || priorityBackupAvailable) && !disposed) {
                         result = doReconnect();
                         buildBackup = false;
-                        connectedToPriority = isPriority(connectedTransportURI);
                     }
                 }
                 if (buildBackup) {
@@ -264,6 +263,7 @@ public class FailoverTransport implement
                 failedConnectTransportURI = connectedTransportURI;
                 connectedTransportURI = null;
                 connected = false;
+                connectedToPriority = false;
 
                 // notify before any reconnect attempt so ack state can be whacked
                 if (transportListener != null) {
@@ -922,7 +922,7 @@ public class FailoverTransport implement
                     failure = new IOException("No uris available to connect to.");
                 } else {
                     if (doRebalance) {
-                        if (compareURIs(connectList.get(0), connectedTransportURI)) {
+                        if (connectedToPriority || compareURIs(connectList.get(0), connectedTransportURI)) {
                             // already connected to first in the list, no need to rebalance
                             doRebalance = false;
                             return false;
@@ -930,6 +930,7 @@ public class FailoverTransport implement
                             if (LOG.isDebugEnabled()) {
                                 LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList);
                             }
+
                             try {
                                 Transport transport = this.connectedTransport.getAndSet(null);
                                 if (transport != null) {
@@ -1008,12 +1009,13 @@ public class FailoverTransport implement
                                 restoreTransport(transport);
                             }
 
-                             if (LOG.isDebugEnabled()) {
+                            if (LOG.isDebugEnabled()) {
                                 LOG.debug("Connection established");
-                             }
+                            }
                             reconnectDelay = initialReconnectDelay;
                             connectedTransportURI = uri;
                             connectedTransport.set(transport);
+                            connectedToPriority = isPriority(connectedTransportURI);
                             reconnectMutex.notifyAll();
                             connectFailures = 0;
 
@@ -1201,6 +1203,10 @@ public class FailoverTransport implement
     }
 
     protected boolean isPriority(URI uri) {
+        if (!priorityBackup) {
+            return false;
+        }
+
         if (!priorityList.isEmpty()) {
             return priorityList.contains(uri);
         }
@@ -1326,8 +1332,9 @@ public class FailoverTransport implement
 
     private boolean compareURIs(final URI first, final URI second) {
 
+        boolean result = false;
         if (first == null || second == null) {
-            return false;
+            return result;
         }
 
         if (first.getPort() == second.getPort()) {
@@ -1336,25 +1343,26 @@ public class FailoverTransport implement
             try {
                 firstAddr = InetAddress.getByName(first.getHost());
                 secondAddr = InetAddress.getByName(second.getHost());
+
+                if (firstAddr.equals(secondAddr)) {
+                    result = true;
+                }
+
             } catch(IOException e) {
 
                 if (firstAddr == null) {
-                    LOG.error("Failed to Lookup INetAddress for URI[ " + firstAddr + " ] : " + e);
+                    LOG.error("Failed to Lookup INetAddress for URI[ " + first + " ] : " + e);
                 } else {
-                    LOG.error("Failed to Lookup INetAddress for URI[ " + secondAddr + " ] : " + e);
+                    LOG.error("Failed to Lookup INetAddress for URI[ " + second + " ] : " + e);
                 }
 
                 if (first.getHost().equalsIgnoreCase(second.getHost())) {
-                    return true;
+                    result = true;
                 }
             }
-
-            if (firstAddr.equals(secondAddr)) {
-                return true;
-            }
         }
 
-        return false;
+        return result;
     }
 
     private InputStreamReader getURLStream(String path) throws IOException {

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java?rev=1478184&r1=1478183&r2=1478184&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java Wed May  1 20:53:51 2013
@@ -16,13 +16,11 @@
  */
 package org.apache.activemq.transport.failover;
 
-import org.apache.activemq.broker.BrokerService;
+import java.util.HashMap;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.Map;
-
 public class FailoverPriorityTest extends FailoverClusterTestSupport {
 
     protected final Logger LOG = LoggerFactory.getLogger(getClass());
@@ -30,7 +28,7 @@ public class FailoverPriorityTest extend
     private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61616";
     private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61617";
     private static final String BROKER_C_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61618";
-    private HashMap<String,String> urls = new HashMap<String,String>();
+    private final HashMap<String,String> urls = new HashMap<String,String>();
 
     @Override
     public void setUp() throws Exception {
@@ -42,8 +40,8 @@ public class FailoverPriorityTest extend
     private static final String BROKER_A_NAME = "BROKERA";
     private static final String BROKER_B_NAME = "BROKERB";
     private static final String BROKER_C_NAME = "BROKERC";
-    
-    
+
+
     public void testPriorityBackup() throws Exception {
         createBrokerA();
         createBrokerB();
@@ -57,7 +55,7 @@ public class FailoverPriorityTest extend
 
 
         restart(false, BROKER_A_NAME, BROKER_B_NAME);
-        
+
         for (int i = 0; i < 3; i++) {
             restart(true, BROKER_A_NAME, BROKER_B_NAME);
         }
@@ -126,7 +124,36 @@ public class FailoverPriorityTest extend
         restart(true, BROKER_A_NAME, BROKER_B_NAME);
 
     }
-    
+
+    public void testPriorityBackupAndUpdateClients() throws Exception {
+        // Broker A
+        addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
+        addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS, true);
+        addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
+        getBroker(BROKER_A_NAME).start();
+
+        // Broker B
+        addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
+        addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS, true);
+        addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
+        getBroker(BROKER_B_NAME).start();
+
+        getBroker(BROKER_B_NAME).waitUntilStarted();
+        Thread.sleep(1000);
+
+        setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?randomize=false&priorityBackup=true&initialReconnectDelay=1000&useExponentialBackOff=false");
+
+        LOG.info("Client URI will be: " + getClientUrl());
+
+        createClients(5);
+
+        // Let's wait a little bit longer just in case it takes a while to realize that the
+        // Broker A is the one with higher priority.
+        Thread.sleep(5000);
+
+        assertAllConnectedTo(urls.get(BROKER_A_NAME));
+    }
+
     private void restart(boolean primary, String primaryName, String secondaryName) throws Exception {
 
         Thread.sleep(1000);
@@ -159,9 +186,9 @@ public class FailoverPriorityTest extend
         Thread.sleep(5000);
 
         assertAllConnectedTo(urls.get(primaryName));
-        
+
     }
-    
+
     private void createBrokerByName(String name) throws Exception {
         if (name.equals(BROKER_A_NAME)) {
             createBrokerA();