You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/09/29 23:42:29 UTC

svn commit: r1177437 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/failover/ test/java/org/apache/activemq/transport/failover/

Author: tabish
Date: Thu Sep 29 21:42:29 2011
New Revision: 1177437

URL: http://svn.apache.org/viewvc?rev=1177437&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3517

Adds tests for the backup option and cleans up some other the other tests.

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=1177437&r1=1177436&r2=1177437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Thu Sep 29 21:42:29 2011
@@ -431,6 +431,10 @@ public class FailoverTransport implement
         this.backupPoolSize = backupPoolSize;
     }
 
+    public int getCurrentBackups() {
+        return this.backups.size();
+    }
+
     public boolean isTrackMessages() {
         return trackMessages;
     }
@@ -470,11 +474,11 @@ public class FailoverTransport implement
                     } else if (command instanceof RemoveInfo || command.isMessageAck()) {
                         // Simulate response to RemoveInfo command or MessageAck (as it will be stale)
                         stateTracker.track(command);
-                    	if (command.isResponseRequired()) {
-	                        Response response = new Response();
-	                        response.setCorrelationId(command.getCommandId());
-	                        myTransportListener.onCommand(response);
-                    	}
+                        if (command.isResponseRequired()) {
+                            Response response = new Response();
+                            response.setCorrelationId(command.getCommandId());
+                            myTransportListener.onCommand(response);
+                        }
                         return;
                     }
                 }
@@ -489,18 +493,24 @@ public class FailoverTransport implement
                         boolean timedout = false;
                         while (transport == null && !disposed && connectionFailure == null
                                 && !Thread.currentThread().isInterrupted()) {
-                            LOG.trace("Waiting for transport to reconnect..: " + command);
+                            if (LOG.isTraceEnabled()) {
+                                LOG.trace("Waiting for transport to reconnect..: " + command);
+                            }
                             long end = System.currentTimeMillis();
                             if (timeout > 0 && (end - start > timeout)) {
                                 timedout = true;
-                                LOG.info("Failover timed out after " + (end - start) + "ms");
+                                if (LOG.isInfoEnabled()) {
+                                    LOG.info("Failover timed out after " + (end - start) + "ms");
+                                }
                                 break;
                             }
                             try {
                                 reconnectMutex.wait(100);
                             } catch (InterruptedException e) {
                                 Thread.currentThread().interrupt();
-                                LOG.debug("Interupted: " + e, e);
+                                if (LOG.isDebugEnabled()) {
+                                    LOG.debug("Interupted: " + e, e);
+                                }
                             }
                             transport = connectedTransport.get();
                         }
@@ -572,6 +582,7 @@ public class FailoverTransport implement
             Thread.currentThread().interrupt();
             throw new InterruptedIOException();
         }
+
         if (!disposed) {
             if (error != null) {
                 if (error instanceof IOException) {
@@ -738,7 +749,6 @@ public class FailoverTransport implement
     }
 
     private void doUpdateURIsFromDisk() {
-
         // If updateURIsURL is specified, read the file and add any new
         // transport URI's to this FailOverTransport.
         // Note: Could track file timestamp to avoid unnecessary reading.
@@ -814,35 +824,26 @@ public class FailoverTransport implement
                         }
                         doRebalance = false;
                     }
+
                     if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) {
                         reconnectDelay = initialReconnectDelay;
                     }
+
+                    Transport transport = null;
+                    URI uri = null;
+
+                    // If we have a backup already waiting lets try it.
                     synchronized (backupMutex) {
                         if (backup && !backups.isEmpty()) {
                             BackupTransport bt = backups.remove(0);
-                            Transport t = bt.getTransport();
-                            URI uri = bt.getUri();
-                            t.setTransportListener(myTransportListener);
-                            try {
-                                if (started) {
-                                    restoreTransport(t);
-                                }
-                                reconnectDelay = initialReconnectDelay;
-                                failedConnectTransportURI = null;
-                                connectedTransportURI = uri;
-                                connectedTransport.set(t);
-                                reconnectMutex.notifyAll();
-                                connectFailures = 0;
-                                LOG.info("Successfully reconnected to backup " + uri);
-                                return false;
-                            } catch (Exception e) {
-                                LOG.debug("Backup transport failed", e);
-                            }
+                            transport = bt.getTransport();
+                            uri = bt.getUri();
                         }
                     }
 
