You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/04/10 08:51:08 UTC

[2/3] qpid-broker-j git commit: QPID-8158: [Broker-J] [System Tests] Refactor MultiNodeTest and TwoNodeTest to use QpidTestRunner and BrokerAdmin for running the tests

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/88a12e8c/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
----------------------------------------------------------------------
diff --git a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
index b80f002..ad9df21 100644
--- a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
+++ b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java
@@ -19,20 +19,33 @@
  */
 package org.apache.qpid.server.store.berkeleydb.replication;
 
+import static junit.framework.TestCase.assertEquals;
+import static org.apache.qpid.systests.Utils.INDEX;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.io.File;
 import java.net.URI;
 import java.nio.file.Files;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -48,11 +61,11 @@ import com.sleepycat.je.Durability;
 import com.sleepycat.je.EnvironmentConfig;
 import com.sleepycat.je.rep.ReplicatedEnvironment;
 import com.sleepycat.je.rep.ReplicationConfig;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.util.FileUtils;
 import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode;
@@ -60,120 +73,94 @@ import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
 import org.apache.qpid.server.virtualhostnode.berkeleydb.NodeRole;
 import org.apache.qpid.systests.ConnectionBuilder;
 import org.apache.qpid.systests.GenericConnectionListener;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.systests.Utils;
+import org.apache.qpid.test.utils.PortHelper;
 import org.apache.qpid.test.utils.TestUtils;
+import org.apache.qpid.tests.utils.ConfigItem;
+import org.apache.qpid.tests.utils.RunBrokerAdmin;
 
-public class MultiNodeTest extends QpidBrokerTestCase
+@RunBrokerAdmin(type = "BDB-HA")
+@GroupConfig(numberOfNodes = 3, groupName = "test")
+@ConfigItem(name = Broker.BROKER_FAIL_STARTUP_WITH_ERRORED_CHILD, value = "false")
+public class MultiNodeTest extends GroupJmsTestBase
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(MultiNodeTest.class);
 
-    private static final String VIRTUAL_HOST = "test";
-    private static final int NUMBER_OF_NODES = 3;
-    private static final int FAILOVER_COMPLETION_TIMEOUT = 60000;
-
-    private GroupCreator _groupCreator;
-
-    private FailoverAwaitingListener _failoverListener;
-
-    /** Used when expectation is client will (re)-connect */
-    private ConnectionBuilder _positiveFailoverBuilder;
-
-    /** Used when expectation is client will not (re)-connect */
-    private ConnectionBuilder _negativeFailoverBuilder;
-
-    @Override
-    protected void setUp() throws Exception
-    {
-        assertTrue(isJavaBroker());
-        assertTrue(isBrokerStorePersistent());
 
-        _groupCreator = new GroupCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES);
-        _groupCreator.configureClusterNodes();
+    private FailoverAwaitingListener _failoverListener = new FailoverAwaitingListener();
 
-        _positiveFailoverBuilder = _groupCreator.getConnectionBuilderForAllClusterNodes();
-        _negativeFailoverBuilder = _groupCreator.getConnectionBuilderForAllClusterNodes(200, 2);
-
-        _groupCreator.startCluster();
-        _failoverListener = new FailoverAwaitingListener();
-
-        super.setUp();
-    }
-
-    @Override
-    public void startDefaultBroker() throws Exception
-    {
-        // Don't start default broker provided by QBTC.
-    }
+    private static final int FAILOVER_COMPLETION_TIMEOUT = 60000;
 
+    @Test
     public void testLossOfMasterNodeCausesClientToFailover() throws Exception
     {
-        final Connection connection = _positiveFailoverBuilder.build();
-        getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
+        final Connection connection = getConnectionBuilder().build();
+        try
+        {
+            getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
 
-        final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
-        LOGGER.info("Active connection port {}", activeBrokerPort);
+            final int masterPort = getJmsProvider().getConnectedURI(connection).getPort();
+            LOGGER.info("Active connection port {}", masterPort);
 
-        _groupCreator.stopNode(activeBrokerPort);
-        LOGGER.info("Node is stopped");
-        _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
-        LOGGER.info("Listener has finished");
-        // any op to ensure connection remains
-        connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            getBrokerAdmin().stopNode(masterPort);
+            LOGGER.info("Node is stopped");
+            _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
+            LOGGER.info("Listener has finished");
+            // any op to ensure connection remains
+            connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        }
+        finally
+        {
+            connection.close();
+        }
     }
 
