You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/05/03 17:21:26 UTC

svn commit: r1478835 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/ activemq-broker/src/main/java/org/apache/activemq/broker/region/ activemq-client/src/main/java/org/apache/activemq/transport/failover/ activemq-unit-te...

Author: tabish
Date: Fri May  3 15:21:25 2013
New Revision: 1478835

URL: http://svn.apache.org/r1478835
Log:
Fix and test for: https://issues.apache.org/jira/browse/AMQ-4505

When a broker was stopping it was sending out a cluster update after tearing down its bridges so any client connected to it would lose its awareness of other brokers in the cluster. 

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1478835&r1=1478834&r2=1478835&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java Fri May  3 15:21:25 2013
@@ -2829,4 +2829,7 @@ public class BrokerService implements Se
         return this.slave;
     }
 
+    public boolean isStopping() {
+        return this.stopping.get();
+    }
 }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1478835&r1=1478834&r2=1478835&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Fri May  3 15:21:25 2013
@@ -16,6 +16,23 @@
  */
 package org.apache.activemq.broker.region;
 
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.jms.InvalidClientIDException;
+import javax.jms.JMSException;
+
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.Connection;
@@ -40,22 +57,6 @@ import org.apache.activemq.util.ServiceS
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.jms.InvalidClientIDException;
-import javax.jms.JMSException;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 /**
  * Routes Broker operations to the correct messaging regions for processing.
  *
@@ -94,6 +95,7 @@ public class RegionBroker extends EmptyB
 
     private final ReentrantReadWriteLock inactiveDestinationsPurgeLock = new ReentrantReadWriteLock();
     private final Runnable purgeInactiveDestinationsTask = new Runnable() {
+        @Override
         public void run() {
             purgeInactiveDestinations();
         }
@@ -526,7 +528,11 @@ public class RegionBroker extends EmptyB
             if (LOG.isDebugEnabled()) {
                 LOG.debug(getBrokerName() + " removeBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size());
             }
-            removeBrokerInClusterUpdate(info);
+            // When stopping don't send cluster updates since we are the one's tearing down
+            // our own bridges.
+            if (!brokerService.isStopping()) {
+                removeBrokerInClusterUpdate(info);
+            }
         }
     }
 
@@ -730,6 +736,7 @@ public class RegionBroker extends EmptyB
         return this.scheduler;
     }
 
+    @Override
     public ThreadPoolExecutor getExecutor() {
         return this.executor;
     }

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=1478835&r1=1478834&r2=1478835&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Fri May  3 15:21:25 2013
@@ -286,6 +286,10 @@ public class FailoverTransport implement
 
     public final void handleConnectionControl(ConnectionControl control) {
         String reconnectStr = control.getReconnectTo();
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Received ConnectionControl: {}", control);
+        }
+
         if (reconnectStr != null) {
             reconnectStr = reconnectStr.trim();
             if (reconnectStr.length() > 0) {

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java?rev=1478835&r1=1478834&r2=1478835&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java Fri May  3 15:21:25 2013
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.transport.failover;
 
+import org.apache.activemq.broker.TransportConnector;
+import org.mortbay.log.Log;
+
 
 /**
  * Complex cluster test that will exercise the dynamic failover capabilities of
@@ -26,21 +29,19 @@ package org.apache.activemq.transport.fa
  */
 public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
 
-    private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://localhost:61616";
-    private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://localhost:61617";
-    private static final String BROKER_C_CLIENT_TC_ADDRESS = "tcp://localhost:61618";
-    private static final String BROKER_A_NOB_TC_ADDRESS = "tcp://localhost:61626";
-    private static final String BROKER_B_NOB_TC_ADDRESS = "tcp://localhost:61627";
-    private static final String BROKER_C_NOB_TC_ADDRESS = "tcp://localhost:61628";
+    private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61616";
+    private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61617";
+    private static final String BROKER_C_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61618";
+    private static final String BROKER_A_NOB_TC_ADDRESS = "tcp://127.0.0.1:61626";
+    private static final String BROKER_B_NOB_TC_ADDRESS = "tcp://127.0.0.1:61627";
+    private static final String BROKER_C_NOB_TC_ADDRESS = "tcp://127.0.0.1:61628";
     private static final String BROKER_A_NAME = "BROKERA";
     private static final String BROKER_B_NAME = "BROKERB";
     private static final String BROKER_C_NAME = "BROKERC";
-    
-    
 
     /**
-     * Basic dynamic failover 3 broker test 
-     * 
+     * Basic dynamic failover 3 broker test
+     *
      * @throws Exception
      */
     public void testThreeBrokerClusterSingleConnectorBasic() throws Exception {
@@ -56,14 +57,14 @@ public class FailoverComplexClusterTest 
         runTests(false, null, null, null);
     }
 
