You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2012/02/01 14:12:34 UTC
svn commit: r1239118 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/transport/failover/
test/java/org/apache/activemq/transport/failover/
Author: dejanb
Date: Wed Feb 1 13:12:33 2012
New Revision: 1239118
URL: http://svn.apache.org/viewvc?rev=1239118&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3685 - fixing cluster update feature
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1239118&r1=1239117&r2=1239118&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Wed Feb 1 13:12:33 2012
@@ -1358,7 +1358,7 @@ public class BrokerService implements Se
public String getDefaultSocketURIString() {
if (started.get()) {
- if (this.defaultSocketURIString ==null) {
+ if (this.defaultSocketURIString == null) {
for (TransportConnector tc:this.transportConnectors) {
String result = null;
try {
@@ -1367,10 +1367,19 @@ public class BrokerService implements Se
LOG.warn("Failed to get the ConnectURI for "+tc,e);
}
if (result != null) {
- this.defaultSocketURIString =result;
- break;
+ // find first publishable uri
+ if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) {
+ this.defaultSocketURIString = result;
+ break;
+ } else {
+ // or use the first defined
+ if (this.defaultSocketURIString == null) {
+ this.defaultSocketURIString = result;
+ }
+ }
}
}
+
}
return this.defaultSocketURIString;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1239118&r1=1239117&r2=1239118&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Wed Feb 1 13:12:33 2012
@@ -16,26 +16,6 @@
*/
package org.apache.activemq.broker;
-import java.io.EOFException;
-import java.io.IOException;
-import java.net.SocketException;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import javax.transaction.xa.XAResource;
import org.apache.activemq.broker.ft.MasterBroker;
import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.broker.region.RegionBroker;
@@ -69,6 +49,26 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
+import javax.transaction.xa.XAResource;
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.SocketException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
public class TransportConnection implements Connection, Task, CommandVisitor {
private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class);
private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport");
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=1239118&r1=1239117&r2=1239118&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java Wed Feb 1 13:12:33 2012
@@ -209,7 +209,7 @@ public class TransportConnector implemen
brokerInfo.setBrokerId(broker.getBrokerId());
brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos());
brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration());
- brokerInfo.setBrokerURL(getPublishableConnectString(getServer().getConnectURI()));
+ brokerInfo.setBrokerURL(broker.getBrokerService().getDefaultSocketURIString());
getServer().setAcceptListener(new TransportAcceptListener() {
public void onAccept(final Transport transport) {
try {
@@ -402,28 +402,29 @@ public class TransportConnector implemen
boolean rebalance = isRebalanceClusterClients();
String connectedBrokers = "";
String self = "";
+ String separator = "";
if (isUpdateClusterClients()) {
if (brokerService.getDefaultSocketURIString() != null) {
self += brokerService.getDefaultSocketURIString();
- self += ",";
}
if (rebalance == false) {
connectedBrokers += self;
+ separator = ",";
}
if (this.broker.getPeerBrokerInfos() != null) {
for (BrokerInfo info : this.broker.getPeerBrokerInfos()) {
if (isMatchesClusterFilter(info.getBrokerName())) {
+ connectedBrokers += separator;
connectedBrokers += info.getBrokerURL();
- connectedBrokers += ",";
+ separator = ",";
}
}
}
if (rebalance) {
- connectedBrokers += self;
+ connectedBrokers += separator + self;
}
}
-
ConnectionControl control = new ConnectionControl();
control.setConnectedBrokers(connectedBrokers);
control.setRebalanceConnection(rebalance);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java?rev=1239118&r1=1239117&r2=1239118&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java Wed Feb 1 13:12:33 2012
@@ -18,11 +18,12 @@
package org.apache.activemq.transport.failover;
-import java.io.IOException;
-import java.net.URI;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.Transport;
+import java.io.IOException;
+import java.net.URI;
+
class BackupTransport extends DefaultTransportListener{
private final FailoverTransport failoverTransport;
private Transport transport;
@@ -76,4 +77,9 @@ class BackupTransport extends DefaultTra
}
return false;
}
+
+ @Override
+ public String toString() {
+ return "Backup transport: " + uri;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=1239118&r1=1239117&r2=1239118&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Wed Feb 1 13:12:33 2012
@@ -16,26 +16,6 @@
*/
package org.apache.activemq.transport.failover;
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.InterruptedIOException;
-import java.net.InetAddress;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.StringTokenizer;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicReference;
-
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
@@ -59,6 +39,25 @@ import org.apache.activemq.util.ServiceS
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.InterruptedIOException;
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+
/**
* A Transport that is made reliable by being able to fail over to another
* transport when a transport failure is detected.
@@ -241,6 +240,7 @@ public class FailoverTransport implement
}
if (reconnectOk) {
+ updated.remove(failedConnectTransportURI);
reconnectTask.wakeup();
} else {
propagateFailureToExceptionListener(e);
@@ -670,6 +670,7 @@ public class FailoverTransport implement
private List<URI> getConnectList() {
ArrayList<URI> l = new ArrayList<URI>(uris);
+ l.addAll(updated);
boolean removed = false;
if (failedConnectTransportURI != null) {
removed = l.remove(failedConnectTransportURI);
@@ -806,7 +807,6 @@ public class FailoverTransport implement
if (disposed || connectionFailure != null) {
reconnectMutex.notifyAll();
}
-
if ((connectedTransport.get() != null && !doRebalance) || disposed || connectionFailure != null) {
return false;
} else {
@@ -845,7 +845,12 @@ public class FailoverTransport implement
// If we have a backup already waiting lets try it.
synchronized (backupMutex) {
if (backup && !backups.isEmpty()) {
- BackupTransport bt = backups.remove(0);
+ ArrayList<BackupTransport> l = new ArrayList(backups);
+ if (randomize) {
+ Collections.shuffle(l);
+ }
+ BackupTransport bt = l.remove(0);
+ backups.remove(bt);
transport = bt.getTransport();
uri = bt.getUri();
}
@@ -1098,26 +1103,18 @@ public class FailoverTransport implement
public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException {
if (isUpdateURIsSupported()) {
List<URI> copy = new ArrayList<URI>(this.updated);
- List<URI> add = new ArrayList<URI>();
+ updated.clear();
if (updatedURIs != null && updatedURIs.length > 0) {
- Set<URI> set = new HashSet<URI>();
for (URI uri : updatedURIs) {
- if (uri != null) {
- set.add(uri);
- }
- }
- for (URI uri : set) {
- if (copy.remove(uri) == false) {
- add.add(uri);
+ if (uri != null && !uris.contains(uri)) {
+ updated.add(uri);
}
}
synchronized (reconnectMutex) {
- this.updated.clear();
- this.updated.addAll(add);
- for (URI uri : copy) {
- this.uris.remove(uri);
+ if (!(copy.isEmpty() && updated.isEmpty()) && !copy.equals(updated)) {
+ buildBackups();
+ reconnect(rebalance);
}
- add(rebalance, add.toArray(new URI[add.size()]));
}
}
}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java?rev=1239118&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java Wed Feb 1 13:12:33 2012
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.failover;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class FailoverClusterTestSupport extends TestCase {
+ protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+ private static final int NUMBER_OF_CLIENTS = 30;
+
+ private String clientUrl;
+
+ private final Map<String, BrokerService> brokers = new HashMap<String, BrokerService>();
+ private final List<ActiveMQConnection> connections = new ArrayList<ActiveMQConnection>();
+
+ protected void assertClientsConnectedToTwoBrokers() {
+ Set<String> set = new HashSet<String>();
+ for (ActiveMQConnection c : connections) {
+ set.add(c.getTransportChannel().getRemoteAddress());
+ }
+ assertTrue("Only 2 connections should be found: " + set,
+ set.size() == 2);
+ }
+
+ protected void assertClientsConnectedToThreeBrokers() {
+ Set<String> set = new HashSet<String>();
+ for (ActiveMQConnection c : connections) {
+ set.add(c.getTransportChannel().getRemoteAddress());
+ }
+ assertTrue("Only 3 connections should be found: " + set,
+ set.size() == 3);
+ }
+
+ protected void addBroker(String name, BrokerService brokerService) {
+ brokers.put(name, brokerService);
+ }
+
+ protected BrokerService getBroker(String name) {
+ return brokers.get(name);
+ }
+
+ protected BrokerService removeBroker(String name) {
+ return brokers.remove(name);
+ }
+
+ protected void destroyBrokerCluster() throws JMSException, Exception {
+ for (BrokerService b : brokers.values()) {
+ b.stop();
+ }
+ brokers.clear();
+ }
+
+ protected void shutdownClients() throws JMSException {
+ for (Connection c : connections) {
+ c.close();
+ }
+ }
+
+ protected BrokerService createBroker(String brokerName) throws Exception {
+ BrokerService answer = new BrokerService();
+ answer.setPersistent(false);
+ answer.setUseJmx(false);
+ answer.setBrokerName(brokerName);
+ answer.setUseShutdownHook(false);
+ return answer;
+ }
+
+ protected void addTransportConnector(BrokerService brokerService,
+ String connectorName, String uri, boolean clustered)
+ throws Exception {
+ TransportConnector connector = brokerService.addConnector(uri);
+ connector.setName(connectorName);
+ if (clustered) {
+ connector.setRebalanceClusterClients(true);
+ connector.setUpdateClusterClients(true);
+ connector.setUpdateClusterClientsOnRemove(true);
+ } else {
+ connector.setRebalanceClusterClients(false);
+ connector.setUpdateClusterClients(false);
+ connector.setUpdateClusterClientsOnRemove(false);
+ }
+ }
+
+ protected void addNetworkBridge(BrokerService answer, String bridgeName,
+ String uri, boolean duplex, String destinationFilter)
+ throws Exception {
+ NetworkConnector network = answer.addNetworkConnector(uri);
+ network.setName(bridgeName);
+ network.setDuplex(duplex);
+ if (destinationFilter != null && !destinationFilter.equals("")) {
+ network.setDestinationFilter(bridgeName);
+ }
+ }
+
+ @SuppressWarnings("unused")
+ protected void createClients() throws Exception {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+ clientUrl);
+ for (int i = 0; i < NUMBER_OF_CLIENTS; i++) {
+ ActiveMQConnection c = (ActiveMQConnection) factory
+ .createConnection();
+ c.start();
+ Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = s.createQueue(getClass().getName());
+ MessageConsumer consumer = s.createConsumer(queue);
+ connections.add(c);
+ }
+ }
+
+ public String getClientUrl() {
+ return clientUrl;
+ }
+
+ public void setClientUrl(String clientUrl) {
+ this.clientUrl = clientUrl;
+ }
+}
\ No newline at end of file
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java?rev=1239118&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java Wed Feb 1 13:12:33 2012
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.failover;
+
+/**
+ * Complex cluster test that will exercise the dynamic failover capabilities of
+ * a network of brokers. Using a networking of 3 brokers where the 3rd broker is
+ * removed and then added back in it is expected in each test that the number of
+ * connections on the client should start with 3, then have two after the 3rd
+ * broker is removed and then show 3 after the 3rd broker is reintroduced.
+ */
+public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
+
+ private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://localhost:61616";
+ private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://localhost:61617";
+ private static final String BROKER_C_CLIENT_TC_ADDRESS = "tcp://localhost:61618";
+ private static final String BROKER_A_NOB_TC_ADDRESS = "tcp://localhost:61626";
+ private static final String BROKER_B_NOB_TC_ADDRESS = "tcp://localhost:61627";
+ private static final String BROKER_C_NOB_TC_ADDRESS = "tcp://localhost:61628";
+ private static final String BROKER_A_NAME = "BROKERA";
+ private static final String BROKER_B_NAME = "BROKERB";
+ private static final String BROKER_C_NAME = "BROKERC";
+
+
+
+ public void testThreeBrokerClusterSingleConnectorBasic() throws Exception {
+
+ initSingleTcBroker("", null);
+
+ Thread.sleep(2000);
+
+ setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")");
+ createClients();
+ Thread.sleep(2000);
+
+ runTests(false);
+ }
+
+
+ public void testThreeBrokerClusterSingleConnectorBackup() throws Exception {
+
+ initSingleTcBroker("", null);
+
+ Thread.sleep(2000);
+
+ setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?backup=true&backupPoolSize=2");
+ createClients();
+ Thread.sleep(2000);
+
+ runTests(false);
+ }
+
+
+ public void testThreeBrokerClusterSingleConnectorWithParams() throws Exception {
+
+ initSingleTcBroker("?transport.closeAsync=false", null);
+
+ Thread.sleep(2000);
+ setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")");
+ createClients();
+
+ runTests(false);
+ }
+
+ public void testThreeBrokerClusterMultipleConnectorBasic() throws Exception {
+
+ initMultiTcCluster("", null);
+
+ Thread.sleep(2000);
+
+ setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")");
+ createClients();
+
+ runTests(true);
+ }
+
+
+ /**
+ * Runs a 3 tests: <br/>
+ * <ul>
+ * <li>asserts clients are distributed across all 3 brokers</li>
+ * <li>asserts clients are distributed across 2 brokers after removing the 3rd</li>
+ * <li>asserts clients are distributed across all 3 brokers after reintroducing the 3rd broker</li>
+ * </ul>
+ * @throws Exception
+ * @throws InterruptedException
+ */
+ private void runTests(boolean multi) throws Exception, InterruptedException {
+ assertClientsConnectedToThreeBrokers();
+
+ getBroker(BROKER_C_NAME).stop();
+ getBroker(BROKER_C_NAME).waitUntilStopped();
+ removeBroker(BROKER_C_NAME);
+
+ Thread.sleep(5000);
+
+ assertClientsConnectedToTwoBrokers();
+
+ createBrokerC(multi, "", null);
+ getBroker(BROKER_C_NAME).waitUntilStarted();
+ Thread.sleep(5000);
+
+ assertClientsConnectedToThreeBrokers();
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ shutdownClients();
+ destroyBrokerCluster();
+ Thread.sleep(2000);
+ }
+
+ private void initSingleTcBroker(String params, String clusterFilter) throws Exception {
+ createBrokerA(false, params, clusterFilter);
+ createBrokerB(false, params, clusterFilter);
+ createBrokerC(false, params, clusterFilter);
+ getBroker(BROKER_C_NAME).waitUntilStarted();
+ }
+
+ private void initMultiTcCluster(String params, String clusterFilter) throws Exception {
+ createBrokerA(true, params, clusterFilter);
+ createBrokerB(true, params, clusterFilter);
+ createBrokerC(true, params, clusterFilter);
+ getBroker(BROKER_C_NAME).waitUntilStarted();
+ }
+
+ private void createBrokerA(boolean multi, String params, String clusterFilter) throws Exception {
+ if (getBroker(BROKER_A_NAME) == null) {
+ addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
+ addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS + params, true);
+ if (multi) {
+ addTransportConnector(getBroker(BROKER_A_NAME), "network", BROKER_A_NOB_TC_ADDRESS, false);
+ addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
+ addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_C_Bridge", "static://(" + BROKER_C_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
+ } else {
+ addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
+ addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_C_Bridge", "static://(" + BROKER_C_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
+ }
+ getBroker(BROKER_A_NAME).start();
+ }
+ }
+
+ private void createBrokerB(boolean multi, String params, String clusterFilter) throws Exception {
+ if (getBroker(BROKER_B_NAME) == null) {
+ addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
+ addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS + params, true);
+ if (multi) {
+ addTransportConnector(getBroker(BROKER_B_NAME), "network", BROKER_B_NOB_TC_ADDRESS, false);
+ addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
+ addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_C_Bridge", "static://(" + BROKER_C_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
+ } else {
+ addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
+ addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_C_Bridge", "static://(" + BROKER_C_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
+ }
+ getBroker(BROKER_B_NAME).start();
+ }
+ }
+
+ private void createBrokerC(boolean multi, String params, String clusterFilter) throws Exception {
+ if (getBroker(BROKER_C_NAME) == null) {
+ addBroker(BROKER_C_NAME, createBroker(BROKER_C_NAME));
+ addTransportConnector(getBroker(BROKER_C_NAME), "openwire", BROKER_C_CLIENT_TC_ADDRESS + params, true);
+ if (multi) {
+ addTransportConnector(getBroker(BROKER_C_NAME), "network", BROKER_C_NOB_TC_ADDRESS, false);
+ addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_A_Bridge", "static://(" + BROKER_A_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
+ addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_B_Bridge", "static://(" + BROKER_B_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
+ } else {
+ addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
+ addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
+ }
+ getBroker(BROKER_C_NAME).start();
+ }
+ }
+
+}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java?rev=1239118&r1=1239117&r2=1239118&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java Wed Feb 1 13:12:33 2012
@@ -16,9 +16,6 @@
*/
package org.apache.activemq.transport.failover;
-import java.io.IOException;
-import java.net.URI;
-
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.MessageAck;
@@ -32,6 +29,10 @@ import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URI;
+
import static org.junit.Assert.*;
public class FailoverTransportTest {
@@ -99,7 +100,6 @@ public class FailoverTransportTest {
// Track a connection
tracker.track(connection);
-
try {
this.transport.oneway(new RemoveInfo(new ConnectionId("1")));
} catch(Exception e) {
@@ -128,7 +128,7 @@ public class FailoverTransportTest {
protected Transport createTransport() throws Exception {
Transport transport = TransportFactory.connect(
- new URI("failover://(tcp://doesNotExist:1234)"));
+ new URI("failover://(tcp://localhost:1234)"));
transport.setTransportListener(new TransportListener() {
public void onCommand(Object command) {