+    @Test
     public void testLossOfReplicaNodeDoesNotCauseClientToFailover() throws Exception
     {
-        final Connection connection = _positiveFailoverBuilder.build();
-        getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
-
-        final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
-        LOGGER.info("Active connection port {}", activeBrokerPort);
+        final Connection connection = getConnectionBuilder().build();
+        try
+        {
+            getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
 
-        final int inactiveBrokerPort = _groupCreator.getPortNumberOfAnInactiveBroker(connection);
-        LOGGER.info("Stopping inactive broker on port {} ", inactiveBrokerPort);
+            final int activeBrokerPort = getJmsProvider().getConnectedURI(connection).getPort();
+            LOGGER.info("Active connection port {}", activeBrokerPort);
 
-        _groupCreator.stopNode(inactiveBrokerPort);
+            final int inactiveBrokerPort = getBrokerAdmin().getAmqpPort(activeBrokerPort);
+            LOGGER.info("Stopping inactive broker on port {} ", inactiveBrokerPort);
 
-        _failoverListener.assertNoFailoverCompletionWithin(2000);
+            getBrokerAdmin().stopNode(inactiveBrokerPort);
 
-        // any op to ensure connection remains
-        connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-    }
+            _failoverListener.assertNoFailoverCompletionWithin(2000);
 
-    public void testLossOfQuorumCausesClientDisconnection() throws Exception
-    {
-        if (getBrokerProtocol().equals(Protocol.AMQP_1_0))
-        {
-            // TODO - QPIDJMS-366 - there seems to be a client defect when a JMS operation is interrupted
-            // by a graceful connection close from the client side.
-            return;
+            // any op to ensure connection remains
+            connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         }
-
-        final Connection connection = _negativeFailoverBuilder.build();
-        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-        Destination destination = session.createQueue(getTestQueueName());
-        getJmsProvider().createQueue(session, getTestQueueName());
-
-        Set<Integer> ports = _groupCreator.getBrokerPortNumbersForNodes();
-
-        final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
-        ports.remove(activeBrokerPort);
-
-        // Stop all other nodes
-        for (Integer p : ports)
+        finally
         {
-            _groupCreator.stopNode(p);
+            connection.close();
         }
+    }
 
+    @Test
+    public void testLossOfQuorumCausesClientDisconnection() throws Exception
+    {
+        final Connection connection = getConnectionBuilder().build();
         try
         {
+            getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
+            Set<Integer> ports =
+                    Arrays.stream(getBrokerAdmin().getGroupAmqpPorts()).boxed().collect(Collectors.toSet());
+            final int activeBrokerPort = getJmsProvider().getConnectedURI(connection).getPort();
+            ports.remove(activeBrokerPort);
+
+            // Stop all other nodes
+            for (Integer p : ports)
+            {
+                getBrokerAdmin().stopNode(p);
+            }
 
-            sendMessage(session, destination, 1);
-            fail("Exception not thrown - sending message within a transaction should fail without quorum");
-        }
-        catch(JMSException jms)
-        {
-            // PASS
+            _failoverListener.awaitPreFailover(2000);
         }
         finally
         {
@@ -184,7 +171,10 @@ public class MultiNodeTest extends QpidBrokerTestCase
         // New connections should now fail as vhost will be unavailable
         try
         {
-            Connection unexpectedConnection = _negativeFailoverBuilder.build();
+            Connection unexpectedConnection = getConnectionBuilder()
+                    .setFailoverReconnectAttempts(SHORT_FAILOVER_CYCLECOUNT)
+                    .setFailoverReconnectDelay(SHORT_FAILOVER_CONNECTDELAY)
+                    .build();
             fail("Got unexpected connection to node in group without quorum " + unexpectedConnection);
         }
         catch (JMSException je)
@@ -198,442 +188,490 @@ public class MultiNodeTest extends QpidBrokerTestCase
      * test ensures that open messaging transactions are correctly rolled-back as quorum is lost,
      * and later the node rejoins the group in either master or replica role.
      */
+    @Test
     public void testQuorumLostAndRestored_OriginalMasterRejoinsTheGroup() throws Exception
     {
-        final Connection connection = _positiveFailoverBuilder.build();
-        getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
+        final Connection connection = getConnectionBuilder().build();
+        try
+        {
+            getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
+            Destination dest = createTestQueue(connection);
 
-        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-        getJmsProvider().createQueue(session, getTestQueueName());
-        Destination dest = session.createQueue(getTestQueueName());
-        session.close();
+            Set<Integer> ports =
+                    Arrays.stream(getBrokerAdmin().getGroupAmqpPorts()).boxed().collect(Collectors.toSet());
 
-        Set<Integer> ports = _groupCreator.getBrokerPortNumbersForNodes();
+            final int activeBrokerPort = getJmsProvider().getConnectedURI(connection).getPort();
+            ports.remove(activeBrokerPort);
 
-        final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
-        ports.remove(activeBrokerPort);
+            Session session1 = connection.createSession(true, Session.SESSION_TRANSACTED);
+            Session session2 = connection.createSession(true, Session.SESSION_TRANSACTED);
 
-        Session session1 = connection.createSession(true, Session.SESSION_TRANSACTED);
-        Session session2 = connection.createSession(true, Session.SESSION_TRANSACTED);
+            session1.createConsumer(dest).close();
 
-        session1.createConsumer(dest).close();
+            MessageProducer producer1 = session1.createProducer(dest);
+            producer1.send(session1.createMessage());
+            MessageProducer producer2 = session2.createProducer(dest);
+            producer2.send(session2.createMessage());
 
-        MessageProducer producer1 = session1.createProducer(dest);
-        producer1.send(session1.createMessage());
-        MessageProducer producer2 = session2.createProducer(dest);
-        producer2.send(session2.createMessage());
+            // Leave transactions open, this will leave two store transactions open on the store
 
-        // Leave transactions open, this will leave two store transactions open on the store
+            // Stop all other nodes
+            for (Integer p : ports)
+            {
+                getBrokerAdmin().stopNode(p);
+            }
 
-        // Stop all other nodes
-        for (Integer p : ports)
-        {
-            _groupCreator.stopNode(p);
-        }
+            // Await the old master discovering that it is all alone
+            getBrokerAdmin().awaitNodeRole(activeBrokerPort, "WAITING");
 
-        // Await the old master discovering that it is all alone
-        _groupCreator.awaitNodeToAttainRole(activeBrokerPort, "WAITING");
+            // Restart all other nodes
+            for (Integer p : ports)
+            {
+                getBrokerAdmin().startNode(p);
+            }
 
-        // Restart all other nodes
-        for (Integer p : ports)
+            _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
+
+            getBrokerAdmin().awaitNodeRole(activeBrokerPort, "MASTER", "REPLICA");
+        }
+        finally
         {
-            _groupCreator.startNode(p);
+            connection.close();
         }
-
-        _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
-
-        _groupCreator.awaitNodeToAttainRole(activeBrokerPort, "MASTER", "REPLICA");
     }
 
+    @Test
     public void testPersistentMessagesAvailableAfterFailover() throws Exception
     {
-        final Connection connection = _positiveFailoverBuilder.build();
-        getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
+        final Connection connection = getConnectionBuilder().build();
+        try
+        {
+            getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
+            Destination queue = createTestQueue(connection);
 
-        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-        getJmsProvider().createQueue(session, getTestQueueName());
-        Destination queue = session.createQueue(getTestQueueName());
-        session.close();
+            final int activeBrokerPort = getJmsProvider().getConnectedURI(connection).getPort();
 
-        final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
+            Session producingSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+            Utils.sendMessages(producingSession, queue, 10);
 
-        Session producingSession = connection.createSession(true, Session.SESSION_TRANSACTED);
-        sendMessage(producingSession, queue, 10);
+            getBrokerAdmin().stopNode(activeBrokerPort);
+            LOGGER.info("Old master (broker port {}) is stopped", activeBrokerPort);
 
-        _groupCreator.stopNode(activeBrokerPort);
-        LOGGER.info("Old master (broker port {}) is stopped", activeBrokerPort);
+            _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
+            LOGGER.info("Failover has finished");
 
-        _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
-        LOGGER.info("Failover has finished");
+            final int activeBrokerPortAfterFailover = getJmsProvider().getConnectedURI(connection).getPort();
+            LOGGER.info("New master (broker port {}) after failover", activeBrokerPortAfterFailover);
 
-        final int activeBrokerPortAfterFailover = _groupCreator.getBrokerPortNumberFromConnection(connection);
-        LOGGER.info("New master (broker port {}) after failover", activeBrokerPortAfterFailover);
+            Session consumingSession = connection.createSession(true, Session.SESSION_TRANSACTED);
+            MessageConsumer consumer = consumingSession.createConsumer(queue);
 
-        Session consumingSession = connection.createSession(true, Session.SESSION_TRANSACTED);
-        MessageConsumer consumer = consumingSession.createConsumer(queue);
-
-        connection.start();
-        for(int i = 0; i < 10; i++)
+            connection.start();
+            for (int i = 0; i < 10; i++)
+            {
+                Message m = consumer.receive(getReceiveTimeout());
+                assertNotNull("Message " + i + "  is not received", m);
+                assertEquals("Unexpected message received", i, m.getIntProperty(INDEX));
+            }
+            consumingSession.commit();
+        }
+        finally
         {
-            Message m = consumer.receive(getReceiveTimeout());
-            assertNotNull("Message " + i + "  is not received", m);
-            assertEquals("Unexpected message received", i, m.getIntProperty(INDEX));
+            connection.close();
         }
-        consumingSession.commit();
     }
 
+    @Test
     public void testTransferMasterFromLocalNode() throws Exception
     {
-        final Connection connection = _positiveFailoverBuilder.build();
-
-        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-        getJmsProvider().createQueue(session, getTestQueueName());
-        session.close();
+        final Connection connection = getConnectionBuilder().build();
+        try
+        {
+            Destination queue = createTestQueue(connection);
 
-        final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
-        LOGGER.info("Active connection port {}", activeBrokerPort);
+            final int activeBrokerPort = getJmsProvider().getConnectedURI(connection).getPort();
+            LOGGER.info("Active connection port {}", activeBrokerPort);
 
-        final int inactiveBrokerPort = _groupCreator.getPortNumberOfAnInactiveBroker(connection);
-        LOGGER.info("Update role attribute on inactive broker on port {}", inactiveBrokerPort);
+            final int inactiveBrokerPort = getBrokerAdmin().getAmqpPort(activeBrokerPort);
+            LOGGER.info("Update role attribute on inactive broker on port {}", inactiveBrokerPort);
 
-        // transfer mastership 3 times in order to verify
-        // that repeated mastership transfer to the same node works, See QPID-6996
-        transferMasterFromLocalNode(connection, inactiveBrokerPort, activeBrokerPort);
-        transferMasterFromLocalNode(connection, activeBrokerPort, inactiveBrokerPort);
-        transferMasterFromLocalNode(connection, inactiveBrokerPort, activeBrokerPort);
+            // transfer mastership 3 times in order to verify
+            // that repeated mastership transfer to the same node works, See QPID-6996
+            transferMasterFromLocalNode(connection, queue, inactiveBrokerPort, activeBrokerPort);
+            transferMasterFromLocalNode(connection, queue, activeBrokerPort, inactiveBrokerPort);
+            transferMasterFromLocalNode(connection, queue, inactiveBrokerPort, activeBrokerPort);
+        }
+        finally
+        {
+            connection.close();
+        }
     }
 
     private void transferMasterFromLocalNode(final Connection connection,
+                                             final Destination queue,
                                              final int inactiveBrokerPort,
                                              final int activeBrokerPort) throws Exception
     {
         _failoverListener = new FailoverAwaitingListener();
         getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
 
-        Map<String, Object> attributes = _groupCreator.getNodeAttributes(inactiveBrokerPort);
+        Map<String, Object> attributes = getBrokerAdmin().getNodeAttributes(inactiveBrokerPort);
         assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE));
-        _groupCreator.setNodeAttributes(inactiveBrokerPort, Collections.singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER"));
+        getBrokerAdmin().setNodeAttributes(inactiveBrokerPort,
+                                           Collections.singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER"));
 
         _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
         LOGGER.info("Listener has finished");
 
-        attributes = _groupCreator.getNodeAttributes(inactiveBrokerPort);
+        attributes = getBrokerAdmin().getNodeAttributes(inactiveBrokerPort);
         assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE));
 
-        assertProducingConsuming(connection);
+        assertThat(Utils.produceConsume(connection, queue), is(equalTo(true)));
 
-        _groupCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA");
+        getBrokerAdmin().awaitNodeRole(activeBrokerPort, "REPLICA");
     }
 
+    @Test
     public void testTransferMasterFromRemoteNode() throws Exception
     {
-        final Connection connection = _positiveFailoverBuilder.build();
-
-        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-        getJmsProvider().createQueue(session, getTestQueueName());
-        session.close();
-
-        final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
-        LOGGER.info("Active connection port {}", activeBrokerPort);
-
-        final int inactiveBrokerPort = _groupCreator.getPortNumberOfAnInactiveBroker(connection);
-        LOGGER.info("Update role attribute on inactive broker on port {}", inactiveBrokerPort);
-
-        // transfer mastership 3 times in order to verify
-        // that repeated mastership transfer to the same node works, See QPID-6996
-        transferMasterFromRemoteNode(connection, activeBrokerPort, inactiveBrokerPort);
-        transferMasterFromRemoteNode(connection, inactiveBrokerPort, activeBrokerPort);
-        transferMasterFromRemoteNode(connection, activeBrokerPort, inactiveBrokerPort);
+        final Connection connection = getConnectionBuilder().build();
+        try
+        {
+            Destination queue = createTestQueue(connection);
+            final int activeBrokerPort = getJmsProvider().getConnectedURI(connection).getPort();
+            LOGGER.info("Active connection port {}", activeBrokerPort);
+
+            final int inactiveBrokerPort = getBrokerAdmin().getAmqpPort(activeBrokerPort);
+            LOGGER.info("Update role attribute on inactive broker on port {}", inactiveBrokerPort);
+
+            // transfer mastership 3 times in order to verify
+            // that repeated mastership transfer to the same node works, See QPID-6996
+            transferMasterFromRemoteNode(connection, queue, activeBrokerPort, inactiveBrokerPort);
+            transferMasterFromRemoteNode(connection, queue, inactiveBrokerPort, activeBrokerPort);
+            transferMasterFromRemoteNode(connection, queue, activeBrokerPort, inactiveBrokerPort);
+        }
+        finally
+        {
+            connection.close();
+        }
     }
 
     private void transferMasterFromRemoteNode(final Connection connection,
+                                              final Destination queue,
                                               final int activeBrokerPort,
                                               final int inactiveBrokerPort) throws Exception
     {
         _failoverListener = new FailoverAwaitingListener();
         getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
 
-        _groupCreator.awaitNodeToAttainRole(activeBrokerPort, inactiveBrokerPort, "REPLICA");
-        Map<String, Object> attributes = _groupCreator.getNodeAttributes(activeBrokerPort, inactiveBrokerPort);
+        getBrokerAdmin().awaitRemoteNodeRole(activeBrokerPort, inactiveBrokerPort, "REPLICA");
+        Map<String, Object> attributes = getBrokerAdmin().getRemoteNodeAttributes(activeBrokerPort, inactiveBrokerPort);
         assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE));
 
