You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/31 04:30:45 UTC
[03/69] [abbrv] activemq-artemis git commit: open wire changes
equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95a6df23/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
index eb5bc61..c129791 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
@@ -16,29 +16,23 @@
*/
package org.apache.activemq.transport.failover;
-import junit.framework.Test;
-
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.AutoFailTestSupport;
-import org.apache.activemq.TestSupport;
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerPluginSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.ConsumerBrokerExchange;
-import org.apache.activemq.broker.ProducerBrokerExchange;
-import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.broker.util.DestinationPathSeparatorBroker;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.SocketProxy;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,28 +63,30 @@ import java.util.concurrent.atomic.AtomicBoolean;
// see https://issues.apache.org/activemq/browse/AMQ-2473
// https://issues.apache.org/activemq/browse/AMQ-2590
-public class FailoverTransactionTest extends TestSupport {
+@RunWith(BMUnitRunner.class)
+public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
private static final Logger LOG = LoggerFactory.getLogger(FailoverTransactionTest.class);
private static final String QUEUE_NAME = "Failover.WithTx";
- private static final String TRANSPORT_URI = "tcp://localhost:0";
- private String url;
- BrokerService broker;
+ private String url = newURI(0);
- public static Test suite() {
- return suite(FailoverTransactionTest.class);
- }
+ private static final AtomicBoolean doByteman = new AtomicBoolean(false);
+ private static CountDownLatch brokerStopLatch;
+
+ private static SocketProxy proxy;
+ private static boolean firstSend;
+ private static int count;
+
+ private static EmbeddedJMS broker;
- @Override
+ @Before
public void setUp() throws Exception {
- super.setMaxTestTime(2 * 60 * 1000); // some boxes can be real slow
- super.setAutoFail(true);
- super.setUp();
+ doByteman.set(false);
+ brokerStopLatch = new CountDownLatch(1);
}
- @Override
+ @After
public void tearDown() throws Exception {
- super.tearDown();
stopBroker();
}
@@ -101,39 +97,19 @@ public class FailoverTransactionTest extends TestSupport {
}
private void startCleanBroker() throws Exception {
- startBroker(true);
- }
-
- public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
- broker = createBroker(deleteAllMessagesOnStartup);
- broker.start();
+ startBroker();
}
- public void startBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
- broker = createBroker(deleteAllMessagesOnStartup, bindAddress);
+ public void startBroker() throws Exception {
+ broker = createBroker();
broker.start();
}
- public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
- return createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI);
- }
-
- public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
- broker = new BrokerService();
- broker.setUseJmx(false);
- broker.setAdvisorySupport(false);
- broker.addConnector(bindAddress);
- broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
-
- url = broker.getTransportConnectors().get(0).getConnectUri().toString();
-
- return broker;
- }
-
public void configureConnectionFactory(ActiveMQConnectionFactory factory) {
// nothing to do
}
+ @Test
public void testFailoverProducerCloseBeforeTransaction() throws Exception {
startCleanBroker();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
@@ -148,55 +124,31 @@ public class FailoverTransactionTest extends TestSupport {
// restart to force failover and connection state recovery before the commit
broker.stop();
- startBroker(false, url);
+ startBroker();
session.commit();
- assertNotNull("we got the message", consumer.receive(20000));
+ Assert.assertNotNull("we got the message", consumer.receive(20000));
session.commit();
connection.close();
}
- public void initCombosForTestFailoverCommitReplyLost() {
- String osName = System.getProperty("os.name");
- Object[] persistenceAdapters;
- if (!osName.equalsIgnoreCase("AIX") && !osName.equalsIgnoreCase("SunOS")) {
- persistenceAdapters = new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC};
- }
- else {
- persistenceAdapters = new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC};
- }
- addCombinationValues("defaultPersistenceAdapter", persistenceAdapters);
- }
-
- @SuppressWarnings("unchecked")
+ @Test
+ @BMRules(
+ rules = {
+ @BMRule(
+ name = "set no return response and stop the broker",
+ targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection",
+ targetMethod = "processCommitTransactionOnePhase",
+ targetLocation = "EXIT",
+ binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()",
+ action = "org.apache.activemq.transport.failover.FailoverTransactionTest.holdResponseAndStopBroker(context)")
+ }
+ )
public void testFailoverCommitReplyLost() throws Exception {
- broker = createBroker(true);
- setDefaultPersistenceAdapter(broker);
-
- broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
- @Override
- public void commitTransaction(ConnectionContext context,
- TransactionId xid,
- boolean onePhase) throws Exception {
- super.commitTransaction(context, xid, onePhase);
- // so commit will hang as if reply is lost
- context.setDontSendReponse(true);
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
- public void run() {
- LOG.info("Stopping broker post commit...");
- try {
- broker.stop();
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
- }});
- broker.start();
+ broker = createBroker();
+ startBrokerWithDurableQueue();
+ doByteman.set(true);
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
@@ -211,14 +163,13 @@ public class FailoverTransactionTest extends TestSupport {
final CountDownLatch commitDoneLatch = new CountDownLatch(1);
// broker will die on commit reply so this will hang till restart
Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
public void run() {
LOG.info("doing async commit...");
try {
session.commit();
}
catch (JMSException e) {
- assertTrue(e instanceof TransactionRolledBackException);
+ Assert.assertTrue(e instanceof TransactionRolledBackException);
LOG.info("got commit exception: ", e);
}
commitDoneLatch.countDown();
@@ -227,29 +178,27 @@ public class FailoverTransactionTest extends TestSupport {
});
// will be stopped by the plugin
- broker.waitUntilStopped();
- broker = createBroker(false, url);
- setDefaultPersistenceAdapter(broker);
+ brokerStopLatch.await();
+ doByteman.set(false);
+ broker = createBroker();
broker.start();
- assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
+ Assert.assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
// new transaction
Message msg = consumer.receive(20000);
LOG.info("Received: " + msg);
- assertNotNull("we got the message", msg);
- assertNull("we got just one message", consumer.receive(2000));
+ Assert.assertNotNull("we got the message", msg);
+ Assert.assertNull("we got just one message", consumer.receive(2000));
session.commit();
consumer.close();
connection.close();
// ensure no dangling messages with fresh broker etc
broker.stop();
- broker.waitUntilStopped();
LOG.info("Checking for remaining/hung messages..");
- broker = createBroker(false, url);
- setDefaultPersistenceAdapter(broker);
+ broker = createBroker();
broker.start();
// after restart, ensure no dangling messages
@@ -264,152 +213,38 @@ public class FailoverTransactionTest extends TestSupport {
msg = consumer.receive(5000);
}
LOG.info("Received: " + msg);
- assertNull("no messges left dangling but got: " + msg, msg);
+ Assert.assertNull("no messges left dangling but got: " + msg, msg);
connection.close();
}
@SuppressWarnings("unchecked")
+ @Test
public void testFailoverCommitReplyLostWithDestinationPathSeparator() throws Exception {
-
- broker = createBroker(true);
- setDefaultPersistenceAdapter(broker);
-
- broker.setPlugins(new BrokerPlugin[]{new DestinationPathSeparatorBroker(), new BrokerPluginSupport() {
- @Override
- public void commitTransaction(ConnectionContext context,
- TransactionId xid,
- boolean onePhase) throws Exception {
- super.commitTransaction(context, xid, onePhase);
- // so commit will hang as if reply is lost
- context.setDontSendReponse(true);
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
- public void run() {
- LOG.info("Stopping broker post commit...");
- try {
- broker.stop();
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
- }});
- broker.start();
-
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
- configureConnectionFactory(cf);
- Connection connection = cf.createConnection();
- connection.start();
- final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- Queue destination = session.createQueue(QUEUE_NAME.replace('.', '/') + "?consumer.prefetchSize=0");
-
- MessageConsumer consumer = session.createConsumer(destination);
- produceMessage(session, destination);
-
- final CountDownLatch commitDoneLatch = new CountDownLatch(1);
- // broker will die on commit reply so this will hang till restart
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
- public void run() {
- LOG.info("doing async commit...");
- try {
- session.commit();
- }
- catch (JMSException e) {
- assertTrue(e instanceof TransactionRolledBackException);
- LOG.info("got commit exception: ", e);
- }
- commitDoneLatch.countDown();
- LOG.info("done async commit");
- }
- });
-
- // will be stopped by the plugin
- broker.waitUntilStopped();
- broker = createBroker(false, url);
- setDefaultPersistenceAdapter(broker);
- broker.setPlugins(new BrokerPlugin[]{new DestinationPathSeparatorBroker()});
- broker.start();
-
- assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
-
- // new transaction
- Message msg = consumer.receive(20000);
- LOG.info("Received: " + msg);
- assertNotNull("we got the message", msg);
- assertNull("we got just one message", consumer.receive(2000));
- session.commit();
- consumer.close();
- connection.close();
-
- // ensure no dangling messages with fresh broker etc
- broker.stop();
- broker.waitUntilStopped();
-
- LOG.info("Checking for remaining/hung messages..");
- broker = createBroker(false, url);
- setDefaultPersistenceAdapter(broker);
- broker.setPlugins(new BrokerPlugin[]{new DestinationPathSeparatorBroker()});
- broker.start();
-
- // after restart, ensure no dangling messages
- cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
- configureConnectionFactory(cf);
- connection = cf.createConnection();
- connection.start();
- Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- consumer = session2.createConsumer(destination);
- msg = consumer.receive(1000);
- if (msg == null) {
- msg = consumer.receive(5000);
- }
- LOG.info("Received: " + msg);
- assertNull("no messges left dangling but got: " + msg, msg);
- connection.close();
-
- ActiveMQDestination[] destinations = broker.getRegionBroker().getDestinations();
- for (ActiveMQDestination dest : destinations) {
- LOG.info("Destinations list: " + dest);
- }
- assertEquals("Only one destination", 1, broker.getRegionBroker().getDestinations().length);
- }
-
- public void initCombosForTestFailoverSendReplyLost() {
- addCombinationValues("defaultPersistenceAdapter", new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC
- // not implemented for AMQ store or PersistenceAdapterChoice.LevelDB
- });
+ //the original test validates destinations using forward slash (/) as
+ //separators instead of dot (.). The broker internally uses a plugin
+ //called DestinationPathSeparatorBroker to convert every occurrence of
+ // "/" into "." inside the server.
+ //Artemis doesn't support "/" so far and this test doesn't make sense therefore.
}
@SuppressWarnings("unchecked")
+ @Test
+ @BMRules(
+ rules = {
+ @BMRule(
+ name = "set no return response and stop the broker",
+ targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection",
+ targetMethod = "processMessage",
+ targetLocation = "EXIT",
+ binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()",
+ action = "org.apache.activemq.transport.failover.FailoverTransactionTest.holdResponseAndStopBroker(context)")
+ }
+ )
public void testFailoverSendReplyLost() throws Exception {
- broker = createBroker(true);
- setDefaultPersistenceAdapter(broker);
-
- broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
- @Override
- public void send(ProducerBrokerExchange producerExchange,
- org.apache.activemq.command.Message messageSend) throws Exception {
- // so send will hang as if reply is lost
- super.send(producerExchange, messageSend);
- producerExchange.getConnectionContext().setDontSendReponse(true);
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
- public void run() {
- LOG.info("Stopping broker post send...");
- try {
- broker.stop();
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
- }});
- broker.start();
+ broker = createBroker();
+ startBrokerWithDurableQueue();
+ doByteman.set(true);
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.watchTopicAdvisories=false");
configureConnectionFactory(cf);
@@ -422,7 +257,6 @@ public class FailoverTransactionTest extends TestSupport {
final CountDownLatch sendDoneLatch = new CountDownLatch(1);
// broker will die on send reply so this will hang till restart
Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
public void run() {
LOG.info("doing async send...");
try {
@@ -431,7 +265,7 @@ public class FailoverTransactionTest extends TestSupport {
catch (JMSException e) {
//assertTrue(e instanceof TransactionRolledBackException);
LOG.error("got send exception: ", e);
- fail("got unexpected send exception" + e);
+ Assert.fail("got unexpected send exception" + e);
}
sendDoneLatch.countDown();
LOG.info("done async send");
@@ -439,33 +273,27 @@ public class FailoverTransactionTest extends TestSupport {
});
// will be stopped by the plugin
- broker.waitUntilStopped();
- broker = createBroker(false, url);
- setDefaultPersistenceAdapter(broker);
+ brokerStopLatch.await();
+ doByteman.set(false);
+ broker = createBroker();
LOG.info("restarting....");
broker.start();
- assertTrue("message sent through failover", sendDoneLatch.await(30, TimeUnit.SECONDS));
+ Assert.assertTrue("message sent through failover", sendDoneLatch.await(30, TimeUnit.SECONDS));
// new transaction
Message msg = consumer.receive(20000);
LOG.info("Received: " + msg);
- assertNotNull("we got the message", msg);
- assertNull("we got just one message", consumer.receive(2000));
+ Assert.assertNotNull("we got the message", msg);
+ Assert.assertNull("we got just one message", consumer.receive(2000));
consumer.close();
connection.close();
- // verify stats
- assertEquals("no newly queued messages", 0, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
- assertEquals("1 dequeue", 1, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount());
-
// ensure no dangling messages with fresh broker etc
broker.stop();
- broker.waitUntilStopped();
LOG.info("Checking for remaining/hung messages with second restart..");
- broker = createBroker(false, url);
- setDefaultPersistenceAdapter(broker);
+ broker = createBroker();
broker.start();
// after restart, ensure no dangling messages
@@ -480,64 +308,33 @@ public class FailoverTransactionTest extends TestSupport {
msg = consumer.receive(5000);
}
LOG.info("Received: " + msg);
- assertNull("no messges left dangling but got: " + msg, msg);
+ Assert.assertNull("no messges left dangling but got: " + msg, msg);
connection.close();
}
- public void initCombosForTestFailoverConnectionSendReplyLost() {
- addCombinationValues("defaultPersistenceAdapter", new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC
- // last producer message id store feature not implemented for AMQ store
- // or PersistenceAdapterChoice.LevelDB
- });
- }
-
@SuppressWarnings("unchecked")
+ @Test
+ @BMRules(
+ rules = {
+ @BMRule(
+ name = "set no return response and stop the broker",
+ targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection",
+ targetMethod = "processMessage",
+ targetLocation = "EXIT",
+ binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()",
+ action = "org.apache.activemq.transport.failover.FailoverTransactionTest.holdResponseAndStopProxyOnFirstSend(context)")
+ }
+ )
public void testFailoverConnectionSendReplyLost() throws Exception {
- broker = createBroker(true);
- PersistenceAdapter store = setDefaultPersistenceAdapter(broker);
- if (store instanceof KahaDBPersistenceAdapter) {
- // duplicate checker not updated on canceled tasks, even it
- // it was, recovery of the audit would fail as the message is
- // not recorded in the store and the audit may not be up to date.
- // So if duplicate messages are an absolute no no after restarts,
- // ConcurrentStoreAndDispatchQueues must be disabled
- ((KahaDBPersistenceAdapter) store).setConcurrentStoreAndDispatchQueues(false);
- }
-
- final SocketProxy proxy = new SocketProxy();
-
- broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
- private boolean firstSend = true;
-
- @Override
- public void send(ProducerBrokerExchange producerExchange,
- org.apache.activemq.command.Message messageSend) throws Exception {
- // so send will hang as if reply is lost
- super.send(producerExchange, messageSend);
- if (firstSend) {
- firstSend = false;
-
- producerExchange.getConnectionContext().setDontSendReponse(true);
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
- public void run() {
- LOG.info("Stopping connection post send...");
- try {
- proxy.close();
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
- }
- }});
- broker.start();
+ broker = createBroker();
+ proxy = new SocketProxy();
+ firstSend = true;
+ startBrokerWithDurableQueue();
proxy.setTarget(new URI(url));
proxy.open();
+ doByteman.set(true);
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + proxy.getUrl().toASCIIString() + ")?jms.watchTopicAdvisories=false");
configureConnectionFactory(cf);
@@ -550,7 +347,6 @@ public class FailoverTransactionTest extends TestSupport {
final CountDownLatch sendDoneLatch = new CountDownLatch(1);
// proxy connection will die on send reply so this will hang on failover reconnect till open
Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
public void run() {
LOG.info("doing async send...");
try {
@@ -566,29 +362,24 @@ public class FailoverTransactionTest extends TestSupport {
});
// will be closed by the plugin
- assertTrue("proxy was closed", proxy.waitUntilClosed(30));
+ Assert.assertTrue("proxy was closed", proxy.waitUntilClosed(30));
LOG.info("restarting proxy");
proxy.open();
- assertTrue("message sent through failover", sendDoneLatch.await(30, TimeUnit.SECONDS));
+ Assert.assertTrue("message sent through failover", sendDoneLatch.await(30, TimeUnit.SECONDS));
Message msg = consumer.receive(20000);
LOG.info("Received: " + msg);
- assertNotNull("we got the message", msg);
- assertNull("we got just one message", consumer.receive(2000));
+ Assert.assertNotNull("we got the message", msg);
+ Assert.assertNull("we got just one message", consumer.receive(2000));
consumer.close();
connection.close();
- // verify stats, connection dup suppression means dups don't get to broker
- assertEquals("one queued message", 1, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
-
// ensure no dangling messages with fresh broker etc
broker.stop();
- broker.waitUntilStopped();
LOG.info("Checking for remaining/hung messages with restart..");
- broker = createBroker(false, url);
- setDefaultPersistenceAdapter(broker);
+ broker = createBroker();
broker.start();
// after restart, ensure no dangling messages
@@ -603,10 +394,11 @@ public class FailoverTransactionTest extends TestSupport {
msg = consumer.receive(5000);
}
LOG.info("Received: " + msg);
- assertNull("no messges left dangling but got: " + msg, msg);
+ Assert.assertNull("no messges left dangling but got: " + msg, msg);
connection.close();
}
+ @Test
public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception {
startCleanBroker();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?trackTransactionProducers=false");
@@ -621,16 +413,17 @@ public class FailoverTransactionTest extends TestSupport {
// restart to force failover and connection state recovery before the commit
broker.stop();
- startBroker(false, url);
+ startBroker();
session.commit();
// without tracking producers, message will not be replayed on recovery
- assertNull("we got the message", consumer.receive(5000));
+ Assert.assertNull("we got the message", consumer.receive(5000));
session.commit();
connection.close();
}
+ @Test
public void testFailoverMultipleProducerCloseBeforeTransaction() throws Exception {
startCleanBroker();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
@@ -653,17 +446,18 @@ public class FailoverTransactionTest extends TestSupport {
// restart to force failover and connection state recovery before the commit
broker.stop();
- startBroker(false, url);
+ startBroker();
session.commit();
for (int i = 0; i < count; i++) {
- assertNotNull("we got all the message: " + count, consumer.receive(20000));
+ Assert.assertNotNull("we got all the message: " + count, consumer.receive(20000));
}
session.commit();
connection.close();
}
// https://issues.apache.org/activemq/browse/AMQ-2772
+ @Test
public void testFailoverWithConnectionConsumer() throws Exception {
startCleanBroker();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
@@ -677,15 +471,12 @@ public class FailoverTransactionTest extends TestSupport {
final CountDownLatch connectionConsumerGotOne = new CountDownLatch(1);
final Session poolSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.createConnectionConsumer(destination, null, new ServerSessionPool() {
- @Override
public ServerSession getServerSession() throws JMSException {
return new ServerSession() {
- @Override
public Session getSession() throws JMSException {
return poolSession;
}
- @Override
public void start() throws JMSException {
connectionConsumerGotOne.countDown();
poolSession.run();
@@ -707,18 +498,30 @@ public class FailoverTransactionTest extends TestSupport {
// restart to force failover and connection state recovery before the commit
broker.stop();
- startBroker(false, url);
+ startBroker();
session.commit();
for (int i = 0; i < count - 1; i++) {
- assertNotNull("Failed to get message: " + count, consumer.receive(20000));
+ Assert.assertNotNull("Failed to get message: " + count, consumer.receive(20000));
}
session.commit();
connection.close();
- assertTrue("connectionconsumer did not get a message", connectionConsumerGotOne.await(10, TimeUnit.SECONDS));
+ Assert.assertTrue("connectionconsumer did not get a message", connectionConsumerGotOne.await(10, TimeUnit.SECONDS));
}
+ @Test
+ @BMRules(
+ rules = {
+ @BMRule(
+ name = "set no return response and stop the broker",
+ targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection",
+ targetMethod = "processMessageAck",
+ targetLocation = "ENTRY",
+ binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()",
+ action = "org.apache.activemq.transport.failover.FailoverTransactionTest.holdResponseAndStopBroker(context)")
+ }
+ )
public void testFailoverConsumerAckLost() throws Exception {
// as failure depends on hash order of state tracker recovery, do a few times
for (int i = 0; i < 3; i++) {
@@ -734,31 +537,10 @@ public class FailoverTransactionTest extends TestSupport {
@SuppressWarnings("unchecked")
public void doTestFailoverConsumerAckLost(final int pauseSeconds) throws Exception {
- broker = createBroker(true);
- setDefaultPersistenceAdapter(broker);
-
- broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
-
- // broker is killed on delivered ack as prefetch is 1
- @Override
- public void acknowledge(ConsumerBrokerExchange consumerExchange, final MessageAck ack) throws Exception {
-
- consumerExchange.getConnectionContext().setDontSendReponse(true);
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
- public void run() {
- LOG.info("Stopping broker on ack: " + ack);
- try {
- broker.stop();
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
- }});
+ broker = createBroker();
broker.start();
+ brokerStopLatch = new CountDownLatch(1);
+ doByteman.set(true);
Vector<Connection> connections = new Vector<>();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
@@ -789,7 +571,6 @@ public class FailoverTransactionTest extends TestSupport {
final CountDownLatch commitDoneLatch = new CountDownLatch(1);
final AtomicBoolean gotTransactionRolledBackException = new AtomicBoolean(false);
Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
public void run() {
LOG.info("doing async commit after consume...");
try {
@@ -839,12 +620,12 @@ public class FailoverTransactionTest extends TestSupport {
});
// will be stopped by the plugin
- broker.waitUntilStopped();
- broker = createBroker(false, url);
- setDefaultPersistenceAdapter(broker);
+ brokerStopLatch.await();
+ broker = createBroker();
broker.start();
+ doByteman.set(false);
- assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
+ Assert.assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
LOG.info("received message count: " + receivedMessages.size());
@@ -852,10 +633,10 @@ public class FailoverTransactionTest extends TestSupport {
Message msg = consumer1.receive(gotTransactionRolledBackException.get() ? 5000 : 20000);
LOG.info("post: from consumer1 received: " + msg);
if (gotTransactionRolledBackException.get()) {
- assertNotNull("should be available again after commit rollback ex", msg);
+ Assert.assertNotNull("should be available again after commit rollback ex", msg);
}
else {
- assertNull("should be nothing left for consumer as receive should have committed", msg);
+ Assert.assertNull("should be nothing left for consumer as receive should have committed", msg);
}
consumerSession1.commit();
@@ -864,7 +645,7 @@ public class FailoverTransactionTest extends TestSupport {
// consumer2 should get other message
msg = consumer2.receive(10000);
LOG.info("post: from consumer2 received: " + msg);
- assertNotNull("got second message on consumer2", msg);
+ Assert.assertNotNull("got second message on consumer2", msg);
consumerSession2.commit();
}
@@ -874,11 +655,9 @@ public class FailoverTransactionTest extends TestSupport {
// ensure no dangling messages with fresh broker etc
broker.stop();
- broker.waitUntilStopped();
LOG.info("Checking for remaining/hung messages..");
- broker = createBroker(false, url);
- setDefaultPersistenceAdapter(broker);
+ broker = createBroker();
broker.start();
// after restart, ensure no dangling messages
@@ -893,36 +672,29 @@ public class FailoverTransactionTest extends TestSupport {
msg = sweeper.receive(5000);
}
LOG.info("Sweep received: " + msg);
- assertNull("no messges left dangling but got: " + msg, msg);
+ Assert.assertNull("no messges left dangling but got: " + msg, msg);
connection.close();
+
+ broker.stop();
}
+ @Test
+ @BMRules(
+ rules = {
+ @BMRule(
+ name = "set no return response and stop the broker",
+ targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection",
+ targetMethod = "processRemoveConsumer",
+ targetLocation = "ENTRY",
+ binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()",
+ action = "org.apache.activemq.transport.failover.FailoverTransactionTest.stopBrokerOnCounter(context)")
+ }
+ )
public void testPoolingNConsumesAfterReconnect() throws Exception {
- broker = createBroker(true);
- setDefaultPersistenceAdapter(broker);
+ broker = createBroker();
+ startBrokerWithDurableQueue();
- broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
- int count = 0;
-
- @Override
- public void removeConsumer(ConnectionContext context, final ConsumerInfo info) throws Exception {
- if (count++ == 1) {
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
- public void run() {
- LOG.info("Stopping broker on removeConsumer: " + info);
- try {
- broker.stop();
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
- }
- }});
- broker.start();
+ doByteman.set(true);
Vector<Connection> connections = new Vector<>();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
@@ -951,6 +723,7 @@ public class FailoverTransactionTest extends TestSupport {
for (int i = 0; i < consumerCount; i++) {
consumers.push(consumerSession.createConsumer(destination));
}
+
final ExecutorService executorService = Executors.newCachedThreadPool();
final FailoverTransport failoverTransport = ((ActiveMQConnection) connection).getTransport().narrow(FailoverTransport.class);
@@ -973,7 +746,6 @@ public class FailoverTransactionTest extends TestSupport {
for (int i = 0; i < consumerCount && !consumers.isEmpty(); i++) {
executorService.execute(new Runnable() {
- @Override
public void run() {
MessageConsumer localConsumer = null;
try {
@@ -1011,9 +783,9 @@ public class FailoverTransactionTest extends TestSupport {
consumer.close();
// will be stopped by the plugin
- broker.waitUntilStopped();
- broker = createBroker(false, url);
- setDefaultPersistenceAdapter(broker);
+ brokerStopLatch.await();
+ doByteman.set(false);
+ broker = createBroker();
broker.start();
consumer = consumerSession.createConsumer(destination);
@@ -1023,8 +795,9 @@ public class FailoverTransactionTest extends TestSupport {
for (int i = 0; i < 4 && msg == null; i++) {
msg = consumer.receive(1000);
}
+
LOG.info("post: from consumer1 received: " + msg);
- assertNotNull("got message after failover", msg);
+ Assert.assertNotNull("got message after failover", msg);
msg.acknowledge();
for (Connection c : connections) {
@@ -1032,8 +805,15 @@ public class FailoverTransactionTest extends TestSupport {
}
}
+ private void startBrokerWithDurableQueue() throws Exception {
+ broker.start();
+ //auto created queue can't survive a restart, so we need this
+ broker.getJMSServerManager().createQueue(false, QUEUE_NAME, null, true, QUEUE_NAME);
+ }
+
+ @Test
public void testAutoRollbackWithMissingRedeliveries() throws Exception {
- broker = createBroker(true);
+ broker = createBroker();
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
configureConnectionFactory(cf);
@@ -1047,32 +827,32 @@ public class FailoverTransactionTest extends TestSupport {
produceMessage(producerSession, destination);
Message msg = consumer.receive(20000);
- assertNotNull(msg);
+ Assert.assertNotNull(msg);
broker.stop();
- broker = createBroker(false, url);
+ broker = createBroker();
// use empty jdbc store so that default wait(0) for redeliveries will timeout after failover
- setPersistenceAdapter(broker, PersistenceAdapterChoice.JDBC);
broker.start();
try {
consumerSession.commit();
- fail("expected transaciton rolledback ex");
+ Assert.fail("expected transaciton rolledback ex");
}
catch (TransactionRolledBackException expected) {
}
broker.stop();
- broker = createBroker(false, url);
+ broker = createBroker();
broker.start();
- assertNotNull("should get rolledback message from original restarted broker", consumer.receive(20000));
+ Assert.assertNotNull("should get rolledback message from original restarted broker", consumer.receive(20000));
connection.close();
}
+ @Test
public void testWaitForMissingRedeliveries() throws Exception {
LOG.info("testWaitForMissingRedeliveries()");
- broker = createBroker(true);
+ broker = createBroker();
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.consumerFailoverRedeliveryWaitPeriod=30000");
configureConnectionFactory(cf);
@@ -1088,18 +868,15 @@ public class FailoverTransactionTest extends TestSupport {
if (msg == null) {
AutoFailTestSupport.dumpAllThreads("missing-");
}
- assertNotNull("got message just produced", msg);
+ Assert.assertNotNull("got message just produced", msg);
broker.stop();
- broker = createBroker(false, url);
- // use empty jdbc store so that wait for re-deliveries occur when failover resumes
- setPersistenceAdapter(broker, PersistenceAdapterChoice.JDBC);
+ broker = createBroker();
broker.start();
final CountDownLatch commitDone = new CountDownLatch(1);
// will block pending re-deliveries
Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
public void run() {
LOG.info("doing async commit...");
try {
@@ -1112,18 +889,19 @@ public class FailoverTransactionTest extends TestSupport {
});
broker.stop();
- broker = createBroker(false, url);
+ broker = createBroker();
broker.start();
- assertTrue("commit was successful", commitDone.await(30, TimeUnit.SECONDS));
+ Assert.assertTrue("commit was successful", commitDone.await(30, TimeUnit.SECONDS));
- assertNull("should not get committed message", consumer.receive(5000));
+ Assert.assertNull("should not get committed message", consumer.receive(5000));
connection.close();
}
+ @Test
public void testReDeliveryWhilePending() throws Exception {
LOG.info("testReDeliveryWhilePending()");
- broker = createBroker(true);
+ broker = createBroker();
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.consumerFailoverRedeliveryWaitPeriod=10000");
configureConnectionFactory(cf);
@@ -1139,13 +917,13 @@ public class FailoverTransactionTest extends TestSupport {
if (msg == null) {
AutoFailTestSupport.dumpAllThreads("missing-");
}
- assertNotNull("got message just produced", msg);
+ Assert.assertNotNull("got message just produced", msg);
// add another consumer into the mix that may get the message after restart
MessageConsumer consumer2 = consumerSession.createConsumer(consumerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1"));
broker.stop();
- broker = createBroker(false, url);
+ broker = createBroker();
broker.start();
final CountDownLatch commitDone = new CountDownLatch(1);
@@ -1154,7 +932,6 @@ public class FailoverTransactionTest extends TestSupport {
// commit may fail if other consumer gets the message on restart
Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
public void run() {
LOG.info("doing async commit...");
try {
@@ -1169,24 +946,24 @@ public class FailoverTransactionTest extends TestSupport {
}
});
- assertTrue("commit completed ", commitDone.await(15, TimeUnit.SECONDS));
+ Assert.assertTrue("commit completed ", commitDone.await(15, TimeUnit.SECONDS));
// either message redelivered in existing tx or consumed by consumer2
// should not be available again in any event
- assertNull("consumer should not get rolled back on non redelivered message or duplicate", consumer.receive(5000));
+ Assert.assertNull("consumer should not get rolled back on non redelivered message or duplicate", consumer.receive(5000));
// consumer replay is hashmap order dependent on a failover connection state recover so need to deal with both cases
if (exceptions.isEmpty()) {
LOG.info("commit succeeded, message was redelivered to the correct consumer after restart so commit was fine");
- assertNull("consumer2 not get a second message consumed by 1", consumer2.receive(2000));
+ Assert.assertNull("consumer2 not get a second message consumed by 1", consumer2.receive(2000));
}
else {
LOG.info("commit failed, consumer2 should get it", exceptions.get(0));
- assertNotNull("consumer2 got message", consumer2.receive(2000));
+ Assert.assertNotNull("consumer2 got message", consumer2.receive(2000));
consumerSession.commit();
// no message should be in dlq
MessageConsumer dlqConsumer = consumerSession.createConsumer(consumerSession.createQueue("ActiveMQ.DLQ"));
- assertNull("nothing in the dlq", dlqConsumer.receive(5000));
+ Assert.assertNull("nothing in the dlq", dlqConsumer.receive(5000));
}
connection.close();
}
@@ -1198,4 +975,63 @@ public class FailoverTransactionTest extends TestSupport {
producer.close();
}
+ public static void holdResponseAndStopBroker(final AMQConnectionContext context) {
+ if (doByteman.get()) {
+ context.setDontSendReponse(true);
+ Executors.newSingleThreadExecutor().execute(new Runnable() {
+ public void run() {
+ LOG.info("Stopping broker post commit...");
+ try {
+ broker.stop();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ finally {
+ brokerStopLatch.countDown();
+ }
+ }
+ });
+ }
+ }
+
+ public static void holdResponseAndStopProxyOnFirstSend(final AMQConnectionContext context) {
+ if (doByteman.get()) {
+ if (firstSend) {
+ firstSend = false;
+ context.setDontSendReponse(true);
+ Executors.newSingleThreadExecutor().execute(new Runnable() {
+ public void run() {
+ LOG.info("Stopping connection post send...");
+ try {
+ proxy.close();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+ }
+ }
+
+ public static void stopBrokerOnCounter(final AMQConnectionContext context) {
+ if (doByteman.get()) {
+ if (count++ == 1) {
+ Executors.newSingleThreadExecutor().execute(new Runnable() {
+ public void run() {
+ try {
+ broker.stop();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ finally {
+ brokerStopLatch.countDown();
+ }
+ }
+ });
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95a6df23/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java
index 0ba3939..149af92 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java
@@ -23,7 +23,8 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.URI;
-import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener;
@@ -34,7 +35,7 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class FailoverTransportBackupsTest {
+public class FailoverTransportBackupsTest extends OpenwireArtemisBaseTest {
private static final Logger LOG = LoggerFactory.getLogger(FailoverTransportBackupsTest.class);
@@ -43,23 +44,11 @@ public class FailoverTransportBackupsTest {
private int transportInterruptions;
private int transportResumptions;
- BrokerService broker1;
- BrokerService broker2;
- BrokerService broker3;
+ EmbeddedJMS[] servers = new EmbeddedJMS[3];
@Before
public void setUp() throws Exception {
- broker1 = createBroker("1");
- broker2 = createBroker("2");
- broker3 = createBroker("3");
-
- broker1.start();
- broker2.start();
- broker3.start();
-
- broker1.waitUntilStarted();
- broker2.waitUntilStarted();
- broker3.waitUntilStarted();
+ setUpClusterServers(servers);
// Reset stats
transportInterruptions = 0;
@@ -71,13 +60,7 @@ public class FailoverTransportBackupsTest {
if (transport != null) {
transport.stop();
}
-
- broker1.stop();
- broker1.waitUntilStopped();
- broker2.stop();
- broker2.waitUntilStopped();
- broker3.stop();
- broker3.waitUntilStopped();
+ shutDownClusterServers(servers);
}
@Test
@@ -111,7 +94,7 @@ public class FailoverTransportBackupsTest {
}
}));
- broker1.stop();
+ servers[0].stop();
assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition() {
@Override
@@ -124,7 +107,7 @@ public class FailoverTransportBackupsTest {
assertTrue("Incorrect number of Transport interruptions", transportInterruptions >= 1);
assertTrue("Incorrect number of Transport resumptions", transportResumptions >= 1);
- broker2.stop();
+ servers[1].stop();
assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition() {
@Override
@@ -153,7 +136,7 @@ public class FailoverTransportBackupsTest {
}
}));
- broker1.stop();
+ servers[0].stop();
assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition() {
@Override
@@ -163,7 +146,7 @@ public class FailoverTransportBackupsTest {
}
}));
- broker2.stop();
+ servers[1].stop();
assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition() {
@Override
@@ -174,20 +157,11 @@ public class FailoverTransportBackupsTest {
}));
}
- private BrokerService createBroker(String name) throws Exception {
- BrokerService bs = new BrokerService();
- bs.setBrokerName(name);
- bs.setUseJmx(false);
- bs.setPersistent(false);
- bs.addConnector("tcp://localhost:0");
- return bs;
- }
-
protected Transport createTransport(int backups) throws Exception {
String connectionUri = "failover://(" +
- broker1.getTransportConnectors().get(0).getPublishableConnectString() + "," +
- broker2.getTransportConnectors().get(0).getPublishableConnectString() + "," +
- broker3.getTransportConnectors().get(0).getPublishableConnectString() + ")";
+ newURI(0) + "," +
+ newURI(1) + "," +
+ newURI(2) + ")";
if (backups > 0) {
connectionUri += "?randomize=false&backup=true&backupPoolSize=" + backups;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95a6df23/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java
index 806faca..15d28d3 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java
@@ -18,41 +18,205 @@ package org.apache.activemq.transport.failover;
import java.io.IOException;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.concurrent.TimeUnit;
import javax.jms.DeliveryMode;
+import javax.jms.MessageNotWriteableException;
-import junit.framework.Test;
-
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
import org.apache.activemq.broker.StubConnection;
-import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.network.NetworkTestSupport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class FailoverTransportBrokerTest extends NetworkTestSupport {
+@RunWith(Parameterized.class)
+public class FailoverTransportBrokerTest extends OpenwireArtemisBaseTest {
private static final Logger LOG = LoggerFactory.getLogger(FailoverTransportBrokerTest.class);
+ protected ArrayList<StubConnection> connections = new ArrayList<StubConnection>();
+ protected long idGenerator;
+ protected int msgIdGenerator;
+ protected int maxWait = 10000;
+ public static final boolean FAST_NO_MESSAGE_LEFT_ASSERT = System.getProperty("FAST_NO_MESSAGE_LEFT_ASSERT", "true").equals("true");
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> getParams()
+ {
+ return Arrays.asList(new Object[][] {
+ {Integer.valueOf(DeliveryMode.NON_PERSISTENT), new ActiveMQQueue("TEST")},
+ {Integer.valueOf(DeliveryMode.NON_PERSISTENT), new ActiveMQTopic("TEST")},
+ {Integer.valueOf(DeliveryMode.PERSISTENT), new ActiveMQQueue("TEST")},
+ {Integer.valueOf(DeliveryMode.PERSISTENT), new ActiveMQTopic("TEST")}
+ });
+ }
+
+ private EmbeddedJMS server;
+ private EmbeddedJMS remoteServer;
public ActiveMQDestination destination;
public int deliveryMode;
- public void initCombosForTestPublisherFailsOver() {
- addCombinationValues("deliveryMode", new Object[]{Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
- addCombinationValues("destination", new Object[]{new ActiveMQQueue("TEST"), new ActiveMQTopic("TEST")});
+ public FailoverTransportBrokerTest(int deliveryMode, ActiveMQDestination destination) {
+ this.deliveryMode = deliveryMode;
+ this.destination = destination;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ Configuration config0 = createConfig(0);
+ server = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+ Configuration config1 = createConfig(1);
+ remoteServer = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+ server.start();
+ remoteServer.start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ for (StubConnection conn : connections) {
+ try {
+ conn.stop();
+ }
+ catch (Exception e) {
+ }
+ }
+ try {
+ remoteServer.stop();
+ }
+ catch (Exception e) {
+ }
+ try {
+ server.stop();
+ }
+ catch (Exception e) {
+ }
+ }
+
+ protected StubConnection createConnection() throws Exception {
+ Transport transport = TransportFactory.connect(new URI(newURI(0)));
+ StubConnection connection = new StubConnection(transport);
+ connections.add(connection);
+ return connection;
+ }
+
+ protected StubConnection createRemoteConnection() throws Exception {
+ Transport transport = TransportFactory.connect(new URI(newURI(1)));
+ StubConnection connection = new StubConnection(transport);
+ connections.add(connection);
+ return connection;
+ }
+
+ protected ConnectionInfo createConnectionInfo() throws Exception {
+ ConnectionInfo info = new ConnectionInfo();
+ info.setConnectionId(new ConnectionId("connection:" + (++idGenerator)));
+ info.setClientId(info.getConnectionId().getValue());
+ return info;
+ }
+
+ protected SessionInfo createSessionInfo(ConnectionInfo connectionInfo) throws Exception {
+ SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator);
+ return info;
+ }
+
+ protected ConsumerInfo createConsumerInfo(SessionInfo sessionInfo,
+ ActiveMQDestination destination) throws Exception {
+ ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator);
+ info.setBrowser(false);
+ info.setDestination(destination);
+ info.setPrefetchSize(1000);
+ info.setDispatchAsync(false);
+ return info;
+ }
+
+ protected ProducerInfo createProducerInfo(SessionInfo sessionInfo) throws Exception {
+ ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator);
+ return info;
}
+ protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination, int deliveryMode) {
+ Message message = createMessage(producerInfo, destination);
+ message.setPersistent(deliveryMode == DeliveryMode.PERSISTENT);
+ return message;
+ }
+
+ protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination) {
+ ActiveMQTextMessage message = new ActiveMQTextMessage();
+ message.setMessageId(new MessageId(producerInfo, ++msgIdGenerator));
+ message.setDestination(destination);
+ message.setPersistent(false);
+ try {
+ message.setText("Test Message Payload.");
+ }
+ catch (MessageNotWriteableException e) {
+ }
+ return message;
+ }
+
+ public Message receiveMessage(StubConnection connection) throws InterruptedException {
+ return receiveMessage(connection, maxWait);
+ }
+
+ public Message receiveMessage(StubConnection connection, long timeout) throws InterruptedException {
+ while (true) {
+ Object o = connection.getDispatchQueue().poll(timeout, TimeUnit.MILLISECONDS);
+
+ if (o == null) {
+ return null;
+ }
+ if (o instanceof MessageDispatch) {
+
+ MessageDispatch dispatch = (MessageDispatch) o;
+ if (dispatch.getMessage() == null) {
+ return null;
+ }
+ dispatch.setMessage(dispatch.getMessage().copy());
+ dispatch.getMessage().setRedeliveryCounter(dispatch.getRedeliveryCounter());
+ return dispatch.getMessage();
+ }
+ }
+ }
+
+ protected void assertNoMessagesLeft(StubConnection connection) throws InterruptedException {
+ long wait = FAST_NO_MESSAGE_LEFT_ASSERT ? 0 : maxWait;
+ while (true) {
+ Object o = connection.getDispatchQueue().poll(wait, TimeUnit.MILLISECONDS);
+ if (o == null) {
+ return;
+ }
+ if (o instanceof MessageDispatch && ((MessageDispatch) o).getMessage() != null) {
+ Assert.fail("Received a message: " + ((MessageDispatch) o).getMessage().getMessageId());
+ }
+ }
+ }
+
+ @Test
public void testPublisherFailsOver() throws Exception {
// Start a normal consumer on the local broker
@@ -92,19 +256,22 @@ public class FailoverTransportBrokerTest extends NetworkTestSupport {
// See which broker we were connected to.
StubConnection connectionA;
StubConnection connectionB;
- TransportConnector serverA;
- if (connector.getServer().getConnectURI().equals(ft.getConnectedTransportURI())) {
+
+
+ EmbeddedJMS serverA;
+
+ if (new URI(newURI(0)).equals(ft.getConnectedTransportURI())) {
connectionA = connection1;
connectionB = connection2;
- serverA = connector;
+ serverA = server;
}
else {
connectionA = connection2;
connectionB = connection1;
- serverA = remoteConnector;
+ serverA = remoteServer;
}
- assertNotNull(receiveMessage(connectionA));
+ Assert.assertNotNull(receiveMessage(connectionA));
assertNoMessagesLeft(connectionB);
// Dispose the server so that it fails over to the other server.
@@ -113,7 +280,7 @@ public class FailoverTransportBrokerTest extends NetworkTestSupport {
connection3.request(createMessage(producerInfo3, destination, deliveryMode));
- assertNotNull(receiveMessage(connectionB));
+ Assert.assertNotNull(receiveMessage(connectionB));
assertNoMessagesLeft(connectionA);
}
@@ -150,34 +317,16 @@ public class FailoverTransportBrokerTest extends NetworkTestSupport {
while (count++ < 20 && info[0] == null) {
TimeUnit.SECONDS.sleep(1);
}
- assertNotNull("got a valid brokerInfo after 20 secs", info[0]);
- assertNull("no peer brokers present", info[0].getPeerBrokerInfos());
- }
-
- @Override
- protected String getLocalURI() {
- return "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true";
- }
-
- @Override
- protected String getRemoteURI() {
- return "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true";
+ Assert.assertNotNull("got a valid brokerInfo after 20 secs", info[0]);
+ Assert.assertNull("no peer brokers present", info[0].getPeerBrokerInfos());
}
protected StubConnection createFailoverConnection(TransportListener listener) throws Exception {
- URI failoverURI = new URI("failover://" + connector.getServer().getConnectURI() + "," + remoteConnector.getServer().getConnectURI() + "");
+ URI failoverURI = new URI("failover://" + newURI(0) + "," + newURI(1) + "");
Transport transport = TransportFactory.connect(failoverURI);
StubConnection connection = new StubConnection(transport, listener);
connections.add(connection);
return connection;
}
- public static Test suite() {
- return suite(FailoverTransportBrokerTest.class);
- }
-
- public static void main(String[] args) {
- junit.textui.TestRunner.run(suite());
- }
-
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95a6df23/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportUriHandlingTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportUriHandlingTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportUriHandlingTest.java
index 8155575..d64cc58 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportUriHandlingTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportUriHandlingTest.java
@@ -22,7 +22,6 @@ import java.lang.reflect.Field;
import java.net.URI;
import java.util.Collection;
-import org.apache.activemq.transport.failover.FailoverTransport;
import org.junit.Test;
public class FailoverTransportUriHandlingTest {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95a6df23/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
index e792228..002a788 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java
@@ -30,38 +30,46 @@ import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
import org.apache.activemq.network.NetworkConnector;
import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
-public class FailoverUpdateURIsTest extends TestCase {
+public class FailoverUpdateURIsTest extends OpenwireArtemisBaseTest {
private static final String QUEUE_NAME = "test.failoverupdateuris";
private static final Logger LOG = Logger.getLogger(FailoverUpdateURIsTest.class);
- String firstTcpUri = "tcp://localhost:61616";
- String secondTcpUri = "tcp://localhost:61626";
+ String firstTcpUri = newURI(0);
+ String secondTcpUri = newURI(10);
Connection connection = null;
- BrokerService bs1 = null;
- BrokerService bs2 = null;
+ EmbeddedJMS server0 = null;
+ EmbeddedJMS server1 = null;
- @Override
+ @After
public void tearDown() throws Exception {
if (connection != null) {
connection.close();
}
- if (bs1 != null) {
- bs1.stop();
+ if (server0 != null) {
+ server0.stop();
}
- if (bs2 != null) {
- bs2.stop();
+ if (server1 != null) {
+ server1.stop();
}
}
+ @Test
public void testUpdateURIsViaFile() throws Exception {
- String targetDir = "target/" + getName();
+ String targetDir = "target/testUpdateURIsViaFile";
new File(targetDir).mkdir();
File updateFile = new File(targetDir + "/updateURIsFile.txt");
LOG.info(updateFile);
@@ -72,8 +80,9 @@ public class FailoverUpdateURIsTest extends TestCase {
out.write(firstTcpUri.getBytes());
out.close();
- bs1 = createBroker("bs1", firstTcpUri);
- bs1.start();
+ Configuration config0 = createConfig(0);
+ server0 = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+ server0.start();
// no failover uri's to start with, must be read from file...
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:()?updateURIsURL=file:///" + updateFile.getAbsoluteFile());
@@ -86,14 +95,14 @@ public class FailoverUpdateURIsTest extends TestCase {
Message message = session.createTextMessage("Test message");
producer.send(message);
Message msg = consumer.receive(2000);
- assertNotNull(msg);
+ Assert.assertNotNull(msg);
- bs1.stop();
- bs1.waitUntilStopped();
- bs1 = null;
+ server0.stop();
+ server0 = null;
- bs2 = createBroker("bs2", secondTcpUri);
- bs2.start();
+ Configuration config1 = createConfig(10);
+ server1 = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+ server1.start();
// add the transport uri for broker number 2
out = new FileOutputStream(updateFile, true);
@@ -103,25 +112,16 @@ public class FailoverUpdateURIsTest extends TestCase {
producer.send(message);
msg = consumer.receive(2000);
- assertNotNull(msg);
- }
-
- private BrokerService createBroker(String name, String tcpUri) throws Exception {
- BrokerService bs = new BrokerService();
- bs.setBrokerName(name);
- bs.setUseJmx(false);
- bs.setPersistent(false);
- bs.addConnector(tcpUri);
- return bs;
+ Assert.assertNotNull(msg);
}
+ @Test
public void testAutoUpdateURIs() throws Exception {
-
- bs1 = new BrokerService();
- bs1.setUseJmx(false);
- TransportConnector transportConnector = bs1.addConnector(firstTcpUri);
- transportConnector.setUpdateClusterClients(true);
- bs1.start();
+ Configuration config0 = createConfig(0);
+ deployClusterConfiguration(config0, 10);
+ server0 = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+ server0.start();
+ Assert.assertTrue(server0.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 1));
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + firstTcpUri + ")");
connection = cf.createConnection();
@@ -133,24 +133,23 @@ public class FailoverUpdateURIsTest extends TestCase {
Message message = session.createTextMessage("Test message");
producer.send(message);
Message msg = consumer.receive(4000);
- assertNotNull(msg);
+ Assert.assertNotNull(msg);
- bs2 = createBroker("bs2", secondTcpUri);
- NetworkConnector networkConnector = bs2.addNetworkConnector("static:(" + firstTcpUri + ")");
- networkConnector.setDuplex(true);
- bs2.start();
- LOG.info("started brokerService 2");
- bs2.waitUntilStarted();
+ Configuration config1 = createConfig(10);
+ deployClusterConfiguration(config1, 0);
+ server1 = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+ server1.start();
+ Assert.assertTrue(server0.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+ Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
TimeUnit.SECONDS.sleep(4);
LOG.info("stopping brokerService 1");
- bs1.stop();
- bs1.waitUntilStopped();
- bs1 = null;
+ server0.stop();
+ server0 = null;
producer.send(message);
msg = consumer.receive(4000);
- assertNotNull(msg);
+ Assert.assertNotNull(msg);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95a6df23/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUriTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUriTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUriTest.java
index ae637ef..a028832 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUriTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUriTest.java
@@ -43,4 +43,5 @@ public class FailoverUriTest extends TransportUriTest {
public static Test suite() {
return suite(FailoverUriTest.class);
}
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95a6df23/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
index 34e7333..dad241c 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.transport.failover;
import java.io.IOException;
import java.util.Date;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Message;
@@ -26,9 +27,13 @@ import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
import org.apache.activemq.transport.TransportListener;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
@@ -36,19 +41,20 @@ import org.slf4j.LoggerFactory;
import static org.junit.Assert.*;
-public class InitalReconnectDelayTest {
+public class InitalReconnectDelayTest extends OpenwireArtemisBaseTest {
private static final transient Logger LOG = LoggerFactory.getLogger(InitalReconnectDelayTest.class);
- protected BrokerService broker1;
- protected BrokerService broker2;
+ protected EmbeddedJMS server1;
+ protected EmbeddedJMS server2;
+
+// protected BrokerService broker1;
+// protected BrokerService broker2;
@Test
public void testInitialReconnectDelay() throws Exception {
- String uriString = "failover://(tcp://localhost:" +
- broker1.getTransportConnectors().get(0).getConnectUri().getPort() +
- ",tcp://localhost:" +
- broker2.getTransportConnectors().get(0).getConnectUri().getPort() +
+ String uriString = "failover://(" + newURI(1) +
+ "," + newURI(2) +
")?randomize=false&initialReconnectDelay=15000";
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(uriString);
@@ -67,7 +73,7 @@ public class InitalReconnectDelayTest {
//Halt the broker1...
LOG.info("Stopping the Broker1...");
start = (new Date()).getTime();
- broker1.stop();
+ server1.stop();
LOG.info("Attempting to send... failover should kick in...");
producer.send(session.createTextMessage("TEST"));
@@ -81,10 +87,8 @@ public class InitalReconnectDelayTest {
@Test
public void testNoSuspendedCallbackOnNoReconnect() throws Exception {
- String uriString = "failover://(tcp://localhost:" +
- broker1.getTransportConnectors().get(0).getConnectUri().getPort() +
- ",tcp://localhost:" +
- broker2.getTransportConnectors().get(0).getConnectUri().getPort() +
+ String uriString = "failover://(" + newURI(1) +
+ "," + newURI(2) +
")?randomize=false&maxReconnectAttempts=0";
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(uriString);
@@ -124,7 +128,7 @@ public class InitalReconnectDelayTest {
calls.set(0);
LOG.info("Stopping the Broker1...");
- broker1.stop();
+ server1.stop();
LOG.info("Attempting to send... failover should throw on disconnect");
try {
@@ -140,25 +144,19 @@ public class InitalReconnectDelayTest {
@Before
public void setUp() throws Exception {
- final String dataDir = "target/data/shared";
+ Configuration config1 = createConfig(1);
+ Configuration config2 = createConfig(2);
- broker1 = new BrokerService();
+ deployClusterConfiguration(config1, 2);
+ deployClusterConfiguration(config2, 1);
- broker1.setBrokerName("broker1");
- broker1.setDeleteAllMessagesOnStartup(true);
- broker1.setDataDirectory(dataDir);
- broker1.addConnector("tcp://localhost:0");
- broker1.setUseJmx(false);
- broker1.start();
- broker1.waitUntilStarted();
+ server1 = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+ server2 = new EmbeddedJMS().setConfiguration(config2).setJmsConfiguration(new JMSConfigurationImpl());
- broker2 = new BrokerService();
- broker2.setBrokerName("broker2");
- broker2.setDataDirectory(dataDir);
- broker2.setUseJmx(false);
- broker2.addConnector("tcp://localhost:0");
- broker2.start();
- broker2.waitUntilStarted();
+ server1.start();
+ server2.start();
+ Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+ Assert.assertTrue(server2.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
}
@@ -172,16 +170,8 @@ public class InitalReconnectDelayTest {
@After
public void tearDown() throws Exception {
-
- if (broker1.isStarted()) {
- broker1.stop();
- broker1.waitUntilStopped();
- }
-
- if (broker2.isStarted()) {
- broker2.stop();
- broker2.waitUntilStopped();
- }
+ server1.stop();
+ server2.stop();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95a6df23/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java
index 4ba5516..83d43af 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java
@@ -28,29 +28,33 @@ import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
-
-import junit.framework.TestCase;
+import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.mock.MockTransport;
-import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ReconnectTest extends TestCase {
+public class ReconnectTest extends OpenwireArtemisBaseTest {
public static final int MESSAGES_PER_ITTERATION = 10;
public static final int WORKER_COUNT = 10;
private static final Logger LOG = LoggerFactory.getLogger(ReconnectTest.class);
- private BrokerService bs;
+ private EmbeddedJMS bs;
private URI tcpUri;
private final AtomicInteger resumedCount = new AtomicInteger();
private final AtomicInteger interruptedCount = new AtomicInteger();
@@ -102,7 +106,7 @@ public class ReconnectTest extends TestCase {
}
public void start() {
- new Thread(this).start();
+ new Thread(this, name).start();
}
public void stop() {
@@ -129,13 +133,19 @@ public class ReconnectTest extends TestCase {
MessageConsumer consumer = session.createConsumer(queue);
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
while (!stop.get()) {
+
for (int i = 0; i < MESSAGES_PER_ITTERATION; i++) {
- producer.send(session.createTextMessage("TEST:" + i));
+ TextMessage text = session.createTextMessage(name + " TEST:" + i);
+ text.setStringProperty("myprop", name + " TEST:" + i);
+ producer.send(text);
}
+
for (int i = 0; i < MESSAGES_PER_ITTERATION; i++) {
- consumer.receive();
+ TextMessage m = (TextMessage) consumer.receive();
}
+
iterations.incrementAndGet();
}
session.close();
@@ -159,11 +169,12 @@ public class ReconnectTest extends TestCase {
public synchronized void assertNoErrors() {
if (error != null) {
error.printStackTrace();
- fail("Worker " + name + " got Exception: " + error);
+ Assert.fail("Worker " + name + " got Exception: " + error);
}
}
}
+ @Test
public void testReconnects() throws Exception {
for (int k = 1; k < 10; k++) {
@@ -181,7 +192,7 @@ public class ReconnectTest extends TestCase {
LOG.info("Test run " + k + ": Waiting for worker " + i + " to finish an iteration.");
Thread.sleep(1000);
}
- assertTrue("Test run " + k + ": Worker " + i + " never completed an interation.", c != 0);
+ Assert.assertTrue("Test run " + k + ": Worker " + i + " never completed an interation.", c != 0);
workers[i].assertNoErrors();
}
@@ -192,7 +203,7 @@ public class ReconnectTest extends TestCase {
workers[i].failConnection();
}
- assertTrue("Timed out waiting for all connections to be interrupted.", Wait.waitFor(new Wait.Condition() {
+ Assert.assertTrue("Timed out waiting for all connections to be interrupted.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.debug("Test run waiting for connections to get interrupted.. at: " + interruptedCount.get());
@@ -201,7 +212,7 @@ public class ReconnectTest extends TestCase {
}, TimeUnit.SECONDS.toMillis(60)));
// Wait for the connections to re-establish...
- assertTrue("Timed out waiting for all connections to be resumed.", Wait.waitFor(new Wait.Condition() {
+ Assert.assertTrue("Timed out waiting for all connections to be resumed.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.debug("Test run waiting for connections to get resumed.. at: " + resumedCount.get());
@@ -220,26 +231,25 @@ public class ReconnectTest extends TestCase {
}
}
- @Override
- protected void setUp() throws Exception {
- bs = new BrokerService();
- bs.setPersistent(false);
- bs.setUseJmx(true);
- TransportConnector connector = bs.addConnector("tcp://localhost:0");
+ @Before
+ public void setUp() throws Exception {
+ Configuration config = createConfig(0);
+ bs = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl());
bs.start();
- tcpUri = connector.getConnectUri();
+ tcpUri = new URI(newURI(0));
+
workers = new Worker[WORKER_COUNT];
for (int i = 0; i < WORKER_COUNT; i++) {
- workers[i] = new Worker("" + i);
+ workers[i] = new Worker("worker-" + i);
workers[i].start();
}
}
- @Override
- protected void tearDown() throws Exception {
+ @After
+ public void tearDown() throws Exception {
for (int i = 0; i < WORKER_COUNT; i++) {
workers[i].stop();
}
- new ServiceStopper().stop(bs);
+ bs.stop();
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/95a6df23/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java
index 3a55473..ed6040d 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java
@@ -24,15 +24,16 @@ import java.util.concurrent.CountDownLatch;
import javax.jms.Connection;
import javax.net.ServerSocketFactory;
-import junit.framework.TestCase;
-
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.util.Wait;
+import org.junit.Assert;
+import org.junit.Test;
-public class SlowConnectionTest extends TestCase {
+public class SlowConnectionTest {
private CountDownLatch socketReadyLatch = new CountDownLatch(1);
+ @Test
public void testSlowConnection() throws Exception {
MockBroker broker = new MockBroker();
@@ -57,7 +58,7 @@ public class SlowConnectionTest extends TestCase {
}).start();
int count = 0;
- assertTrue("Transport count: " + count + ", expected <= 1", Wait.waitFor(new Wait.Condition() {
+ Assert.assertTrue("Transport count: " + count + ", expected <= 1", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
int count = 0;