You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/01 17:38:00 UTC
[04/52] [abbrv] activemq-artemis git commit: open wire changes
equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b036fc2c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java
index 2779f52..75c27d7 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java
@@ -22,6 +22,8 @@ import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -37,97 +39,105 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQMessageTransformation;
import org.apache.activemq.ActiveMQSession;
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerPluginSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.util.Wait;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.After;
import org.junit.Test;
// see https://issues.apache.org/activemq/browse/AMQ-2573
-public class FailoverConsumerUnconsumedTest {
-
+@RunWith(BMUnitRunner.class)
+public class FailoverConsumerUnconsumedTest extends OpenwireArtemisBaseTest {
private static final Logger LOG = LoggerFactory.getLogger(FailoverConsumerUnconsumedTest.class);
private static final String QUEUE_NAME = "FailoverWithUnconsumed";
- private static final String TRANSPORT_URI = "tcp://localhost:0";
- private String url;
+ private static final AtomicBoolean doByteman = new AtomicBoolean(false);
+
+ private static int maxConsumers = 2;
+ private static AtomicInteger consumerCount = new AtomicInteger(0);
+ private static CountDownLatch brokerStopLatch = new CountDownLatch(1);
+ private static AtomicBoolean watchTopicAdvisories = new AtomicBoolean(false);
+
+ private String url = newURI(0);
final int prefetch = 10;
- BrokerService broker;
+ private static EmbeddedJMS broker;
@After
- public void stopBroker() throws Exception {
+ public void tearDown() throws Exception {
if (broker != null) {
broker.stop();
+ broker = null;
}
}
- public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
- broker = createBroker(deleteAllMessagesOnStartup);
- broker.start();
- }
-
- public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
- return createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI);
- }
-
- public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
- broker = new BrokerService();
- broker.addConnector(bindAddress);
- broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
-
- this.url = broker.getTransportConnectors().get(0).getConnectUri().toString();
-
- return broker;
+ @Before
+ public void setUp() throws Exception {
+ consumerCount.set(0);
}
@Test
+ @BMRules(
+ rules = {
+ @BMRule(
+ name = "set no return response and stop the broker",
+ targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection",
+ targetMethod = "processAddConsumer",
+ targetLocation = "ENTRY",
+ binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()",
+ action = "org.apache.activemq.transport.failover.FailoverConsumerUnconsumedTest.holdResponseAndStopBroker2(context)")
+ }
+ )
public void testFailoverConsumerDups() throws Exception {
+ watchTopicAdvisories.set(true);
doTestFailoverConsumerDups(true);
}
@Test
+ @BMRules(
+ rules = {
+ @BMRule(
+ name = "set no return response and stop the broker",
+ targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection",
+ targetMethod = "processAddConsumer",
+ targetLocation = "ENTRY",
+ binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()",
+ action = "org.apache.activemq.transport.failover.FailoverConsumerUnconsumedTest.holdResponseAndStopBroker2(context)")
+ }
+ )
public void testFailoverConsumerDupsNoAdvisoryWatch() throws Exception {
+ watchTopicAdvisories.set(false);
doTestFailoverConsumerDups(false);
}
@SuppressWarnings("unchecked")
@Test
+ @BMRules(
+ rules = {
+ @BMRule(
+ name = "set no return response and stop the broker",
+ targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection",
+ targetMethod = "processAddConsumer",
+ targetLocation = "ENTRY",
+ binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()",
+ action = "org.apache.activemq.transport.failover.FailoverConsumerUnconsumedTest.holdResponseAndStopBroker(context)")
+ }
+ )
public void testFailoverClientAckMissingRedelivery() throws Exception {
-
- final int maxConsumers = 2;
- broker = createBroker(true);
-
- broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
- int consumerCount;
-
- // broker is killed on x create consumer
- @Override
- public Subscription addConsumer(ConnectionContext context, final ConsumerInfo info) throws Exception {
- if (++consumerCount == maxConsumers) {
- context.setDontSendReponse(true);
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
- public void run() {
- LOG.info("Stopping broker on consumer: " + info.getConsumerId());
- try {
- broker.stop();
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
- return super.addConsumer(context, info);
- }
- }});
+ maxConsumers = 2;
+ brokerStopLatch = new CountDownLatch(1);
+ broker = createBroker();
broker.start();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
@@ -139,7 +149,9 @@ public class FailoverConsumerUnconsumedTest {
final Session consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
final Queue destination = consumerSession.createQueue(QUEUE_NAME + "?jms.consumer.prefetch=" + prefetch);
- final Vector<TestConsumer> testConsumers = new Vector<>();
+ doByteman.set(true);
+
+ final Vector<TestConsumer> testConsumers = new Vector<TestConsumer>();
TestConsumer testConsumer = new TestConsumer(consumerSession, destination, connection);
testConsumer.setMessageListener(new MessageListener() {
@Override
@@ -157,7 +169,6 @@ public class FailoverConsumerUnconsumedTest {
produceMessage(consumerSession, destination, maxConsumers * prefetch);
assertTrue("add messages are delivered", Wait.waitFor(new Wait.Condition() {
- @Override
public boolean isSatisified() throws Exception {
int totalDelivered = 0;
for (TestConsumer testConsumer : testConsumers) {
@@ -172,7 +183,6 @@ public class FailoverConsumerUnconsumedTest {
final CountDownLatch shutdownConsumerAdded = new CountDownLatch(1);
Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
public void run() {
try {
LOG.info("add last consumer...");
@@ -198,17 +208,16 @@ public class FailoverConsumerUnconsumedTest {
}
});
- // will be stopped by the plugin
- broker.waitUntilStopped();
+ brokerStopLatch.await();
+ doByteman.set(false);
- broker = createBroker(false, this.url);
+ broker = createBroker();
broker.start();
assertTrue("consumer added through failover", shutdownConsumerAdded.await(30, TimeUnit.SECONDS));
// each should again get prefetch messages - all unacked deliveries should be rolledback
assertTrue("after restart all messages are re dispatched", Wait.waitFor(new Wait.Condition() {
- @Override
public boolean isSatisified() throws Exception {
int totalDelivered = 0;
for (TestConsumer testConsumer : testConsumers) {
@@ -220,55 +229,19 @@ public class FailoverConsumerUnconsumedTest {
}
}));
- assertTrue("after restart each got prefetch amount", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- for (TestConsumer testConsumer : testConsumers) {
- long delivered = testConsumer.deliveredSize();
- LOG.info(testConsumer.getConsumerId() + " delivered: " + delivered);
- if (delivered != prefetch) {
- return false;
- }
- }
- return true;
- }
- }));
-
connection.close();
}
@SuppressWarnings("unchecked")
public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception {
- final int maxConsumers = 4;
- broker = createBroker(true);
-
- broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
- int consumerCount;
-
- // broker is killed on x create consumer
- @Override
- public Subscription addConsumer(ConnectionContext context, final ConsumerInfo info) throws Exception {
- if (++consumerCount == maxConsumers + (watchTopicAdvisories ? 1 : 0)) {
- context.setDontSendReponse(true);
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
- public void run() {
- LOG.info("Stopping broker on consumer: " + info.getConsumerId());
- try {
- broker.stop();
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
- return super.addConsumer(context, info);
- }
- }});
+ maxConsumers = 4;
+ broker = createBroker();
broker.start();
+ brokerStopLatch = new CountDownLatch(1);
+ doByteman.set(true);
+
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
cf.setWatchTopicAdvisories(watchTopicAdvisories);
@@ -283,10 +256,11 @@ public class FailoverConsumerUnconsumedTest {
testConsumers.add(new TestConsumer(consumerSession, destination, connection));
}
+ assureQueueMessages(0, new SimpleString("jms.queue." + QUEUE_NAME));
+
produceMessage(consumerSession, destination, maxConsumers * prefetch);
assertTrue("add messages are dispatched", Wait.waitFor(new Wait.Condition() {
- @Override
public boolean isSatisified() throws Exception {
int totalUnconsumed = 0;
for (TestConsumer testConsumer : testConsumers) {
@@ -301,7 +275,6 @@ public class FailoverConsumerUnconsumedTest {
final CountDownLatch shutdownConsumerAdded = new CountDownLatch(1);
Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
public void run() {
try {
LOG.info("add last consumer...");
@@ -315,12 +288,8 @@ public class FailoverConsumerUnconsumedTest {
}
});
- // will be stopped by the plugin
- broker.waitUntilStopped();
-
// verify interrupt
assertTrue("add messages dispatched and unconsumed are cleaned up", Wait.waitFor(new Wait.Condition() {
- @Override
public boolean isSatisified() throws Exception {
int totalUnconsumed = 0;
for (TestConsumer testConsumer : testConsumers) {
@@ -332,14 +301,16 @@ public class FailoverConsumerUnconsumedTest {
}
}));
- broker = createBroker(false, this.url);
+ brokerStopLatch.await();
+ doByteman.set(false);
+
+ broker = createBroker();
broker.start();
assertTrue("consumer added through failover", shutdownConsumerAdded.await(30, TimeUnit.SECONDS));
// each should again get prefetch messages - all unconsumed deliveries should be rolledback
assertTrue("after start all messages are re dispatched", Wait.waitFor(new Wait.Condition() {
- @Override
public boolean isSatisified() throws Exception {
int totalUnconsumed = 0;
for (TestConsumer testConsumer : testConsumers) {
@@ -354,6 +325,11 @@ public class FailoverConsumerUnconsumedTest {
connection.close();
}
+ private void assureQueueMessages(int num, SimpleString queueName) {
+ QueueImpl queue = (QueueImpl) broker.getActiveMQServer().getPostOffice().getBinding(queueName).getBindable();
+ Assert.assertEquals(num, queue.getMessageCount());
+ }
+
private void produceMessage(final Session producerSession, Queue destination, long count) throws JMSException {
MessageProducer producer = producerSession.createProducer(destination);
for (int i = 0; i < count; i++) {
@@ -385,4 +361,44 @@ public class FailoverConsumerUnconsumedTest {
idGen -= 5;
return idGen;
}
+
+ public static void holdResponseAndStopBroker(AMQConnectionContext context) {
+ if (doByteman.get()) {
+ if (consumerCount.incrementAndGet() == maxConsumers) {
+ context.setDontSendReponse(true);
+ Executors.newSingleThreadExecutor().execute(new Runnable() {
+ public void run() {
+ try {
+ broker.stop();
+ brokerStopLatch.countDown();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+ }
+ }
+
+ public static void holdResponseAndStopBroker2(AMQConnectionContext context) {
+ if (doByteman.get()) {
+ if (consumerCount.incrementAndGet() == maxConsumers + (watchTopicAdvisories.get() ? 1 : 0)) {
+ context.setDontSendReponse(true);
+ Executors.newSingleThreadExecutor().execute(new Runnable() {
+ public void run() {
+ try {
+ broker.stop();
+ Assert.assertEquals(1, brokerStopLatch.getCount());
+ brokerStopLatch.countDown();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b036fc2c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java
index cb15940..e801b3c 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java
@@ -33,25 +33,35 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.TestSupport;
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerPluginSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.ProducerBrokerExchange;
-import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
import org.apache.activemq.util.Wait;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class FailoverDuplicateTest extends TestSupport {
+@RunWith(BMUnitRunner.class)
+public class FailoverDuplicateTest extends OpenwireArtemisBaseTest {
private static final Logger LOG = LoggerFactory.getLogger(FailoverDuplicateTest.class);
private static final String QUEUE_NAME = "TestQueue";
- private static final String TRANSPORT_URI = "tcp://localhost:0";
- private String url;
- BrokerService broker;
- @Override
+ private static final AtomicBoolean doByteman = new AtomicBoolean(false);
+ private static final AtomicBoolean first = new AtomicBoolean(false);
+ private static final CountDownLatch gotMessageLatch = new CountDownLatch(1);
+ private static final CountDownLatch producersDone = new CountDownLatch(1);
+
+ private String url = newURI(0);
+ EmbeddedJMS broker;
+
+ @After
public void tearDown() throws Exception {
stopBroker();
}
@@ -63,72 +73,38 @@ public class FailoverDuplicateTest extends TestSupport {
}
public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
- broker = createBroker(deleteAllMessagesOnStartup);
+ broker = createBroker();
broker.start();
}
- public void startBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
- broker = createBroker(deleteAllMessagesOnStartup, bindAddress);
+ public void startBroker() throws Exception {
+ broker = createBroker();
broker.start();
}
- public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
- return createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI);
- }
-
- public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
- broker = new BrokerService();
- broker.setUseJmx(false);
- broker.setAdvisorySupport(false);
- broker.addConnector(bindAddress);
- broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
-
- url = broker.getTransportConnectors().get(0).getConnectUri().toString();
-
- return broker;
- }
-
public void configureConnectionFactory(ActiveMQConnectionFactory factory) {
factory.setAuditMaximumProducerNumber(2048);
factory.setOptimizeAcknowledge(true);
}
@SuppressWarnings("unchecked")
+ @Test
+ @BMRules(
+ rules = {
+ @BMRule(
+ name = "set no return response and stop the broker",
+ targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection",
+ targetMethod = "processMessage",
+ targetLocation = "EXIT",
+ binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()",
+ action = "org.apache.activemq.transport.failover.FailoverDuplicateTest.holdResponseAndStopConn(context)")
+ }
+ )
public void testFailoverSendReplyLost() throws Exception {
- broker = createBroker(true);
- setDefaultPersistenceAdapter(broker);
-
- final CountDownLatch gotMessageLatch = new CountDownLatch(1);
- final CountDownLatch producersDone = new CountDownLatch(1);
- final AtomicBoolean first = new AtomicBoolean(false);
- broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
- @Override
- public void send(final ProducerBrokerExchange producerExchange,
- org.apache.activemq.command.Message messageSend) throws Exception {
- // so send will hang as if reply is lost
- super.send(producerExchange, messageSend);
- if (first.compareAndSet(false, true)) {
- producerExchange.getConnectionContext().setDontSendReponse(true);
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
- public void run() {
- try {
- LOG.info("Waiting for recepit");
- assertTrue("message received on time", gotMessageLatch.await(60, TimeUnit.SECONDS));
- assertTrue("new producers done on time", producersDone.await(120, TimeUnit.SECONDS));
- LOG.info("Stopping connection post send and receive and multiple producers");
- producerExchange.getConnectionContext().getConnection().stop();
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- }
- }
- }});
+ broker = createBroker();
broker.start();
+ doByteman.set(true);
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.watchTopicAdvisories=false");
configureConnectionFactory(cf);
@@ -164,14 +140,14 @@ public class FailoverDuplicateTest extends TestSupport {
}
catch (JMSException e) {
LOG.error("got send exception: ", e);
- fail("got unexpected send exception" + e);
+ Assert.fail("got unexpected send exception" + e);
}
sendDoneLatch.countDown();
LOG.info("done async send");
}
});
- assertTrue("one message got through on time", gotMessageLatch.await(20, TimeUnit.SECONDS));
+ Assert.assertTrue("one message got through on time", gotMessageLatch.await(20, TimeUnit.SECONDS));
// send more messages, blow producer audit
final int numProducers = 1050;
final int numPerProducer = 2;
@@ -186,7 +162,7 @@ public class FailoverDuplicateTest extends TestSupport {
}
}
- assertTrue("message sent complete through failover", sendDoneLatch.await(30, TimeUnit.SECONDS));
+ Assert.assertTrue("message sent complete through failover", sendDoneLatch.await(30, TimeUnit.SECONDS));
Wait.waitFor(new Wait.Condition() {
@Override
@@ -195,29 +171,16 @@ public class FailoverDuplicateTest extends TestSupport {
return totalSent <= receivedCount.get();
}
});
- assertEquals("we got all produced messages", totalSent, receivedCount.get());
+ Assert.assertEquals("we got all produced messages", totalSent, receivedCount.get());
sendConnection.close();
receiveConnection.close();
- // verify stats
- assertEquals("expect all messages are dequeued with one duplicate to dlq", totalSent + 2, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
-
- Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- LOG.info("dequeues : " + ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount());
- return totalSent + 1 <= ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount();
- }
- });
- assertEquals("dequeue correct, including duplicate dispatch poisoned", totalSent + 1, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount());
-
// ensure no dangling messages with fresh broker etc
broker.stop();
- broker.waitUntilStopped();
+ doByteman.set(false);
LOG.info("Checking for remaining/hung messages with second restart..");
- broker = createBroker(false, url);
- setDefaultPersistenceAdapter(broker);
+ broker = createBroker();
broker.start();
// after restart, ensure no dangling messages
@@ -231,7 +194,7 @@ public class FailoverDuplicateTest extends TestSupport {
if (msg == null) {
msg = consumer.receive(5000);
}
- assertNull("no messges left dangling but got: " + msg, msg);
+ Assert.assertNull("no messges left dangling but got: " + msg, msg);
sendConnection.close();
}
@@ -247,4 +210,28 @@ public class FailoverDuplicateTest extends TestSupport {
}
producer.close();
}
+
+ public static void holdResponseAndStopConn(final AMQConnectionContext context) {
+ if (doByteman.get()) {
+ if (first.compareAndSet(false, true)) {
+ context.setDontSendReponse(true);
+ Executors.newSingleThreadExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ LOG.info("Waiting for recepit");
+ Assert.assertTrue("message received on time", gotMessageLatch.await(60, TimeUnit.SECONDS));
+ Assert.assertTrue("new producers done on time", producersDone.await(120, TimeUnit.SECONDS));
+ LOG.info("Stopping connection post send and receive and multiple producers");
+ context.getConnection().fail(null, "test Failoverduplicatetest");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b036fc2c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java
index 57899ba..fcb60e5 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPrefetchZeroTest.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,10 +16,6 @@
*/
package org.apache.activemq.transport.failover;
-import java.util.Vector;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -27,31 +23,42 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
+import java.util.Vector;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerPluginSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.command.MessagePull;
-import org.apache.activemq.command.Response;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import org.junit.After;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertTrue;
// see: https://issues.apache.org/activemq/browse/AMQ-2877
-public class FailoverPrefetchZeroTest {
+@RunWith(BMUnitRunner.class)
+public class FailoverPrefetchZeroTest extends OpenwireArtemisBaseTest {
private static final Logger LOG = LoggerFactory.getLogger(FailoverPrefetchZeroTest.class);
private static final String QUEUE_NAME = "FailoverPrefetchZero";
- private static final String TRANSPORT_URI = "tcp://localhost:0";
- private String url;
+
+ private static final AtomicBoolean doByteman = new AtomicBoolean(false);
+ private static final CountDownLatch pullDone = new CountDownLatch(1);
+ private static CountDownLatch brokerStopLatch = new CountDownLatch(1);
+
+ private String url = newURI(0);
final int prefetch = 0;
- BrokerService broker;
+ private static EmbeddedJMS broker;
@After
public void stopBroker() throws Exception {
@@ -60,52 +67,25 @@ public class FailoverPrefetchZeroTest {
}
}
- public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
- broker = createBroker(deleteAllMessagesOnStartup);
+ public void startBroker() throws Exception {
+ broker = createBroker();
broker.start();
}
- public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
- return createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI);
- }
-
- public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
- broker = new BrokerService();
- broker.addConnector(bindAddress);
- broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
-
- url = broker.getTransportConnectors().get(0).getConnectUri().toString();
-
- return broker;
- }
-
@SuppressWarnings("unchecked")
@Test
+ @BMRules(
+ rules = {@BMRule(
+ name = "set no return response and stop the broker",
+ targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection",
+ targetMethod = "processMessagePull",
+ targetLocation = "ENTRY",
+ binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()",
+ action = "org.apache.activemq.transport.failover.FailoverPrefetchZeroTest.holdResponseAndStopBroker(context)")})
public void testPrefetchZeroConsumerThroughRestart() throws Exception {
- broker = createBroker(true);
-
- final CountDownLatch pullDone = new CountDownLatch(1);
- broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() {
- @Override
- public Response messagePull(ConnectionContext context, final MessagePull pull) throws Exception {
- context.setDontSendReponse(true);
- pullDone.countDown();
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
- public void run() {
- LOG.info("Stopping broker on pull: " + pull);
- try {
- broker.stop();
- }
- catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
- return null;
- }
- }});
+ broker = createBroker();
broker.start();
+ doByteman.set(true);
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
cf.setWatchTopicAdvisories(false);
@@ -122,7 +102,6 @@ public class FailoverPrefetchZeroTest {
final CountDownLatch receiveDone = new CountDownLatch(1);
final Vector<Message> received = new Vector<>();
Executors.newSingleThreadExecutor().execute(new Runnable() {
- @Override
public void run() {
try {
LOG.info("receive one...");
@@ -141,8 +120,9 @@ public class FailoverPrefetchZeroTest {
// will be stopped by the plugin
assertTrue("pull completed on broker", pullDone.await(30, TimeUnit.SECONDS));
- broker.waitUntilStopped();
- broker = createBroker(false, url);
+ brokerStopLatch.await();
+ doByteman.set(false);
+ broker = createBroker();
broker.start();
assertTrue("receive completed through failover", receiveDone.await(30, TimeUnit.SECONDS));
@@ -160,4 +140,25 @@ public class FailoverPrefetchZeroTest {
}
producer.close();
}
+
+ public static void holdResponseAndStopBroker(final AMQConnectionContext context) {
+ if (doByteman.get()) {
+ context.setDontSendReponse(true);
+ pullDone.countDown();
+ Executors.newSingleThreadExecutor().execute(new Runnable() {
+ public void run() {
+ try {
+ broker.stop();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ finally {
+ brokerStopLatch.countDown();
+ }
+ }
+ });
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b036fc2c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
index b8860a7..6e559e7 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
@@ -16,58 +16,106 @@
*/
package org.apache.activemq.transport.failover;
+import java.util.ArrayList;
import java.util.HashMap;
-
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class FailoverPriorityTest extends FailoverClusterTestSupport {
+import javax.jms.Connection;
+import javax.jms.JMSException;
+
+public class FailoverPriorityTest extends OpenwireArtemisBaseTest {
protected final Logger LOG = LoggerFactory.getLogger(getClass());
private static final String BROKER_A_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61616";
private static final String BROKER_B_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61617";
private static final String BROKER_C_CLIENT_TC_ADDRESS = "tcp://127.0.0.1:61618";
- private final HashMap<String, String> urls = new HashMap<>();
+ private final HashMap<Integer, String> urls = new HashMap<>();
+
+ private final List<ActiveMQConnection> connections = new ArrayList<ActiveMQConnection>();
+ private EmbeddedJMS[] servers = new EmbeddedJMS[3];
+ private String clientUrl;
- @Override
+ @Before
public void setUp() throws Exception {
- super.setUp();
- urls.put(BROKER_A_NAME, BROKER_A_CLIENT_TC_ADDRESS);
- urls.put(BROKER_B_NAME, BROKER_B_CLIENT_TC_ADDRESS);
+ urls.put(0, BROKER_A_CLIENT_TC_ADDRESS);
+ urls.put(1, BROKER_B_CLIENT_TC_ADDRESS);
}
- private static final String BROKER_A_NAME = "BROKERA";
- private static final String BROKER_B_NAME = "BROKERB";
- private static final String BROKER_C_NAME = "BROKERC";
+ @After
+ public void tearDown() throws Exception {
+ shutdownClients();
+ for (EmbeddedJMS server : servers) {
+ if (server != null) {
+ server.stop();
+ }
+ }
+ }
+ @Test
public void testPriorityBackup() throws Exception {
- createBrokerA();
- createBrokerB();
- getBroker(BROKER_B_NAME).waitUntilStarted();
+ Configuration config0 = createConfig("127.0.0.1", 0);
+ Configuration config1 = createConfig("127.0.0.1", 1);
+
+ deployClusterConfiguration(config0, 1);
+ deployClusterConfiguration(config1, 0);
+
+ servers[0] = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+ servers[1] = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+ servers[0].start();
+ servers[1].start();
+
+ Assert.assertTrue(servers[0].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+ Assert.assertTrue(servers[1].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+
Thread.sleep(1000);
setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?randomize=false&priorityBackup=true&initialReconnectDelay=1000&useExponentialBackOff=false");
createClients(5);
- assertAllConnectedTo(urls.get(BROKER_A_NAME));
+ assertAllConnectedTo(urls.get(0));
- restart(false, BROKER_A_NAME, BROKER_B_NAME);
+ restart(false, 0, 1);
for (int i = 0; i < 3; i++) {
- restart(true, BROKER_A_NAME, BROKER_B_NAME);
+ restart(true, 0, 1);
}
Thread.sleep(5000);
- restart(false, BROKER_A_NAME, BROKER_B_NAME);
+ restart(false, 0, 1);
}
+ @Test
public void testPriorityBackupList() throws Exception {
- createBrokerA();
- createBrokerB();
- getBroker(BROKER_B_NAME).waitUntilStarted();
+ Configuration config0 = createConfig("127.0.0.1", 0);
+ Configuration config1 = createConfig("127.0.0.1", 1);
+
+ deployClusterConfiguration(config0, 1);
+ deployClusterConfiguration(config1, 0);
+
+ servers[0] = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+ servers[1] = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+ servers[0].start();
+ servers[1].start();
+
+ Assert.assertTrue(servers[0].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+ Assert.assertTrue(servers[1].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
Thread.sleep(1000);
setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?randomize=false&priorityBackup=true&priorityURIs=tcp://127.0.0.1:61617&initialReconnectDelay=1000&useExponentialBackOff=false");
@@ -75,154 +123,166 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport {
Thread.sleep(3000);
- assertAllConnectedTo(urls.get(BROKER_B_NAME));
+ assertAllConnectedTo(urls.get(1));
- restart(false, BROKER_B_NAME, BROKER_A_NAME);
+ restart(false, 1, 0);
for (int i = 0; i < 3; i++) {
- restart(true, BROKER_B_NAME, BROKER_A_NAME);
+ restart(true, 1, 0);
}
- restart(false, BROKER_B_NAME, BROKER_A_NAME);
-
+ restart(false, 1, 0);
}
+ @Test
public void testThreeBrokers() throws Exception {
- // Broker A
- addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
- addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS, false);
- addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
- addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_C_Bridge", "static://(" + BROKER_C_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
- getBroker(BROKER_A_NAME).start();
-
- // Broker B
- addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
- addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS, false);
- addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
- addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_C_Bridge", "static://(" + BROKER_C_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
- getBroker(BROKER_B_NAME).start();
-
- // Broker C
- addBroker(BROKER_C_NAME, createBroker(BROKER_C_NAME));
- addTransportConnector(getBroker(BROKER_C_NAME), "openwire", BROKER_C_CLIENT_TC_ADDRESS, false);
- addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
- addNetworkBridge(getBroker(BROKER_C_NAME), "C_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
- getBroker(BROKER_C_NAME).start();
-
- getBroker(BROKER_C_NAME).waitUntilStarted();
+ commonSetup();
Thread.sleep(1000);
setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + "," + BROKER_C_CLIENT_TC_ADDRESS + ")?randomize=false&priorityBackup=true&initialReconnectDelay=1000&useExponentialBackOff=false");
createClients(5);
- assertAllConnectedTo(urls.get(BROKER_A_NAME));
-
- restart(true, BROKER_A_NAME, BROKER_B_NAME);
+ assertAllConnectedTo(urls.get(0));
+ restart(true, 0, 1, 3);
}
+ @Test
public void testPriorityBackupAndUpdateClients() throws Exception {
- // Broker A
- addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
- addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS, true);
- addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
- getBroker(BROKER_A_NAME).start();
-
- // Broker B
- addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
- addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS, true);
- addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
- getBroker(BROKER_B_NAME).start();
-
- getBroker(BROKER_B_NAME).waitUntilStarted();
+ Configuration config0 = createConfig("127.0.0.1", 0);
+ Configuration config1 = createConfig("127.0.0.1", 1);
+
+ deployClusterConfiguration(config0, 1);
+ deployClusterConfiguration(config1, 0);
+
+ servers[0] = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+ servers[1] = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+ servers[0].start();
+ servers[1].start();
+
+ Assert.assertTrue(servers[0].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+ Assert.assertTrue(servers[1].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+
Thread.sleep(1000);
setClientUrl("failover:(" + BROKER_A_CLIENT_TC_ADDRESS + "," + BROKER_B_CLIENT_TC_ADDRESS + ")?randomize=false&priorityBackup=true&initialReconnectDelay=1000&useExponentialBackOff=false");
- LOG.info("Client URI will be: " + getClientUrl());
-
createClients(5);
// Let's wait a little bit longer just in case it takes a while to realize that the
// Broker A is the one with higher priority.
Thread.sleep(5000);
- assertAllConnectedTo(urls.get(BROKER_A_NAME));
+ assertAllConnectedTo(urls.get(0));
}
- private void restart(boolean primary, String primaryName, String secondaryName) throws Exception {
+ private void restart(boolean primary, int primaryID, int secondaryID) throws Exception {
+ restart(primary, primaryID, secondaryID, 2);
+ }
+
+ private void restart(boolean primary, int primaryID, int secondaryID, int total) throws Exception {
Thread.sleep(1000);
if (primary) {
- LOG.info("Stopping " + primaryName);
- stopBroker(primaryName);
+ LOG.info("Stopping " + primaryID);
+ stopBroker(primaryID);
+ Assert.assertTrue(servers[secondaryID].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, total - 1));
}
else {
- LOG.info("Stopping " + secondaryName);
- stopBroker(secondaryName);
+ LOG.info("Stopping " + secondaryID);
+ stopBroker(secondaryID);
+ Assert.assertTrue(servers[primaryID].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, total - 1));
}
Thread.sleep(5000);
if (primary) {
- assertAllConnectedTo(urls.get(secondaryName));
+ assertAllConnectedTo(urls.get(secondaryID));
}
else {
- assertAllConnectedTo(urls.get(primaryName));
+ assertAllConnectedTo(urls.get(primaryID));
}
if (primary) {
- LOG.info("Starting " + primaryName);
- createBrokerByName(primaryName);
- getBroker(primaryName).waitUntilStarted();
+ Configuration config = createConfig("127.0.0.1", primaryID);
+
+ deployClusterConfiguration(config, secondaryID);
+
+ servers[primaryID] = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl());
+ servers[primaryID].start();
+
+ Assert.assertTrue(servers[primaryID].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, total));
+ Assert.assertTrue(servers[secondaryID].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, total));
}
else {
- LOG.info("Starting " + secondaryName);
- createBrokerByName(secondaryName);
- getBroker(secondaryName).waitUntilStarted();
+ Configuration config = createConfig("127.0.0.1", secondaryID);
+
+ deployClusterConfiguration(config, primaryID);
+
+ servers[secondaryID] = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl());
+ servers[secondaryID].start();
+
+ Assert.assertTrue(servers[primaryID].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, total));
+ Assert.assertTrue(servers[secondaryID].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, total));
}
Thread.sleep(5000);
- assertAllConnectedTo(urls.get(primaryName));
+ assertAllConnectedTo(urls.get(primaryID));
}
- private void createBrokerByName(String name) throws Exception {
- if (name.equals(BROKER_A_NAME)) {
- createBrokerA();
- }
- else if (name.equals(BROKER_B_NAME)) {
- createBrokerB();
- }
- else {
- throw new Exception("Unknown broker " + name);
+ private void stopBroker(int serverID) throws Exception {
+ servers[serverID].stop();
+ }
+
+ public void setClientUrl(String clientUrl) {
+ this.clientUrl = clientUrl;
+ }
+
+ protected void createClients(int numOfClients) throws Exception {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUrl);
+ for (int i = 0; i < numOfClients; i++) {
+ ActiveMQConnection c = (ActiveMQConnection) factory.createConnection();
+ c.start();
+ connections.add(c);
}
}
- private void createBrokerA() throws Exception {
- if (getBroker(BROKER_A_NAME) == null) {
- addBroker(BROKER_A_NAME, createBroker(BROKER_A_NAME));
- addTransportConnector(getBroker(BROKER_A_NAME), "openwire", BROKER_A_CLIENT_TC_ADDRESS, false);
- addNetworkBridge(getBroker(BROKER_A_NAME), "A_2_B_Bridge", "static://(" + BROKER_B_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
- getBroker(BROKER_A_NAME).start();
+ protected void shutdownClients() throws JMSException {
+ for (Connection c : connections) {
+ c.close();
}
}
- private void createBrokerB() throws Exception {
- if (getBroker(BROKER_B_NAME) == null) {
- addBroker(BROKER_B_NAME, createBroker(BROKER_B_NAME));
- addTransportConnector(getBroker(BROKER_B_NAME), "openwire", BROKER_B_CLIENT_TC_ADDRESS, false);
- addNetworkBridge(getBroker(BROKER_B_NAME), "B_2_A_Bridge", "static://(" + BROKER_A_CLIENT_TC_ADDRESS + ")?useExponentialBackOff=false", false, null);
- getBroker(BROKER_B_NAME).start();
+ protected void assertAllConnectedTo(String url) throws Exception {
+ for (ActiveMQConnection c : connections) {
+ Assert.assertEquals(url, c.getTransportChannel().getRemoteAddress());
}
}
- @Override
- protected void tearDown() throws Exception {
- shutdownClients();
- destroyBrokerCluster();
+ //default setup for most tests
+ private void commonSetup() throws Exception {
+ Configuration config0 = createConfig("127.0.0.1", 0);
+ Configuration config1 = createConfig("127.0.0.1", 1);
+ Configuration config2 = createConfig("127.0.0.1", 2);
+
+ deployClusterConfiguration(config0, 1, 2);
+ deployClusterConfiguration(config1, 0, 2);
+ deployClusterConfiguration(config2, 0, 1);
+
+ servers[0] = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+ servers[1] = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+ servers[2] = new EmbeddedJMS().setConfiguration(config2).setJmsConfiguration(new JMSConfigurationImpl());
+
+ servers[0].start();
+ servers[1].start();
+ servers[2].start();
+
+ Assert.assertTrue(servers[0].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 3));
+ Assert.assertTrue(servers[1].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 3));
+ Assert.assertTrue(servers[2].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 3));
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b036fc2c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRandomTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRandomTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRandomTest.java
index 54dd3e3..80f83db 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRandomTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRandomTest.java
@@ -17,47 +17,59 @@
package org.apache.activemq.transport.failover;
-import junit.framework.TestCase;
-
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
-public class FailoverRandomTest extends TestCase {
+public class FailoverRandomTest extends OpenwireArtemisBaseTest {
- BrokerService brokerA, brokerB;
+ private EmbeddedJMS server0, server1;
- @Override
+ @Before
public void setUp() throws Exception {
- brokerA = createBroker("A");
- brokerB = createBroker("B");
- }
+ Configuration config0 = createConfig(0);
+ Configuration config1 = createConfig(1);
- @Override
- public void tearDown() throws Exception {
- brokerA.stop();
- brokerB.stop();
+ deployClusterConfiguration(config0, 1);
+ deployClusterConfiguration(config1, 0);
+
+ server0 = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+ server1 = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl());
+
+ server0.start();
+ server1.start();
+
+ server0.getActiveMQServer().setIdentity("BrokerA");
+ server1.getActiveMQServer().setIdentity("BrokerB");
+
+ Assert.assertTrue(server0.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
+ Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2));
}
- private BrokerService createBroker(String name) throws Exception {
- BrokerService broker = new BrokerService();
- broker.setBrokerName("Broker" + name);
- broker.addConnector("tcp://localhost:0");
- broker.getManagementContext().setCreateConnector(false);
- broker.setPersistent(false);
- broker.setUseJmx(false);
- broker.start();
- return broker;
+ @After
+ public void tearDown() throws Exception {
+ server0.stop();
+ server1.stop();
}
+ @Test
public void testRandomConnections() throws Exception {
- String failoverUrl = "failover:(" + brokerA.getTransportConnectors().get(0).getConnectUri() + "," + brokerB.getTransportConnectors().get(0).getConnectUri() + ")";
+ String failoverUrl = "failover:(" + newURI(0) + "," + newURI(1) + ")";
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(failoverUrl);
ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
connection.start();
String brokerName1 = connection.getBrokerName();
- assertNotNull(brokerName1);
+ Assert.assertNotNull(brokerName1);
connection.close();
String brokerName2 = brokerName1;
@@ -66,9 +78,9 @@ public class FailoverRandomTest extends TestCase {
connection = (ActiveMQConnection) cf.createConnection();
connection.start();
brokerName2 = connection.getBrokerName();
- assertNotNull(brokerName2);
+ Assert.assertNotNull(brokerName2);
connection.close();
}
- assertTrue(brokerName1 + "!=" + brokerName2, !brokerName1.equals(brokerName2));
+ Assert.assertTrue(brokerName1 + "!=" + brokerName2, !brokerName1.equals(brokerName2));
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b036fc2c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java
index 6b7a2bb..3be2593 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverRedeliveryTransactionTest.java
@@ -16,19 +16,15 @@
*/
package org.apache.activemq.transport.failover;
-import junit.framework.Test;
-
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.junit.Test;
public class FailoverRedeliveryTransactionTest extends FailoverTransactionTest {
- public static Test suite() {
- return suite(FailoverRedeliveryTransactionTest.class);
- }
-
@Override
public void configureConnectionFactory(ActiveMQConnectionFactory factory) {
super.configureConnectionFactory(factory);
@@ -36,26 +32,24 @@ public class FailoverRedeliveryTransactionTest extends FailoverTransactionTest {
}
@Override
- public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
- BrokerService brokerService = super.createBroker(deleteAllMessagesOnStartup, bindAddress);
+ public EmbeddedJMS createBroker() throws Exception {
+ EmbeddedJMS brokerService = super.createBroker();
PolicyMap policyMap = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();
defaultEntry.setPersistJMSRedelivered(true);
policyMap.setDefaultEntry(defaultEntry);
- brokerService.setDestinationPolicy(policyMap);
+ //revisit: do we support sth like persistJMSRedelivered?
+ //brokerService.setDestinationPolicy(policyMap);
return brokerService;
}
// no point rerunning these
@Override
+ @Test
public void testFailoverProducerCloseBeforeTransaction() throws Exception {
}
@Override
- public void initCombosForTestFailoverCommitReplyLost() {
- }
-
- @Override
public void testFailoverCommitReplyLost() throws Exception {
}
@@ -64,18 +58,10 @@ public class FailoverRedeliveryTransactionTest extends FailoverTransactionTest {
}
@Override
- public void initCombosForTestFailoverSendReplyLost() {
- }
-
- @Override
public void testFailoverSendReplyLost() throws Exception {
}
@Override
- public void initCombosForTestFailoverConnectionSendReplyLost() {
- }
-
- @Override
public void testFailoverConnectionSendReplyLost() throws Exception {
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b036fc2c/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
index 07a8436..c5ee02f 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
@@ -30,40 +30,43 @@ import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class FailoverTimeoutTest {
+public class FailoverTimeoutTest extends OpenwireArtemisBaseTest {
private static final Logger LOG = LoggerFactory.getLogger(FailoverTimeoutTest.class);
private static final String QUEUE_NAME = "test.failovertimeout";
- BrokerService bs;
+ EmbeddedJMS server;
URI tcpUri;
@Before
public void setUp() throws Exception {
- bs = new BrokerService();
- bs.setUseJmx(false);
- bs.addConnector("tcp://localhost:0");
- bs.start();
- tcpUri = bs.getTransportConnectors().get(0).getConnectUri();
+ Configuration config = createConfig(0);
+ server = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl());
+ server.start();
+ tcpUri = new URI(newURI(0));
}
@After
public void tearDown() throws Exception {
- if (bs != null) {
- bs.stop();
+ if (server != null) {
+ server.stop();
}
}
@Test
public void testTimoutDoesNotFailConnectionAttempts() throws Exception {
- bs.stop();
+ server.stop();
long timeout = 1000;
long startTime = System.currentTimeMillis();
@@ -99,7 +102,7 @@ public class FailoverTimeoutTest {
TextMessage message = session.createTextMessage("Test message");
producer.send(message);
- bs.stop();
+ server.stop();
try {
producer.send(message);
@@ -108,15 +111,14 @@ public class FailoverTimeoutTest {
assertEquals("Failover timeout of " + timeout + " ms reached.", jmse.getMessage());
}
- bs = new BrokerService();
- bs.setUseJmx(false);
- bs.addConnector(tcpUri);
- bs.start();
- bs.waitUntilStarted();
+ Configuration config = createConfig(0);
+ server = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl());
+ server.start();
producer.send(message);
- bs.stop();
+ server.stop();
+ server = null;
}
@Test