-        _groupCreator.setNodeAttributes(activeBrokerPort, inactiveBrokerPort, Collections.singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER"));
+        getBrokerAdmin().setRemoteNodeAttributes(activeBrokerPort,
+                                                 inactiveBrokerPort,
+                                                 Collections.singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER"));
 
         _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
         LOGGER.info("Listener has finished");
 
-        attributes = _groupCreator.getNodeAttributes(inactiveBrokerPort);
+        attributes = getBrokerAdmin().getNodeAttributes(inactiveBrokerPort);
         assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE));
 
-        assertProducingConsuming(connection);
+        assertThat(Utils.produceConsume(connection, queue), is(equalTo(true)));
 
-        _groupCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA");
+        getBrokerAdmin().awaitNodeRole(activeBrokerPort, "REPLICA");
     }
 
+
+    @Test
     public void testTransferMasterWhilstMessagesInFlight() throws Exception
     {
-        final Connection connection = _positiveFailoverBuilder.build();
-        getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
-        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-        getJmsProvider().createQueue(session, getTestQueueName());
-        final Destination destination = session.createQueue(getTestQueueName());
-
-        final AtomicBoolean masterTransferred = new AtomicBoolean(false);
-        final AtomicBoolean keepRunning = new AtomicBoolean(true);
-        final AtomicReference<Exception> workerException = new AtomicReference<>();
-        final CountDownLatch producedOneBefore = new CountDownLatch(1);
-        final CountDownLatch producedOneAfter = new CountDownLatch(1);
-        final CountDownLatch workerShutdown = new CountDownLatch(1);
-
-        Runnable producer = () -> {
-            try
-            {
-                int count = 0;
-                MessageProducer producer1 = session.createProducer(destination);
+        final Connection connection = getConnectionBuilder().build();
+        try
+        {
+            getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
+
+            final Destination destination = createTestQueue(connection);
 
-                while (keepRunning.get())
+            final AtomicBoolean masterTransferred = new AtomicBoolean(false);
+            final AtomicBoolean keepRunning = new AtomicBoolean(true);
+            final AtomicReference<Exception> workerException = new AtomicReference<>();
+            final CountDownLatch producedOneBefore = new CountDownLatch(1);
+            final CountDownLatch producedOneAfter = new CountDownLatch(1);
+            final CountDownLatch workerShutdown = new CountDownLatch(1);
+
+            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+            Runnable producer = () -> {
+                try
                 {
-                    String messageText = "message" + count;
-                    try
+                    int count = 0;
+                    MessageProducer producer1 = session.createProducer(destination);
+
+                    while (keepRunning.get())
                     {
-                        Message message = session.createTextMessage(messageText);
-                        producer1.send(message);
-                        session.commit();
-                        LOGGER.debug("Sent message " + count);
+                        String messageText = "message" + count;
+                        try
+                        {
+                            Message message = session.createTextMessage(messageText);
+                            producer1.send(message);
+                            session.commit();
+                            LOGGER.debug("Sent message " + count);
 
-                        producedOneBefore.countDown();
+                            producedOneBefore.countDown();
 
-                        if (masterTransferred.get())
+                            if (masterTransferred.get())
+                            {
+                                producedOneAfter.countDown();
+                            }
+                            count++;
+                        }
+                        catch (javax.jms.IllegalStateException ise)
                         {
-                            producedOneAfter.countDown();
+                            throw ise;
+                        }
+                        catch (TransactionRolledBackException trbe)
+                        {
+                            // Pass - failover in prgoress
+                        }
+                        catch (JMSException je)
+                        {
+                            // Pass - failover in progress
                         }
-                        count++;
-                    }
-                    catch (javax.jms.IllegalStateException ise)
-                    {
-                        throw ise;
-                    }
-                    catch (TransactionRolledBackException trbe)
-                    {
-                        // Pass - failover in prgoress
-                    }
-                    catch(JMSException je)
-                    {
-                        // Pass - failover in progress
                     }
                 }
-            }
-            catch (Exception e)
-            {
-                workerException.set(e);
-            }
-            finally
-            {
-                workerShutdown.countDown();
-            }
-        };
+                catch (Exception e)
+                {
+                    workerException.set(e);
+                }
+                finally
+                {
+                    workerShutdown.countDown();
+                }
+            };
 
-        Thread backgroundWorker = new Thread(producer);
-        backgroundWorker.start();
+            Thread backgroundWorker = new Thread(producer);
+            backgroundWorker.start();
 
-        boolean workerRunning = producedOneBefore.await(5000, TimeUnit.MILLISECONDS);
-        assertTrue(workerRunning);
+            boolean workerRunning = producedOneBefore.await(5000, TimeUnit.MILLISECONDS);
+            assertTrue(workerRunning);
 
-        final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
-        LOGGER.info("Active connection port {}", activeBrokerPort);
+            final int activeBrokerPort = getJmsProvider().getConnectedURI(connection).getPort();
+            LOGGER.info("Active connection port {}", activeBrokerPort);
 
-        final int inactiveBrokerPort = _groupCreator.getPortNumberOfAnInactiveBroker(connection);
-        LOGGER.info("Update role attribute on inactive broker on port {}", inactiveBrokerPort);
+            final int inactiveBrokerPort = getBrokerAdmin().getAmqpPort(activeBrokerPort);
+            LOGGER.info("Update role attribute on inactive broker on port {}", inactiveBrokerPort);
 
-        _groupCreator.awaitNodeToAttainRole(activeBrokerPort, inactiveBrokerPort, "REPLICA");
-        Map<String, Object> attributes = _groupCreator.getNodeAttributes(activeBrokerPort, inactiveBrokerPort);
-        assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE));
+            getBrokerAdmin().awaitNodeRole(inactiveBrokerPort, "REPLICA");
+            Map<String, Object> attributes = getBrokerAdmin().getNodeAttributes(inactiveBrokerPort);
+            assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE));
 