-	/**
-	 * Tests a 3 broker configuration to ensure that the backup is random and
-	 * supported in a cluster. useExponentialBackOff is set to false and
-	 * maxReconnectAttempts is set to 1 to move through the list quickly for
-	 * this test.
-	 * 
-	 * @throws Exception
-	 */
+    /**
+     * Tests a 3 broker configuration to ensure that the backup is random and
+     * supported in a cluster. useExponentialBackOff is set to false and
+     * maxReconnectAttempts is set to 1 to move through the list quickly for
+     * this test.
+     *
+     * @throws Exception
+     */
     public void testThreeBrokerClusterSingleConnectorBackupFailoverConfig() throws Exception {
 
         initSingleTcBroker("", null, null);
@@ -77,14 +78,14 @@ public class FailoverComplexClusterTest 
         runTests(false, null, null, null);
     }
 
-	/**
-	 * Tests a 3 broker cluster that passes in connection params on the
-	 * transport connector. Prior versions of AMQ passed the TC connection
-	 * params to the client and this should not happen. The chosen param is not
-	 * compatible with the client and will throw an error if used.
-	 * 
-	 * @throws Exception
-	 */
+    /**
+     * Tests a 3 broker cluster that passes in connection params on the
+     * transport connector. Prior versions of AMQ passed the TC connection
+     * params to the client and this should not happen. The chosen param is not
+     * compatible with the client and will throw an error if used.
+     *
+     * @throws Exception
+     */
     public void testThreeBrokerClusterSingleConnectorWithParams() throws Exception {
 
         initSingleTcBroker("?transport.closeAsync=false", null, null);
@@ -97,10 +98,9 @@ public class FailoverComplexClusterTest 
         runTests(false, null, null, null);
     }
 
-
     /**
      * Tests a 3 broker cluster using a cluster filter of *
-     * 
+     *
      * @throws Exception
      */
     public void testThreeBrokerClusterWithClusterFilter() throws Exception {
@@ -114,12 +114,12 @@ public class FailoverComplexClusterTest 
         runTests(false, null, "*", null);
     }
 
-	/**
-	 * Test to verify that a broker with multiple transport connections only the
-	 * one marked to update clients is propagate
-	 * 
-	 * @throws Exception
-	 */
+    /**
+     * Test to verify that a broker with multiple transport connections only the
+     * one marked to update clients is propagate
+     *
+     * @throws Exception
+     */
     public void testThreeBrokerClusterMultipleConnectorBasic() throws Exception {
 
         initMultiTcCluster("", null);
@@ -133,11 +133,11 @@ public class FailoverComplexClusterTest 
         runTests(true, null, null, null);
     }
 
-	/**
-	 * Test to verify the reintroduction of the A Broker
-	 * 
-	 * @throws Exception
-	 */
+    /**
+     * Test to verify the reintroduction of the A Broker
+     *
+     * @throws Exception
+     */
     public void testOriginalBrokerRestart() throws Exception {
         initSingleTcBroker("", null, null);
 
@@ -164,12 +164,12 @@ public class FailoverComplexClusterTest 
         assertClientsConnectedToThreeBrokers();
     }
 
-	/**
-	 * Test to ensure clients are evenly to all available brokers in the
-	 * network.
-	 * 
-	 * @throws Exception
-	 */
+    /**
+     * Test to ensure clients are evenly to all available brokers in the
+     * network.
+     *
+     * @throws Exception
+     */
     public void testThreeBrokerClusterClientDistributions() throws Exception {
 
         initSingleTcBroker("", null, null);
@@ -182,12 +182,12 @@ public class FailoverComplexClusterTest 
         runClientDistributionTests(false, null, null, null);
     }
 
-	/**
-	 * Test to verify that clients are distributed with no less than 20% of the
-	 * clients on any one broker.
-	 * 
-	 * @throws Exception
-	 */
+    /**
+     * Test to verify that clients are distributed with no less than 20% of the
+     * clients on any one broker.
+     *
+     * @throws Exception
+     */
     public void testThreeBrokerClusterDestinationFilter() throws Exception {
 
         initSingleTcBroker("", null, null);
@@ -199,23 +199,61 @@ public class FailoverComplexClusterTest 
         runTests(false, null, null, "Queue.TEST.FOO.>");
     }
 
