You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/05/03 17:21:26 UTC
svn commit: r1478835 - in /activemq/trunk:
activemq-broker/src/main/java/org/apache/activemq/broker/
activemq-broker/src/main/java/org/apache/activemq/broker/region/
activemq-client/src/main/java/org/apache/activemq/transport/failover/
activemq-unit-te...
Author: tabish
Date: Fri May 3 15:21:25 2013
New Revision: 1478835
URL: http://svn.apache.org/r1478835
Log:
Fix and test for: https://issues.apache.org/jira/browse/AMQ-4505
When a broker was stopping it was sending out a cluster update after tearing down its bridges so any client connected to it would lose its awareness of other brokers in the cluster.
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1478835&r1=1478834&r2=1478835&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java Fri May 3 15:21:25 2013
@@ -2829,4 +2829,7 @@ public class BrokerService implements Se
return this.slave;
}
+ public boolean isStopping() {
+ return this.stopping.get();
+ }
}
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1478835&r1=1478834&r2=1478835&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Fri May 3 15:21:25 2013
@@ -16,6 +16,23 @@
*/
package org.apache.activemq.broker.region;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.jms.InvalidClientIDException;
+import javax.jms.JMSException;
+
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.Connection;
@@ -40,22 +57,6 @@ import org.apache.activemq.util.ServiceS
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jms.InvalidClientIDException;
-import javax.jms.JMSException;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
/**
* Routes Broker operations to the correct messaging regions for processing.
*
@@ -94,6 +95,7 @@ public class RegionBroker extends EmptyB
private final ReentrantReadWriteLock inactiveDestinationsPurgeLock = new ReentrantReadWriteLock();
private final Runnable purgeInactiveDestinationsTask = new Runnable() {
+ @Override
public void run() {
purgeInactiveDestinations();
}
@@ -526,7 +528,11 @@ public class RegionBroker extends EmptyB
if (LOG.isDebugEnabled()) {
LOG.debug(getBrokerName() + " removeBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size());
}
- removeBrokerInClusterUpdate(info);
+ // When stopping don't send cluster updates since we are the one's tearing down
+ // our own bridges.
+ if (!brokerService.isStopping()) {
+ removeBrokerInClusterUpdate(info);
+ }
}
}
@@ -730,6 +736,7 @@ public class RegionBroker extends EmptyB
return this.scheduler;
}
+ @Override
public ThreadPoolExecutor getExecutor() {
return this.executor;
}
Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=1478835&r1=1478834&r2=1478835&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Fri May 3 15:21:25 2013
@@ -286,6 +286,10 @@ public class FailoverTransport implement
public final void handleConnectionControl(ConnectionControl control) {
String reconnectStr = control.getReconnectTo();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Received ConnectionControl: {}", control);
+ }
+
if (reconnectStr != null) {
reconnectStr = reconnectStr.trim();
if (reconnectStr.length() > 0) {
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java?rev=1478835&r1=1478834&r2=1478835&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java Fri May 3 15:21:25 2013
@@ -16,6 +16,9 @@
*/
package org.apache.activemq.transport.failover;
+import org.apache.activemq.broker.TransportConnector;
+import org.mortbay.log.Log;
+
/**
* Complex cluster test that will exercise the dynamic failover capabilities of
@@ -26,21 +29,19 @@ package org.apache.activemq.transport.fa
*/
public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
- private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://localhost:61616";
- private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://localhost:61617";
- private static final String BROKER_C_CLIENT_TC_ADDRESS = "tcp://localhost:61618";
- private static final String BROKER_A_NOB_TC_ADDRESS = "tcp://localhost:61626";
- private static final String BROKER_B_NOB_TC_ADDRESS = "tcp://localhost:61627";
- private static final String BROKER_C_NOB_TC_ADDRESS = "tcp://localhost:61628";
+ private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61616";
+ private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61617";
+ private static final String BROKER_C_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61618";
+ private static final String BROKER_A_NOB_TC_ADDRESS = "tcp://127.0.0.1:61626";
+ private static final String BROKER_B_NOB_TC_ADDRESS = "tcp://127.0.0.1:61627";
+ private static final String BROKER_C_NOB_TC_ADDRESS = "tcp://127.0.0.1:61628";
private static final String BROKER_A_NAME = "BROKERA";
private static final String BROKER_B_NAME = "BROKERB";
private static final String BROKER_C_NAME = "BROKERC";
-
-
/**
- * Basic dynamic failover 3 broker test
- *
+ * Basic dynamic failover 3 broker test
+ *
* @throws Exception
*/
public void testThreeBrokerClusterSingleConnectorBasic() throws Exception {
@@ -56,14 +57,14 @@ public class FailoverComplexClusterTest
runTests(false, null, null, null);
}
- /**
- * Tests a 3 broker configuration to ensure that the backup is random and
- * supported in a cluster. useExponentialBackOff is set to false and
- * maxReconnectAttempts is set to 1 to move through the list quickly for
- * this test.
- *
- * @throws Exception
- */
+ /**
+ * Tests a 3 broker configuration to ensure that the backup is random and
+ * supported in a cluster. useExponentialBackOff is set to false and
+ * maxReconnectAttempts is set to 1 to move through the list quickly for
+ * this test.
+ *
+ * @throws Exception
+ */
public void testThreeBrokerClusterSingleConnectorBackupFailoverConfig() throws Exception {
initSingleTcBroker("", null, null);
@@ -77,14 +78,14 @@ public class FailoverComplexClusterTest
runTests(false, null, null, null);
}
- /**
- * Tests a 3 broker cluster that passes in connection params on the
- * transport connector. Prior versions of AMQ passed the TC connection
- * params to the client and this should not happen. The chosen param is not
- * compatible with the client and will throw an error if used.
- *
- * @throws Exception
- */
+ /**
+ * Tests a 3 broker cluster that passes in connection params on the
+ * transport connector. Prior versions of AMQ passed the TC connection
+ * params to the client and this should not happen. The chosen param is not
+ * compatible with the client and will throw an error if used.
+ *
+ * @throws Exception
+ */
public void testThreeBrokerClusterSingleConnectorWithParams() throws Exception {
initSingleTcBroker("?transport.closeAsync=false", null, null);
@@ -97,10 +98,9 @@ public class FailoverComplexClusterTest
runTests(false, null, null, null);
}
-
/**
* Tests a 3 broker cluster using a cluster filter of *
- *
+ *
* @throws Exception
*/
public void testThreeBrokerClusterWithClusterFilter() throws Exception {
@@ -114,12 +114,12 @@ public class FailoverComplexClusterTest
runTests(false, null, "*", null);
}
- /**
- * Test to verify that a broker with multiple transport connections only the
- * one marked to update clients is propagate
- *
- * @throws Exception
- */
+ /**
+ * Test to verify that a broker with multiple transport connections only the
+ * one marked to update clients is propagate
+ *
+ * @throws Exception
+ */
public void testThreeBrokerClusterMultipleConnectorBasic() throws Exception {
initMultiTcCluster("", null);
@@ -133,11 +133,11 @@ public class FailoverComplexClusterTest
runTests(true, null, null, null);
}
- /**
- * Test to verify the reintroduction of the A Broker
- *
- * @throws Exception
- */
+ /**
+ * Test to verify the reintroduction of the A Broker
+ *
+ * @throws Exception
+ */
public void testOriginalBrokerRestart() throws Exception {
initSingleTcBroker("", null, null);
@@ -164,12 +164,12 @@ public class FailoverComplexClusterTest
assertClientsConnectedToThreeBrokers();
}
- /**
- * Test to ensure clients are evenly to all available brokers in the
- * network.
- *
- * @throws Exception
- */
+ /**
+ * Test to ensure clients are evenly to all available brokers in the
+ * network.
+ *
+ * @throws Exception
+ */
public void testThreeBrokerClusterClientDistributions() throws Exception {
initSingleTcBroker("", null, null);
@@ -182,12 +182,12 @@ public class FailoverComplexClusterTest
runClientDistributionTests(false, null, null, null);
}
- /**
- * Test to verify that clients are distributed with no less than 20% of the
- * clients on any one broker.
- *
- * @throws Exception
- */
+ /**
+ * Test to verify that clients are distributed with no less than 20% of the
+ * clients on any one broker.
+ *
+ * @throws Exception
+ */
public void testThreeBrokerClusterDestinationFilter() throws Exception {
initSingleTcBroker("", null, null);
@@ -199,23 +199,61 @@ public class FailoverComplexClusterTest
runTests(false, null, null, "Queue.TEST.FOO.>");
}
+ public void testFailOverWithUpdateClientsOnRemove() throws Exception{
+ // Broker A
+ addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
+ TransportConnector connectorA = getBroker(BROKER_A_NAME).addConnector(BROKER_A_CLIENT_TC_ADDRESS);
+ connectorA.setName("openwire");
+ connectorA.setRebalanceClusterClients(true);
+ connectorA.setUpdateClusterClients(true);
+ connectorA.setUpdateClusterClientsOnRemove(true); //If set to false the test succeeds.
+ addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
+ getBroker(BROKER_A_NAME).start();
+
+ // Broker B
+ addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
+ TransportConnector connectorB = getBroker(BROKER_B_NAME).addConnector(BROKER_B_CLIENT_TC_ADDRESS);
+ connectorB.setName("openwire");
+ connectorB.setRebalanceClusterClients(true);
+ connectorB.setUpdateClusterClients(true);
+ connectorB.setUpdateClusterClientsOnRemove(true); //If set to false the test succeeds.
+ addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
+ getBroker(BROKER_B_NAME).start();
+
+ getBroker(BROKER_B_NAME).waitUntilStarted();
+ Thread.sleep(1000);
+
+ // create client connecting only to A. It should receive broker B address whet it connects to A.
+ setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=true");
+ createClients(1);
+ Thread.sleep(5000);
- /**
- * Runs a 3 Broker dynamic failover test: <br/>
- * <ul>
- * <li>asserts clients are distributed across all 3 brokers</li>
- * <li>asserts clients are distributed across 2 brokers after removing the 3rd</li>
- * <li>asserts clients are distributed across all 3 brokers after
- * reintroducing the 3rd broker</li>
- * </ul>
- *
- * @param multi
- * @param tcParams
- * @param clusterFilter
- * @param destinationFilter
- * @throws Exception
- * @throws InterruptedException
- */
+ // We stop broker A.
+ Log.info("Stopping broker A whose address is: {}", BROKER_A_CLIENT_TC_ADDRESS);
+ getBroker(BROKER_A_NAME).stop();
+ getBroker(BROKER_A_NAME).waitUntilStopped();
+ Thread.sleep(5000);
+
+ // Client should failover to B.
+ assertAllConnectedTo(BROKER_B_CLIENT_TC_ADDRESS);
+ }
+
+ /**
+ * Runs a 3 Broker dynamic failover test: <br/>
+ * <ul>
+ * <li>asserts clients are distributed across all 3 brokers</li>
+ * <li>asserts clients are distributed across 2 brokers after removing the 3rd</li>
+ * <li>asserts clients are distributed across all 3 brokers after
+ * reintroducing the 3rd broker</li>
+ * </ul>
+ *
+ * @param multi
+ * @param tcParams
+ * @param clusterFilter
+ * @param destinationFilter
+ * @throws Exception
+ * @throws InterruptedException
+ */
private void runTests(boolean multi, String tcParams, String clusterFilter, String destinationFilter) throws Exception, InterruptedException {
assertClientsConnectedToThreeBrokers();
@@ -226,14 +264,13 @@ public class FailoverComplexClusterTest
Thread.sleep(5000);
assertClientsConnectedToTwoBrokers();
-
+
createBrokerC(multi, tcParams, clusterFilter, destinationFilter);
getBroker(BROKER_C_NAME).waitUntilStarted();
Thread.sleep(5000);
assertClientsConnectedToThreeBrokers();
}
-
/**
* @param multi
@@ -288,9 +325,9 @@ public class FailoverComplexClusterTest
createBrokerC(true, params, clusterFilter, null);
getBroker(BROKER_C_NAME).waitUntilStarted();
}
-
+
private void createBrokerA(boolean multi, String params, String clusterFilter, String destinationFilter) throws Exception {
- final String tcParams = (params == null)?"":params;
+ final String tcParams = (params == null)?"":params;
if (getBroker(BROKER_A_NAME) == null) {
addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS + tcParams, true);
@@ -307,7 +344,7 @@ public class FailoverComplexClusterTest
}
private void createBrokerB(boolean multi, String params, String clusterFilter, String destinationFilter) throws Exception {
- final String tcParams = (params == null)?"":params;
+ final String tcParams = (params == null)?"":params;
if (getBroker(BROKER_B_NAME) == null) {
addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS + tcParams, true);
@@ -324,7 +361,7 @@ public class FailoverComplexClusterTest
}
private void createBrokerC(boolean multi, String params, String clusterFilter, String destinationFilter) throws Exception {
- final String tcParams = (params == null)?"":params;
+ final String tcParams = (params == null)?"":params;
if (getBroker(BROKER_C_NAME) == null) {
addBroker(BROKER_C_NAME, createBroker(BROKER_C_NAME));
addTransportConnector(getBroker(BROKER_C_NAME), "openwire", BROKER_C_CLIENT_TC_ADDRESS + tcParams, true);
@@ -339,5 +376,4 @@ public class FailoverComplexClusterTest
getBroker(BROKER_C_NAME).start();
}
}
-
}