-        _groupCreator.setNodeAttributes(activeBrokerPort, inactiveBrokerPort, Collections.singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER"));
+            getBrokerAdmin().setNodeAttributes(inactiveBrokerPort,
+                                               Collections.singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER"));
 
-        _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
-        LOGGER.info("Failover has finished");
+            _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
+            LOGGER.info("Failover has finished");
 
-        attributes = _groupCreator.getNodeAttributes(inactiveBrokerPort);
-        assertEquals("New master has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE));
+            attributes = getBrokerAdmin().getNodeAttributes(inactiveBrokerPort);
+            assertEquals("New master has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE));
 
-        _groupCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA");
+            getBrokerAdmin().awaitNodeRole(activeBrokerPort, "REPLICA");
 
-        LOGGER.info("Master transfer known to have completed successfully.");
-        masterTransferred.set(true);
+            LOGGER.info("Master transfer known to have completed successfully.");
+            masterTransferred.set(true);
 
-        boolean producedMore = producedOneAfter.await(5000, TimeUnit.MILLISECONDS);
-        assertTrue("Should have successfully produced at least one message after transfer complete", producedMore);
+            boolean producedMore = producedOneAfter.await(5000, TimeUnit.MILLISECONDS);
+            assertTrue("Should have successfully produced at least one message after transfer complete", producedMore);
 
-        keepRunning.set(false);
-        boolean shutdown = workerShutdown.await(5000, TimeUnit.MILLISECONDS);
-        assertTrue("Worker thread should have shutdown", shutdown);
+            keepRunning.set(false);
+            boolean shutdown = workerShutdown.await(5000, TimeUnit.MILLISECONDS);
+            assertTrue("Worker thread should have shutdown", shutdown);
 
-        backgroundWorker.join(5000);
-        assertNull(workerException.get());
+            backgroundWorker.join(5000);
+            assertThat(workerException.get(), is(nullValue()));
 
-        assertNotNull(session.createTemporaryQueue());
+            assertNotNull(session.createTemporaryQueue());
+        }
+        finally
+        {
+            connection.close();
+        }
     }
 
+    @Test
     public void testInFlightTransactionsWhilstMajorityIsLost() throws Exception
     {
-        if (getBrokerProtocol().equals(Protocol.AMQP_1_0))
-        {
-            // TODO - QPIDJMS-366 - there seems to be a client defect when a JMS operation is interrupted
-            // by a graceful connection close from the client side.
-            return;
-        }
 
-        int connectionNumber = Integer.getInteger("MultiNodeTest.testInFlightTransactionsWhilstMajorityIsLost.numberOfConnections", 20);
-        ExecutorService executorService = Executors.newFixedThreadPool(connectionNumber + NUMBER_OF_NODES - 1);
+        int connectionNumber = Integer.getInteger(
+                "MultiNodeTest.testInFlightTransactionsWhilstMajorityIsLost.numberOfConnections",
+                20);
+        ExecutorService executorService = Executors.newFixedThreadPool(connectionNumber + 2);
         try
         {
-            final ConnectionBuilder consumerBuilder = _groupCreator.getConnectionBuilderForAllClusterNodes(100, 100);
-            final Connection consumerConnection = consumerBuilder.build();
-            Session s = consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
-            getJmsProvider().createQueue(s, getTestQueueName());
-            s.close();
+            final ConnectionBuilder connectionBuilder =
+                    getConnectionBuilder().setFailoverReconnectDelay(100).setFailoverReconnectAttempts(100);
+            final Connection consumerConnection = connectionBuilder.build();
+            try
+            {
+                Destination destination = createTestQueue(consumerConnection);
+                consumerConnection.start();
 
-            consumerConnection.start();
+                final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                consumerSession.createConsumer(destination).setMessageListener(message -> {
+                    try
+                    {
+                        LOGGER.info("Message received: " + ((TextMessage) message).getText());
+                    }
+                    catch (JMSException e)
+                    {
+                        LOGGER.error("Failure to get message text", e);
+                    }
+                });
 
-            final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            final Destination destination = consumerSession.createQueue(getTestQueueName());
-            consumerSession.createConsumer(destination).setMessageListener(message -> {
-                try
+                final Connection[] connections = new Connection[connectionNumber];
+                final Session[] sessions = new Session[connectionNumber];
+                for (int i = 0; i < sessions.length; i++)
                 {
-                    LOGGER.info("Message received: " + ((TextMessage) message).getText());
+                    connections[i] = connectionBuilder.setClientId("test-" + UUID.randomUUID()).build();
+                    sessions[i] = connections[i].createSession(true, Session.SESSION_TRANSACTED);
+                    LOGGER.info("Session {} is created", i);
                 }
-                catch (JMSException e)
+                try
                 {
-                    LOGGER.error("Failure to get message text", e);
-                }
-            });
-
-            final Connection[] connections = new Connection[connectionNumber];
-            final Session[] sessions = new Session[connectionNumber];
-            for (int i = 0; i < sessions.length; i++)
-            {
-                final ConnectionBuilder builder = _groupCreator.getConnectionBuilderForAllClusterNodes(100, 100);
-                connections[i] = builder.build();
-                sessions[i] = connections[i].createSession(true, Session.SESSION_TRANSACTED);
-                LOGGER.info("Session {} is created", i);
-            }
+                    Set<Integer> ports =
+                            Arrays.stream(getBrokerAdmin().getGroupAmqpPorts()).boxed().collect(Collectors.toSet());
 
-            List<Integer> ports = new ArrayList<>(_groupCreator.getBrokerPortNumbersForNodes());
+                    int maxMessageSize = 10;
+                    StringBuilder sb = new StringBuilder();
+                    for (int i = 0; i < maxMessageSize - 2; i++)
+                    {
+                        sb.append("X");
+                    }
+                    String messageText = sb.toString();
+                    for (int n = 0; n < 3; n++)
+                    {
+                        LOGGER.info("Starting iteration {}", n);
 
-            int maxMessageSize = 10;
-            StringBuilder sb = new StringBuilder();
-            for (int i = 0; i < maxMessageSize - 2; i++)
-            {
-                sb.append("X");
-            }
-            String messageText = sb.toString();
-            for (int n = 0; n < NUMBER_OF_NODES; n++)
-            {
-                LOGGER.info("Starting iteration {}", n);
+                        FailoverAwaitingListener failoverListener = new FailoverAwaitingListener(connectionNumber);
 
-                FailoverAwaitingListener failoverListener = new FailoverAwaitingListener(connectionNumber);
+                        for (int i = 0; i < sessions.length; i++)
+                        {
+                            Connection connection = connections[i];
+                            getJmsProvider().addGenericConnectionListener(connection, failoverListener);
 
-                for (int i = 0; i < sessions.length; i++)
-                {
-                    Connection connection = connections[i];
-                    getJmsProvider().addGenericConnectionListener(connection, failoverListener);
+                            MessageProducer producer = sessions[i].createProducer(destination);
+                            Message message = sessions[i].createTextMessage(messageText + "-" + i);
+                            producer.send(message);
+                        }
 
-                    MessageProducer producer = sessions[i].createProducer(destination);
-                    Message message = sessions[i].createTextMessage(messageText + "-" + i);
-                    producer.send(message);
-                }
+                        LOGGER.info("All publishing sessions have uncommitted transactions");
 
-                LOGGER.info("All publishing sessions have uncommitted transactions");
+                        final int activeBrokerPort = getJmsProvider().getConnectedURI(connections[0]).getPort();
+                        LOGGER.info("Active connection port {}", activeBrokerPort);
 
-                final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connections[0]);
-                LOGGER.info("Active connection port {}", activeBrokerPort);
+                        List<Integer> inactivePorts = new ArrayList<>(ports);
+                        inactivePorts.remove(new Integer(activeBrokerPort));
 
-                List<Integer> inactivePorts = new ArrayList<>(ports);
-                inactivePorts.remove(new Integer(activeBrokerPort));
+                        final CountDownLatch latch = new CountDownLatch(inactivePorts.size());
+                        for (int port : inactivePorts)
+                        {
+                            final int inactiveBrokerPort = port;
+                            LOGGER.info("Stop node for inactive broker on port " + inactiveBrokerPort);
+
+                            executorService.submit(() -> {
+                                try
+                                {
+                                    getBrokerAdmin().setNodeAttributes(inactiveBrokerPort,
+                                                                       Collections.singletonMap(
+                                                                               BDBHAVirtualHostNode.DESIRED_STATE,
+                                                                               State.STOPPED.name()));
+                                }
+                                catch (Exception e)
+                                {
+                                    LOGGER.error("Failed to stop node on broker with port {}", inactiveBrokerPort, e);
+                                }
+                                finally
+                                {
+                                    latch.countDown();
+                                }
+                            });
+                        }
 
-                final CountDownLatch latch = new CountDownLatch(inactivePorts.size());
-                for (int port : inactivePorts)
-                {
-                    final int inactiveBrokerPort = port;
-                    LOGGER.info("Stop node for inactive broker on port " + inactiveBrokerPort);
+                        latch.await(500, TimeUnit.MILLISECONDS);
 
-                    executorService.submit(() -> {
-                        try
+                        LOGGER.info("Committing transactions in parallel to provoke a lot of syncing to disk");
+                        for (final Session session : sessions)
                         {
-                            _groupCreator.setNodeAttributes(inactiveBrokerPort,
-                                                            inactiveBrokerPort,
-                                                            Collections.singletonMap(
-                                                                    BDBHAVirtualHostNode.DESIRED_STATE,
-                                                                    State.STOPPED.name()));
+                            executorService.submit(() -> {
+                                try
+                                {
+                                    session.commit();
+                                }
+                                catch (JMSException e)
+                                {
+                                    // majority of commits might fail due to insufficient replicas
+                                }
+                            });
                         }
-                        catch (Exception e)
+
+                        LOGGER.info("Verify that stopped nodes are in detached role");
+                        for (int port : inactivePorts)
                         {
-                            LOGGER.error("Failed to stop node on broker with port {}", inactiveBrokerPort, e);
+                            getBrokerAdmin().awaitNodeRole(port, NodeRole.DETACHED.name());
                         }
-                        finally
+
+                        LOGGER.info("Start stopped nodes");
+                        for (int port : inactivePorts)
                         {
-                            latch.countDown();
+                            LOGGER.info("Starting node for inactive broker on port " + port);
+                            try
+                            {
+                                getBrokerAdmin().setNodeAttributes(port,
+                                                                   Collections.singletonMap(
+                                                                           BDBHAVirtualHostNode.DESIRED_STATE,
+                                                                           State.ACTIVE.name()));
+                            }
+                            catch (Exception e)
+                            {
+                                LOGGER.error("Failed to start node on broker with port " + port, e);
+                            }
                         }
-                    });
-                }
 
-                latch.await(500, TimeUnit.MILLISECONDS);
+                        for (int port : ports)
+                        {
+                            getBrokerAdmin().awaitNodeRole(port, "REPLICA", "MASTER");
+                        }
 
-                LOGGER.info("Committing transactions in parallel to provoke a lot of syncing to disk");
-                for (final Session session : sessions)
-                {
-                    executorService.submit(() -> {
-                        try
+                        if (failoverListener.isFailoverStarted())
                         {
-                            session.commit();
+                            LOGGER.info("Waiting for failover completion");
+                            failoverListener.awaitFailoverCompletion(20000 * connectionNumber);
+                            LOGGER.info("Failover has finished");
                         }
-                        catch (JMSException e)
+                        else
                         {
-                            // majority of commits might fail due to insufficient replicas
+                            LOGGER.info("Failover never started");
                         }
-                    });
-                }
-
-                LOGGER.info("Verify that stopped nodes are in detached role");
-                for (int port : inactivePorts)
-                {
-                    _groupCreator.awaitNodeToAttainRole(port, NodeRole.DETACHED.name());
+                    }
                 }
-
-                LOGGER.info("Start stopped nodes");
-                for (int port : inactivePorts)
+                finally
                 {
-                    LOGGER.info("Starting node for inactive broker on port " + port);
-                    try
-                    {
-                        _groupCreator.setNodeAttributes(port,
-                                                        port,
-                                                        Collections.singletonMap(
-                                                                BDBHAVirtualHostNode.DESIRED_STATE,
-                                                                State.ACTIVE.name()));
-                    }
-                    catch (Exception e)
+                    for (Connection c: connections)
                     {
-                        LOGGER.error("Failed to start node on broker with port " + port, e);
+                        try
+                        {
+                            c.close();
+                        }
+                        finally
+                        {
+                            LOGGER.error("Unexpected exception on connection close");
+                        }
                     }
                 }
-
-                for (int port : ports)
-                {
-                    _groupCreator.awaitNodeToAttainRole(port, "REPLICA", "MASTER");
-                }
-
-                if (failoverListener.isFailoverStarted())
-                {
-                    LOGGER.info("Waiting for failover completion");
-                    failoverListener.awaitFailoverCompletion(20000 * connectionNumber);
-                    LOGGER.info("Failover has finished");
-                }
-                else
-                {
-                    LOGGER.info("Failover never started");
-                }
+            }
+            finally
+            {
+                consumerConnection.close();
             }
         }
         finally
@@ -646,127 +684,148 @@ public class MultiNodeTest extends QpidBrokerTestCase
      * Tests aims to demonstrate that in a disaster situation (where all nodes except the master is lost), that operation
      * can be continued from a single node using the QUORUM_OVERRIDE feature.
      */
+    @Test
     public void testQuorumOverride() throws Exception
     {
-        final Connection connection = _positiveFailoverBuilder.build();
-        getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
-        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-        getJmsProvider().createQueue(session, getTestQueueName());
-        session.close();
+        final Connection connection = getConnectionBuilder().build();
+        try
+        {
+            getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
+            Destination queue = createTestQueue(connection);
 
-        Set<Integer> ports = _groupCreator.getBrokerPortNumbersForNodes();
+            Set<Integer> ports =
+                    Arrays.stream(getBrokerAdmin().getGroupAmqpPorts()).boxed().collect(Collectors.toSet());
 
-        final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
-        ports.remove(activeBrokerPort);
+            final int activeBrokerPort = getJmsProvider().getConnectedURI(connection).getPort();
+            ports.remove(activeBrokerPort);
 
-        // Stop all other nodes
-        for (Integer p : ports)
-        {
-            _groupCreator.stopNode(p);
-        }
+            // Stop all other nodes
+            for (Integer p : ports)
+            {
+                getBrokerAdmin().stopNode(p);
+            }
 
-        LOGGER.info("Awaiting failover to start");
-        _failoverListener.awaitPreFailover(FAILOVER_COMPLETION_TIMEOUT);
-        LOGGER.info("Failover has begun");
+            LOGGER.info("Awaiting failover to start");
+            _failoverListener.awaitPreFailover(FAILOVER_COMPLETION_TIMEOUT);
+            LOGGER.info("Failover has begun");
 
-        Map<String, Object> attributes = _groupCreator.getNodeAttributes(activeBrokerPort);
-        assertEquals("Broker has unexpected quorum override", new Integer(0), attributes.get(BDBHAVirtualHostNode.QUORUM_OVERRIDE));
-        _groupCreator.setNodeAttributes(activeBrokerPort, Collections.singletonMap(BDBHAVirtualHostNode.QUORUM_OVERRIDE, 1));
+            Map<String, Object> attributes = getBrokerAdmin().getNodeAttributes(activeBrokerPort);
+            assertEquals("Broker has unexpected quorum override",
+                         new Integer(0),
+                         attributes.get(BDBHAVirtualHostNode.QUORUM_OVERRIDE));
+            getBrokerAdmin().setNodeAttributes(activeBrokerPort,
+                                               Collections.singletonMap(BDBHAVirtualHostNode.QUORUM_OVERRIDE, 1));
 
-        attributes = _groupCreator.getNodeAttributes(activeBrokerPort);
-        assertEquals("Broker has unexpected quorum override", new Integer(1), attributes.get(BDBHAVirtualHostNode.QUORUM_OVERRIDE));
+            attributes = getBrokerAdmin().getNodeAttributes(activeBrokerPort);
+            assertEquals("Broker has unexpected quorum override",
+                         new Integer(1),
+                         attributes.get(BDBHAVirtualHostNode.QUORUM_OVERRIDE));
 
-        _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
-        LOGGER.info("Failover has finished");
+            _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
+            LOGGER.info("Failover has finished");
 
-        assertProducingConsuming(connection);
+            assertThat(Utils.produceConsume(connection, queue), is(equalTo(true)));
+        }
+        finally
+        {
+            connection.close();
+        }
     }
 
+    @Test
     public void testPriority() throws Exception
     {
-        final Connection connection = _positiveFailoverBuilder.build();
-        getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
-        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-        getJmsProvider().createQueue(session, getTestQueueName());
-        session.close();
+        final Connection connection = getConnectionBuilder().build();
+        try
+        {
+            getJmsProvider().addGenericConnectionListener(connection, _failoverListener);
+            Destination queue = createTestQueue(connection);
 
-        final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection);
-        LOGGER.info("Active connection port {}", activeBrokerPort);
+            final int activeBrokerPort = getJmsProvider().getConnectedURI(connection).getPort();
+            LOGGER.info("Active connection port {}", activeBrokerPort);
 
-        int priority = 1;
-        Integer highestPriorityBrokerPort = null;
-        Set<Integer> ports = _groupCreator.getBrokerPortNumbersForNodes();
-        for (Integer port : ports)
-        {
-            if (activeBrokerPort != port)
+            int priority = 1;
+            Integer highestPriorityBrokerPort = null;
+            Set<Integer> ports =
+                    Arrays.stream(getBrokerAdmin().getGroupAmqpPorts()).boxed().collect(Collectors.toSet());
+            for (Integer port : ports)
             {
-                priority = priority + 1;
-                highestPriorityBrokerPort = port;
-                _groupCreator.setNodeAttributes(port, port, Collections.singletonMap(BDBHAVirtualHostNode.PRIORITY, priority));
-                Map<String, Object> attributes = _groupCreator.getNodeAttributes(port, port);
-                assertEquals("Broker has unexpected priority", priority, attributes.get(BDBHAVirtualHostNode.PRIORITY));
+                if (activeBrokerPort != port)
+                {
+                    priority = priority + 1;
+                    highestPriorityBrokerPort = port;
+                    getBrokerAdmin().setNodeAttributes(port,
+                                                       Collections.singletonMap(BDBHAVirtualHostNode.PRIORITY,
+                                                                                priority));
+                    Map<String, Object> attributes = getBrokerAdmin().getNodeAttributes(port);
+                    assertEquals("Broker has unexpected priority",
+                                 priority,
+                                 attributes.get(BDBHAVirtualHostNode.PRIORITY));
+                }
             }
-        }
 
-        LOGGER.info("Broker on port {} has the highest priority of {}", highestPriorityBrokerPort, priority);
+            LOGGER.info("Broker on port {} has the highest priority of {}", highestPriorityBrokerPort, priority);
 
-        // make sure all remote nodes are materialized on the master
-        // in order to make sure that DBPing is not invoked
-        for (Integer port : ports)
-        {
-            if (activeBrokerPort != port)
+            for (Integer port : ports)
             {
-                _groupCreator.awaitNodeToAttainAttributeValue(activeBrokerPort, port, BDBHARemoteReplicationNode.ROLE, "REPLICA");
+                if (activeBrokerPort != port)
+                {
+                    getBrokerAdmin().awaitNodeRole(port, BDBHARemoteReplicationNode.ROLE, "REPLICA");
+                }
             }
-        }
 
-        // do work on master
-        assertProducingConsuming(connection);
+            // do work on master
+            assertThat(Utils.produceConsume(connection, queue), is(equalTo(true)));
 
-        Map<String, Object> masterNodeAttributes = _groupCreator.getNodeAttributes(activeBrokerPort);
+            Map<String, Object> masterNodeAttributes = getBrokerAdmin().getNodeAttributes(activeBrokerPort);
 
-        Object lastTransactionId = masterNodeAttributes.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID);
-        assertTrue("Unexpected last transaction id: " + lastTransactionId, lastTransactionId instanceof Number);
+            Object lastTransactionId =
+                    masterNodeAttributes.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID);
+            assertTrue("Unexpected last transaction id: " + lastTransactionId, lastTransactionId instanceof Number);
 
-        // make sure all remote nodes have the same transaction id as master
-        for (Integer port : ports)
-        {
-            if (activeBrokerPort != port)
+            // make sure all remote nodes have the same transaction id as master
+            for (Integer port : ports)
             {
-                _groupCreator.awaitNodeToAttainAttributeValue(activeBrokerPort,
-                                                              port,
-                                                              BDBHARemoteReplicationNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID,
-                                                              String.valueOf(lastTransactionId));
+                if (activeBrokerPort != port)
+                {
+                    getBrokerAdmin().awaitNodeToAttainAttributeValue(activeBrokerPort,
+                                                                     BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID,
+                                                                     lastTransactionId);
+                }
             }
-        }
 
-        LOGGER.info("Shutting down the MASTER");
-        _groupCreator.stopNode(activeBrokerPort);
+            LOGGER.info("Shutting down the MASTER");
+            getBrokerAdmin().stopNode(activeBrokerPort);
 
-        _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
-        LOGGER.info("Listener has finished");
+            _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT);
+            LOGGER.info("Listener has finished");
 
-        Map<String, Object> attributes = _groupCreator.getNodeAttributes(highestPriorityBrokerPort, highestPriorityBrokerPort);
-        assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE));
+            Map<String, Object> attributes =
+                    getBrokerAdmin().getNodeAttributes(highestPriorityBrokerPort);
+            assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE));
 
