You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2012/02/01 14:12:34 UTC

svn commit: r1239118 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/transport/failover/ test/java/org/apache/activemq/transport/failover/

Author: dejanb
Date: Wed Feb  1 13:12:33 2012
New Revision: 1239118

URL: http://svn.apache.org/viewvc?rev=1239118&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3685 - fixing cluster update feature

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1239118&r1=1239117&r2=1239118&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Wed Feb  1 13:12:33 2012
@@ -1358,7 +1358,7 @@ public class BrokerService implements Se
     public String getDefaultSocketURIString() {
 
             if (started.get()) {
-                if (this.defaultSocketURIString ==null) {
+                if (this.defaultSocketURIString == null) {
                     for (TransportConnector tc:this.transportConnectors) {
                         String result = null;
                         try {
@@ -1367,10 +1367,19 @@ public class BrokerService implements Se
                           LOG.warn("Failed to get the ConnectURI for "+tc,e);
                         }
                         if (result != null) {
-                            this.defaultSocketURIString =result;
-                            break;
+                            // find first publishable uri
+                            if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) {
+                                this.defaultSocketURIString = result;
+                                break;
+                            } else {
+                            // or use the first defined
+                                if (this.defaultSocketURIString == null) {
+                                    this.defaultSocketURIString = result;
+                                }
+                            }
                         }
                     }
+
                 }
                 return this.defaultSocketURIString;
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1239118&r1=1239117&r2=1239118&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Wed Feb  1 13:12:33 2012
@@ -16,26 +16,6 @@
  */
 package org.apache.activemq.broker;
 
-import java.io.EOFException;
-import java.io.IOException;
-import java.net.SocketException;
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import javax.transaction.xa.XAResource;
 import org.apache.activemq.broker.ft.MasterBroker;
 import org.apache.activemq.broker.region.ConnectionStatistics;
 import org.apache.activemq.broker.region.RegionBroker;
@@ -69,6 +49,26 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
+import javax.transaction.xa.XAResource;
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.SocketException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
 public class TransportConnection implements Connection, Task, CommandVisitor {
     private static final Logger LOG = LoggerFactory.getLogger(TransportConnection.class);
     private static final Logger TRANSPORTLOG = LoggerFactory.getLogger(TransportConnection.class.getName() + ".Transport");

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=1239118&r1=1239117&r2=1239118&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java Wed Feb  1 13:12:33 2012
@@ -209,7 +209,7 @@ public class TransportConnector implemen
         brokerInfo.setBrokerId(broker.getBrokerId());
         brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos());
         brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration());
-        brokerInfo.setBrokerURL(getPublishableConnectString(getServer().getConnectURI()));
+        brokerInfo.setBrokerURL(broker.getBrokerService().getDefaultSocketURIString());
         getServer().setAcceptListener(new TransportAcceptListener() {
             public void onAccept(final Transport transport) {
                 try {
@@ -402,28 +402,29 @@ public class TransportConnector implemen
         boolean rebalance = isRebalanceClusterClients();
         String connectedBrokers = "";
         String self = "";
+        String separator = "";
 
         if (isUpdateClusterClients()) {
             if (brokerService.getDefaultSocketURIString() != null) {
                 self += brokerService.getDefaultSocketURIString();
-                self += ",";
             }
             if (rebalance == false) {
                 connectedBrokers += self;
+                separator = ",";
             }
             if (this.broker.getPeerBrokerInfos() != null) {
                 for (BrokerInfo info : this.broker.getPeerBrokerInfos()) {
                     if (isMatchesClusterFilter(info.getBrokerName())) {
+                        connectedBrokers += separator;
                         connectedBrokers += info.getBrokerURL();
-                        connectedBrokers += ",";
+                        separator = ",";
                     }
                 }
             }
             if (rebalance) {
-                connectedBrokers += self;
+                connectedBrokers += separator + self;
             }
         }
-
         ConnectionControl control = new ConnectionControl();
         control.setConnectedBrokers(connectedBrokers);
         control.setRebalanceConnection(rebalance);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java?rev=1239118&r1=1239117&r2=1239118&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java Wed Feb  1 13:12:33 2012
@@ -18,11 +18,12 @@
 
 package org.apache.activemq.transport.failover;
 
-import java.io.IOException;
-import java.net.URI;
 import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.Transport;
 
+import java.io.IOException;
+import java.net.URI;
+
 class BackupTransport extends DefaultTransportListener{
 	private final FailoverTransport failoverTransport;
 	private Transport transport;
@@ -76,4 +77,9 @@ class BackupTransport extends DefaultTra
 		}
 		return false;
 	}
+
+    @Override
+    public String toString() {
+        return "Backup transport: " + uri;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=1239118&r1=1239117&r2=1239118&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Wed Feb  1 13:12:33 2012
@@ -16,26 +16,6 @@
  */
 package org.apache.activemq.transport.failover;
 
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.InterruptedIOException;
-import java.net.InetAddress;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.StringTokenizer;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.activemq.broker.SslContext;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionControl;
@@ -59,6 +39,25 @@ import org.apache.activemq.util.ServiceS
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.InterruptedIOException;
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
+
 /**
  * A Transport that is made reliable by being able to fail over to another
  * transport when a transport failure is detected.
@@ -241,6 +240,7 @@ public class FailoverTransport implement
                 }
 
                 if (reconnectOk) {
+                    updated.remove(failedConnectTransportURI);
                     reconnectTask.wakeup();
                 } else {
                     propagateFailureToExceptionListener(e);
@@ -670,6 +670,7 @@ public class FailoverTransport implement
 
     private List<URI> getConnectList() {
         ArrayList<URI> l = new ArrayList<URI>(uris);
+        l.addAll(updated);
         boolean removed = false;
         if (failedConnectTransportURI != null) {
             removed = l.remove(failedConnectTransportURI);
@@ -806,7 +807,6 @@ public class FailoverTransport implement
             if (disposed || connectionFailure != null) {
                 reconnectMutex.notifyAll();
             }
-
             if ((connectedTransport.get() != null && !doRebalance) || disposed || connectionFailure != null) {
                 return false;
             } else {
@@ -845,7 +845,12 @@ public class FailoverTransport implement
                     // If we have a backup already waiting lets try it.
                     synchronized (backupMutex) {
                         if (backup && !backups.isEmpty()) {
-                            BackupTransport bt = backups.remove(0);
+                            ArrayList<BackupTransport> l = new ArrayList(backups);
+                            if (randomize) {
+                                Collections.shuffle(l);
+                            }
+                            BackupTransport bt = l.remove(0);
+                            backups.remove(bt);
                             transport = bt.getTransport();
                             uri = bt.getUri();
                         }
@@ -1098,26 +1103,18 @@ public class FailoverTransport implement
     public void updateURIs(boolean rebalance, URI[] updatedURIs) throws IOException {
         if (isUpdateURIsSupported()) {
             List<URI> copy = new ArrayList<URI>(this.updated);
-            List<URI> add = new ArrayList<URI>();
+            updated.clear();
             if (updatedURIs != null && updatedURIs.length > 0) {
-                Set<URI> set = new HashSet<URI>();
                 for (URI uri : updatedURIs) {
-                    if (uri != null) {
-                        set.add(uri);
-                    }
-                }
-                for (URI uri : set) {
-                    if (copy.remove(uri) == false) {
-                        add.add(uri);
+                    if (uri != null && !uris.contains(uri)) {
+                        updated.add(uri);
                     }
                 }
                 synchronized (reconnectMutex) {
-                    this.updated.clear();
-                    this.updated.addAll(add);
-                    for (URI uri : copy) {
-                        this.uris.remove(uri);
+                    if (!(copy.isEmpty() && updated.isEmpty()) && !copy.equals(updated)) {
+                        buildBackups();
+                        reconnect(rebalance);
                     }
-                    add(rebalance, add.toArray(new URI[add.size()]));
                 }
             }
         }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java?rev=1239118&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java Wed Feb  1 13:12:33 2012
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.failover;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class FailoverClusterTestSupport extends TestCase {
+    protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+    private static final int NUMBER_OF_CLIENTS = 30;
+
+    private String clientUrl;
+
+    private final Map<String, BrokerService> brokers = new HashMap<String, BrokerService>();
+    private final List<ActiveMQConnection> connections = new ArrayList<ActiveMQConnection>();
+
+    protected void assertClientsConnectedToTwoBrokers() {
+        Set<String> set = new HashSet<String>();
+        for (ActiveMQConnection c : connections) {
+            set.add(c.getTransportChannel().getRemoteAddress());
+        }
+        assertTrue("Only 2 connections should be found: " + set,
+                set.size() == 2);
+    }
+
+    protected void assertClientsConnectedToThreeBrokers() {
+        Set<String> set = new HashSet<String>();
+        for (ActiveMQConnection c : connections) {
+            set.add(c.getTransportChannel().getRemoteAddress());
+        }
+        assertTrue("Only 3 connections should be found: " + set,
+                set.size() == 3);
+    }
+
+    protected void addBroker(String name, BrokerService brokerService) {
+        brokers.put(name, brokerService);
+    }
+
+    protected BrokerService getBroker(String name) {
+        return brokers.get(name);
+    }
+
+    protected BrokerService removeBroker(String name) {
+        return brokers.remove(name);
+    }
+
+    protected void destroyBrokerCluster() throws JMSException, Exception {
+        for (BrokerService b : brokers.values()) {
+            b.stop();
+        }
+        brokers.clear();
+    }
+
+    protected void shutdownClients() throws JMSException {
+        for (Connection c : connections) {
+            c.close();
+        }
+    }
+
+    protected BrokerService createBroker(String brokerName) throws Exception {
+        BrokerService answer = new BrokerService();
+        answer.setPersistent(false);
+        answer.setUseJmx(false);
+        answer.setBrokerName(brokerName);
+        answer.setUseShutdownHook(false);
+        return answer;
+    }
+
+    protected void addTransportConnector(BrokerService brokerService,
+                                         String connectorName, String uri, boolean clustered)
+            throws Exception {
+        TransportConnector connector = brokerService.addConnector(uri);
+        connector.setName(connectorName);
+        if (clustered) {
+            connector.setRebalanceClusterClients(true);
+            connector.setUpdateClusterClients(true);
+            connector.setUpdateClusterClientsOnRemove(true);
+        } else {
+            connector.setRebalanceClusterClients(false);
+            connector.setUpdateClusterClients(false);
+            connector.setUpdateClusterClientsOnRemove(false);
+        }
+    }
+
+    protected void addNetworkBridge(BrokerService answer, String bridgeName,
+                                    String uri, boolean duplex, String destinationFilter)
+            throws Exception {
+        NetworkConnector network = answer.addNetworkConnector(uri);
+        network.setName(bridgeName);
+        network.setDuplex(duplex);
+        if (destinationFilter != null && !destinationFilter.equals("")) {
+            network.setDestinationFilter(bridgeName);
+        }
+    }
+
+    @SuppressWarnings("unused")
+    protected void createClients() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+                clientUrl);
+        for (int i = 0; i < NUMBER_OF_CLIENTS; i++) {
+            ActiveMQConnection c = (ActiveMQConnection) factory
+                    .createConnection();
+            c.start();
+            Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = s.createQueue(getClass().getName());
+            MessageConsumer consumer = s.createConsumer(queue);
+            connections.add(c);
+        }
+    }
+
+    public String getClientUrl() {
+        return clientUrl;
+    }
+
+    public void setClientUrl(String clientUrl) {
+        this.clientUrl = clientUrl;
+    }
+}
\ No newline at end of file

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java?rev=1239118&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverComplexClusterTest.java Wed Feb  1 13:12:33 2012
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.failover;
+
+/**
+ * Complex cluster test that will exercise the dynamic failover capabilities of
+ * a network of brokers. Using a networking of 3 brokers where the 3rd broker is
+ * removed and then added back in it is expected in each test that the number of
+ * connections on the client should start with 3, then have two after the 3rd
+ * broker is removed and then show 3 after the 3rd broker is reintroduced.
+ */
+public class FailoverComplexClusterTest extends FailoverClusterTestSupport {
+
+    private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://localhost:61616";
+    private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://localhost:61617";
+    private static final String BROKER_C_CLIENT_TC_ADDRESS = "tcp://localhost:61618";
+    private static final String BROKER_A_NOB_TC_ADDRESS = "tcp://localhost:61626";
+    private static final String BROKER_B_NOB_TC_ADDRESS = "tcp://localhost:61627";
+    private static final String BROKER_C_NOB_TC_ADDRESS = "tcp://localhost:61628";
+    private static final String BROKER_A_NAME = "BROKERA";
+    private static final String BROKER_B_NAME = "BROKERB";
+    private static final String BROKER_C_NAME = "BROKERC";
+    
+    
+
+    public void testThreeBrokerClusterSingleConnectorBasic() throws Exception {
+
+        initSingleTcBroker("", null);
+
+        Thread.sleep(2000);
+
+        setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")");
+        createClients();
+        Thread.sleep(2000);
+
+        runTests(false);
+    }
+
+
+    public void testThreeBrokerClusterSingleConnectorBackup() throws Exception {
+
+        initSingleTcBroker("", null);
+
+        Thread.sleep(2000);
+
+        setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?backup=true&backupPoolSize=2");
+        createClients();
+        Thread.sleep(2000);
+
+        runTests(false);
+    }
+
+
+    public void testThreeBrokerClusterSingleConnectorWithParams() throws Exception {
+
+        initSingleTcBroker("?transport.closeAsync=false", null);
+
+        Thread.sleep(2000);
+        setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")");
+        createClients();
+
+        runTests(false);
+    }
+
+    public void testThreeBrokerClusterMultipleConnectorBasic() throws Exception {
+
+        initMultiTcCluster("", null);
+
+        Thread.sleep(2000);
+
+        setClientUrl("failover://(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")");
+        createClients();
+
+        runTests(true);
+    }
+
+
+    /**
+     * Runs a 3 tests: <br/>
+     * <ul>
+     * <li>asserts clients are distributed across all 3 brokers</li>
+     * <li>asserts clients are distributed across 2 brokers after removing the 3rd</li>
+     * <li>asserts clients are distributed across all 3 brokers after reintroducing the 3rd broker</li>
+     * </ul>
+     * @throws Exception
+     * @throws InterruptedException
+     */
+    private void runTests(boolean multi) throws Exception, InterruptedException {
+        assertClientsConnectedToThreeBrokers();
+
+        getBroker(BROKER_C_NAME).stop();
+        getBroker(BROKER_C_NAME).waitUntilStopped();
+        removeBroker(BROKER_C_NAME);
+
+        Thread.sleep(5000);
+
+        assertClientsConnectedToTwoBrokers();
+
+        createBrokerC(multi, "", null);
+        getBroker(BROKER_C_NAME).waitUntilStarted();
+        Thread.sleep(5000);
+
+        assertClientsConnectedToThreeBrokers();
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        shutdownClients();
+        destroyBrokerCluster();
+        Thread.sleep(2000);
+    }
+
+    private void initSingleTcBroker(String params, String clusterFilter) throws Exception {
+        createBrokerA(false, params, clusterFilter);
+        createBrokerB(false, params, clusterFilter);
+        createBrokerC(false, params, clusterFilter);
+        getBroker(BROKER_C_NAME).waitUntilStarted();
+    }
+
+    private void initMultiTcCluster(String params, String clusterFilter) throws Exception {
+        createBrokerA(true, params, clusterFilter);
+        createBrokerB(true, params, clusterFilter);
+        createBrokerC(true, params, clusterFilter);
+        getBroker(BROKER_C_NAME).waitUntilStarted();
+    }
+    
+    private void createBrokerA(boolean multi, String params, String clusterFilter) throws Exception {
+        if (getBroker(BROKER_A_NAME) == null) {
+            addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
+            addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS + params, true);
+            if (multi) {
+                addTransportConnector(getBroker(BROKER_A_NAME), "network", BROKER_A_NOB_TC_ADDRESS, false);
+                addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
+                addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_C_Bridge", "static://(" + BROKER_C_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
+            } else {
+                addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
+                addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_C_Bridge", "static://(" + BROKER_C_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
+            }
+            getBroker(BROKER_A_NAME).start();
+        }
+    }
+
+    private void createBrokerB(boolean multi, String params, String clusterFilter) throws Exception {
+        if (getBroker(BROKER_B_NAME) == null) {
+            addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
+            addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS + params, true);
+            if (multi) {
+                addTransportConnector(getBroker(BROKER_B_NAME), "network", BROKER_B_NOB_TC_ADDRESS, false);
+                addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
+                addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_C_Bridge", "static://(" + BROKER_C_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
+            } else {
+                addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
+                addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_C_Bridge", "static://(" + BROKER_C_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
+            }
+            getBroker(BROKER_B_NAME).start();
+        }
+    }
+
+    private void createBrokerC(boolean multi, String params, String clusterFilter) throws Exception {
+        if (getBroker(BROKER_C_NAME) == null) {
+            addBroker(BROKER_C_NAME, createBroker(BROKER_C_NAME));
+            addTransportConnector(getBroker(BROKER_C_NAME), "openwire", BROKER_C_CLIENT_TC_ADDRESS + params, true);
+            if (multi) {
+                addTransportConnector(getBroker(BROKER_C_NAME), "network", BROKER_C_NOB_TC_ADDRESS, false);
+                addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_A_Bridge", "static://(" + BROKER_A_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
+                addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_B_Bridge", "static://(" + BROKER_B_NOB_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
+            } else {
+                addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, clusterFilter);
+                addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
+            }
+            getBroker(BROKER_C_NAME).start();
+        }
+    }
+    
+}

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java?rev=1239118&r1=1239117&r2=1239118&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java Wed Feb  1 13:12:33 2012
@@ -16,9 +16,6 @@
  */
 package org.apache.activemq.transport.failover;
 
-import java.io.IOException;
-import java.net.URI;
-
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.MessageAck;
@@ -32,6 +29,10 @@ import org.apache.activemq.util.Wait;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URI;
+
 import static org.junit.Assert.*;
 
 public class FailoverTransportTest {
@@ -99,7 +100,6 @@ public class FailoverTransportTest {
 
 		// Track a connection
 		tracker.track(connection);
-
 		try {
 			this.transport.oneway(new RemoveInfo(new ConnectionId("1")));
 		} catch(Exception e) {
@@ -128,7 +128,7 @@ public class FailoverTransportTest {
 
     protected Transport createTransport() throws Exception {
     	Transport transport = TransportFactory.connect(
-    			new URI("failover://(tcp://doesNotExist:1234)"));
+    			new URI("failover://(tcp://localhost:1234)"));
         transport.setTransportListener(new TransportListener() {
 
             public void onCommand(Object command) {