-                    // Sleep for the reconnectDelay
-                    if (!firstConnection && (reconnectDelay > 0) && !disposed) {
+                    // Sleep for the reconnectDelay if there's no backup and we aren't trying
+                    // for the first time, or we were disposed for some reason.
+                    if (transport == null && !firstConnection && (reconnectDelay > 0) && !disposed) {
                         synchronized (sleepMutex) {
                             LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
                             try {
@@ -854,63 +855,76 @@ public class FailoverTransport implement
                     }
 
                     Iterator<URI> iter = connectList.iterator();
-                    while (iter.hasNext() && connectedTransport.get() == null && !disposed) {
+                    while ((transport != null || iter.hasNext()) && (connectedTransport.get() == null && !disposed)) {
 
-                        URI uri = iter.next();
-                        Transport t = null;
                         try {
+                            SslContext.setCurrentSslContext(brokerSslContext);
+
+                            // We could be starting with a backup and if so we wait to grab a
+                            // URI from the pool until next time around.
+                            if (transport == null) {
+                                uri = iter.next();
+                                transport = TransportFactory.compositeConnect(uri);
+                            }
+
                             if (LOG.isDebugEnabled()) {
                                 LOG.debug("Attempting connect to: " + uri);
                             }
-                            SslContext.setCurrentSslContext(brokerSslContext);
-                            t = TransportFactory.compositeConnect(uri);
-                            t.setTransportListener(myTransportListener);
-                            t.start();
+                            transport.setTransportListener(myTransportListener);
+                            transport.start();
 
                             if (started) {
-                                restoreTransport(t);
+                                restoreTransport(transport);
                             }
 
                             LOG.debug("Connection established");
                             reconnectDelay = initialReconnectDelay;
                             connectedTransportURI = uri;
-                            connectedTransport.set(t);
+                            connectedTransport.set(transport);
                             reconnectMutex.notifyAll();
                             connectFailures = 0;
-                            // Make sure on initial startup, that the
-                            // transportListener
+
+                            // Make sure on initial startup, that the transportListener
                             // has been initialized for this instance.
                             synchronized (listenerMutex) {
                                 if (transportListener == null) {
                                     try {
-                                        // if it isn't set after 2secs - it
-                                        // probably never will be
+                                        // if it isn't set after 2secs - it probably never will be
                                         listenerMutex.wait(2000);
                                     } catch (InterruptedException ex) {
                                     }
                                 }
                             }
+
                             if (transportListener != null) {
                                 transportListener.transportResumed();
                             } else {
                                 LOG.debug("transport resumed by transport listener not set");
                             }
+
                             if (firstConnection) {
                                 firstConnection = false;
                                 LOG.info("Successfully connected to " + uri);
                             } else {
                                 LOG.info("Successfully reconnected to " + uri);
                             }
+
                             connected = true;
                             return false;
                         } catch (Exception e) {
                             failure = e;
-                            LOG.debug("Connect fail to: " + uri + ", reason: " + e);
-                            if (t != null) {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("Connect fail to: " + uri + ", reason: " + e);
+                            }
+                            if (transport != null) {
                                 try {
-                                    t.stop();
+                                    transport.stop();
+                                    transport = null;
                                 } catch (Exception ee) {
-                                    LOG.debug("Stop of failed transport: " + t + " failed with reason: " + ee);
+                                    if (LOG.isDebugEnabled()) {
+                                        LOG.debug("Stop of failed transport: " + transport +
+                                                  " failed with reason: " + ee);
+                                    }
                                 }
                             }
                         } finally {
@@ -919,21 +933,24 @@ public class FailoverTransport implement
                     }
                 }
             }
+
             int reconnectAttempts = 0;
             if (firstConnection) {
                 if (this.startupMaxReconnectAttempts != 0) {
                     reconnectAttempts = this.startupMaxReconnectAttempts;
                 }
             }
+
             if (reconnectAttempts == 0) {
                 reconnectAttempts = this.maxReconnectAttempts;
             }
+
             if (reconnectAttempts > 0 && ++connectFailures >= reconnectAttempts) {
                 LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
                 connectionFailure = failure;
 
-                // Make sure on initial startup, that the transportListener has
-                // been initialized for this instance.
+                // Make sure on initial startup, that the transportListener has been
+                // initialized for this instance.
                 synchronized (listenerMutex) {
                     if (transportListener == null) {
                         try {
@@ -1122,7 +1139,6 @@ public class FailoverTransport implement
     }
 
     private boolean contains(URI newURI) {
-
         boolean result = false;
         try {
             for (URI uri : uris) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java?rev=1177437&r1=1177436&r2=1177437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java Thu Sep 29 21:42:29 2011
@@ -32,7 +32,6 @@ import org.apache.activemq.broker.Broker
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.network.NetworkConnector;
 
-
 public class FailoverClusterTest extends TestCase {
 
     private static final int NUMBER = 10;
@@ -45,7 +44,6 @@ public class FailoverClusterTest extends
 
     private final List<ActiveMQConnection> connections = new ArrayList<ActiveMQConnection>();
 
-
     public void testClusterConnectedAfterClients() throws Exception {
         createClients();
         if (brokerB == null) {
@@ -73,7 +71,6 @@ public class FailoverClusterTest extends
         assertTrue(set.size() > 1);
     }
 
-
     public void testClusterConnectedBeforeClients() throws Exception {
 
         if (brokerB == null) {
@@ -151,6 +148,7 @@ public class FailoverClusterTest extends
         answer.setUseShutdownHook(false);
     }
 
+    @SuppressWarnings("unused")
     protected void createClients() throws Exception {
         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUrl);
         for (int i = 0; i < NUMBER; i++) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java?rev=1177437&r1=1177436&r2=1177437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java Thu Sep 29 21:42:29 2011
@@ -100,6 +100,7 @@ public class FailoverConsumerOutstanding
         doTestFailoverConsumerDups(true);
     }
 
+    @SuppressWarnings("unchecked")
     public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception {
 
         broker = createBroker(true);
@@ -138,7 +139,6 @@ public class FailoverConsumerOutstanding
 
         final Session consumerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
 
-
         final CountDownLatch commitDoneLatch = new CountDownLatch(1);
         final CountDownLatch messagesReceived = new CountDownLatch(2);
 
@@ -196,6 +196,7 @@ public class FailoverConsumerOutstanding
         doTestFailoverConsumerOutstandingSendTx(true);
     }
 
+    @SuppressWarnings("unchecked")
     public void doTestFailoverConsumerOutstandingSendTx(final boolean doActualBrokerCommit) throws Exception {
         final boolean watchTopicAdvisories = true;
         broker = createBroker(true);
@@ -240,7 +241,6 @@ public class FailoverConsumerOutstanding
         final Queue signalDestination = producerSession.createQueue(QUEUE_NAME + ".signal"
                 + "?consumer.prefetchSize=" + prefetch);
 
-
         final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
 
         final CountDownLatch commitDoneLatch = new CountDownLatch(1);
@@ -295,7 +295,6 @@ public class FailoverConsumerOutstanding
         assertTrue("another message was received", messagesReceived.await(20, TimeUnit.SECONDS));
         assertEquals("get message 1 eventually", MESSAGE_TEXT + "1", receivedMessages.get(2).getText());
 
-
         connection.close();
     }
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java?rev=1177437&r1=1177436&r2=1177437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java Thu Sep 29 21:42:29 2011
@@ -95,6 +95,7 @@ public class FailoverConsumerUnconsumedT
         doTestFailoverConsumerDups(false);
     }
 
+    @SuppressWarnings("unchecked")
     public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception {
 
         final int maxConsumers = 4;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java?rev=1177437&r1=1177436&r2=1177437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java Thu Sep 29 21:42:29 2011
@@ -79,6 +79,7 @@ public class FailoverPrefetchZeroTest {
         return broker;
     }
 
+    @SuppressWarnings("unchecked")
     @Test
     public void testPrefetchZeroConsumerThroughRestart() throws Exception {
         broker = createBroker(true);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java?rev=1177437&r1=1177436&r2=1177437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java Thu Sep 29 21:42:29 2011
@@ -140,6 +140,7 @@ public class FailoverTransactionTest ext
                 new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.AMQ, PersistenceAdapterChoice.JDBC});
     }
 
+    @SuppressWarnings("unchecked")
     public void testFailoverCommitReplyLost() throws Exception {
 
         broker = createBroker(true);
@@ -234,15 +235,15 @@ public class FailoverTransactionTest ext
         connection.close();
     }
 
-
     public void initCombosForTestFailoverSendReplyLost() {
         addCombinationValues("defaultPersistenceAdapter",
-                new Object[]{PersistenceAdapterChoice.KahaDB,
-                        PersistenceAdapterChoice.JDBC
-                        // not implemented for AMQ store
-                });
+            new Object[]{PersistenceAdapterChoice.KahaDB,
+                    PersistenceAdapterChoice.JDBC
+                    // not implemented for AMQ store
+            });
     }
 
+    @SuppressWarnings("unchecked")
     public void testFailoverSendReplyLost() throws Exception {
 
         broker = createBroker(true);
@@ -341,15 +342,15 @@ public class FailoverTransactionTest ext
         connection.close();
     }
 
-
     public void initCombosForTestFailoverConnectionSendReplyLost() {
         addCombinationValues("defaultPersistenceAdapter",
-                new Object[]{PersistenceAdapterChoice.KahaDB,
-                        PersistenceAdapterChoice.JDBC
-                        // last producer message id store feature not implemented for AMQ store
-                });
+            new Object[]{PersistenceAdapterChoice.KahaDB,
+                    PersistenceAdapterChoice.JDBC
+                    // last producer message id store feature not implemented for AMQ store
+            });
     }
 
+    @SuppressWarnings("unchecked")
     public void testFailoverConnectionSendReplyLost() throws Exception {
 
         broker = createBroker(true);
@@ -579,6 +580,7 @@ public class FailoverTransactionTest ext
         }
     }
 
+    @SuppressWarnings("unchecked")
     public void doTestFailoverConsumerAckLost(final int pauseSeconds) throws Exception {
         broker = createBroker(true);
         setDefaultPersistenceAdapter(broker);
@@ -681,7 +683,6 @@ public class FailoverTransactionTest ext
             }
         });
 
-
         // will be stopped by the plugin
         broker.waitUntilStopped();
         broker = createBroker(false, url);
@@ -776,7 +777,6 @@ public class FailoverTransactionTest ext
         connection.close();
     }
 
-
     public void testWaitForMissingRedeliveries() throws Exception {
         LOG.info("testWaitForMissingRedeliveries()");
         broker = createBroker(true);
@@ -825,7 +825,6 @@ public class FailoverTransactionTest ext
         connection.close();
     }
 
-
     public void testPoisonOnDeliveryWhilePending() throws Exception {
         LOG.info("testPoisonOnDeliveryWhilePending()");
         broker = createBroker(true);

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java?rev=1177437&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java Thu Sep 29 21:42:29 2011
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.failover;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FailoverTransportBackupsTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FailoverTransportBackupsTest.class);
+
+    protected Transport transport;
+    protected FailoverTransport failoverTransport;
+    private int commandsReceived;
+    private int exceptionReceived;
+    private int transportInterruptions;
+    private int transportResumptions;
+
+    BrokerService broker1;
+    BrokerService broker2;
+    BrokerService broker3;
+
+    @Before
+    public void setUp() throws Exception {
+        broker1 = createBroker("1");
+        broker2 = createBroker("2");
+        broker3 = createBroker("3");
+
+        broker1.start();
+        broker2.start();
+        broker3.start();
+
+        broker1.waitUntilStarted();
+        broker2.waitUntilStarted();
+        broker3.waitUntilStarted();
+
+        // Reset stats
+        commandsReceived = 0;
+        exceptionReceived = 0;
+        transportInterruptions = 0;
+        transportResumptions = 0;
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (transport != null) {
+            transport.stop();
+        }
+
+        broker1.stop();
+        broker1.waitUntilStopped();
+        broker2.stop();
+        broker2.waitUntilStopped();
+        broker3.stop();
+        broker3.waitUntilStopped();
+    }
+
+    @Test
+    public void testBackupsAreCreated() throws Exception {
+        this.transport = createTransport(2);
+        assertNotNull(failoverTransport);
+        assertTrue(failoverTransport.isBackup());
+        assertEquals(2, failoverTransport.getBackupPoolSize());
+
+        assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){
+            public boolean isSatisified() throws Exception {
+                LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups());
+                return failoverTransport.getCurrentBackups() == 2;
+            }
+        }));
+    }
+
+    @Test
+    public void testFailoverToBackups() throws Exception {
+        this.transport = createTransport(2);
+        assertNotNull(failoverTransport);
+        assertTrue(failoverTransport.isBackup());
+        assertEquals(2, failoverTransport.getBackupPoolSize());
+
+        assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){
+            public boolean isSatisified() throws Exception {
+                LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups());
+                return failoverTransport.getCurrentBackups() == 2;
+            }
+        }));
+
+        broker1.stop();
+
+        assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){
+            public boolean isSatisified() throws Exception {
+                LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups());
+                return failoverTransport.getCurrentBackups() == 1;
+            }
+        }));
+
+        assertTrue("Incorrect number of Transport interruptions", transportInterruptions >= 1);
+        assertTrue("Incorrect number of Transport resumptions", transportResumptions >= 1);
+
+        broker2.stop();
+
+        assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){
+            public boolean isSatisified() throws Exception {
+                LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups());
+                return failoverTransport.getCurrentBackups() == 0;
+            }
+        }));
+
+        assertTrue("Incorrect number of Transport interruptions", transportInterruptions >= 2);
+        assertTrue("Incorrect number of Transport resumptions", transportResumptions >= 2);
+    }
+
+    @Test
+    public void testBackupsRefilled() throws Exception {
+        this.transport = createTransport(1);
+        assertNotNull(failoverTransport);
+        assertTrue(failoverTransport.isBackup());
+        assertEquals(1, failoverTransport.getBackupPoolSize());
+
+        assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){
+            public boolean isSatisified() throws Exception {
+                LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups());
+                return failoverTransport.getCurrentBackups() == 1;
+            }
+        }));
+
+        broker1.stop();
+
+        assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){
+            public boolean isSatisified() throws Exception {
+                LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups());
+                return failoverTransport.getCurrentBackups() == 1;
+            }
+        }));
+
+        broker2.stop();
+
+        assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){
+            public boolean isSatisified() throws Exception {
+                LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups());
+                return failoverTransport.getCurrentBackups() == 0;
+            }
+        }));
+    }
+
+    private BrokerService createBroker(String name) throws Exception {
+        BrokerService bs = new BrokerService();
+        bs.setBrokerName(name);
+        bs.setUseJmx(false);
+        bs.setPersistent(false);
+        bs.addConnector("tcp://localhost:0");
+        return bs;
+    }
+
+    protected Transport createTransport(int backups) throws Exception {
+        String connectionUri = "failover://("+
+                               broker1.getTransportConnectors().get(0).getPublishableConnectString() + "," +
+                               broker2.getTransportConnectors().get(0).getPublishableConnectString() + "," +
+                               broker3.getTransportConnectors().get(0).getPublishableConnectString() + ")";
+
+        if (backups > 0) {
+            connectionUri += "?randomize=false&backup=true&backupPoolSize=" + backups;
+        }
+
+        Transport transport = TransportFactory.connect(new URI(connectionUri));
+        transport.setTransportListener(new TransportListener() {
+
+            public void onCommand(Object command) {
+                LOG.debug("Test Transport Listener received Command: " + command);
+                commandsReceived++;
+            }
+
+            public void onException(IOException error) {
+                LOG.debug("Test Transport Listener received Exception: " + error);
+                exceptionReceived++;
+            }
+
+            public void transportInterupted() {
+                transportInterruptions++;
+                LOG.debug("Test Transport Listener records transport Interrupted: " + transportInterruptions);
+            }
+
+            public void transportResumed() {
+                transportResumptions++;
+                LOG.debug("Test Transport Listener records transport Resumed: " + transportResumptions);
+            }
+        });
+        transport.start();
+
+        this.failoverTransport = transport.narrow(FailoverTransport.class);
+
+        return transport;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java?rev=1177437&r1=1177436&r2=1177437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java Thu Sep 29 21:42:29 2011
@@ -37,7 +37,6 @@ import org.apache.activemq.network.Netwo
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportListener;
-import org.apache.activemq.transport.multicast.MulticastTransportTest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -143,6 +142,7 @@ public class FailoverTransportBrokerTest
                 //To change body of implemented methods use File | Settings | File Templates.
             }
         };
+        @SuppressWarnings("unused")
         StubConnection c = createFailoverConnection(listener);
         int count = 0;
         while(count++ < 20 && info[0] == null) {
@@ -160,6 +160,7 @@ public class FailoverTransportBrokerTest
         return "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true";
     }
 
+    @SuppressWarnings("unchecked")
     protected StubConnection createFailoverConnection(TransportListener listener) throws Exception {
         URI failoverURI = new URI("failover://" + connector.getServer().getConnectURI() + "," + remoteConnector.getServer().getConnectURI() + "");
         Transport transport = TransportFactory.connect(failoverURI);