-        assertProducingConsuming(connection);
+            assertThat(Utils.produceConsume(connection, queue), is(equalTo(true)));
+        }
+        finally
+        {
+            connection.close();
+        }
     }
 
+    @Test
     public void testClusterCannotStartWithIntruder() throws Exception
     {
-        //set property explicitly as test requires broker to start to enable check for ERRORED nodes
-        setSystemProperty(Broker.BROKER_FAIL_STARTUP_WITH_ERRORED_CHILD, String.valueOf(Boolean.FALSE));
-
-        int intruderPort = getNextAvailable(Collections.max(_groupCreator.getBdbPortNumbers()) + 1);
+        int intruderPort =
+                new PortHelper().getNextAvailable(Arrays.stream(getBrokerAdmin().getBdbPorts()).max().getAsInt() + 1);
         String nodeName = "intruder";
-        String nodeHostPort = _groupCreator.getIpAddressOfBrokerHost() + ":" + intruderPort;
+        String nodeHostPort = getBrokerAdmin().getHost() + ":" + intruderPort;
         File environmentPathFile = Files.createTempDirectory("qpid-work-intruder").toFile();
         try
         {
             environmentPathFile.mkdirs();
             ReplicationConfig replicationConfig =
-                    new ReplicationConfig(_groupCreator.getGroupName(), nodeName, nodeHostPort);
-            replicationConfig.setHelperHosts(_groupCreator.getHelperHostPort());
+                    new ReplicationConfig("test", nodeName, nodeHostPort);
+            replicationConfig.setHelperHosts(getBrokerAdmin().getHelperHostPort());
             EnvironmentConfig envConfig = new EnvironmentConfig();
             envConfig.setAllowCreate(true);
             envConfig.setTransactional(true);
@@ -775,7 +834,9 @@ public class MultiNodeTest extends QpidBrokerTestCase
                                                    Durability.ReplicaAckPolicy.SIMPLE_MAJORITY));
 
             final String currentThreadName = Thread.currentThread().getName();
