You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/21 23:56:19 UTC

[63/68] [abbrv] activemq-artemis git commit: Fix thread leak in tests

Fix thread leak in tests


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e06ff394
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e06ff394
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e06ff394

Branch: refs/heads/refactor-openwire
Commit: e06ff394dfeab2e0946a2d762a0549f6d8ea318b
Parents: 8791d38
Author: Howard Gao <ho...@gmail.com>
Authored: Wed Mar 16 20:41:40 2016 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Mar 21 18:54:50 2016 -0400

----------------------------------------------------------------------
 .../artemiswrapper/OpenwireArtemisBaseTest.java |   2 -
 .../transport/failover/FailoverTimeoutTest.java |  57 +--
 .../failover/FailoverTransactionTest.java       | 373 ++++++++++---------
 .../failover/TwoBrokerFailoverClusterTest.java  |  12 +-
 4 files changed, 235 insertions(+), 209 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e06ff394/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java
index 2f3a330..b523433 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java
@@ -34,7 +34,6 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
 import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
 import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule;
-import org.apache.activemq.artemis.utils.uri.URISchema;
 import org.apache.activemq.artemis.utils.uri.URISupport;
 import org.apache.activemq.broker.BrokerService;
 import org.junit.Assert;
@@ -47,7 +46,6 @@ import javax.management.MBeanServerInvocationHandler;
 import javax.management.ObjectName;
 
 public class OpenwireArtemisBaseTest {
-
    @Rule
    public CleanupThreadRule cleanupRules = new CleanupThreadRule();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e06ff394/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
index c5ee02f..72b8c43 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java
@@ -33,7 +33,6 @@ import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
 import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
-import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
 import org.junit.After;
 import org.junit.Before;
@@ -97,28 +96,35 @@ public class FailoverTimeoutTest extends OpenwireArtemisBaseTest {
       long timeout = 1000;
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + tcpUri + ")?timeout=" + timeout + "&useExponentialBackOff=false");
       Connection connection = cf.createConnection();
-      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
-      TextMessage message = session.createTextMessage("Test message");
-      producer.send(message);
-
-      server.stop();
-
       try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
+         TextMessage message = session.createTextMessage("Test message");
          producer.send(message);
-      }
-      catch (JMSException jmse) {
-         assertEquals("Failover timeout of " + timeout + " ms reached.", jmse.getMessage());
-      }
 
-      Configuration config = createConfig(0);
-      server = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl());
-      server.start();
+         server.stop();
 
-      producer.send(message);
+         try {
+            producer.send(message);
+         }
+         catch (JMSException jmse) {
+            assertEquals("Failover timeout of " + timeout + " ms reached.", jmse.getMessage());
+         }
 
-      server.stop();
-      server = null;
+         Configuration config = createConfig(0);
+         server = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl());
+         server.start();
+
+         producer.send(message);
+
+         server.stop();
+         server = null;
+      }
+      finally {
+         if (connection != null) {
+            connection.close();
+         }
+      }
    }
 
    @Test
@@ -126,10 +132,17 @@ public class FailoverTimeoutTest extends OpenwireArtemisBaseTest {
 
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + tcpUri + ")?useExponentialBackOff=false");
       ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
-      connection.start();
-      FailoverTransport failoverTransport = connection.getTransport().narrow(FailoverTransport.class);
+      try {
+         connection.start();
+         FailoverTransport failoverTransport = connection.getTransport().narrow(FailoverTransport.class);
 
-      URI[] bunchOfUnknownAndOneKnown = new URI[]{new URI("tcp://unknownHost:" + tcpUri.getPort()), new URI("tcp://unknownHost2:" + tcpUri.getPort()), new URI("tcp://localhost:2222")};
-      failoverTransport.add(false, bunchOfUnknownAndOneKnown);
+         URI[] bunchOfUnknownAndOneKnown = new URI[]{new URI("tcp://unknownHost:" + tcpUri.getPort()), new URI("tcp://unknownHost2:" + tcpUri.getPort()), new URI("tcp://localhost:2222")};
+         failoverTransport.add(false, bunchOfUnknownAndOneKnown);
+      }
+      finally {
+         if (connection != null) {
+            connection.close();
+         }
+      }
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e06ff394/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
index 6cd6942..4aaec57 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
@@ -21,7 +21,6 @@ import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQMessageConsumer;
 import org.apache.activemq.AutoFailTestSupport;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
 import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
 import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest;
 import org.apache.activemq.transport.TransportListener;
