You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/09/29 23:42:29 UTC
svn commit: r1177437 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/transport/failover/
test/java/org/apache/activemq/transport/failover/
Author: tabish
Date: Thu Sep 29 21:42:29 2011
New Revision: 1177437
URL: http://svn.apache.org/viewvc?rev=1177437&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3517
Adds tests for the backup option and cleans up some other the other tests.
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=1177437&r1=1177436&r2=1177437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Thu Sep 29 21:42:29 2011
@@ -431,6 +431,10 @@ public class FailoverTransport implement
this.backupPoolSize = backupPoolSize;
}
+ public int getCurrentBackups() {
+ return this.backups.size();
+ }
+
public boolean isTrackMessages() {
return trackMessages;
}
@@ -470,11 +474,11 @@ public class FailoverTransport implement
} else if (command instanceof RemoveInfo || command.isMessageAck()) {
// Simulate response to RemoveInfo command or MessageAck (as it will be stale)
stateTracker.track(command);
- if (command.isResponseRequired()) {
- Response response = new Response();
- response.setCorrelationId(command.getCommandId());
- myTransportListener.onCommand(response);
- }
+ if (command.isResponseRequired()) {
+ Response response = new Response();
+ response.setCorrelationId(command.getCommandId());
+ myTransportListener.onCommand(response);
+ }
return;
}
}
@@ -489,18 +493,24 @@ public class FailoverTransport implement
boolean timedout = false;
while (transport == null && !disposed && connectionFailure == null
&& !Thread.currentThread().isInterrupted()) {
- LOG.trace("Waiting for transport to reconnect..: " + command);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Waiting for transport to reconnect..: " + command);
+ }
long end = System.currentTimeMillis();
if (timeout > 0 && (end - start > timeout)) {
timedout = true;
- LOG.info("Failover timed out after " + (end - start) + "ms");
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Failover timed out after " + (end - start) + "ms");
+ }
break;
}
try {
reconnectMutex.wait(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- LOG.debug("Interupted: " + e, e);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Interupted: " + e, e);
+ }
}
transport = connectedTransport.get();
}
@@ -572,6 +582,7 @@ public class FailoverTransport implement
Thread.currentThread().interrupt();
throw new InterruptedIOException();
}
+
if (!disposed) {
if (error != null) {
if (error instanceof IOException) {
@@ -738,7 +749,6 @@ public class FailoverTransport implement
}
private void doUpdateURIsFromDisk() {
-
// If updateURIsURL is specified, read the file and add any new
// transport URI's to this FailOverTransport.
// Note: Could track file timestamp to avoid unnecessary reading.
@@ -814,35 +824,26 @@ public class FailoverTransport implement
}
doRebalance = false;
}
+
if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) {
reconnectDelay = initialReconnectDelay;
}
+
+ Transport transport = null;
+ URI uri = null;
+
+ // If we have a backup already waiting lets try it.
synchronized (backupMutex) {
if (backup && !backups.isEmpty()) {
BackupTransport bt = backups.remove(0);
- Transport t = bt.getTransport();
- URI uri = bt.getUri();
- t.setTransportListener(myTransportListener);
- try {
- if (started) {
- restoreTransport(t);
- }
- reconnectDelay = initialReconnectDelay;
- failedConnectTransportURI = null;
- connectedTransportURI = uri;
- connectedTransport.set(t);
- reconnectMutex.notifyAll();
- connectFailures = 0;
- LOG.info("Successfully reconnected to backup " + uri);
- return false;
- } catch (Exception e) {
- LOG.debug("Backup transport failed", e);
- }
+ transport = bt.getTransport();
+ uri = bt.getUri();
}
}
- // Sleep for the reconnectDelay
- if (!firstConnection && (reconnectDelay > 0) && !disposed) {
+ // Sleep for the reconnectDelay if there's no backup and we aren't trying
+ // for the first time, or we were disposed for some reason.
+ if (transport == null && !firstConnection && (reconnectDelay > 0) && !disposed) {
synchronized (sleepMutex) {
LOG.debug("Waiting " + reconnectDelay + " ms before attempting connection. ");
try {
@@ -854,63 +855,76 @@ public class FailoverTransport implement
}
Iterator<URI> iter = connectList.iterator();
- while (iter.hasNext() && connectedTransport.get() == null && !disposed) {
+ while ((transport != null || iter.hasNext()) && (connectedTransport.get() == null && !disposed)) {
- URI uri = iter.next();
- Transport t = null;
try {
+ SslContext.setCurrentSslContext(brokerSslContext);
+
+ // We could be starting with a backup and if so we wait to grab a
+ // URI from the pool until next time around.
+ if (transport == null) {
+ uri = iter.next();
+ transport = TransportFactory.compositeConnect(uri);
+ }
+
if (LOG.isDebugEnabled()) {
LOG.debug("Attempting connect to: " + uri);
}
- SslContext.setCurrentSslContext(brokerSslContext);
- t = TransportFactory.compositeConnect(uri);
- t.setTransportListener(myTransportListener);
- t.start();
+ transport.setTransportListener(myTransportListener);
+ transport.start();
if (started) {
- restoreTransport(t);
+ restoreTransport(transport);
}
LOG.debug("Connection established");
reconnectDelay = initialReconnectDelay;
connectedTransportURI = uri;
- connectedTransport.set(t);
+ connectedTransport.set(transport);
reconnectMutex.notifyAll();
connectFailures = 0;
- // Make sure on initial startup, that the
- // transportListener
+
+ // Make sure on initial startup, that the transportListener
// has been initialized for this instance.
synchronized (listenerMutex) {
if (transportListener == null) {
try {
- // if it isn't set after 2secs - it
- // probably never will be
+ // if it isn't set after 2secs - it probably never will be
listenerMutex.wait(2000);
} catch (InterruptedException ex) {
}
}
}
+
if (transportListener != null) {
transportListener.transportResumed();
} else {
LOG.debug("transport resumed by transport listener not set");
}
+
if (firstConnection) {
firstConnection = false;
LOG.info("Successfully connected to " + uri);
} else {
LOG.info("Successfully reconnected to " + uri);
}
+
connected = true;
return false;
} catch (Exception e) {
failure = e;
- LOG.debug("Connect fail to: " + uri + ", reason: " + e);
- if (t != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connect fail to: " + uri + ", reason: " + e);
+ }
+ if (transport != null) {
try {
- t.stop();
+ transport.stop();
+ transport = null;
} catch (Exception ee) {
- LOG.debug("Stop of failed transport: " + t + " failed with reason: " + ee);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Stop of failed transport: " + transport +
+ " failed with reason: " + ee);
+ }
}
}
} finally {
@@ -919,21 +933,24 @@ public class FailoverTransport implement
}
}
}
+
int reconnectAttempts = 0;
if (firstConnection) {
if (this.startupMaxReconnectAttempts != 0) {
reconnectAttempts = this.startupMaxReconnectAttempts;
}
}
+
if (reconnectAttempts == 0) {
reconnectAttempts = this.maxReconnectAttempts;
}
+
if (reconnectAttempts > 0 && ++connectFailures >= reconnectAttempts) {
LOG.error("Failed to connect to transport after: " + connectFailures + " attempt(s)");
connectionFailure = failure;
- // Make sure on initial startup, that the transportListener has
- // been initialized for this instance.
+ // Make sure on initial startup, that the transportListener has been
+ // initialized for this instance.
synchronized (listenerMutex) {
if (transportListener == null) {
try {
@@ -1122,7 +1139,6 @@ public class FailoverTransport implement
}
private boolean contains(URI newURI) {
-
boolean result = false;
try {
for (URI uri : uris) {
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java?rev=1177437&r1=1177436&r2=1177437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java Thu Sep 29 21:42:29 2011
@@ -32,7 +32,6 @@ import org.apache.activemq.broker.Broker
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.network.NetworkConnector;
-
public class FailoverClusterTest extends TestCase {
private static final int NUMBER = 10;
@@ -45,7 +44,6 @@ public class FailoverClusterTest extends
private final List<ActiveMQConnection> connections = new ArrayList<ActiveMQConnection>();
-
public void testClusterConnectedAfterClients() throws Exception {
createClients();
if (brokerB == null) {
@@ -73,7 +71,6 @@ public class FailoverClusterTest extends
assertTrue(set.size() > 1);
}
-
public void testClusterConnectedBeforeClients() throws Exception {
if (brokerB == null) {
@@ -151,6 +148,7 @@ public class FailoverClusterTest extends
answer.setUseShutdownHook(false);
}
+ @SuppressWarnings("unused")
protected void createClients() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUrl);
for (int i = 0; i < NUMBER; i++) {
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java?rev=1177437&r1=1177436&r2=1177437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java Thu Sep 29 21:42:29 2011
@@ -100,6 +100,7 @@ public class FailoverConsumerOutstanding
doTestFailoverConsumerDups(true);
}
+ @SuppressWarnings("unchecked")
public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception {
broker = createBroker(true);
@@ -138,7 +139,6 @@ public class FailoverConsumerOutstanding
final Session consumerSession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-
final CountDownLatch commitDoneLatch = new CountDownLatch(1);
final CountDownLatch messagesReceived = new CountDownLatch(2);
@@ -196,6 +196,7 @@ public class FailoverConsumerOutstanding
doTestFailoverConsumerOutstandingSendTx(true);
}
+ @SuppressWarnings("unchecked")
public void doTestFailoverConsumerOutstandingSendTx(final boolean doActualBrokerCommit) throws Exception {
final boolean watchTopicAdvisories = true;
broker = createBroker(true);
@@ -240,7 +241,6 @@ public class FailoverConsumerOutstanding
final Queue signalDestination = producerSession.createQueue(QUEUE_NAME + ".signal"
+ "?consumer.prefetchSize=" + prefetch);
-
final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
final CountDownLatch commitDoneLatch = new CountDownLatch(1);
@@ -295,7 +295,6 @@ public class FailoverConsumerOutstanding
assertTrue("another message was received", messagesReceived.await(20, TimeUnit.SECONDS));
assertEquals("get message 1 eventually", MESSAGE_TEXT + "1", receivedMessages.get(2).getText());
-
connection.close();
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java?rev=1177437&r1=1177436&r2=1177437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java Thu Sep 29 21:42:29 2011
@@ -95,6 +95,7 @@ public class FailoverConsumerUnconsumedT
doTestFailoverConsumerDups(false);
}
+ @SuppressWarnings("unchecked")
public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception {
final int maxConsumers = 4;
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java?rev=1177437&r1=1177436&r2=1177437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java Thu Sep 29 21:42:29 2011
@@ -79,6 +79,7 @@ public class FailoverPrefetchZeroTest {
return broker;
}
+ @SuppressWarnings("unchecked")
@Test
public void testPrefetchZeroConsumerThroughRestart() throws Exception {
broker = createBroker(true);
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java?rev=1177437&r1=1177436&r2=1177437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java Thu Sep 29 21:42:29 2011
@@ -140,6 +140,7 @@ public class FailoverTransactionTest ext
new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.AMQ, PersistenceAdapterChoice.JDBC});
}
+ @SuppressWarnings("unchecked")
public void testFailoverCommitReplyLost() throws Exception {
broker = createBroker(true);
@@ -234,15 +235,15 @@ public class FailoverTransactionTest ext
connection.close();
}
-
public void initCombosForTestFailoverSendReplyLost() {
addCombinationValues("defaultPersistenceAdapter",
- new Object[]{PersistenceAdapterChoice.KahaDB,
- PersistenceAdapterChoice.JDBC
- // not implemented for AMQ store
- });
+ new Object[]{PersistenceAdapterChoice.KahaDB,
+ PersistenceAdapterChoice.JDBC
+ // not implemented for AMQ store
+ });
}
+ @SuppressWarnings("unchecked")
public void testFailoverSendReplyLost() throws Exception {
broker = createBroker(true);
@@ -341,15 +342,15 @@ public class FailoverTransactionTest ext
connection.close();
}
-
public void initCombosForTestFailoverConnectionSendReplyLost() {
addCombinationValues("defaultPersistenceAdapter",
- new Object[]{PersistenceAdapterChoice.KahaDB,
- PersistenceAdapterChoice.JDBC
- // last producer message id store feature not implemented for AMQ store
- });
+ new Object[]{PersistenceAdapterChoice.KahaDB,
+ PersistenceAdapterChoice.JDBC
+ // last producer message id store feature not implemented for AMQ store
+ });
}
+ @SuppressWarnings("unchecked")
public void testFailoverConnectionSendReplyLost() throws Exception {
broker = createBroker(true);
@@ -579,6 +580,7 @@ public class FailoverTransactionTest ext
}
}
+ @SuppressWarnings("unchecked")
public void doTestFailoverConsumerAckLost(final int pauseSeconds) throws Exception {
broker = createBroker(true);
setDefaultPersistenceAdapter(broker);
@@ -681,7 +683,6 @@ public class FailoverTransactionTest ext
}
});
-
// will be stopped by the plugin
broker.waitUntilStopped();
broker = createBroker(false, url);
@@ -776,7 +777,6 @@ public class FailoverTransactionTest ext
connection.close();
}
-
public void testWaitForMissingRedeliveries() throws Exception {
LOG.info("testWaitForMissingRedeliveries()");
broker = createBroker(true);
@@ -825,7 +825,6 @@ public class FailoverTransactionTest ext
connection.close();
}
-
public void testPoisonOnDeliveryWhilePending() throws Exception {
LOG.info("testPoisonOnDeliveryWhilePending()");
broker = createBroker(true);
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java?rev=1177437&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java Thu Sep 29 21:42:29 2011
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.failover;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FailoverTransportBackupsTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FailoverTransportBackupsTest.class);
+
+ protected Transport transport;
+ protected FailoverTransport failoverTransport;
+ private int commandsReceived;
+ private int exceptionReceived;
+ private int transportInterruptions;
+ private int transportResumptions;
+
+ BrokerService broker1;
+ BrokerService broker2;
+ BrokerService broker3;
+
+ @Before
+ public void setUp() throws Exception {
+ broker1 = createBroker("1");
+ broker2 = createBroker("2");
+ broker3 = createBroker("3");
+
+ broker1.start();
+ broker2.start();
+ broker3.start();
+
+ broker1.waitUntilStarted();
+ broker2.waitUntilStarted();
+ broker3.waitUntilStarted();
+
+ // Reset stats
+ commandsReceived = 0;
+ exceptionReceived = 0;
+ transportInterruptions = 0;
+ transportResumptions = 0;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (transport != null) {
+ transport.stop();
+ }
+
+ broker1.stop();
+ broker1.waitUntilStopped();
+ broker2.stop();
+ broker2.waitUntilStopped();
+ broker3.stop();
+ broker3.waitUntilStopped();
+ }
+
+ @Test
+ public void testBackupsAreCreated() throws Exception {
+ this.transport = createTransport(2);
+ assertNotNull(failoverTransport);
+ assertTrue(failoverTransport.isBackup());
+ assertEquals(2, failoverTransport.getBackupPoolSize());
+
+ assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){
+ public boolean isSatisified() throws Exception {
+ LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups());
+ return failoverTransport.getCurrentBackups() == 2;
+ }
+ }));
+ }
+
+ @Test
+ public void testFailoverToBackups() throws Exception {
+ this.transport = createTransport(2);
+ assertNotNull(failoverTransport);
+ assertTrue(failoverTransport.isBackup());
+ assertEquals(2, failoverTransport.getBackupPoolSize());
+
+ assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){
+ public boolean isSatisified() throws Exception {
+ LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups());
+ return failoverTransport.getCurrentBackups() == 2;
+ }
+ }));
+
+ broker1.stop();
+
+ assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){
+ public boolean isSatisified() throws Exception {
+ LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups());
+ return failoverTransport.getCurrentBackups() == 1;
+ }
+ }));
+
+ assertTrue("Incorrect number of Transport interruptions", transportInterruptions >= 1);
+ assertTrue("Incorrect number of Transport resumptions", transportResumptions >= 1);
+
+ broker2.stop();
+
+ assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){
+ public boolean isSatisified() throws Exception {
+ LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups());
+ return failoverTransport.getCurrentBackups() == 0;
+ }
+ }));
+
+ assertTrue("Incorrect number of Transport interruptions", transportInterruptions >= 2);
+ assertTrue("Incorrect number of Transport resumptions", transportResumptions >= 2);
+ }
+
+ @Test
+ public void testBackupsRefilled() throws Exception {
+ this.transport = createTransport(1);
+ assertNotNull(failoverTransport);
+ assertTrue(failoverTransport.isBackup());
+ assertEquals(1, failoverTransport.getBackupPoolSize());
+
+ assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){
+ public boolean isSatisified() throws Exception {
+ LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups());
+ return failoverTransport.getCurrentBackups() == 1;
+ }
+ }));
+
+ broker1.stop();
+
+ assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){
+ public boolean isSatisified() throws Exception {
+ LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups());
+ return failoverTransport.getCurrentBackups() == 1;
+ }
+ }));
+
+ broker2.stop();
+
+ assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){
+ public boolean isSatisified() throws Exception {
+ LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups());
+ return failoverTransport.getCurrentBackups() == 0;
+ }
+ }));
+ }
+
+ private BrokerService createBroker(String name) throws Exception {
+ BrokerService bs = new BrokerService();
+ bs.setBrokerName(name);
+ bs.setUseJmx(false);
+ bs.setPersistent(false);
+ bs.addConnector("tcp://localhost:0");
+ return bs;
+ }
+
+ protected Transport createTransport(int backups) throws Exception {
+ String connectionUri = "failover://("+
+ broker1.getTransportConnectors().get(0).getPublishableConnectString() + "," +
+ broker2.getTransportConnectors().get(0).getPublishableConnectString() + "," +
+ broker3.getTransportConnectors().get(0).getPublishableConnectString() + ")";
+
+ if (backups > 0) {
+ connectionUri += "?randomize=false&backup=true&backupPoolSize=" + backups;
+ }
+
+ Transport transport = TransportFactory.connect(new URI(connectionUri));
+ transport.setTransportListener(new TransportListener() {
+
+ public void onCommand(Object command) {
+ LOG.debug("Test Transport Listener received Command: " + command);
+ commandsReceived++;
+ }
+
+ public void onException(IOException error) {
+ LOG.debug("Test Transport Listener received Exception: " + error);
+ exceptionReceived++;
+ }
+
+ public void transportInterupted() {
+ transportInterruptions++;
+ LOG.debug("Test Transport Listener records transport Interrupted: " + transportInterruptions);
+ }
+
+ public void transportResumed() {
+ transportResumptions++;
+ LOG.debug("Test Transport Listener records transport Resumed: " + transportResumptions);
+ }
+ });
+ transport.start();
+
+ this.failoverTransport = transport.narrow(FailoverTransport.class);
+
+ return transport;
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java?rev=1177437&r1=1177436&r2=1177437&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java Thu Sep 29 21:42:29 2011
@@ -37,7 +37,6 @@ import org.apache.activemq.network.Netwo
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener;
-import org.apache.activemq.transport.multicast.MulticastTransportTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -143,6 +142,7 @@ public class FailoverTransportBrokerTest
//To change body of implemented methods use File | Settings | File Templates.
}
};
+ @SuppressWarnings("unused")
StubConnection c = createFailoverConnection(listener);
int count = 0;
while(count++ < 20 && info[0] == null) {
@@ -160,6 +160,7 @@ public class FailoverTransportBrokerTest
return "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true";
}
+ @SuppressWarnings("unchecked")
protected StubConnection createFailoverConnection(TransportListener listener) throws Exception {
URI failoverURI = new URI("failover://" + connector.getServer().getConnectURI() + "," + remoteConnector.getServer().getConnectURI() + "");
Transport transport = TransportFactory.connect(failoverURI);