-            try(ReplicatedEnvironment intruder = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig))
+            try (ReplicatedEnvironment intruder = new ReplicatedEnvironment(environmentPathFile,
+                                                                            replicationConfig,
+                                                                            envConfig))
             {
                 LOGGER.debug("Intruder started");
             }
@@ -784,23 +845,24 @@ public class MultiNodeTest extends QpidBrokerTestCase
                 Thread.currentThread().setName(currentThreadName);
             }
 
-            for (int port : _groupCreator.getBrokerPortNumbersForNodes())
+            Set<Integer> ports =
+                    Arrays.stream(getBrokerAdmin().getGroupAmqpPorts()).boxed().collect(Collectors.toSet());
+            for (int port : ports)
             {
-                _groupCreator.awaitNodeToAttainAttributeValue(port,
-                                                              port,
-                                                              BDBHAVirtualHostNode.STATE,
-                                                              State.ERRORED.name());
+                getBrokerAdmin().awaitNodeToAttainAttributeValue(port,
+                                                                 BDBHAVirtualHostNode.STATE,
+                                                                 State.ERRORED.name());
             }
 
-            _groupCreator.stopCluster();
-            _groupCreator.startCluster();
+            getBrokerAdmin().stop();
+            getBrokerAdmin().start(false);
 
-            for (int port : _groupCreator.getBrokerPortNumbersForNodes())
+            for (int port : ports)
             {
-                _groupCreator.awaitNodeToAttainAttributeValue(port,
-                                                              port,
-                                                              BDBHAVirtualHostNode.STATE,
-                                                              State.ERRORED.name());
+                getBrokerAdmin().awaitNodeToAttainAttributeValue(port,
+
+                                                                 BDBHAVirtualHostNode.STATE,
+                                                                 State.ERRORED.name());
             }
         }
         finally
@@ -844,10 +906,10 @@ public class MultiNodeTest extends QpidBrokerTestCase
             if (!_failoverCompletionLatch.await(delay, TimeUnit.MILLISECONDS))
             {
                 LOGGER.warn("Failover did not occur, dumping threads:\n\n" + TestUtils.dumpThreads() + "\n");
-                Map<Integer,String> threadDumps = _groupCreator.groupThreadumps();
-                for (Map.Entry<Integer,String> entry : threadDumps.entrySet())
+                Map<Integer, String> threadDumps = getBrokerAdmin().groupThreadDumps();
+                for (Map.Entry<Integer, String> entry : threadDumps.entrySet())
                 {
-                    LOGGER.warn("Broker {} thread dump:\n\n {}" , entry.getKey(), entry.getValue());
+                    LOGGER.warn("Broker {} thread dump:\n\n {}", entry.getKey(), entry.getValue());
                 }
             }
             assertEquals("Failover did not occur", 0, _failoverCompletionLatch.getCount());
@@ -870,5 +932,4 @@ public class MultiNodeTest extends QpidBrokerTestCase
             return _failoverStarted;
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/88a12e8c/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java
----------------------------------------------------------------------
diff --git a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java
index 3e0783d..151592c 100644
--- a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java
+++ b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java
@@ -19,157 +19,151 @@
  */
 package org.apache.qpid.server.store.berkeleydb.replication;
 
-import java.util.Collections;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
 import java.util.Map;
 
 import javax.jms.Connection;
 import javax.jms.JMSException;
-import javax.jms.Session;
+import javax.jms.Queue;
+
+import org.junit.Test;
 
-import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode;
-import org.apache.qpid.systests.ConnectionBuilder;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.systests.Utils;
+import org.apache.qpid.tests.utils.RunBrokerAdmin;
 
-public class TwoNodeTest extends QpidBrokerTestCase
+@RunBrokerAdmin(type = "BDB-HA")
+@GroupConfig(numberOfNodes = 2, groupName = "test")
+public class TwoNodeTest extends GroupJmsTestBase
 {
-    private static final String VIRTUAL_HOST = "test";
-
-    private static final int NUMBER_OF_NODES = 2;
-
-    private GroupCreator _groupCreator;
 
-    /** Used when expectation is client will not (re)-connect */
-    private ConnectionBuilder _positiveFailoverBuilder;
-
-    /** Used when expectation is client will not (re)-connect */
-    private ConnectionBuilder _negativeFailoverBuilder;
-
-    @Override
-    protected void setUp() throws Exception
+    @Test
+    public void testMasterDesignatedPrimaryCanBeRestartedWithoutReplica() throws Exception
     {
-        assertTrue(isJavaBroker());
-        assertTrue(isBrokerStorePersistent());
-
-        super.setUp();
-
-        _groupCreator = new GroupCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES);
-    }
+        final Connection initialConnection = getConnectionBuilder().build();
+        int masterPort;
+        Queue queue;
+        try
+        {
+            queue = createTestQueue(initialConnection);
+            masterPort = getJmsProvider().getConnectedURI(initialConnection).getPort();
+            getBrokerAdmin().setDesignatedPrimary(masterPort, true);
+        }
+        finally
+        {
+            initialConnection.close();
+        }
+        getBrokerAdmin().stop();
+        getBrokerAdmin().startNode(masterPort);
 
