You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/21 23:56:19 UTC
[63/68] [abbrv] activemq-artemis git commit: Fix thread leak in tests
Fix thread leak in tests
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e06ff394
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e06ff394
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e06ff394
Branch: refs/heads/refactor-openwire
Commit: e06ff394dfeab2e0946a2d762a0549f6d8ea318b
Parents: 8791d38
Author: Howard Gao <ho...@gmail.com>
Authored: Wed Mar 16 20:41:40 2016 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Mar 21 18:54:50 2016 -0400
----------------------------------------------------------------------
.../artemiswrapper/OpenwireArtemisBaseTest.java | 2 -
.../transport/failover/FailoverTimeoutTest.java | 57 +--
.../failover/FailoverTransactionTest.java | 373 ++++++++++---------
.../failover/TwoBrokerFailoverClusterTest.java | 12 +-
4 files changed, 235 insertions(+), 209 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e06ff394/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java
index 2f3a330..b523433 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java
@@ -34,7 +34,6 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule;
-import org.apache.activemq.artemis.utils.uri.URISchema;
import org.apache.activemq.artemis.utils.uri.URISupport;
import org.apache.activemq.broker.BrokerService;
import org.junit.Assert;
@@ -47,7 +46,6 @@ import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
public class OpenwireArtemisBaseTest {
-
@Rule
public CleanupThreadRule cleanupRules = new CleanupThreadRule();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e06ff394/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
index c5ee02f..72b8c43 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
@@ -33,7 +33,6 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
-import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
import org.junit.After;
import org.junit.Before;
@@ -97,28 +96,35 @@ public class FailoverTimeoutTest extends OpenwireArtemisBaseTest {
long timeout = 1000;
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + tcpUri + ")?timeout=" + timeout + "&useExponentialBackOff=false");
Connection connection = cf.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
- TextMessage message = session.createTextMessage("Test message");
- producer.send(message);
-
- server.stop();
-
try {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
+ TextMessage message = session.createTextMessage("Test message");
producer.send(message);
- }
- catch (JMSException jmse) {
- assertEquals("Failover timeout of " + timeout + " ms reached.", jmse.getMessage());
- }
- Configuration config = createConfig(0);
- server = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl());
- server.start();
+ server.stop();
- producer.send(message);
+ try {
+ producer.send(message);
+ }
+ catch (JMSException jmse) {
+ assertEquals("Failover timeout of " + timeout + " ms reached.", jmse.getMessage());
+ }
- server.stop();
- server = null;
+ Configuration config = createConfig(0);
+ server = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl());
+ server.start();
+
+ producer.send(message);
+
+ server.stop();
+ server = null;
+ }
+ finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
}
@Test
@@ -126,10 +132,17 @@ public class FailoverTimeoutTest extends OpenwireArtemisBaseTest {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + tcpUri + ")?useExponentialBackOff=false");
ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
- connection.start();
- FailoverTransport failoverTransport = connection.getTransport().narrow(FailoverTransport.class);
+ try {
+ connection.start();
+ FailoverTransport failoverTransport = connection.getTransport().narrow(FailoverTransport.class);
- URI[] bunchOfUnknownAndOneKnown = new URI[]{new URI("tcp://unknownHost:" + tcpUri.getPort()), new URI("tcp://unknownHost2:" + tcpUri.getPort()), new URI("tcp://localhost:2222")};
- failoverTransport.add(false, bunchOfUnknownAndOneKnown);
+ URI[] bunchOfUnknownAndOneKnown = new URI[]{new URI("tcp://unknownHost:" + tcpUri.getPort()), new URI("tcp://unknownHost2:" + tcpUri.getPort()), new URI("tcp://localhost:2222")};
+ failoverTransport.add(false, bunchOfUnknownAndOneKnown);
+ }
+ finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e06ff394/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
index 6cd6942..4aaec57 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
@@ -21,7 +21,6 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
import org.apache.activemq.transport.TransportListener;
@@ -541,114 +540,120 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
doByteman.set(true);
Vector<Connection> connections = new Vector<>();
+ Connection connection = null;
+ Message msg = null;
+ Queue destination = null;
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
- configureConnectionFactory(cf);
- Connection connection = cf.createConnection();
- connection.start();
- connections.add(connection);
- final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
-
- connection = cf.createConnection();
- connection.start();
- connections.add(connection);
- final Session consumerSession1 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-
- connection = cf.createConnection();
- connection.start();
- connections.add(connection);
- final Session consumerSession2 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-
- final MessageConsumer consumer1 = consumerSession1.createConsumer(destination);
- final MessageConsumer consumer2 = consumerSession2.createConsumer(destination);
-
- produceMessage(producerSession, destination);
- produceMessage(producerSession, destination);
-
- final Vector<Message> receivedMessages = new Vector<>();
- final CountDownLatch commitDoneLatch = new CountDownLatch(1);
- final AtomicBoolean gotTransactionRolledBackException = new AtomicBoolean(false);
- new Thread() {
- public void run() {
- LOG.info("doing async commit after consume...");
- try {
- Message msg = consumer1.receive(20000);
- LOG.info("consumer1 first attempt got message: " + msg);
- receivedMessages.add(msg);
-
- // give some variance to the runs
- TimeUnit.SECONDS.sleep(pauseSeconds * 2);
-
- // should not get a second message as there are two messages and two consumers
- // and prefetch=1, but with failover and unordered connection restore it can get the second
- // message.
-
- // For the transaction to complete it needs to get the same one or two messages
- // again so that the acks line up.
- // If redelivery order is different, the commit should fail with an ex
- //
- msg = consumer1.receive(5000);
- LOG.info("consumer1 second attempt got message: " + msg);
- if (msg != null) {
+ try {
+ configureConnectionFactory(cf);
+ connection = cf.createConnection();
+ connection.start();
+ connections.add(connection);
+ final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
+
+ connection = cf.createConnection();
+ connection.start();
+ connections.add(connection);
+ final Session consumerSession1 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+
+ connection = cf.createConnection();
+ connection.start();
+ connections.add(connection);
+ final Session consumerSession2 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+
+ final MessageConsumer consumer1 = consumerSession1.createConsumer(destination);
+ final MessageConsumer consumer2 = consumerSession2.createConsumer(destination);
+
+ produceMessage(producerSession, destination);
+ produceMessage(producerSession, destination);
+
+ final Vector<Message> receivedMessages = new Vector<>();
+ final CountDownLatch commitDoneLatch = new CountDownLatch(1);
+ final AtomicBoolean gotTransactionRolledBackException = new AtomicBoolean(false);
+ new Thread() {
+ public void run() {
+ LOG.info("doing async commit after consume...");
+ try {
+ Message msg = consumer1.receive(20000);
+ LOG.info("consumer1 first attempt got message: " + msg);
receivedMessages.add(msg);
- }
- LOG.info("committing consumer1 session: " + receivedMessages.size() + " messsage(s)");
- try {
- consumerSession1.commit();
- }
- catch (JMSException expectedSometimes) {
- LOG.info("got exception ex on commit", expectedSometimes);
- if (expectedSometimes instanceof TransactionRolledBackException) {
- gotTransactionRolledBackException.set(true);
- // ok, message one was not replayed so we expect the rollback
+ // give some variance to the runs
+ TimeUnit.SECONDS.sleep(pauseSeconds * 2);
+
+ // should not get a second message as there are two messages and two consumers
+ // and prefetch=1, but with failover and unordered connection restore it can get the second
+ // message.
+
+ // For the transaction to complete it needs to get the same one or two messages
+ // again so that the acks line up.
+ // If redelivery order is different, the commit should fail with an ex
+ //
+ msg = consumer1.receive(5000);
+ LOG.info("consumer1 second attempt got message: " + msg);
+ if (msg != null) {
+ receivedMessages.add(msg);
}
- else {
- throw expectedSometimes;
+
+ LOG.info("committing consumer1 session: " + receivedMessages.size() + " messsage(s)");
+ try {
+ consumerSession1.commit();
}
+ catch (JMSException expectedSometimes) {
+ LOG.info("got exception ex on commit", expectedSometimes);
+ if (expectedSometimes instanceof TransactionRolledBackException) {
+ gotTransactionRolledBackException.set(true);
+ // ok, message one was not replayed so we expect the rollback
+ }
+ else {
+ throw expectedSometimes;
+ }
+ }
+ commitDoneLatch.countDown();
+ LOG.info("done async commit");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
}
- commitDoneLatch.countDown();
- LOG.info("done async commit");
- }
- catch (Exception e) {
- e.printStackTrace();
}
- }
- }.start();
+ }.start();
- // will be stopped by the plugin
- brokerStopLatch.await();
- broker = createBroker();
- broker.start();
- doByteman.set(false);
+ // will be stopped by the plugin
+ brokerStopLatch.await();
+ broker = createBroker();
+ broker.start();
+ doByteman.set(false);
- Assert.assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
+ Assert.assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
- LOG.info("received message count: " + receivedMessages.size());
+ LOG.info("received message count: " + receivedMessages.size());
- // new transaction
- Message msg = consumer1.receive(gotTransactionRolledBackException.get() ? 5000 : 20000);
- LOG.info("post: from consumer1 received: " + msg);
- if (gotTransactionRolledBackException.get()) {
- Assert.assertNotNull("should be available again after commit rollback ex", msg);
- }
- else {
- Assert.assertNull("should be nothing left for consumer as receive should have committed", msg);
- }
- consumerSession1.commit();
-
- if (gotTransactionRolledBackException.get() || !gotTransactionRolledBackException.get() && receivedMessages.size() == 1) {
- // just one message successfully consumed or none consumed
- // consumer2 should get other message
- msg = consumer2.receive(10000);
- LOG.info("post: from consumer2 received: " + msg);
- Assert.assertNotNull("got second message on consumer2", msg);
- consumerSession2.commit();
+ // new transaction
+ msg = consumer1.receive(gotTransactionRolledBackException.get() ? 5000 : 20000);
+ LOG.info("post: from consumer1 received: " + msg);
+ if (gotTransactionRolledBackException.get()) {
+ Assert.assertNotNull("should be available again after commit rollback ex", msg);
+ }
+ else {
+ Assert.assertNull("should be nothing left for consumer as receive should have committed", msg);
+ }
+ consumerSession1.commit();
+
+ if (gotTransactionRolledBackException.get() || !gotTransactionRolledBackException.get() && receivedMessages.size() == 1) {
+ // just one message successfully consumed or none consumed
+ // consumer2 should get other message
+ msg = consumer2.receive(10000);
+ LOG.info("post: from consumer2 received: " + msg);
+ Assert.assertNotNull("got second message on consumer2", msg);
+ consumerSession2.commit();
+ }
}
-
- for (Connection c : connections) {
- c.close();
+ finally {
+ for (Connection c : connections) {
+ c.close();
+ }
}
// ensure no dangling messages with fresh broker etc
@@ -694,111 +699,115 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
doByteman.set(true);
Vector<Connection> connections = new Vector<>();
- ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
- configureConnectionFactory(cf);
- Connection connection = cf.createConnection();
- connection.start();
- Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
-
- produceMessage(producerSession, destination);
- connection.close();
-
- connection = cf.createConnection();
- connection.start();
- connections.add(connection);
- final Session consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- final int sessionCount = 10;
- final Stack<Session> sessions = new Stack<>();
- for (int i = 0; i < sessionCount; i++) {
- sessions.push(connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
- }
-
- final int consumerCount = 1000;
- final Deque<MessageConsumer> consumers = new ArrayDeque<>();
- for (int i = 0; i < consumerCount; i++) {
- consumers.push(consumerSession.createConsumer(destination));
- }
-
final ExecutorService executorService = Executors.newCachedThreadPool();
- final FailoverTransport failoverTransport = ((ActiveMQConnection) connection).getTransport().narrow(FailoverTransport.class);
- final TransportListener delegate = failoverTransport.getTransportListener();
- failoverTransport.setTransportListener(new TransportListener() {
- @Override
- public void onCommand(Object command) {
- delegate.onCommand(command);
+ try {
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+ configureConnectionFactory(cf);
+ Connection connection = cf.createConnection();
+ connection.start();
+ Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
+
+ produceMessage(producerSession, destination);
+ connection.close();
+
+ connection = cf.createConnection();
+ connection.start();
+ connections.add(connection);
+ final Session consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ final int sessionCount = 10;
+ final Stack<Session> sessions = new Stack<>();
+ for (int i = 0; i < sessionCount; i++) {
+ sessions.push(connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
}
- @Override
- public void onException(IOException error) {
- delegate.onException(error);
+ final int consumerCount = 1000;
+ final Deque<MessageConsumer> consumers = new ArrayDeque<>();
+ for (int i = 0; i < consumerCount; i++) {
+ consumers.push(consumerSession.createConsumer(destination));
}
- @Override
- public void transportInterupted() {
+ final FailoverTransport failoverTransport = ((ActiveMQConnection) connection).getTransport().narrow(FailoverTransport.class);
+ final TransportListener delegate = failoverTransport.getTransportListener();
+ failoverTransport.setTransportListener(new TransportListener() {
+ @Override
+ public void onCommand(Object command) {
+ delegate.onCommand(command);
+ }
- LOG.error("Transport interrupted: " + failoverTransport, new RuntimeException("HERE"));
- for (int i = 0; i < consumerCount && !consumers.isEmpty(); i++) {
+ @Override
+ public void onException(IOException error) {
+ delegate.onException(error);
+ }
- executorService.execute(new Runnable() {
- public void run() {
- MessageConsumer localConsumer = null;
- try {
- synchronized (delegate) {
- localConsumer = consumers.pop();
- }
- localConsumer.receive(1);
+ @Override
+ public void transportInterupted() {
- LOG.info("calling close() " + ((ActiveMQMessageConsumer) localConsumer).getConsumerId());
- localConsumer.close();
- }
- catch (NoSuchElementException nse) {
- }
- catch (Exception ignored) {
- LOG.error("Ex on: " + ((ActiveMQMessageConsumer) localConsumer).getConsumerId(), ignored);
+ LOG.error("Transport interrupted: " + failoverTransport, new RuntimeException("HERE"));
+ for (int i = 0; i < consumerCount && !consumers.isEmpty(); i++) {
+
+ executorService.execute(new Runnable() {
+ public void run() {
+ MessageConsumer localConsumer = null;
+ try {
+ synchronized (delegate) {
+ localConsumer = consumers.pop();
+ }
+ localConsumer.receive(1);
+
+ LOG.info("calling close() " + ((ActiveMQMessageConsumer) localConsumer).getConsumerId());
+ localConsumer.close();
+ }
+ catch (NoSuchElementException nse) {
+ }
+ catch (Exception ignored) {
+ LOG.error("Ex on: " + ((ActiveMQMessageConsumer) localConsumer).getConsumerId(), ignored);
+ }
}
- }
- });
+ });
+ }
+
+ delegate.transportInterupted();
}
- delegate.transportInterupted();
- }
+ @Override
+ public void transportResumed() {
+ delegate.transportResumed();
+ }
+ });
- @Override
- public void transportResumed() {
- delegate.transportResumed();
+ MessageConsumer consumer = null;
+ synchronized (delegate) {
+ consumer = consumers.pop();
}
- });
+ LOG.info("calling close to trigger broker stop " + ((ActiveMQMessageConsumer) consumer).getConsumerId());
+ consumer.close();
- MessageConsumer consumer = null;
- synchronized (delegate) {
- consumer = consumers.pop();
- }
- LOG.info("calling close to trigger broker stop " + ((ActiveMQMessageConsumer) consumer).getConsumerId());
- consumer.close();
+ // will be stopped by the plugin
+ brokerStopLatch.await();
+ doByteman.set(false);
+ broker = createBroker();
+ broker.start();
- // will be stopped by the plugin
- brokerStopLatch.await();
- doByteman.set(false);
- broker = createBroker();
- broker.start();
+ consumer = consumerSession.createConsumer(destination);
+ LOG.info("finally consuming message: " + ((ActiveMQMessageConsumer) consumer).getConsumerId());
- consumer = consumerSession.createConsumer(destination);
- LOG.info("finally consuming message: " + ((ActiveMQMessageConsumer) consumer).getConsumerId());
+ Message msg = null;
+ for (int i = 0; i < 4 && msg == null; i++) {
+ msg = consumer.receive(1000);
+ }
- Message msg = null;
- for (int i = 0; i < 4 && msg == null; i++) {
- msg = consumer.receive(1000);
+ LOG.info("post: from consumer1 received: " + msg);
+ Assert.assertNotNull("got message after failover", msg);
+ msg.acknowledge();
}
-
- LOG.info("post: from consumer1 received: " + msg);
- Assert.assertNotNull("got message after failover", msg);
- msg.acknowledge();
-
- for (Connection c : connections) {
- c.close();
+ finally {
+ executorService.shutdown();
+ for (Connection c : connections) {
+ c.close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e06ff394/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
index dc91873..5759547 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
@@ -27,7 +27,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
@@ -80,8 +79,12 @@ public class TwoBrokerFailoverClusterTest extends OpenwireArtemisBaseTest {
@Before
public void setUp() throws Exception {
- Configuration config0 = createConfig("127.0.0.1", 0);
- Configuration config1 = createConfig("127.0.0.1", 1);
+ HashMap<String, String> map = new HashMap<>();
+ map.put("rebalanceClusterClients", "true");
+ map.put("updateClusterClients", "true");
+ map.put("updateClusterClientsOnRemove", "true");
+ Configuration config0 = createConfig("127.0.0.1", 0, map);
+ Configuration config1 = createConfig("127.0.0.1", 1, map);
deployClusterConfiguration(config0, 1);
deployClusterConfiguration(config1, 0);
@@ -99,6 +102,9 @@ public class TwoBrokerFailoverClusterTest extends OpenwireArtemisBaseTest {
@After
public void tearDown() throws Exception {
+ for (ActiveMQConnection conn : connections) {
+ conn.close();
+ }
server0.stop();
server1.stop();
}