@@ -541,114 +540,120 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
       doByteman.set(true);
 
       Vector<Connection> connections = new Vector<>();
+      Connection connection = null;
+      Message msg = null;
+      Queue destination = null;
       ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
-      configureConnectionFactory(cf);
-      Connection connection = cf.createConnection();
-      connection.start();
-      connections.add(connection);
-      final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
-
-      connection = cf.createConnection();
-      connection.start();
-      connections.add(connection);
-      final Session consumerSession1 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-
-      connection = cf.createConnection();
-      connection.start();
-      connections.add(connection);
-      final Session consumerSession2 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-
-      final MessageConsumer consumer1 = consumerSession1.createConsumer(destination);
-      final MessageConsumer consumer2 = consumerSession2.createConsumer(destination);
-
-      produceMessage(producerSession, destination);
-      produceMessage(producerSession, destination);
-
-      final Vector<Message> receivedMessages = new Vector<>();
-      final CountDownLatch commitDoneLatch = new CountDownLatch(1);
-      final AtomicBoolean gotTransactionRolledBackException = new AtomicBoolean(false);
-      new Thread() {
-         public void run() {
-            LOG.info("doing async commit after consume...");
-            try {
-               Message msg = consumer1.receive(20000);
-               LOG.info("consumer1 first attempt got message: " + msg);
-               receivedMessages.add(msg);
-
-               // give some variance to the runs
-               TimeUnit.SECONDS.sleep(pauseSeconds * 2);
-
-               // should not get a second message as there are two messages and two consumers
-               // and prefetch=1, but with failover and unordered connection restore it can get the second
-               // message.
-
-               // For the transaction to complete it needs to get the same one or two messages
-               // again so that the acks line up.
-               // If redelivery order is different, the commit should fail with an ex
-               //
-               msg = consumer1.receive(5000);
-               LOG.info("consumer1 second attempt got message: " + msg);
-               if (msg != null) {
+      try {
+         configureConnectionFactory(cf);
+         connection = cf.createConnection();
+         connection.start();
+         connections.add(connection);
+         final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
+
+         connection = cf.createConnection();
+         connection.start();
+         connections.add(connection);
+         final Session consumerSession1 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+
+         connection = cf.createConnection();
+         connection.start();
+         connections.add(connection);
+         final Session consumerSession2 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+
+         final MessageConsumer consumer1 = consumerSession1.createConsumer(destination);
+         final MessageConsumer consumer2 = consumerSession2.createConsumer(destination);
+
+         produceMessage(producerSession, destination);
+         produceMessage(producerSession, destination);
+
+         final Vector<Message> receivedMessages = new Vector<>();
+         final CountDownLatch commitDoneLatch = new CountDownLatch(1);
+         final AtomicBoolean gotTransactionRolledBackException = new AtomicBoolean(false);
+         new Thread() {
+            public void run() {
+               LOG.info("doing async commit after consume...");
+               try {
+                  Message msg = consumer1.receive(20000);
+                  LOG.info("consumer1 first attempt got message: " + msg);
                   receivedMessages.add(msg);
-               }
 
-               LOG.info("committing consumer1 session: " + receivedMessages.size() + " messsage(s)");
-               try {
-                  consumerSession1.commit();
-               }
-               catch (JMSException expectedSometimes) {
-                  LOG.info("got exception ex on commit", expectedSometimes);
-                  if (expectedSometimes instanceof TransactionRolledBackException) {
-                     gotTransactionRolledBackException.set(true);
-                     // ok, message one was not replayed so we expect the rollback
+                  // give some variance to the runs
+                  TimeUnit.SECONDS.sleep(pauseSeconds * 2);
+
+                  // should not get a second message as there are two messages and two consumers
+                  // and prefetch=1, but with failover and unordered connection restore it can get the second
+                  // message.
+
+                  // For the transaction to complete it needs to get the same one or two messages
+                  // again so that the acks line up.
+                  // If redelivery order is different, the commit should fail with an ex
+                  //
+                  msg = consumer1.receive(5000);
+                  LOG.info("consumer1 second attempt got message: " + msg);
+                  if (msg != null) {
+                     receivedMessages.add(msg);
                   }
-                  else {
-                     throw expectedSometimes;
+
+                  LOG.info("committing consumer1 session: " + receivedMessages.size() + " messsage(s)");
+                  try {
+                     consumerSession1.commit();
                   }
+                  catch (JMSException expectedSometimes) {
+                     LOG.info("got exception ex on commit", expectedSometimes);
+                     if (expectedSometimes instanceof TransactionRolledBackException) {
+                        gotTransactionRolledBackException.set(true);
+                        // ok, message one was not replayed so we expect the rollback
+                     }
+                     else {
+                        throw expectedSometimes;
+                     }
 
+                  }
+                  commitDoneLatch.countDown();
+                  LOG.info("done async commit");
+               }
+               catch (Exception e) {
+                  e.printStackTrace();
                }
-               commitDoneLatch.countDown();
-               LOG.info("done async commit");
-            }
-            catch (Exception e) {
-               e.printStackTrace();
             }
-         }
-      }.start();
+         }.start();
 
-      // will be stopped by the plugin
-      brokerStopLatch.await();
-      broker = createBroker();
-      broker.start();
-      doByteman.set(false);
+         // will be stopped by the plugin
+         brokerStopLatch.await();
+         broker = createBroker();
+         broker.start();
+         doByteman.set(false);
 
-      Assert.assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
+         Assert.assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
 
-      LOG.info("received message count: " + receivedMessages.size());
+         LOG.info("received message count: " + receivedMessages.size());
 
-      // new transaction
-      Message msg = consumer1.receive(gotTransactionRolledBackException.get() ? 5000 : 20000);
-      LOG.info("post: from consumer1 received: " + msg);
-      if (gotTransactionRolledBackException.get()) {
-         Assert.assertNotNull("should be available again after commit rollback ex", msg);
-      }
-      else {
-         Assert.assertNull("should be nothing left for consumer as receive should have committed", msg);
-      }
-      consumerSession1.commit();
-
-      if (gotTransactionRolledBackException.get() || !gotTransactionRolledBackException.get() && receivedMessages.size() == 1) {
-         // just one message successfully consumed or none consumed
-         // consumer2 should get other message
-         msg = consumer2.receive(10000);
-         LOG.info("post: from consumer2 received: " + msg);
-         Assert.assertNotNull("got second message on consumer2", msg);
-         consumerSession2.commit();
+         // new transaction
+         msg = consumer1.receive(gotTransactionRolledBackException.get() ? 5000 : 20000);
+         LOG.info("post: from consumer1 received: " + msg);
+         if (gotTransactionRolledBackException.get()) {
+            Assert.assertNotNull("should be available again after commit rollback ex", msg);
+         }
+         else {
+            Assert.assertNull("should be nothing left for consumer as receive should have committed", msg);
+         }
+         consumerSession1.commit();
+
+         if (gotTransactionRolledBackException.get() || !gotTransactionRolledBackException.get() && receivedMessages.size() == 1) {
+            // just one message successfully consumed or none consumed
+            // consumer2 should get other message
+            msg = consumer2.receive(10000);
+            LOG.info("post: from consumer2 received: " + msg);
+            Assert.assertNotNull("got second message on consumer2", msg);
+            consumerSession2.commit();
+         }
       }
-
-      for (Connection c : connections) {
-         c.close();
+      finally {
+         for (Connection c : connections) {
+            c.close();
+         }
       }
 
       // ensure no dangling messages with fresh broker etc
@@ -694,111 +699,115 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest {
       doByteman.set(true);
 
       Vector<Connection> connections = new Vector<>();
-      ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
-      configureConnectionFactory(cf);
-      Connection connection = cf.createConnection();
-      connection.start();
-      Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
-
-      produceMessage(producerSession, destination);
-      connection.close();
-
-      connection = cf.createConnection();
-      connection.start();
-      connections.add(connection);
-      final Session consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
-      final int sessionCount = 10;
-      final Stack<Session> sessions = new Stack<>();
-      for (int i = 0; i < sessionCount; i++) {
-         sessions.push(connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
-      }
-
-      final int consumerCount = 1000;
-      final Deque<MessageConsumer> consumers = new ArrayDeque<>();
-      for (int i = 0; i < consumerCount; i++) {
-         consumers.push(consumerSession.createConsumer(destination));
-      }
-
       final ExecutorService executorService = Executors.newCachedThreadPool();
 
-      final FailoverTransport failoverTransport = ((ActiveMQConnection) connection).getTransport().narrow(FailoverTransport.class);
-      final TransportListener delegate = failoverTransport.getTransportListener();
-      failoverTransport.setTransportListener(new TransportListener() {
-         @Override
-         public void onCommand(Object command) {
-            delegate.onCommand(command);
+      try {
+         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")");
+         configureConnectionFactory(cf);
+         Connection connection = cf.createConnection();
+         connection.start();
+         Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1");
+
+         produceMessage(producerSession, destination);
+         connection.close();
+
+         connection = cf.createConnection();
+         connection.start();
+         connections.add(connection);
+         final Session consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+         final int sessionCount = 10;
+         final Stack<Session> sessions = new Stack<>();
+         for (int i = 0; i < sessionCount; i++) {
+            sessions.push(connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
          }
 
-         @Override
-         public void onException(IOException error) {
-            delegate.onException(error);
+         final int consumerCount = 1000;
+         final Deque<MessageConsumer> consumers = new ArrayDeque<>();
+         for (int i = 0; i < consumerCount; i++) {
+            consumers.push(consumerSession.createConsumer(destination));
          }
 
-         @Override
-         public void transportInterupted() {
+         final FailoverTransport failoverTransport = ((ActiveMQConnection) connection).getTransport().narrow(FailoverTransport.class);
+         final TransportListener delegate = failoverTransport.getTransportListener();
+         failoverTransport.setTransportListener(new TransportListener() {
+            @Override
+            public void onCommand(Object command) {
+               delegate.onCommand(command);
+            }
 
-            LOG.error("Transport interrupted: " + failoverTransport, new RuntimeException("HERE"));
-            for (int i = 0; i < consumerCount && !consumers.isEmpty(); i++) {
+            @Override
+            public void onException(IOException error) {
+               delegate.onException(error);
+            }
 
-               executorService.execute(new Runnable() {
-                  public void run() {
-                     MessageConsumer localConsumer = null;
-                     try {
-                        synchronized (delegate) {
-                           localConsumer = consumers.pop();
-                        }
-                        localConsumer.receive(1);
+            @Override
+            public void transportInterupted() {
 
-                        LOG.info("calling close() " + ((ActiveMQMessageConsumer) localConsumer).getConsumerId());
-                        localConsumer.close();
-                     }
-                     catch (NoSuchElementException nse) {
-                     }
-                     catch (Exception ignored) {
-                        LOG.error("Ex on: " + ((ActiveMQMessageConsumer) localConsumer).getConsumerId(), ignored);
+               LOG.error("Transport interrupted: " + failoverTransport, new RuntimeException("HERE"));
+               for (int i = 0; i < consumerCount && !consumers.isEmpty(); i++) {
+
+                  executorService.execute(new Runnable() {
+                     public void run() {
+                        MessageConsumer localConsumer = null;
+                        try {
+                           synchronized (delegate) {
+                              localConsumer = consumers.pop();
+                           }
+                           localConsumer.receive(1);
+
+                           LOG.info("calling close() " + ((ActiveMQMessageConsumer) localConsumer).getConsumerId());
+                           localConsumer.close();
+                        }
+                        catch (NoSuchElementException nse) {
+                        }
+                        catch (Exception ignored) {
+                           LOG.error("Ex on: " + ((ActiveMQMessageConsumer) localConsumer).getConsumerId(), ignored);
+                        }
                      }
-                  }
-               });
+                  });
+               }
+
+               delegate.transportInterupted();
             }
 
-            delegate.transportInterupted();
-         }
+            @Override
+            public void transportResumed() {
+               delegate.transportResumed();
+            }
+         });
 
-         @Override
-         public void transportResumed() {
-            delegate.transportResumed();
+         MessageConsumer consumer = null;
+         synchronized (delegate) {
+            consumer = consumers.pop();
          }
-      });
+         LOG.info("calling close to trigger broker stop " + ((ActiveMQMessageConsumer) consumer).getConsumerId());
+         consumer.close();
 
-      MessageConsumer consumer = null;
-      synchronized (delegate) {
-         consumer = consumers.pop();
-      }
-      LOG.info("calling close to trigger broker stop " + ((ActiveMQMessageConsumer) consumer).getConsumerId());
-      consumer.close();
+         // will be stopped by the plugin
+         brokerStopLatch.await();
+         doByteman.set(false);
+         broker = createBroker();
+         broker.start();
 
-      // will be stopped by the plugin
-      brokerStopLatch.await();
-      doByteman.set(false);
-      broker = createBroker();
-      broker.start();
+         consumer = consumerSession.createConsumer(destination);
+         LOG.info("finally consuming message: " + ((ActiveMQMessageConsumer) consumer).getConsumerId());
 
-      consumer = consumerSession.createConsumer(destination);
-      LOG.info("finally consuming message: " + ((ActiveMQMessageConsumer) consumer).getConsumerId());
+         Message msg = null;
+         for (int i = 0; i < 4 && msg == null; i++) {
+            msg = consumer.receive(1000);
+         }
 
-      Message msg = null;
-      for (int i = 0; i < 4 && msg == null; i++) {
-         msg = consumer.receive(1000);
+         LOG.info("post: from consumer1 received: " + msg);
+         Assert.assertNotNull("got message after failover", msg);
+         msg.acknowledge();
       }
-
-      LOG.info("post: from consumer1 received: " + msg);
-      Assert.assertNotNull("got message after failover", msg);
-      msg.acknowledge();
-
-      for (Connection c : connections) {
-         c.close();
+      finally {
+         executorService.shutdown();
+         for (Connection c : connections) {
+            c.close();
+         }
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e06ff394/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
index dc91873..5759547 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java
@@ -27,7 +27,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import javax.jms.Connection;
 import javax.jms.MessageConsumer;
 import javax.jms.Queue;
 import javax.jms.Session;
@@ -80,8 +79,12 @@ public class TwoBrokerFailoverClusterTest extends OpenwireArtemisBaseTest {
 
    @Before
    public void setUp() throws Exception {
-      Configuration config0 = createConfig("127.0.0.1", 0);
-      Configuration config1 = createConfig("127.0.0.1", 1);
+      HashMap<String, String> map = new HashMap<>();
+      map.put("rebalanceClusterClients", "true");
+      map.put("updateClusterClients", "true");
+      map.put("updateClusterClientsOnRemove", "true");
+      Configuration config0 = createConfig("127.0.0.1", 0, map);
+      Configuration config1 = createConfig("127.0.0.1", 1, map);
 
       deployClusterConfiguration(config0, 1);
       deployClusterConfiguration(config1, 0);
@@ -99,6 +102,9 @@ public class TwoBrokerFailoverClusterTest extends OpenwireArtemisBaseTest {
 
    @After
    public void tearDown() throws Exception {
+      for (ActiveMQConnection conn : connections) {
+         conn.close();
+      }
       server0.stop();
       server1.stop();
    }