-    @Override
-    public void startDefaultBroker() throws Exception
-    {
-        // Don't start default broker provided by QBTC.
+        assertProduceConsume(queue);
     }
 
-    private void startCluster(boolean designedPrimary) throws Exception
+    @Test
+    public void testClusterRestartWithoutDesignatedPrimary() throws Exception
     {
-        _groupCreator.configureClusterNodes();
-        _groupCreator.setDesignatedPrimaryOnFirstBroker(designedPrimary);
-        _positiveFailoverBuilder = _groupCreator.getConnectionBuilderForAllClusterNodes();
-        _negativeFailoverBuilder = _groupCreator.getConnectionBuilderForAllClusterNodes(200, 2);
-        _groupCreator.startCluster();
-    }
+        Queue queue;
+        final Connection initialConnection = getConnectionBuilder().build();
+        try
+        {
+            queue = createTestQueue(initialConnection);
+            assertThat(Utils.produceConsume(initialConnection, queue), is(equalTo(true)));
+        }
+        finally
+        {
+            initialConnection.close();
+        }
 
-    public void testMasterDesignatedPrimaryCanBeRestartedWithoutReplica() throws Exception
-    {
-        startCluster(true);
-
-        final Connection initialConnection = _positiveFailoverBuilder.build();
-        Session session = initialConnection.createSession(true, Session.SESSION_TRANSACTED);
-        getJmsProvider().createQueue(session, getTestQueueName());
-        session.close();
-
-        int masterPort = _groupCreator.getBrokerPortNumberFromConnection(initialConnection);
-        assertProducingConsuming(initialConnection);
-        initialConnection.close();
-        _groupCreator.stopCluster();
-        _groupCreator.startNode(masterPort);
-        final Connection secondConnection = _positiveFailoverBuilder.build();
-        assertProducingConsuming(secondConnection);
-        secondConnection.close();
-    }
+        getBrokerAdmin().stop();
+        getBrokerAdmin().start();
 
-    public void testClusterRestartWithoutDesignatedPrimary() throws Exception
-    {
-        startCluster(false);
-
-        final Connection initialConnection = _positiveFailoverBuilder.build();
-        Session session = initialConnection.createSession(true, Session.SESSION_TRANSACTED);
-        getJmsProvider().createQueue(session, getTestQueueName());
-        session.close();
-
-        assertProducingConsuming(initialConnection);
-        initialConnection.close();
-        _groupCreator.stopCluster();
-        _groupCreator.startClusterParallel();
-        final Connection secondConnection = _positiveFailoverBuilder.build();
-        assertProducingConsuming(secondConnection);
-        secondConnection.close();
+        assertProduceConsume(queue);
     }
 
+    @Test
     public void testDesignatedPrimaryContinuesAfterSecondaryStopped() throws Exception
     {
-        startCluster(true);
-        _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfSecondaryNode());
+        int masterPort;
+        Queue queue;
+        final Connection initialConnection = getConnectionBuilder().build();
+        try
+        {
+            masterPort = getJmsProvider().getConnectedURI(initialConnection).getPort();
+            queue = createTestQueue(initialConnection);
+            getBrokerAdmin().setDesignatedPrimary(masterPort, true);
+        }
+        finally
+        {
+            initialConnection.close();
+        }
 
-        final Connection connection = _positiveFailoverBuilder.build();
-        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-        getJmsProvider().createQueue(session, getTestQueueName());
-        session.close();
+        int replicaPort = getBrokerAdmin().getAmqpPort(masterPort);
+        getBrokerAdmin().stopNode(replicaPort);
 
-        assertNotNull("Expected to get a valid connection to primary", connection);
-        assertProducingConsuming(connection);
+        assertProduceConsume(queue);
     }
 
+    @Test
     public void testPersistentOperationsFailOnNonDesignatedPrimaryAfterSecondaryStopped() throws Exception
     {
-        if (getBrokerProtocol().equals(Protocol.AMQP_1_0))
+        int masterPort;
+        Queue queue ;
+        final Connection initialConnection = getConnectionBuilder().build();
+        try
         {
-            // TODO - there seems to be a client defect when a JMS operation is interrupted
-            // by a graceful connection close from the client side.
-            return;
+            masterPort = getJmsProvider().getConnectedURI(initialConnection).getPort();
+            queue = createTestQueue(initialConnection);
+        }
+        finally
+        {
+            initialConnection.close();
         }
 
-        startCluster(false);
-
-        final Connection initialConnection = _negativeFailoverBuilder.build();
-        Session session = initialConnection.createSession(true, Session.SESSION_TRANSACTED);
-        getJmsProvider().createQueue(session, getTestQueueName());
-        initialConnection.close();
+        int replicaPort = getBrokerAdmin().getAmqpPort(masterPort);
 
-        _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfSecondaryNode());
+        getBrokerAdmin().stopNode(replicaPort);
 
         try
         {
-
-            Connection connection = _negativeFailoverBuilder.build();
-            assertProducingConsuming(connection);
+            Connection connection = getConnectionBuilder().setFailoverReconnectDelay(SHORT_FAILOVER_CONNECTDELAY)
+                                                          .setFailoverReconnectAttempts(SHORT_FAILOVER_CYCLECOUNT)
+                                                          .build();
+            Utils.produceConsume(connection, queue);
             fail("Exception not thrown");
         }
         catch(JMSException e)
         {
-            // JMSException should be thrown either on getConnection, or produce/consume
+            // JMSException should be thrown either on connection open, or produce/consume
             // depending on whether the relative timing of the node discovering that the
             // secondary has gone.
         }
     }
 
+    @Test
     public void testSecondaryDoesNotBecomePrimaryWhenDesignatedPrimaryStopped() throws Exception
     {
-        if (getBrokerProtocol().equals(Protocol.AMQP_1_0))
+        int masterPort;
+        final Connection initialConnection = getConnectionBuilder().build();
+        try
         {
-            // TODO - there seems to be a client defect when a JMS operation is interrupted
-            // by a graceful connection close from the client side.
-            return;
+            masterPort = getJmsProvider().getConnectedURI(initialConnection).getPort();
+            getBrokerAdmin().setDesignatedPrimary(masterPort, true);
+        }
+        finally
+        {
+            initialConnection.close();
         }
 
-        startCluster(true);
-        _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfPrimary());
-
+        getBrokerAdmin().stopNode(masterPort);
         try
         {
-            _negativeFailoverBuilder.build();
+            getConnectionBuilder().setFailoverReconnectDelay(SHORT_FAILOVER_CONNECTDELAY)
+                                  .setFailoverReconnectAttempts(SHORT_FAILOVER_CYCLECOUNT)
+                                  .build();
             fail("Connection not expected");
         }
         catch (JMSException e)
@@ -178,70 +172,92 @@ public class TwoNodeTest extends QpidBrokerTestCase
         }
     }
 
+    @Test
     public void testInitialDesignatedPrimaryStateOfNodes() throws Exception
     {
-        startCluster(true);
+        int masterPort;
+        final Connection initialConnection = getConnectionBuilder().build();
+        try
+        {
+            masterPort = getJmsProvider().getConnectedURI(initialConnection).getPort();
+            getBrokerAdmin().setDesignatedPrimary(masterPort, true);
+        }
+        finally
+        {
+            initialConnection.close();
+        }
+
+        Map<String, Object>
+                primaryNodeAttributes = getBrokerAdmin().getNodeAttributes(masterPort);
+        assertThat("Expected primary node to be set as designated primary",
+                   primaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY), is(equalTo(true)));
 
-        Map<String, Object> primaryNodeAttributes = _groupCreator.getNodeAttributes(_groupCreator.getBrokerPortNumberOfPrimary());
-        assertTrue("Expected primary node to be set as designated primary",
-                   (Boolean) primaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY));
+        int replicaPort = getBrokerAdmin().getAmqpPort(masterPort);
 
-        Map<String, Object> secondaryNodeAttributes = _groupCreator.getNodeAttributes(_groupCreator.getBrokerPortNumberOfSecondaryNode());
-        assertFalse("Expected secondary node to NOT be set as designated primary",
-                    (Boolean) secondaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY));
+        Map<String, Object> secondaryNodeAttributes = getBrokerAdmin().getNodeAttributes(replicaPort);
+        assertThat("Expected secondary node to NOT be set as designated primary",
+                   secondaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY), is(equalTo(false)));
     }
 
