You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/04/04 18:09:15 UTC
[06/42] activemq-artemis git commit: ARTEMIS-463 Improvement to the
openwire testsuite https://issues.apache.org/jira/browse/ARTEMIS-463
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
index eb5bc61..e704274 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
@@ -16,29 +16,23 @@
*/
package org.apache.activemq.transport.failover;
-import junit.framework.Test;
-
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.AutoFailTestSupport;
-import org.apache.activemq.TestSupport;
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerPluginSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.ConsumerBrokerExchange;
-import org.apache.activemq.broker.ProducerBrokerExchange;
-import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.broker.util.DestinationPathSeparatorBroker;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.SocketProxy;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,28 +63,31 @@ import java.util.concurrent.atomic.AtomicBoolean;
// see https://issues.apache.org/activemq/browse/AMQ-2473
// https://issues.apache.org/activemq/browse/AMQ-2590
-public class FailoverTransactionTest extends TestSupport {
+@RunWith(BMUnitRunner.class)
+public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
private static final Logger LOG = LoggerFactory.getLogger(FailoverTransactionTest.class);
private static final String QUEUE_NAME = "Failover.WithTx";
- private static final String TRANSPORT_URI = "tcp://localhost:0";
- private String url;
- BrokerService broker;
+ private String url = newURI(0);
- public static Test suite() {
- return suite(FailoverTransactionTest.class);
- }
+ private static final AtomicBoolean doByteman = new AtomicBoolean(false);
+ private static CountDownLatch brokerStopLatch;
+
+ private static SocketProxy proxy;
+ private static boolean firstSend;
+ private static int count;
- @Override
+ private static volatile EmbeddedJMS broker;
+
+ @Before
public void setUp() throws Exception {
- super.setMaxTestTime(2 * 60 * 1000); // some boxes can be real slow
- super.setAutoFail(true);
- super.setUp();
+ doByteman.set(false);
+ brokerStopLatch = new CountDownLatch(1);
}
- @Override
+ @After
public void tearDown() throws Exception {
- super.tearDown();
+ doByteman.set(false);
stopBroker();
}
@@ -101,40 +98,21 @@ public class FailoverTransactionTest extends TestSupport {
}
private void startCleanBroker() throws Exception {
- startBroker(true);
+ startBroker();
}
- public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
- broker = createBroker(deleteAllMessagesOnStartup);
+ public void startBroker() throws Exception {
+ broker = createBroker();
broker.start();
}
- public void startBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
- broker = createBroker(deleteAllMessagesOnStartup, bindAddress);
- broker.start();
- }
-
- public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
- return createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI);
- }
-
- public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
- broker = new BrokerService();
- broker.setUseJmx(false);
- broker.setAdvisorySupport(false);
- broker.addConnector(bindAddress);
- broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
-
- url = broker.getTransportConnectors().get(0).getConnectUri().toString();
-
- return broker;
- }
-
public void configureConnectionFactory(ActiveMQConnectionFactory factory) {
// nothing to do
}
+ @Test
public void testFailoverProducerCloseBeforeTransaction() throws Exception {
+ LOG.info(this + " running test testFailoverProducerCloseBeforeTransaction");
startCleanBroker();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
@@ -148,55 +126,31 @@ public class FailoverTransactionTest extends TestSupport {
// restart to force failover and connection state recovery before the commit
broker.stop();
- startBroker(false, url);
+ startBroker();
session.commit();
- assertNotNull("we got the message", consumer.receive(20000));
+ Assert.assertNotNull("we got the message", consumer.receive(20000));
session.commit();
connection.close();
}
- public void initCombosForTestFailoverCommitReplyLost() {
- String osName = System.getProperty("os.name");
- Object[] persistenceAdapters;
- if (!osName.equalsIgnoreCase("AIX") && !osName.equalsIgnoreCase("SunOS")) {
- persistenceAdapters = new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC};
- }
- else {
- persistenceAdapters = new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC};
- }
- addCombinationValues("defaultPersistenceAdapter", persistenceAdapters);
- }
-
- @SuppressWarnings("unchecked")
+ @Test
+ @BMRules(
+ rules = {
+ @BMRule(
+ name = "set no return response and stop the broker",
+ targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor",
+ targetMethod = "processCommitTransactionOnePhase",
+ targetLocation = "EXIT",
+ action = "org.apache.activemq.transport.failover.FailoverTransactionTest.holdResponseAndStopBroker($0)")
+ }
+ )
public void testFailoverCommitReplyLost() throws Exception {
+ LOG.info(this + " running test testFailoverCommitReplyLost");
- broker = createBroker(true);
- setDefaultPersistenceAdapter(broker);
-
- broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
- @Override
- public void commitTransaction(ConnectionContext context,
- TransactionId xid,
- boolean onePhase) throws Exception {
- super.commitTransaction(context, xid, onePhase);
- // so commit will hang as if reply is lost
- context.setDontSendReponse(true);
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
- public void run() {
- LOG.info("Stopping broker post commit...");
- try {
- broker.stop();
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
- }});
- broker.start();
+ broker = createBroker();
+ startBrokerWithDurableQueue();
+ doByteman.set(true);
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
@@ -210,46 +164,43 @@ public class FailoverTransactionTest extends TestSupport {
final CountDownLatch commitDoneLatch = new CountDownLatch(1);
// broker will die on commit reply so this will hang till restart
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
+ new Thread() {
public void run() {
LOG.info("doing async commit...");
try {
session.commit();
}
catch (JMSException e) {
- assertTrue(e instanceof TransactionRolledBackException);
+ Assert.assertTrue(e instanceof TransactionRolledBackException);
LOG.info("got commit exception: ", e);
}
commitDoneLatch.countDown();
LOG.info("done async commit");
}
- });
+ }.start();
// will be stopped by the plugin
- broker.waitUntilStopped();
- broker = createBroker(false, url);
- setDefaultPersistenceAdapter(broker);
+ brokerStopLatch.await(60, TimeUnit.SECONDS);
+ doByteman.set(false);
+ broker = createBroker();
broker.start();
- assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
+ Assert.assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
// new transaction
Message msg = consumer.receive(20000);
LOG.info("Received: " + msg);
- assertNotNull("we got the message", msg);
- assertNull("we got just one message", consumer.receive(2000));
+ Assert.assertNotNull("we got the message", msg);
+ Assert.assertNull("we got just one message", consumer.receive(2000));
session.commit();
consumer.close();
connection.close();
// ensure no dangling messages with fresh broker etc
broker.stop();
- broker.waitUntilStopped();
LOG.info("Checking for remaining/hung messages..");
- broker = createBroker(false, url);
- setDefaultPersistenceAdapter(broker);
+ broker = createBroker();
broker.start();
// after restart, ensure no dangling messages
@@ -264,152 +215,38 @@ public class FailoverTransactionTest extends TestSupport {
msg = consumer.receive(5000);
}
LOG.info("Received: " + msg);
- assertNull("no messges left dangling but got: " + msg, msg);
+ Assert.assertNull("no messges left dangling but got: " + msg, msg);
connection.close();
}
@SuppressWarnings("unchecked")
+ @Test
public void testFailoverCommitReplyLostWithDestinationPathSeparator() throws Exception {
-
- broker = createBroker(true);
- setDefaultPersistenceAdapter(broker);
-
- broker.setPlugins(new BrokerPlugin[]{new DestinationPathSeparatorBroker(), new BrokerPluginSupport() {
- @Override
- public void commitTransaction(ConnectionContext context,
- TransactionId xid,
- boolean onePhase) throws Exception {
- super.commitTransaction(context, xid, onePhase);
- // so commit will hang as if reply is lost
- context.setDontSendReponse(true);
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
- public void run() {
- LOG.info("Stopping broker post commit...");
- try {
- broker.stop();
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
- }});
- broker.start();
-
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
- configureConnectionFactory(cf);
- Connection connection = cf.createConnection();
- connection.start();
- final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- Queue destination = session.createQueue(QUEUE_NAME.replace('.', '/') + "?consumer.prefetchSize=0");
-
- MessageConsumer consumer = session.createConsumer(destination);
- produceMessage(session, destination);
-
- final CountDownLatch commitDoneLatch = new CountDownLatch(1);
- // broker will die on commit reply so this will hang till restart
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
- public void run() {
- LOG.info("doing async commit...");
- try {
- session.commit();
- }
- catch (JMSException e) {
- assertTrue(e instanceof TransactionRolledBackException);
- LOG.info("got commit exception: ", e);
- }
- commitDoneLatch.countDown();
- LOG.info("done async commit");
- }
- });
-
- // will be stopped by the plugin
- broker.waitUntilStopped();
- broker = createBroker(false, url);
- setDefaultPersistenceAdapter(broker);
- broker.setPlugins(new BrokerPlugin[]{new DestinationPathSeparatorBroker()});
- broker.start();
-
- assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
-
- // new transaction
- Message msg = consumer.receive(20000);
- LOG.info("Received: " + msg);
- assertNotNull("we got the message", msg);
- assertNull("we got just one message", consumer.receive(2000));
- session.commit();
- consumer.close();
- connection.close();
-
- // ensure no dangling messages with fresh broker etc
- broker.stop();
- broker.waitUntilStopped();
-
- LOG.info("Checking for remaining/hung messages..");
- broker = createBroker(false, url);
- setDefaultPersistenceAdapter(broker);
- broker.setPlugins(new BrokerPlugin[]{new DestinationPathSeparatorBroker()});
- broker.start();
-
- // after restart, ensure no dangling messages
- cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
- configureConnectionFactory(cf);
- connection = cf.createConnection();
- connection.start();
- Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- consumer = session2.createConsumer(destination);
- msg = consumer.receive(1000);
- if (msg == null) {
- msg = consumer.receive(5000);
- }
- LOG.info("Received: " + msg);
- assertNull("no messges left dangling but got: " + msg, msg);
- connection.close();
-
- ActiveMQDestination[] destinations = broker.getRegionBroker().getDestinations();
- for (ActiveMQDestination dest : destinations) {
- LOG.info("Destinations list: " + dest);
- }
- assertEquals("Only one destination", 1, broker.getRegionBroker().getDestinations().length);
- }
-
- public void initCombosForTestFailoverSendReplyLost() {
- addCombinationValues("defaultPersistenceAdapter", new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC
- // not implemented for AMQ store or PersistenceAdapterChoice.LevelDB
- });
+ //the original test validates destinations using forward slash (/) as
+ //separators instead of dot (.). The broker internally uses a plugin
+ //called DestinationPathSeparatorBroker to convert every occurrence of
+ // "/" into "." inside the server.
+ //Artemis doesn't support "/" so far and this test doesn't make sense therefore.
}
@SuppressWarnings("unchecked")
+ @Test
+ @BMRules(
+ rules = {
+ @BMRule(
+ name = "set no return response and stop the broker",
+ targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor",
+ targetMethod = "processMessage",
+ targetLocation = "EXIT",
+ binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()",
+ action = "org.apache.activemq.transport.failover.FailoverTransactionTest.holdResponseAndStopBroker($0)")
+ }
+ )
public void testFailoverSendReplyLost() throws Exception {
-
- broker = createBroker(true);
- setDefaultPersistenceAdapter(broker);
-
- broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
- @Override
- public void send(ProducerBrokerExchange producerExchange,
- org.apache.activemq.command.Message messageSend) throws Exception {
- // so send will hang as if reply is lost
- super.send(producerExchange, messageSend);
- producerExchange.getConnectionContext().setDontSendReponse(true);
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
- public void run() {
- LOG.info("Stopping broker post send...");
- try {
- broker.stop();
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
- }});
- broker.start();
+ LOG.info(this + " running test testFailoverSendReplyLost");
+ broker = createBroker();
+ startBrokerWithDurableQueue();
+ doByteman.set(true);
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.watchTopicAdvisories=false");
configureConnectionFactory(cf);
@@ -421,8 +258,7 @@ public class FailoverTransactionTest extends TestSupport {
MessageConsumer consumer = session.createConsumer(destination);
final CountDownLatch sendDoneLatch = new CountDownLatch(1);
// broker will die on send reply so this will hang till restart
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
+ new Thread() {
public void run() {
LOG.info("doing async send...");
try {
@@ -431,41 +267,35 @@ public class FailoverTransactionTest extends TestSupport {
catch (JMSException e) {
//assertTrue(e instanceof TransactionRolledBackException);
LOG.error("got send exception: ", e);
- fail("got unexpected send exception" + e);
+ Assert.fail("got unexpected send exception" + e);
}
sendDoneLatch.countDown();
LOG.info("done async send");
}
- });
+ }.start();
// will be stopped by the plugin
- broker.waitUntilStopped();
- broker = createBroker(false, url);
- setDefaultPersistenceAdapter(broker);
+ brokerStopLatch.await(60, TimeUnit.SECONDS);
+ doByteman.set(false);
+ broker = createBroker();
LOG.info("restarting....");
broker.start();
- assertTrue("message sent through failover", sendDoneLatch.await(30, TimeUnit.SECONDS));
+ Assert.assertTrue("message sent through failover", sendDoneLatch.await(30, TimeUnit.SECONDS));
// new transaction
Message msg = consumer.receive(20000);
LOG.info("Received: " + msg);
- assertNotNull("we got the message", msg);
- assertNull("we got just one message", consumer.receive(2000));
+ Assert.assertNotNull("we got the message", msg);
+ Assert.assertNull("we got just one message", consumer.receive(2000));
consumer.close();
connection.close();
- // verify stats
- assertEquals("no newly queued messages", 0, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
- assertEquals("1 dequeue", 1, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount());
-
// ensure no dangling messages with fresh broker etc
broker.stop();
- broker.waitUntilStopped();
LOG.info("Checking for remaining/hung messages with second restart..");
- broker = createBroker(false, url);
- setDefaultPersistenceAdapter(broker);
+ broker = createBroker();
broker.start();
// after restart, ensure no dangling messages
@@ -480,64 +310,32 @@ public class FailoverTransactionTest extends TestSupport {
msg = consumer.receive(5000);
}
LOG.info("Received: " + msg);
- assertNull("no messges left dangling but got: " + msg, msg);
+ Assert.assertNull("no messges left dangling but got: " + msg, msg);
connection.close();
}
- public void initCombosForTestFailoverConnectionSendReplyLost() {
- addCombinationValues("defaultPersistenceAdapter", new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC
- // last producer message id store feature not implemented for AMQ store
- // or PersistenceAdapterChoice.LevelDB
- });
- }
-
@SuppressWarnings("unchecked")
+ @Test
+ @BMRules(
+ rules = {
+ @BMRule(
+ name = "set no return response and stop the broker",
+ targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor",
+ targetMethod = "processMessage",
+ targetLocation = "EXIT",
+ action = "org.apache.activemq.transport.failover.FailoverTransactionTest.holdResponseAndStopProxyOnFirstSend($0)")
+ }
+ )
public void testFailoverConnectionSendReplyLost() throws Exception {
-
- broker = createBroker(true);
- PersistenceAdapter store = setDefaultPersistenceAdapter(broker);
- if (store instanceof KahaDBPersistenceAdapter) {
- // duplicate checker not updated on canceled tasks, even it
- // it was, recovery of the audit would fail as the message is
- // not recorded in the store and the audit may not be up to date.
- // So if duplicate messages are an absolute no no after restarts,
- // ConcurrentStoreAndDispatchQueues must be disabled
- ((KahaDBPersistenceAdapter) store).setConcurrentStoreAndDispatchQueues(false);
- }
-
- final SocketProxy proxy = new SocketProxy();
-
- broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
- private boolean firstSend = true;
-
- @Override
- public void send(ProducerBrokerExchange producerExchange,
- org.apache.activemq.command.Message messageSend) throws Exception {
- // so send will hang as if reply is lost
- super.send(producerExchange, messageSend);
- if (firstSend) {
- firstSend = false;
-
- producerExchange.getConnectionContext().setDontSendReponse(true);
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
- public void run() {
- LOG.info("Stopping connection post send...");
- try {
- proxy.close();
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
- }
- }});
- broker.start();
+ LOG.info(this + " running test testFailoverConnectionSendReplyLost");
+ broker = createBroker();
+ proxy = new SocketProxy();
+ firstSend = true;
+ startBrokerWithDurableQueue();
proxy.setTarget(new URI(url));
proxy.open();
+ doByteman.set(true);
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + proxy.getUrl().toASCIIString() + ")?jms.watchTopicAdvisories=false");
configureConnectionFactory(cf);
@@ -549,8 +347,7 @@ public class FailoverTransactionTest extends TestSupport {
MessageConsumer consumer = session.createConsumer(destination);
final CountDownLatch sendDoneLatch = new CountDownLatch(1);
// proxy connection will die on send reply so this will hang on failover reconnect till open
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
+ new Thread() {
public void run() {
LOG.info("doing async send...");
try {
@@ -563,32 +360,27 @@ public class FailoverTransactionTest extends TestSupport {
sendDoneLatch.countDown();
LOG.info("done async send");
}
- });
+ }.start();
// will be closed by the plugin
- assertTrue("proxy was closed", proxy.waitUntilClosed(30));
+ Assert.assertTrue("proxy was closed", proxy.waitUntilClosed(30));
LOG.info("restarting proxy");
proxy.open();
- assertTrue("message sent through failover", sendDoneLatch.await(30, TimeUnit.SECONDS));
+ Assert.assertTrue("message sent through failover", sendDoneLatch.await(30, TimeUnit.SECONDS));
Message msg = consumer.receive(20000);
LOG.info("Received: " + msg);
- assertNotNull("we got the message", msg);
- assertNull("we got just one message", consumer.receive(2000));
+ Assert.assertNotNull("we got the message", msg);
+ Assert.assertNull("we got just one message", consumer.receive(2000));
consumer.close();
connection.close();
- // verify stats, connection dup suppression means dups don't get to broker
- assertEquals("one queued message", 1, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
-
// ensure no dangling messages with fresh broker etc
broker.stop();
- broker.waitUntilStopped();
LOG.info("Checking for remaining/hung messages with restart..");
- broker = createBroker(false, url);
- setDefaultPersistenceAdapter(broker);
+ broker = createBroker();
broker.start();
// after restart, ensure no dangling messages
@@ -603,11 +395,13 @@ public class FailoverTransactionTest extends TestSupport {
msg = consumer.receive(5000);
}
LOG.info("Received: " + msg);
- assertNull("no messges left dangling but got: " + msg, msg);
+ Assert.assertNull("no messges left dangling but got: " + msg, msg);
connection.close();
}
+ @Test
public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception {
+ LOG.info(this + " running test testFailoverProducerCloseBeforeTransactionFailWhenDisabled");
startCleanBroker();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?trackTransactionProducers=false");
configureConnectionFactory(cf);
@@ -621,17 +415,19 @@ public class FailoverTransactionTest extends TestSupport {
// restart to force failover and connection state recovery before the commit
broker.stop();
- startBroker(false, url);
+ startBroker();
session.commit();
// without tracking producers, message will not be replayed on recovery
- assertNull("we got the message", consumer.receive(5000));
+ Assert.assertNull("we got the message", consumer.receive(5000));
session.commit();
connection.close();
}
+ @Test
public void testFailoverMultipleProducerCloseBeforeTransaction() throws Exception {
+ LOG.info(this + " running test testFailoverMultipleProducerCloseBeforeTransaction");
startCleanBroker();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
@@ -653,426 +449,448 @@ public class FailoverTransactionTest extends TestSupport {
// restart to force failover and connection state recovery before the commit
broker.stop();
- startBroker(false, url);
+ startBroker();
session.commit();
for (int i = 0; i < count; i++) {
- assertNotNull("we got all the message: " + count, consumer.receive(20000));
+ Assert.assertNotNull("we got all the message: " + count, consumer.receive(20000));
}
session.commit();
connection.close();
}
// https://issues.apache.org/activemq/browse/AMQ-2772
+ @Test
public void testFailoverWithConnectionConsumer() throws Exception {
+ LOG.info(this + " running test testFailoverWithConnectionConsumer");
startCleanBroker();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
Connection connection = cf.createConnection();
connection.start();
-
- Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- Queue destination = session.createQueue(QUEUE_NAME);
-
final CountDownLatch connectionConsumerGotOne = new CountDownLatch(1);
- final Session poolSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- connection.createConnectionConsumer(destination, null, new ServerSessionPool() {
- @Override
- public ServerSession getServerSession() throws JMSException {
- return new ServerSession() {
- @Override
- public Session getSession() throws JMSException {
- return poolSession;
- }
-
- @Override
- public void start() throws JMSException {
- connectionConsumerGotOne.countDown();
- poolSession.run();
- }
- };
- }
- }, 1);
-
- MessageConsumer consumer = session.createConsumer(destination);
- MessageProducer producer;
- TextMessage message;
- final int count = 10;
- for (int i = 0; i < count; i++) {
- producer = session.createProducer(destination);
- message = session.createTextMessage("Test message: " + count);
- producer.send(message);
- producer.close();
- }
- // restart to force failover and connection state recovery before the commit
- broker.stop();
- startBroker(false, url);
+ try {
+ Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ Queue destination = session.createQueue(QUEUE_NAME);
+
+ final Session poolSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ connection.createConnectionConsumer(destination, null, new ServerSessionPool() {
+ public ServerSession getServerSession() throws JMSException {
+ return new ServerSession() {
+ public Session getSession() throws JMSException {
+ return poolSession;
+ }
- session.commit();
- for (int i = 0; i < count - 1; i++) {
- assertNotNull("Failed to get message: " + count, consumer.receive(20000));
- }
- session.commit();
- connection.close();
+ public void start() throws JMSException {
+ connectionConsumerGotOne.countDown();
+ poolSession.run();
+ }
+ };
+ }
+ }, 1);
+
+ MessageConsumer consumer = session.createConsumer(destination);
+ MessageProducer producer;
+ TextMessage message;
+ final int count = 10;
+ for (int i = 0; i < count; i++) {
+ producer = session.createProducer(destination);
+ message = session.createTextMessage("Test message: " + count);
+ producer.send(message);
+ producer.close();
+ }
- assertTrue("connectionconsumer did not get a message", connectionConsumerGotOne.await(10, TimeUnit.SECONDS));
- }
+ // restart to force failover and connection state recovery before the commit
+ broker.stop();
+ startBroker();
- public void testFailoverConsumerAckLost() throws Exception {
- // as failure depends on hash order of state tracker recovery, do a few times
- for (int i = 0; i < 3; i++) {
- try {
- LOG.info("Iteration: " + i);
- doTestFailoverConsumerAckLost(i);
- }
- finally {
- stopBroker();
+ session.commit();
+ for (int i = 0; i < count - 1; i++) {
+ Message received = consumer.receive(20000);
+ Assert.assertNotNull("Failed to get message: " + count, received);
}
+ session.commit();
+ }
+ finally {
+ connection.close();
}
+
+ Assert.assertTrue("connectionconsumer did not get a message", connectionConsumerGotOne.await(10, TimeUnit.SECONDS));
}
+// @Test
+// @BMRules(
+// rules = {
+// @BMRule(
+// name = "set no return response and stop the broker",
+// targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor",
+// targetMethod = "processMessageAck",
+// targetLocation = "ENTRY",
+// action = "org.apache.activemq.transport.failover.FailoverTransactionTest.holdResponseAndStopBroker($0)")
+// }
+// )
+// public void testFailoverConsumerAckLost() throws Exception {
+// LOG.info(this + " running test testFailoverConsumerAckLost");
+// // as failure depends on hash order of state tracker recovery, do a few times
+// for (int i = 0; i < 3; i++) {
+// try {
+// LOG.info("Iteration: " + i);
+// doTestFailoverConsumerAckLost(i);
+// }
+// finally {
+// stopBroker();
+// }
+// }
+// }
+//
@SuppressWarnings("unchecked")
public void doTestFailoverConsumerAckLost(final int pauseSeconds) throws Exception {
- broker = createBroker(true);
- setDefaultPersistenceAdapter(broker);
-
- broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
-
- // broker is killed on delivered ack as prefetch is 1
- @Override
- public void acknowledge(ConsumerBrokerExchange consumerExchange, final MessageAck ack) throws Exception {
-
- consumerExchange.getConnectionContext().setDontSendReponse(true);
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
- public void run() {
- LOG.info("Stopping broker on ack: " + ack);
- try {
- broker.stop();
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
- }});
+ broker = createBroker();
broker.start();
+ brokerStopLatch = new CountDownLatch(1);
+ doByteman.set(true);
Vector<Connection> connections = new Vector<>();
+ Connection connection = null;
+ Message msg = null;
+ Queue destination = null;
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
- configureConnectionFactory(cf);
- Connection connection = cf.createConnection();
- connection.start();
- connections.add(connection);
- final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
-
- connection = cf.createConnection();
- connection.start();
- connections.add(connection);
- final Session consumerSession1 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-
- connection = cf.createConnection();
- connection.start();
- connections.add(connection);
- final Session consumerSession2 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-
- final MessageConsumer consumer1 = consumerSession1.createConsumer(destination);
- final MessageConsumer consumer2 = consumerSession2.createConsumer(destination);
-
- produceMessage(producerSession, destination);
- produceMessage(producerSession, destination);
-
- final Vector<Message> receivedMessages = new Vector<>();
- final CountDownLatch commitDoneLatch = new CountDownLatch(1);
- final AtomicBoolean gotTransactionRolledBackException = new AtomicBoolean(false);
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
- public void run() {
- LOG.info("doing async commit after consume...");
- try {
- Message msg = consumer1.receive(20000);
- LOG.info("consumer1 first attempt got message: " + msg);
- receivedMessages.add(msg);
-
- // give some variance to the runs
- TimeUnit.SECONDS.sleep(pauseSeconds * 2);
-
- // should not get a second message as there are two messages and two consumers
- // and prefetch=1, but with failover and unordered connection restore it can get the second
- // message.
-
- // For the transaction to complete it needs to get the same one or two messages
- // again so that the acks line up.
- // If redelivery order is different, the commit should fail with an ex
- //
- msg = consumer1.receive(5000);
- LOG.info("consumer1 second attempt got message: " + msg);
- if (msg != null) {
+ try {
+ configureConnectionFactory(cf);
+ connection = cf.createConnection();
+ connection.start();
+ connections.add(connection);
+ final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
+
+ connection = cf.createConnection();
+ connection.start();
+ connections.add(connection);
+ final Session consumerSession1 = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ connection = cf.createConnection();
+ connection.start();
+ connections.add(connection);
+ final Session consumerSession2 = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+ final MessageConsumer consumer1 = consumerSession1.createConsumer(destination);
+ final MessageConsumer consumer2 = consumerSession2.createConsumer(destination);
+
+ produceMessage(producerSession, destination);
+ produceMessage(producerSession, destination);
+
+ final Vector<Message> receivedMessages = new Vector<>();
+ final CountDownLatch commitDoneLatch = new CountDownLatch(1);
+ final AtomicBoolean gotTransactionRolledBackException = new AtomicBoolean(false);
+ Thread t = new Thread("doTestFailoverConsumerAckLost(" + pauseSeconds + ")") {
+ public void run() {
+ LOG.info("doing async commit after consume...");
+ try {
+ Message msg = consumer1.receive(20000);
+ LOG.info("consumer1 first attempt got message: " + msg);
receivedMessages.add(msg);
- }
- LOG.info("committing consumer1 session: " + receivedMessages.size() + " messsage(s)");
- try {
- consumerSession1.commit();
- }
- catch (JMSException expectedSometimes) {
- LOG.info("got exception ex on commit", expectedSometimes);
- if (expectedSometimes instanceof TransactionRolledBackException) {
- gotTransactionRolledBackException.set(true);
- // ok, message one was not replayed so we expect the rollback
+ // give some variance to the runs
+ TimeUnit.SECONDS.sleep(pauseSeconds * 2);
+
+ // should not get a second message as there are two messages and two consumers
+ // and prefetch=1, but with failover and unordered connection restore it can get the second
+ // message.
+
+ // For the transaction to complete it needs to get the same one or two messages
+ // again so that the acks line up.
+ // If redelivery order is different, the commit should fail with an ex
+ //
+ msg = consumer1.receive(5000);
+ LOG.info("consumer1 second attempt got message: " + msg);
+ if (msg != null) {
+ receivedMessages.add(msg);
}
- else {
- throw expectedSometimes;
+
+ LOG.info("committing consumer1 session: " + receivedMessages.size() + " messsage(s)");
+ try {
+ consumerSession1.commit();
}
+ catch (JMSException expectedSometimes) {
+ LOG.info("got exception ex on commit", expectedSometimes);
+ if (expectedSometimes instanceof TransactionRolledBackException) {
+ gotTransactionRolledBackException.set(true);
+ // ok, message one was not replayed so we expect the rollback
+ }
+ else {
+ throw expectedSometimes;
+ }
+ }
+ commitDoneLatch.countDown();
+ LOG.info("done async commit");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
}
- commitDoneLatch.countDown();
- LOG.info("done async commit");
- }
- catch (Exception e) {
- e.printStackTrace();
}
+ };
+ t.start();
+
+ // will be stopped by the plugin
+ brokerStopLatch.await(60, TimeUnit.SECONDS);
+ t.join(30000);
+ if (t.isAlive()) {
+ t.interrupt();
+ Assert.fail("Thread " + t.getName() + " is still alive");
}
- });
-
- // will be stopped by the plugin
- broker.waitUntilStopped();
- broker = createBroker(false, url);
- setDefaultPersistenceAdapter(broker);
- broker.start();
+ broker = createBroker();
+ broker.start();
+ doByteman.set(false);
- assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
+ Assert.assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
- LOG.info("received message count: " + receivedMessages.size());
+ LOG.info("received message count: " + receivedMessages.size());
- // new transaction
- Message msg = consumer1.receive(gotTransactionRolledBackException.get() ? 5000 : 20000);
- LOG.info("post: from consumer1 received: " + msg);
- if (gotTransactionRolledBackException.get()) {
- assertNotNull("should be available again after commit rollback ex", msg);
- }
- else {
- assertNull("should be nothing left for consumer as receive should have committed", msg);
- }
- consumerSession1.commit();
-
- if (gotTransactionRolledBackException.get() || !gotTransactionRolledBackException.get() && receivedMessages.size() == 1) {
- // just one message successfully consumed or none consumed
- // consumer2 should get other message
- msg = consumer2.receive(10000);
- LOG.info("post: from consumer2 received: " + msg);
- assertNotNull("got second message on consumer2", msg);
- consumerSession2.commit();
+ // new transaction
+ msg = consumer1.receive(gotTransactionRolledBackException.get() ? 5000 : 20000);
+ LOG.info("post: from consumer1 received: " + msg);
+ if (gotTransactionRolledBackException.get()) {
+ Assert.assertNotNull("should be available again after commit rollback ex", msg);
+ }
+ else {
+ Assert.assertNull("should be nothing left for consumer as receive should have committed", msg);
+ }
+ consumerSession1.commit();
+
+ if (gotTransactionRolledBackException.get() || !gotTransactionRolledBackException.get() && receivedMessages.size() == 1) {
+ // just one message successfully consumed or none consumed
+ // consumer2 should get other message
+ msg = consumer2.receive(10000);
+ LOG.info("post: from consumer2 received: " + msg);
+ Assert.assertNotNull("got second message on consumer2", msg);
+ consumerSession2.commit();
+ }
}
+ finally {
+ for (Connection c : connections) {
+ c.close();
+ }
- for (Connection c : connections) {
- c.close();
+ // ensure no dangling messages with fresh broker etc
+ if (broker != null) {
+ broker.stop();
+ }
}
- // ensure no dangling messages with fresh broker etc
- broker.stop();
- broker.waitUntilStopped();
-
LOG.info("Checking for remaining/hung messages..");
- broker = createBroker(false, url);
- setDefaultPersistenceAdapter(broker);
+ broker = createBroker();
broker.start();
// after restart, ensure no dangling messages
cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
connection = cf.createConnection();
- connection.start();
- Session sweeperSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer sweeper = sweeperSession.createConsumer(destination);
- msg = sweeper.receive(1000);
- if (msg == null) {
- msg = sweeper.receive(5000);
+ try {
+ connection.start();
+ Session sweeperSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer sweeper = sweeperSession.createConsumer(destination);
+ msg = sweeper.receive(1000);
+ if (msg == null) {
+ msg = sweeper.receive(5000);
+ }
+ LOG.info("Sweep received: " + msg);
+ Assert.assertNull("no messges left dangling but got: " + msg, msg);
+ }
+ finally {
+ connection.close();
+ broker.stop();
}
- LOG.info("Sweep received: " + msg);
- assertNull("no messges left dangling but got: " + msg, msg);
- connection.close();
}
+ @Test
+ @BMRules(
+ rules = {
+ @BMRule(
+ name = "set no return response and stop the broker",
+ targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection$CommandProcessor",
+ targetMethod = "processRemoveConsumer",
+ targetLocation = "ENTRY",
+ action = "org.apache.activemq.transport.failover.FailoverTransactionTest.stopBrokerOnCounter()")
+ }
+ )
public void testPoolingNConsumesAfterReconnect() throws Exception {
- broker = createBroker(true);
- setDefaultPersistenceAdapter(broker);
-
- broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
- int count = 0;
-
- @Override
- public void removeConsumer(ConnectionContext context, final ConsumerInfo info) throws Exception {
- if (count++ == 1) {
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
- public void run() {
- LOG.info("Stopping broker on removeConsumer: " + info);
- try {
- broker.stop();
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
- }
- }});
- broker.start();
-
- Vector<Connection> connections = new Vector<>();
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
- configureConnectionFactory(cf);
- Connection connection = cf.createConnection();
- connection.start();
- Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
-
- produceMessage(producerSession, destination);
- connection.close();
-
- connection = cf.createConnection();
- connection.start();
- connections.add(connection);
- final Session consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ LOG.info(this + " running test testPoolingNConsumesAfterReconnect");
+ count = 0;
+ broker = createBroker();
+ startBrokerWithDurableQueue();
- final int sessionCount = 10;
- final Stack<Session> sessions = new Stack<>();
- for (int i = 0; i < sessionCount; i++) {
- sessions.push(connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
- }
+ doByteman.set(true);
- final int consumerCount = 1000;
- final Deque<MessageConsumer> consumers = new ArrayDeque<>();
- for (int i = 0; i < consumerCount; i++) {
- consumers.push(consumerSession.createConsumer(destination));
- }
+ Vector<Connection> connections = new Vector<>();
final ExecutorService executorService = Executors.newCachedThreadPool();
- final FailoverTransport failoverTransport = ((ActiveMQConnection) connection).getTransport().narrow(FailoverTransport.class);
- final TransportListener delegate = failoverTransport.getTransportListener();
- failoverTransport.setTransportListener(new TransportListener() {
- @Override
- public void onCommand(Object command) {
- delegate.onCommand(command);
+ try {
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+ configureConnectionFactory(cf);
+ Connection connection = cf.createConnection();
+ connection.start();
+ Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
+
+ produceMessage(producerSession, destination);
+ connection.close();
+
+ connection = cf.createConnection();
+ connection.start();
+ connections.add(connection);
+ final Session consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ final int sessionCount = 10;
+ final Stack<Session> sessions = new Stack<>();
+ for (int i = 0; i < sessionCount; i++) {
+ sessions.push(connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
}
- @Override
- public void onException(IOException error) {
- delegate.onException(error);
+ final int consumerCount = 1000;
+ final Deque<MessageConsumer> consumers = new ArrayDeque<>();
+ for (int i = 0; i < consumerCount; i++) {
+ consumers.push(consumerSession.createConsumer(destination));
}
- @Override
- public void transportInterupted() {
+ final FailoverTransport failoverTransport = ((ActiveMQConnection) connection).getTransport().narrow(FailoverTransport.class);
+ final TransportListener delegate = failoverTransport.getTransportListener();
+ failoverTransport.setTransportListener(new TransportListener() {
+ @Override
+ public void onCommand(Object command) {
+ delegate.onCommand(command);
+ }
+
+ @Override
+ public void onException(IOException error) {
+ delegate.onException(error);
+ }
- LOG.error("Transport interrupted: " + failoverTransport, new RuntimeException("HERE"));
- for (int i = 0; i < consumerCount && !consumers.isEmpty(); i++) {
+ @Override
+ public void transportInterupted() {
- executorService.execute(new Runnable() {
- @Override
- public void run() {
- MessageConsumer localConsumer = null;
- try {
- synchronized (delegate) {
- localConsumer = consumers.pop();
- }
- localConsumer.receive(1);
+ LOG.error("Transport interrupted: " + failoverTransport, new RuntimeException("HERE"));
+ for (int i = 0; i < consumerCount && !consumers.isEmpty(); i++) {
- LOG.info("calling close() " + ((ActiveMQMessageConsumer) localConsumer).getConsumerId());
- localConsumer.close();
- }
- catch (NoSuchElementException nse) {
- }
- catch (Exception ignored) {
- LOG.error("Ex on: " + ((ActiveMQMessageConsumer) localConsumer).getConsumerId(), ignored);
+ executorService.execute(new Runnable() {
+ public void run() {
+ MessageConsumer localConsumer = null;
+ try {
+ synchronized (delegate) {
+ localConsumer = consumers.pop();
+ }
+ localConsumer.receive(1);
+
+ LOG.info("calling close() " + ((ActiveMQMessageConsumer) localConsumer).getConsumerId());
+ localConsumer.close();
+ }
+ catch (NoSuchElementException nse) {
+ }
+ catch (Exception ignored) {
+ LOG.error("Ex on: " + ((ActiveMQMessageConsumer) localConsumer).getConsumerId(), ignored);
+ }
}
- }
- });
+ });
+ }
+
+ delegate.transportInterupted();
}
- delegate.transportInterupted();
- }
+ @Override
+ public void transportResumed() {
+ delegate.transportResumed();
+ }
+ });
- @Override
- public void transportResumed() {
- delegate.transportResumed();
+ MessageConsumer consumer = null;
+ synchronized (delegate) {
+ consumer = consumers.pop();
}
- });
+ LOG.info("calling close to trigger broker stop " + ((ActiveMQMessageConsumer) consumer).getConsumerId());
+ consumer.close();
- MessageConsumer consumer = null;
- synchronized (delegate) {
- consumer = consumers.pop();
- }
- LOG.info("calling close to trigger broker stop " + ((ActiveMQMessageConsumer) consumer).getConsumerId());
- consumer.close();
+ LOG.info("waiting latch: " + brokerStopLatch.getCount());
+ // will be stopped by the plugin
+ Assert.assertTrue(brokerStopLatch.await(60, TimeUnit.SECONDS));
- // will be stopped by the plugin
- broker.waitUntilStopped();
- broker = createBroker(false, url);
- setDefaultPersistenceAdapter(broker);
- broker.start();
+ doByteman.set(false);
+ broker = createBroker();
+ broker.start();
- consumer = consumerSession.createConsumer(destination);
- LOG.info("finally consuming message: " + ((ActiveMQMessageConsumer) consumer).getConsumerId());
+ consumer = consumerSession.createConsumer(destination);
+ LOG.info("finally consuming message: " + ((ActiveMQMessageConsumer) consumer).getConsumerId());
- Message msg = null;
- for (int i = 0; i < 4 && msg == null; i++) {
- msg = consumer.receive(1000);
- }
- LOG.info("post: from consumer1 received: " + msg);
- assertNotNull("got message after failover", msg);
- msg.acknowledge();
+ Message msg = null;
+ for (int i = 0; i < 4 && msg == null; i++) {
+ msg = consumer.receive(1000);
+ }
- for (Connection c : connections) {
- c.close();
+ LOG.info("post: from consumer1 received: " + msg);
+ Assert.assertNotNull("got message after failover", msg);
+ msg.acknowledge();
}
+ finally {
+ executorService.shutdown();
+ for (Connection c : connections) {
+ c.close();
+ }
+ }
+ }
+
+ private void startBrokerWithDurableQueue() throws Exception {
+ broker.start();
+ //auto created queue can't survive a restart, so we need this
+ broker.getJMSServerManager().createQueue(false, QUEUE_NAME, null, true, QUEUE_NAME);
}
+ @Test
public void testAutoRollbackWithMissingRedeliveries() throws Exception {
- broker = createBroker(true);
+ LOG.info(this + " running test testAutoRollbackWithMissingRedeliveries");
+ broker = createBroker();
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
Connection connection = cf.createConnection();
- connection.start();
- final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
- final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
- MessageConsumer consumer = consumerSession.createConsumer(destination);
+ try {
+ connection.start();
+ final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
+ final Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer consumer = consumerSession.createConsumer(destination);
- produceMessage(producerSession, destination);
+ produceMessage(producerSession, destination);
- Message msg = consumer.receive(20000);
- assertNotNull(msg);
+ Message msg = consumer.receive(20000);
+ Assert.assertNotNull(msg);
- broker.stop();
- broker = createBroker(false, url);
- // use empty jdbc store so that default wait(0) for redeliveries will timeout after failover
- setPersistenceAdapter(broker, PersistenceAdapterChoice.JDBC);
- broker.start();
+ broker.stop();
+ broker = createBroker();
+ // use empty jdbc store so that default wait(0) for redeliveries will timeout after failover
+ broker.start();
- try {
- consumerSession.commit();
- fail("expected transaciton rolledback ex");
+ try {
+ consumerSession.commit();
+ Assert.fail("expected transaciton rolledback ex");
+ }
+ catch (TransactionRolledBackException expected) {
+ }
+
+ broker.stop();
+ broker = createBroker();
+ broker.start();
+ Assert.assertNotNull("should get rolledback message from original restarted broker", consumer.receive(20000));
}
- catch (TransactionRolledBackException expected) {
+ finally {
+ connection.close();
}
-
- broker.stop();
- broker = createBroker(false, url);
- broker.start();
-
- assertNotNull("should get rolledback message from original restarted broker", consumer.receive(20000));
- connection.close();
}
+ @Test
public void testWaitForMissingRedeliveries() throws Exception {
- LOG.info("testWaitForMissingRedeliveries()");
- broker = createBroker(true);
+ LOG.info(this + " running test testWaitForMissingRedeliveries");
+
+ broker = createBroker();
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.consumerFailoverRedeliveryWaitPeriod=30000");
configureConnectionFactory(cf);
@@ -1088,18 +906,15 @@ public class FailoverTransactionTest extends TestSupport {
if (msg == null) {
AutoFailTestSupport.dumpAllThreads("missing-");
}
- assertNotNull("got message just produced", msg);
+ Assert.assertNotNull("got message just produced", msg);
broker.stop();
- broker = createBroker(false, url);
- // use empty jdbc store so that wait for re-deliveries occur when failover resumes
- setPersistenceAdapter(broker, PersistenceAdapterChoice.JDBC);
+ broker = createBroker();
broker.start();
final CountDownLatch commitDone = new CountDownLatch(1);
// will block pending re-deliveries
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
+ new Thread() {
public void run() {
LOG.info("doing async commit...");
try {
@@ -1109,21 +924,22 @@ public class FailoverTransactionTest extends TestSupport {
catch (JMSException ignored) {
}
}
- });
+ }.start();
broker.stop();
- broker = createBroker(false, url);
+ broker = createBroker();
broker.start();
- assertTrue("commit was successful", commitDone.await(30, TimeUnit.SECONDS));
+ Assert.assertTrue("commit was successful", commitDone.await(30, TimeUnit.SECONDS));
- assertNull("should not get committed message", consumer.receive(5000));
+ Assert.assertNull("should not get committed message", consumer.receive(5000));
connection.close();
}
+ @Test
public void testReDeliveryWhilePending() throws Exception {
- LOG.info("testReDeliveryWhilePending()");
- broker = createBroker(true);
+ LOG.info(this + " running test testReDeliveryWhilePending");
+ broker = createBroker();
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.consumerFailoverRedeliveryWaitPeriod=10000");
configureConnectionFactory(cf);
@@ -1139,13 +955,13 @@ public class FailoverTransactionTest extends TestSupport {
if (msg == null) {
AutoFailTestSupport.dumpAllThreads("missing-");
}
- assertNotNull("got message just produced", msg);
+ Assert.assertNotNull("got message just produced", msg);
// add another consumer into the mix that may get the message after restart
MessageConsumer consumer2 = consumerSession.createConsumer(consumerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1"));
broker.stop();
- broker = createBroker(false, url);
+ broker = createBroker();
broker.start();
final CountDownLatch commitDone = new CountDownLatch(1);
@@ -1153,8 +969,7 @@ public class FailoverTransactionTest extends TestSupport {
final Vector<Exception> exceptions = new Vector<>();
// commit may fail if other consumer gets the message on restart
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
+ new Thread() {
public void run() {
LOG.info("doing async commit...");
try {
@@ -1167,26 +982,26 @@ public class FailoverTransactionTest extends TestSupport {
commitDone.countDown();
}
}
- });
+ }.start();
- assertTrue("commit completed ", commitDone.await(15, TimeUnit.SECONDS));
+ Assert.assertTrue("commit completed ", commitDone.await(15, TimeUnit.SECONDS));
// either message redelivered in existing tx or consumed by consumer2
// should not be available again in any event
- assertNull("consumer should not get rolled back on non redelivered message or duplicate", consumer.receive(5000));
+ Assert.assertNull("consumer should not get rolled back on non redelivered message or duplicate", consumer.receive(5000));
// consumer replay is hashmap order dependent on a failover connection state recover so need to deal with both cases
if (exceptions.isEmpty()) {
LOG.info("commit succeeded, message was redelivered to the correct consumer after restart so commit was fine");
- assertNull("consumer2 not get a second message consumed by 1", consumer2.receive(2000));
+ Assert.assertNull("consumer2 not get a second message consumed by 1", consumer2.receive(2000));
}
else {
LOG.info("commit failed, consumer2 should get it", exceptions.get(0));
- assertNotNull("consumer2 got message", consumer2.receive(2000));
+ Assert.assertNotNull("consumer2 got message", consumer2.receive(2000));
consumerSession.commit();
// no message should be in dlq
MessageConsumer dlqConsumer = consumerSession.createConsumer(consumerSession.createQueue("ActiveMQ.DLQ"));
- assertNull("nothing in the dlq", dlqConsumer.receive(5000));
+ Assert.assertNull("nothing in the dlq", dlqConsumer.receive(5000));
}
connection.close();
}
@@ -1198,4 +1013,70 @@ public class FailoverTransactionTest extends TestSupport {
producer.close();
}
+ public static void holdResponseAndStopBroker(final OpenWireConnection.CommandProcessor context) {
+ if (doByteman.get()) {
+ context.getContext().setDontSendReponse(true);
+ new Thread() {
+ public void run() {
+ LOG.info("Stopping broker post commit...");
+ try {
+ broker.stop();
+ broker = null;
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ finally {
+ brokerStopLatch.countDown();
+ }
+ }
+ }.start();
+ }
+ }
+
+ public static void holdResponseAndStopProxyOnFirstSend(final OpenWireConnection.CommandProcessor context) {
+ if (doByteman.get()) {
+ if (firstSend) {
+ firstSend = false;
+ context.getContext().setDontSendReponse(true);
+ new Thread() {
+ public void run() {
+ LOG.info("Stopping connection post send...");
+ try {
+ proxy.close();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }.start();
+ }
+ }
+ }
+
+ public static void stopBrokerOnCounter() {
+ LOG.info("in stopBrokerOnCounter, byteman " + doByteman.get() + " count " + count);
+ if (doByteman.get()) {
+ if (count++ == 1) {
+ LOG.info("ok stop broker...");
+ new Thread() {
+ public void run() {
+ try {
+ if (broker != null) {
+ broker.stop();
+ broker = null;
+ }
+ LOG.info("broker stopped.");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ finally {
+ brokerStopLatch.countDown();
+ }
+ }
+ }.start();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java
index 0ba3939..149af92 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java
@@ -23,7 +23,8 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.URI;
-import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener;
@@ -34,7 +35,7 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class FailoverTransportBackupsTest {
+public class FailoverTransportBackupsTest extends OpenwireArtemisBaseTest {
private static final Logger LOG = LoggerFactory.getLogger(FailoverTransportBackupsTest.class);
@@ -43,23 +44,11 @@ public class FailoverTransportBackupsTest {
private int transportInterruptions;
private int transportResumptions;
- BrokerService broker1;
- BrokerService broker2;
- BrokerService broker3;
+ EmbeddedJMS[] servers = new EmbeddedJMS[3];
@Before
public void setUp() throws Exception {
- broker1 = createBroker("1");
- broker2 = createBroker("2");
- broker3 = createBroker("3");
-
- broker1.start();
- broker2.start();
- broker3.start();
-
- broker1.waitUntilStarted();
- broker2.waitUntilStarted();
- broker3.waitUntilStarted();
+ setUpClusterServers(servers);
// Reset stats
transportInterruptions = 0;
@@ -71,13 +60,7 @@ public class FailoverTransportBackupsTest {
if (transport != null) {
transport.stop();
}
-
- broker1.stop();
- broker1.waitUntilStopped();
- broker2.stop();
- broker2.waitUntilStopped();
- broker3.stop();
- broker3.waitUntilStopped();
+ shutDownClusterServers(servers);
}
@Test
@@ -111,7 +94,7 @@ public class FailoverTransportBackupsTest {
}
}));
- broker1.stop();
+ servers[0].stop();
assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition() {
@Override
@@ -124,7 +107,7 @@ public class FailoverTransportBackupsTest {
assertTrue("Incorrect number of Transport interruptions", transportInterruptions >= 1);
assertTrue("Incorrect number of Transport resumptions", transportResumptions >= 1);
- broker2.stop();
+ servers[1].stop();
assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition() {
@Override
@@ -153,7 +136,7 @@ public class FailoverTransportBackupsTest {
}
}));
- broker1.stop();
+ servers[0].stop();
assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition() {
@Override
@@ -163,7 +146,7 @@ public class FailoverTransportBackupsTest {
}
}));
- broker2.stop();
+ servers[1].stop();
assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition() {
@Override
@@ -174,20 +157,11 @@ public class FailoverTransportBackupsTest {
}));
}
- private BrokerService createBroker(String name) throws Exception {
- BrokerService bs = new BrokerService();
- bs.setBrokerName(name);
- bs.setUseJmx(false);
- bs.setPersistent(false);
- bs.addConnector("tcp://localhost:0");
- return bs;
- }
-
protected Transport createTransport(int backups) throws Exception {
String connectionUri = "failover://(" +
- broker1.getTransportConnectors().get(0).getPublishableConnectString() + "," +
- broker2.getTransportConnectors().get(0).getPublishableConnectString() + "," +
- broker3.getTransportConnectors().get(0).getPublishableConnectString() + ")";
+ newURI(0) + "," +
+ newURI(1) + "," +
+ newURI(2) + ")";
if (backups > 0) {
connectionUri += "?randomize=false&backup=true&backupPoolSize=" + backups;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java
index 806faca..15d28d3 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java
@@ -18,41 +18,205 @@ package org.apache.activemq.transport.failover;
import java.io.IOException;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.concurrent.TimeUnit;
import javax.jms.DeliveryMode;
+import javax.jms.MessageNotWriteableException;
-import junit.framework.Test;
-
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
import org.apache.activemq.broker.StubConnection;
-import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.network.NetworkTestSupport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class FailoverTransportBrokerTest extends NetworkTestSupport {
+@RunWith(Parameterized.class)
+public class FailoverTransportBrokerTest extends OpenwireArtemisBaseTest {
private static final Logger LOG = LoggerFactory.getLogger(FailoverTransportBrokerTest.class);
+ protected ArrayList<StubConnection> connections = new ArrayList<StubConnection>();
+ protected long idGenerator;
+ protected int msgIdGenerator;
+ protected int maxWait = 10000;
+ public static final boolean FAST_NO_MESSAGE_LEFT_ASSERT = System.getProperty("FAST_NO_MESSAGE_LEFT_ASSERT", "true").equals("true");
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> getParams()
+ {
+ return Arrays.asList(new Object[][] {
+ {Integer.valueOf(DeliveryMode.NON_PERSISTENT), new ActiveMQQueue("TEST")},
+ {Integer.valueOf(DeliveryMode.NON_PERSISTENT), new ActiveMQTopic("TEST")},
+ {Integer.valueOf(DeliveryMode.PERSISTENT), new ActiveMQQueue("TEST")},
+ {Integer.valueOf(DeliveryMode.PERSISTENT), new ActiveMQTopic("TEST")}
+ });
+ }
+
+ private EmbeddedJMS server;
+ private EmbeddedJMS remoteServer;
public ActiveMQDestination destination;
public int deliveryMode;
- public void initCombosForTestPublisherFailsOver() {
- addCombinationValues("deliveryMode", new Object[]{Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
- addCombinationValues("destination", new Object[]{new ActiveMQQueue("TEST"), new ActiveMQTopic("TEST")});
+ public FailoverTransportBrokerTest(int deliveryMode, ActiveMQDestination destination) {
+ this.deliveryMode = deliveryMode;
+ this.destination = destination;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ Configuration config0 = createConfig(0);
+ server = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+ Configuration config1 = createConfig(1);
+ remoteServer = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+ server.start();
+ remoteServer.start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ for (StubConnection conn : connections) {
+ try {
+ conn.stop();
+ }
+ catch (Exception e) {
+ }
+ }
+ try {
+ remoteServer.stop();
+ }
+ catch (Exception e) {
+ }
+ try {
+ server.stop();
+ }
+ catch (Exception e) {
+ }
+ }
+
+ protected StubConnection createConnection() throws Exception {
+ Transport transport = TransportFactory.connect(new URI(newURI(0)));
+ StubConnection connection = new StubConnection(transport);
+ connections.add(connection);
+ return connection;
+ }
+
+ protected StubConnection createRemoteConnection() throws Exception {
+ Transport transport = TransportFactory.connect(new URI(newURI(1)));
+ StubConnection connection = new StubConnection(transport);
+ connections.add(connection);
+ return connection;
+ }
+
+ protected ConnectionInfo createConnectionInfo() throws Exception {
+ ConnectionInfo info = new ConnectionInfo();
+ info.setConnectionId(new ConnectionId("connection:" + (++idGenerator)));
+ info.setClientId(info.getConnectionId().getValue());
+ return info;
+ }
+
+ protected SessionInfo createSessionInfo(ConnectionInfo connectionInfo) throws Exception {
+ SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator);
+ return info;
+ }
+
+ protected ConsumerInfo createConsumerInfo(SessionInfo sessionInfo,
+ ActiveMQDestination destination) throws Exception {
+ ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator);
+ info.setBrowser(false);
+ info.setDestination(destination);
+ info.setPrefetchSize(1000);
+ info.setDispatchAsync(false);
+ return info;
+ }
+
+ protected ProducerInfo createProducerInfo(SessionInfo sessionInfo) throws Exception {
+ ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator);
+ return info;
}
+ protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination, int deliveryMode) {
+ Message message = createMessage(producerInfo, destination);
+ message.setPersistent(deliveryMode == DeliveryMode.PERSISTENT);
+ return message;
+ }
+
+ protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination) {
+ ActiveMQTextMessage message = new ActiveMQTextMessage();
+ message.setMessageId(new MessageId(producerInfo, ++msgIdGenerator));
+ message.setDestination(destination);
+ message.setPersistent(false);
+ try {
+ message.setText("Test Message Payload.");
+ }
+ catch (MessageNotWriteableException e) {
+ }
+ return message;
+ }
+
+ public Message receiveMessage(StubConnection connection) throws InterruptedException {
+ return receiveMessage(connection, maxWait);
+ }
+
+ public Message receiveMessage(StubConnection connection, long timeout) throws InterruptedException {
+ while (true) {
+ Object o = connection.getDispatchQueue().poll(timeout, TimeUnit.MILLISECONDS);
+
+ if (o == null) {
+ return null;
+ }
+ if (o instanceof MessageDispatch) {
+
+ MessageDispatch dispatch = (MessageDispatch) o;
+ if (dispatch.getMessage() == null) {
+ return null;
+ }
+ dispatch.setMessage(dispatch.getMessage().copy());
+ dispatch.getMessage().setRedeliveryCounter(dispatch.getRedeliveryCounter());
+ return dispatch.getMessage();
+ }
+ }
+ }
+
+ protected void assertNoMessagesLeft(StubConnection connection) throws InterruptedException {
+ long wait = FAST_NO_MESSAGE_LEFT_ASSERT ? 0 : maxWait;
+ while (true) {
+ Object o = connection.getDispatchQueue().poll(wait, TimeUnit.MILLISECONDS);
+ if (o == null) {
+ return;
+ }
+ if (o instanceof MessageDispatch && ((MessageDispatch) o).getMessage() != null) {
+ Assert.fail("Received a message: " + ((MessageDispatch) o).getMessage().getMessageId());
+ }
+ }
+ }
+
+ @Test
public void testPublisherFailsOver() throws Exception {
// Start a normal consumer on the local broker
@@ -92,19 +256,22 @@ public class FailoverTransportBrokerTest extends NetworkTestSupport {
// See which broker we were connected to.
StubConnection connectionA;
StubConnection connectionB;
- TransportConnector serverA;
- if (connector.getServer().getConnectURI().equals(ft.getConnectedTransportURI())) {
+
+
+ EmbeddedJMS serverA;
+
+ if (new URI(newURI(0)).equals(ft.getConnectedTransportURI())) {
connectionA = connection1;
connectionB = connection2;
- serverA = connector;
+ serverA = server;
}
else {
connectionA = connection2;
connectionB = connection1;
- serverA = remoteConnector;
+ serverA = remoteServer;
}
- assertNotNull(receiveMessage(connectionA));
+ Assert.assertNotNull(receiveMessage(connectionA));
assertNoMessagesLeft(connectionB);
// Dispose the server so that it fails over to the other server.
@@ -113,7 +280,7 @@ public class FailoverTransportBrokerTest extends NetworkTestSupport {
connection3.request(createMessage(producerInfo3, destination, deliveryMode));
- assertNotNull(receiveMessage(connectionB));
+ Assert.assertNotNull(receiveMessage(connectionB));
assertNoMessagesLeft(connectionA);
}
@@ -150,34 +317,16 @@ public class FailoverTransportBrokerTest extends NetworkTestSupport {
while (count++ < 20 && info[0] == null) {
TimeUnit.SECONDS.sleep(1);
}
- assertNotNull("got a valid brokerInfo after 20 secs", info[0]);
- assertNull("no peer brokers present", info[0].getPeerBrokerInfos());
- }
-
- @Override
- protected String getLocalURI() {
- return "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true";
- }
-
- @Override
- protected String getRemoteURI() {
- return "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true";
+ Assert.assertNotNull("got a valid brokerInfo after 20 secs", info[0]);
+ Assert.assertNull("no peer brokers present", info[0].getPeerBrokerInfos());
}
protected StubConnection createFailoverConnection(TransportListener listener) throws Exception {
- URI failoverURI = new URI("failover://" + connector.getServer().getConnectURI() + "," + remoteConnector.getServer().getConnectURI() + "");
+ URI failoverURI = new URI("failover://" + newURI(0) + "," + newURI(1) + "");
Transport transport = TransportFactory.connect(failoverURI);
StubConnection connection = new StubConnection(transport, listener);
connections.add(connection);
return connection;
}
- public static Test suite() {
- return suite(FailoverTransportBrokerTest.class);
- }
-
- public static void main(String[] args) {
- junit.textui.TestRunner.run(suite());
- }
-
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e666730/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportUriHandlingTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportUriHandlingTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportUriHandlingTest.java
index 8155575..d64cc58 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportUriHandlingTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportUriHandlingTest.java
@@ -22,7 +22,6 @@ import java.lang.reflect.Field;
import java.net.URI;
import java.util.Collection;
-import org.apache.activemq.transport.failover.FailoverTransport;
import org.junit.Test;
public class FailoverTransportUriHandlingTest {