+    public void testFailOverWithUpdateClientsOnRemove() throws Exception{
+        // Broker A
+        addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
+        TransportConnector connectorA = getBroker(BROKER_A_NAME).addConnector(BROKER_A_CLIENT_TC_ADDRESS);
+        connectorA.setName("openwire");
+        connectorA.setRebalanceClusterClients(true);
+        connectorA.setUpdateClusterClients(true);
+        connectorA.setUpdateClusterClientsOnRemove(true); //If set to false the test succeeds.
+        addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
+        getBroker(BROKER_A_NAME).start();
+
+        // Broker B
+        addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
+        TransportConnector connectorB = getBroker(BROKER_B_NAME).addConnector(BROKER_B_CLIENT_TC_ADDRESS);
+        connectorB.setName("openwire");
+        connectorB.setRebalanceClusterClients(true);
+        connectorB.setUpdateClusterClients(true);
+        connectorB.setUpdateClusterClientsOnRemove(true); //If set to false the test succeeds.
+        addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
+        getBroker(BROKER_B_NAME).start();
+
+        getBroker(BROKER_B_NAME).waitUntilStarted();
+        Thread.sleep(1000);
+
+        // create client connecting only to A. It should receive broker B address whet it connects to A.
+        setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=true");
+        createClients(1);
+        Thread.sleep(5000);
 
-	/**
-	 * Runs a 3 Broker dynamic failover test: <br/>
-	 * <ul>
-	 * <li>asserts clients are distributed across all 3 brokers</li>
-	 * <li>asserts clients are distributed across 2 brokers after removing the 3rd</li>
-	 * <li>asserts clients are distributed across all 3 brokers after
-	 * reintroducing the 3rd broker</li>
-	 * </ul>
-	 * 
-	 * @param multi
-	 * @param tcParams
-	 * @param clusterFilter
-	 * @param destinationFilter
-	 * @throws Exception
-	 * @throws InterruptedException
-	 */
+        // We stop broker A.
+        Log.info("Stopping broker A whose address is: {}", BROKER_A_CLIENT_TC_ADDRESS);
+        getBroker(BROKER_A_NAME).stop();
+        getBroker(BROKER_A_NAME).waitUntilStopped();
+        Thread.sleep(5000);
+
+        // Client should failover to B.
+        assertAllConnectedTo(BROKER_B_CLIENT_TC_ADDRESS);
+    }
+
+    /**
+     * Runs a 3 Broker dynamic failover test: <br/>
+     * <ul>
+     * <li>asserts clients are distributed across all 3 brokers</li>
+     * <li>asserts clients are distributed across 2 brokers after removing the 3rd</li>
+     * <li>asserts clients are distributed across all 3 brokers after
+     * reintroducing the 3rd broker</li>
+     * </ul>
+     *
+     * @param multi
+     * @param tcParams
+     * @param clusterFilter
+     * @param destinationFilter
+     * @throws Exception
+     * @throws InterruptedException
+     */
     private void runTests(boolean multi, String tcParams, String clusterFilter, String destinationFilter) throws Exception, InterruptedException {
         assertClientsConnectedToThreeBrokers();
 
@@ -226,14 +264,13 @@ public class FailoverComplexClusterTest 
         Thread.sleep(5000);
 
         assertClientsConnectedToTwoBrokers();
-        
+
         createBrokerC(multi, tcParams, clusterFilter, destinationFilter);
         getBroker(BROKER_C_NAME).waitUntilStarted();
         Thread.sleep(5000);
 
         assertClientsConnectedToThreeBrokers();
     }
-    
 
     /**
      * @param multi
@@ -288,9 +325,9 @@ public class FailoverComplexClusterTest 
         createBrokerC(true, params, clusterFilter, null);
         getBroker(BROKER_C_NAME).waitUntilStarted();
     }
-    
+
     private void createBrokerA(boolean multi, String params, String clusterFilter, String destinationFilter) throws Exception {
-    	final String tcParams = (params == null)?"":params;
+        final String tcParams = (params == null)?"":params;
         if (getBroker(BROKER_A_NAME) == null) {
             addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
             addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS + tcParams, true);
@@ -307,7 +344,7 @@ public class FailoverComplexClusterTest 
     }
 
     private void createBrokerB(boolean multi, String params, String clusterFilter, String destinationFilter) throws Exception {
-    	final String tcParams = (params == null)?"":params;
+        final String tcParams = (params == null)?"":params;
         if (getBroker(BROKER_B_NAME) == null) {
             addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
             addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS + tcParams, true);
@@ -324,7 +361,7 @@ public class FailoverComplexClusterTest 
     }
 
     private void createBrokerC(boolean multi, String params, String clusterFilter, String destinationFilter) throws Exception {
-    	final String tcParams = (params == null)?"":params;
+        final String tcParams = (params == null)?"":params;
         if (getBroker(BROKER_C_NAME) == null) {
             addBroker(BROKER_C_NAME, createBroker(BROKER_C_NAME));
             addTransportConnector(getBroker(BROKER_C_NAME), "openwire", BROKER_C_CLIENT_TC_ADDRESS + tcParams, true);
@@ -339,5 +376,4 @@ public class FailoverComplexClusterTest 
             getBroker(BROKER_C_NAME).start();
         }
     }
-    
 }