+    @Test
     public void testSecondaryDesignatedAsPrimaryAfterOriginalPrimaryStopped() throws Exception
     {
-        startCluster(true);
-
-        final Connection initialConnection = _positiveFailoverBuilder.build();
-        Session session = initialConnection.createSession(true, Session.SESSION_TRANSACTED);
-        getJmsProvider().createQueue(session, getTestQueueName());
-        initialConnection.close();
+        int masterPort;
+        Queue queue;
+        final Connection initialConnection = getConnectionBuilder().build();
+        try
+        {
+            masterPort = getJmsProvider().getConnectedURI(initialConnection).getPort();
+            getBrokerAdmin().setDesignatedPrimary(masterPort, true);
+            queue = createTestQueue(initialConnection);
+        }
+        finally
+        {
+            initialConnection.close();
+        }
 
 
-        _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfPrimary());
+        getBrokerAdmin().stopNode(masterPort);
+        int replicaPort = getBrokerAdmin().getAmqpPort(masterPort);
 
-        Map<String, Object> secondaryNodeAttributes = _groupCreator.getNodeAttributes(_groupCreator.getBrokerPortNumberOfSecondaryNode());
-        assertFalse("Expected node to NOT be set as designated primary", (Boolean) secondaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY));
+        Map<String, Object> secondaryNodeAttributes = getBrokerAdmin().getNodeAttributes(replicaPort);
+        assertThat("Expected secondary node to NOT be set as designated primary",
+                   secondaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY), is(equalTo(false)));
 
-        _groupCreator.setNodeAttributes(_groupCreator.getBrokerPortNumberOfSecondaryNode(), Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.DESIGNATED_PRIMARY, true));
+        getBrokerAdmin().setDesignatedPrimary(replicaPort, true);
+        getBrokerAdmin().awaitNodeRole(replicaPort, "MASTER");
 
-        int timeout = 5000;
-        long limit = System.currentTimeMillis() + timeout;
-        while( !((Boolean)secondaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY)) && System.currentTimeMillis() < limit)
-        {
-            Thread.sleep(100);
-            secondaryNodeAttributes = _groupCreator.getNodeAttributes(_groupCreator.getBrokerPortNumberOfSecondaryNode());
-        }
-        assertTrue("Expected secondary to transition to primary within " + timeout, (Boolean) secondaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY));
-
-        final Connection connection = _positiveFailoverBuilder.build();
-        assertNotNull("Expected to get a valid connection to new primary", connection);
-        assertProducingConsuming(connection);
+        assertProduceConsume(queue);
     }
 
+    @Test
     public void testSetDesignatedAfterReplicaBeingStopped() throws Exception
     {
-        startCluster(false);
-
-        final Connection initialConnection = _positiveFailoverBuilder.build();
-        Session session = initialConnection.createSession(true, Session.SESSION_TRANSACTED);
-        getJmsProvider().createQueue(session, getTestQueueName());
-        initialConnection.close();
+        final Connection initialConnection = getConnectionBuilder().build();
+        int masterPort;
+        Queue queue;
+        try
+        {
+            masterPort = getJmsProvider().getConnectedURI(initialConnection).getPort();
+            queue = createTestQueue(initialConnection);
+        }
+        finally
+        {
+            initialConnection.close();
+        }
 
-        _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfSecondaryNode());
+        int replicaPort = getBrokerAdmin().getAmqpPort(masterPort);
+        getBrokerAdmin().stopNode(replicaPort);
 
-        Map<String, Object> secondaryNodeAttributes = _groupCreator.getNodeAttributes(_groupCreator.getBrokerPortNumberOfPrimary());
-        assertFalse("Expected node to NOT be set as designated primary", (Boolean) secondaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY));
+        Map<String, Object>
+                primaryNodeAttributes = getBrokerAdmin().getNodeAttributes(masterPort);
+        assertThat("Expected node to NOT be set as designated primary",
+                   primaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY), is(equalTo(false)));
 
-        _groupCreator.setNodeAttributes(_groupCreator.getBrokerPortNumberOfPrimary(), Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.DESIGNATED_PRIMARY, true));
-        _groupCreator.awaitNodeToAttainRole(_groupCreator.getBrokerPortNumberOfPrimary(), "MASTER" );
+        getBrokerAdmin().setDesignatedPrimary(masterPort, true);
+        getBrokerAdmin().awaitNodeRole(masterPort, "MASTER");
 
-        final Connection connection = _positiveFailoverBuilder.build();
-        assertNotNull("Expected to get a valid connection to primary", connection);
-        assertProducingConsuming(connection);
+        assertProduceConsume(queue);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/88a12e8c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2e8d623..7cd016a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -201,6 +201,7 @@
     <module>systests/protocol-tests-amqp-0-8</module>
     <module>systests/protocol-tests-amqp-0-10</module>
     <module>systests/protocol-tests-amqp-1-0</module>
+    <module>systests/qpid-systests-spawn-admin</module>
     <module>systests/end-to-end-conversion-tests</module>
     <module>perftests</module>
     <module>qpid-perftests-systests</module>
@@ -453,6 +454,12 @@
         <version>${project.version}</version>
       </dependency>
 
+      <dependency>
+        <groupId>org.apache.qpid</groupId>
+        <artifactId>qpid-systests-spawn-admin</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
       <!-- External dependencies -->
       <dependency>
         <groupId>org.apache.qpid</groupId>

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/88a12e8c/systests/qpid-systests-http-management/src/main/java/org/apache/qpid/tests/http/HttpTestBase.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-http-management/src/main/java/org/apache/qpid/tests/http/HttpTestBase.java b/systests/qpid-systests-http-management/src/main/java/org/apache/qpid/tests/http/HttpTestBase.java
index a41ee39..91255ef 100644
--- a/systests/qpid-systests-http-management/src/main/java/org/apache/qpid/tests/http/HttpTestBase.java
+++ b/systests/qpid-systests-http-management/src/main/java/org/apache/qpid/tests/http/HttpTestBase.java
@@ -20,6 +20,8 @@
 
 package org.apache.qpid.tests.http;
 
+import static org.apache.qpid.systests.Utils.getJmsProvider;
+
 import java.net.InetSocketAddress;
 
 import javax.jms.Connection;
@@ -32,11 +34,9 @@ import org.junit.Rule;
 import org.junit.rules.TestName;
 
 import org.apache.qpid.server.model.Protocol;
-import org.apache.qpid.systests.AmqpManagementFacade;
 import org.apache.qpid.systests.ConnectionBuilder;
 import org.apache.qpid.systests.JmsProvider;
-import org.apache.qpid.systests.QpidJmsClient0xProvider;
-import org.apache.qpid.systests.QpidJmsClientProvider;
+import org.apache.qpid.systests.Utils;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 
@@ -59,17 +59,7 @@ public abstract class HttpTestBase extends BrokerAdminUsingTestBase
         _helper = new HttpTestHelper(getBrokerAdmin(),
                                      config != null && config.useVirtualHostAsHost() ? getVirtualHost() : null);
 
-        Protocol protocol = getProtocol();
-        AmqpManagementFacade managementFacade = new AmqpManagementFacade(protocol);
-        if (protocol == Protocol.AMQP_1_0)
-        {
-            _jmsProvider = new QpidJmsClientProvider(managementFacade);
-        }
-        else
-        {
-            _jmsProvider = new QpidJmsClient0xProvider();
-        }
-
+        _jmsProvider = getJmsProvider();
     }
 
     @After
@@ -121,14 +111,12 @@ public abstract class HttpTestBase extends BrokerAdminUsingTestBase
 
     protected static long getReceiveTimeout()
     {
-        return Long.getLong("qpid.test_receive_timeout", 1000L);
+        return Utils.getReceiveTimeout();
     }
 
     protected static Protocol getProtocol()
     {
-        return Protocol.valueOf("AMQP_" + System.getProperty("broker.version", "0-9-1")
-                                                .replace('-', '_')
-                                                .replace('.', '_'));
+        return Utils.getProtocol();
     }
 
     protected String getTestName()

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/88a12e8c/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/AmqpManagementFacade.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/AmqpManagementFacade.java b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/AmqpManagementFacade.java
index 647d34a..5ac57cf 100644
--- a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/AmqpManagementFacade.java
+++ b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/AmqpManagementFacade.java
@@ -492,10 +492,12 @@ public class AmqpManagementFacade
                     return new HashMap<>(bodyMap);
                 }
             }
-            throw new IllegalArgumentException("Management read failed : "
-                                               + response.getStringProperty("statusCode")
-                                               + " - "
-                                               + response.getStringProperty("statusDescription"));
+            throw new AmqpManagementFacade.OperationUnsuccessfulException("Management read failed : "
+                                                                          + response.getStringProperty("statusCode")
+                                                                          + " - "
+                                                                          + response.getStringProperty(
+                    "statusDescription"),
+                                                                          response.getIntProperty("statusCode"));
         }
         finally
         {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org